Wait

Description:

Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. The release signal entry is then removed from the cache. The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex property of the corresponding Notify processor is set properly. If there are multiple release signals in the cache identified by the Release Signal Identifier, and the Notify processor is configured to copy the FlowFile attributes to the cache, then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles that produced the release signals in the cache (identified by Release Signal Identifier). Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor.It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop.

Additional Details...

Tags:

map, cache, wait, hold, distributed, signal, release

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

NameDefault ValueAllowable ValuesDescription
Release Signal IdentifierA value that specifies the key to a specific release signal cache. To decide whether the FlowFile that is being processed by the Wait processor should be sent to the 'success' or the 'wait' relationship, the processor checks the signals in the cache specified by this key.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Target Signal Count1The number of signals that need to be in the cache (specified by the Release Signal Identifier) in order for the FlowFile processed by the Wait processor to be sent to the ‘success’ relationship. If the number of signals in the cache has reached this number, the FlowFile is routed to the 'success' relationship and the number of signals in the cache is decreased by this value. If Signal Counter Name is specified, this processor checks a particular counter, otherwise checks against the total number of signals in the cache.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Signal Counter NameWithin the cache (specified by the Release Signal Identifier) the signals may belong to different counters. If this property is specified, the processor checks the number of signals in the cache that belong to this particular counter. If not specified, the processor checks the total number of signals in the cache.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Wait Buffer Count1Specify the maximum number of incoming FlowFiles that can be buffered to check whether it can move forward. The more buffer can provide the better performance, as it reduces the number of interactions with cache service by grouping FlowFiles by signal identifier. Only a signal identifier can be processed at a processor execution.
Releasable FlowFile Count1A value, or the results of an Attribute Expression Language statement, which will be evaluated against a FlowFile in order to determine the releasable FlowFile count. This specifies how many FlowFiles can be released when a target count reaches target signal count. Zero (0) has a special meaning, any number of FlowFiles can be released as long as signal count matches target.
Supports Expression Language: true (will be evaluated using flow file attributes and variable registry)
Expiration Duration10 minIndicates the duration after which waiting FlowFiles will be routed to the 'expired' relationship
Distributed Cache ServiceController Service API:
AtomicDistributedMapCacheClient
Implementations: HazelcastMapCacheClient
CouchbaseMapCacheClient
HBase_1_1_2_ClientMapCacheService
RedisDistributedMapCacheClientService
DistributedMapCacheClientService
HBase_2_ClientMapCacheService
The Controller Service that is used to check for release signals from a corresponding Notify processor
Attribute Copy Modekeeporiginal
  • Replace if present When cached attributes are copied onto released FlowFiles, they replace any matching attributes.
  • Keep original Attributes on released FlowFiles are not overwritten by copied cached attributes.
Specifies how to handle attributes copied from FlowFiles entering the Notify processor
Wait Modewait
  • Transfer to wait relationship Transfer a FlowFile to the 'wait' relationship when whose release signal has not been notified yet. This mode allows other incoming FlowFiles to be enqueued by moving FlowFiles into the wait relationship. It is recommended to set a prioritizer (for instance First In First Out) on the 'wait' relationship.
  • Keep in the upstream connection Transfer a FlowFile to the upstream connection where it comes from when whose release signal has not been notified yet. This mode helps keeping upstream connection being full so that the upstream source processor will not be scheduled while back-pressure is active and limit incoming FlowFiles.
Specifies how to handle a FlowFile waiting for a notify signal
Wait Penalty DurationIf configured, after a signal identifier got processed but did not meet the release criteria, the signal identifier is penalized and FlowFiles having the signal identifier will not be processed again for the specified period of time, so that the signal identifier will not block others to be processed. This can be useful for use cases where a Wait processor is expected to process multiple signal identifiers, and each signal identifier has multiple FlowFiles, and also the order of releasing FlowFiles is important within a signal identifier. The FlowFile order can be configured with Prioritizers. IMPORTANT: There is a limitation of number of queued signals can be processed, and Wait processor may not be able to check all queued signal ids. See additional details for the best practice.

Relationships:

NameDescription
expiredA FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship
successA FlowFile with a matching release signal in the cache will be routed to this relationship
waitA FlowFile with no matching release signal in the cache will be routed to this relationship
failureWhen the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
wait.start.timestampAll FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. This attribute is not written when the FlowFile is transferred to failure, expired or success
wait.counter.<counterName>The name of each counter for which at least one signal has been present in the cache since the last time the cache was empty gets copied to the current FlowFile as an attribute.

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component requires an incoming relationship.

System Resource Considerations:

None specified.

See Also:

DistributedMapCacheClientService, DistributedMapCacheServer, Notify