Wait

Description:

Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship, with attributes copied from the FlowFile that produced the release signal from the Notify processor. The release signal entry is then removed from the cache. Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor.It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop.

Tags:

map, cache, wait, hold, distributed, signal, release

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

NameDefault ValueAllowable ValuesDescription
Release Signal IdentifierA value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the release signal cache key
Supports Expression Language: true
Target Signal Count1A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the target signal count. This processor checks whether the signal count has reached this number. If Signal Counter Name is specified, this processor checks a particular counter, otherwise checks against total count in a signal.
Supports Expression Language: true
Signal Counter NameA value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the signal counter name. If not specified, this processor checks the total count in a signal.
Supports Expression Language: true
Wait Buffer Count1Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping FlowFiles by signal identifier. Only a signal identifier can be processed at a processor execution.
Releasable FlowFile Count1A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the releasable FlowFile count. This specifies how many FlowFiles can be released when a target count reaches target signal count. Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.
Supports Expression Language: true
Expiration Duration10 minIndicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship
Distributed Cache ServiceController Service API:
AtomicDistributedMapCacheClient
Implementations: DistributedMapCacheClientService
RedisDistributedMapCacheClientService
The Controller Service that is used to check for release signals from a corresponding Notify processor
Attribute Copy Modekeeporiginal
  • Replace if present When cached attributes are copied onto released FlowFiles, they replace any matching attributes.
  • Keep original Attributes on released FlowFiles are not overwritten by copied cached attributes.
Specifies how to handle attributes copied from FlowFiles entering the Notify processor
Wait Modewait
  • Transfer to wait relationship Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet. This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship. It is recommended to set a prioritizer (for instance First In First Out) on the 'wait' relationship.
  • Keep in the upstream connection Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet. This mode helps keeping upstream connection being full so that the upstream source processor will not be scheduled while back-pressure is active and limit incoming FlowFiles.
Specifies how to handle a FlowFile waiting for a notify signal

Relationships:

NameDescription
expiredA FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship
successA FlowFile with a matching release signal in the cache will be routed to this relationship
waitA FlowFile with no matching release signal in the cache will be routed to this relationship
failureWhen the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
wait.start.timestampAll FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile.
wait.counter.<counterName>If a signal exists when the processor runs, each count value in the signal is copied.

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component requires an incoming relationship.

See Also:

DistributedMapCacheClientService, DistributedMapCacheServer, Notify