Wait 2.0.0

Bundle
org.apache.nifi | nifi-standard-nar
Description
Routes incoming FlowFiles to the 'wait' relationship until a matching release signal is stored in the distributed cache from a corresponding Notify processor. When a matching release signal is identified, a waiting FlowFile is routed to the 'success' relationship. The release signal entry is then removed from the cache. The attributes of the FlowFile that produced the release signal are copied to the waiting FlowFile if the Attribute Cache Regex property of the corresponding Notify processor is set properly. If there are multiple release signals in the cache identified by the Release Signal Identifier, and the Notify processor is configured to copy the FlowFile attributes to the cache, then the FlowFile passing the Wait processor receives the union of the attributes of the FlowFiles that produced the release signals in the cache (identified by Release Signal Identifier). Waiting FlowFiles will be routed to 'expired' if they exceed the Expiration Duration. If you need to wait for more than one signal, specify the desired number of signals via the 'Target Signal Count' property. This is particularly useful with processors that split a source FlowFile into multiple fragments, such as SplitText. In order to wait for all fragments to be processed, connect the 'original' relationship to a Wait processor, and the 'splits' relationship to a corresponding Notify processor. Configure the Notify and Wait processors to use the '${fragment.identifier}' as the value of 'Release Signal Identifier', and specify '${fragment.count}' as the value of 'Target Signal Count' in the Wait processor.It is recommended to use a prioritizer (for instance First In First Out) when using the 'wait' relationship as a loop.
Tags
cache, distributed, hold, map, release, signal, wait
Input Requirement
REQUIRED
Supports Sensitive Dynamic Properties
false
  • Additional Details for Wait 2.0.0

    Wait

    Best practices to handle multiple signal ids at a Wait processor

    When a Wait processor is expected to process multiple signal ids, by configuring ‘Release Signal Identifier’ with a FlowFile attribute Expression Language, there are few things to consider in order to get the expected result. Processor configuration can vary based on your requirement. Also, you will need to have high level understanding on how Wait processor works:

    • The Wait processor only processes a single signal id at a time
    • How frequently the Wait processor runs is defined in the ‘Run Schedule’
    • Which FlowFile is processed is determined by a Prioritizer
    • Not limited to the Wait processor, but for all processors, the order of queued FlowFiles in a connection is undefined if no Prioritizer is set

    See following sections for common patterns

    Release any FlowFile as soon as its signal is notified

    This is the most common use case. FlowFiles are independent and can be released in any order.

    Important configurations:

    • Use FirstInFirstOutPrioritizer (FIFO) at ‘wait’ relationship (or the incoming connection if ‘Wait Mode’ is ‘Keep in the upstream connection)

    The following table illustrates the notified signal ids, queued FlowFiles and what will happen at each Wait run cycle.

    # of Wait run Notified Signals Queue Index (FIFO) FlowFile UUID Signal ID
    1 B 1 a A This FlowFile is processed. But its signal is not found, and will be re-queued at the end of the queue.
    2 b B
    3 c C
    2 B 1 b B This FlowFile is processed and since its signal is notified, this one will be released to ‘success’.
    2 c C
    3 a A
    3 1 c C This FlowFile will be processed at the next run.
    2 a A

    Release higher priority FlowFiles in each signal id

    Multiple FlowFiles share the same signal id, and the order of releasing a FlowFile is important.

    Important configurations:

    • Use a (or set of a) Prioritizer(s) suites your need other than FIFO, at ‘wait’ relationship (or the incoming connection if ‘Wait Mode’ is ‘Keep in the upstream connection), e.g. PriorityPrioritizer
    • Specify adequate ‘Wait Penalty Duration’, e.g. “3 sec”,
    • ‘Wait Penalty Duration’ should be grater than ‘Run Schedule’, e.g “3 sec” > “1 sec”
    • Increase ‘Run Duration’ to avoid the limitation of number of signal ids (see the note below)

    The following table illustrates the notified signal ids, queued FlowFiles and what will happen at each Wait run cycle. The example uses PriorityPrioritizer to control the order of processing FlowFiles within a signal id. If ‘Wait Penalty Duration’ is configured, Wait processor tracks unreleased signal ids and their penalty representing when they will be checked again.

    # of Wait run Notified Signals Signal Penalties Queue Index (via ‘priority’ attribute) FlowFile UUID Signal ID ‘priority’ attr
    1 (00:01) B 1 a-1 A 1 This FlowFile is processed. But its signal is not found. Penalized.
    2 b-1 B 1 Since a-1 and b-1 have the same priority ‘1’, b-1 may be processed before a-1. You can add another Prioritizer to define more specific ordering.
    3 b-2 B 2
    2 (00:02) B A (00:04) 1 a-1 A 1 This FlowFile is the first one according to the configured Prioritizer, but the signal id is penalized. So, this FlowFile is skipped at this execution.
    2 b-1 B 1 This FlowFile is processed.
    3 b-2 B 2
    3 (00:03) A (00:04) 1 a-1 A 1 This FlowFile is the first one but is still penalized.
    2 b-2 B 2 This FlowFile is processed, but its signal is not notified yet, thus will be penalized.
    4 (00:04) B (00:06) 1 a-1 A 1 This FlowFile is no longer penalized, and get processed. But its signal is not notified yet, thus will be penalized again.
    2 b-2 B 2

    The importance of ‘Run Duration’ when ‘Wait Penalty Duration’ is used

    There are limitation of number of signals can be checked based on the combination of ‘Run Schedule’ and ‘Wait Penalize Duration’. If this limitation is engaged, some FlowFiles may not be processed and remain in the ‘wait’ relationship even if their signal ids are notified. Let’s say Wait is configured with:

    • Run Schedule = 1 sec
    • Wait Penalize Duration = 3 sec
    • Release Signal Identifier = ${uuid}

    And there are 5 FlowFiles F1, F2 … F5 in the ‘wait’ relationship. Then the signal for F5 is notified. Wait will work as follows:

    • At 00:00 Wait checks the signal for F1, not found, and penalize F1 (till 00:03)
    • At 00:01 Wait checks the signal for F2, not found, and penalize F2 (till 00:04)
    • At 00:02 Wait checks the signal for F3, not found, and penalize F3 (till 00:05)
    • At 00:03 Wait checks the signal for F4, not found, and penalize F4 (till 00:06)
    • At 00:04 Wait checks the signal for F1 again, because it’s not penalized any longer

    Repeat above cycle, thus F5 will not be released until one of F1 … F4 is released.

    To mitigate such limitation, increasing ‘Run Duration’ is recommended. By increasing ‘Run Duration’, Wait processor can keep being scheduled for that duration. For example, with ‘Run Duration’ 500 ms, Wait should be able to loop through all 5 queued FlowFiles at a single run.

    Using counters

    A counter is basically a label to differentiate signals within the cache. (A cache in this context is a “container” that contains signals that have the same signal identifier.)

    Let’s suppose that there are the following signals in the cache (note, that these are not FlowFiles on the incoming (or wait) connection of the Wait processor, like in the examples above, but release signals stored in the cache.)

    Signal ID Signal Counter Name
    A counter_1
    A counter_1
    A counter_2

    In this state, the following FlowFile gets processed by the Wait processor, (the FlowFile has a signal_counter_name attribute and the Wait processor is configured to use the value of this attribute as the value of the Signal Counter Name property):

    FlowFile UUID Signal ID signal_counter_name
    a-1 A counter_3

    Despite the fact that the cache identified by Signal ID “A” has signals in it, the FlowFile above will be sent to the ' wait’ relationship, since there is no signal in the cache that belongs to the counter named “counter_3”.

    Let’s suppose, that the state of the cache is the same as above, and the following FlowFile gets processed by the Wait processor:

    FlowFile UUID Signal ID signal_counter_name
    a-2 A counter_1

    The FlowFile is transmitted to the ‘success’ relationship, since cache “A” has signals in it and there are signals that belong to “counter_1”. The outgoing FlowFile will have the following attributes and their values appended to it:

    • wait.counter.counter_1 : 2
    • wait.counter.counter_2 : 1
    • wait.counter.total : 3

    The key point here is that counters can be used to differentiate between signals within the cache. If counters are used, a new attribute will be appended to the FlowFile passing the Wait processor for each counter. If a large number of counters are used within a cache, the FlowFile passing the Wait processor will have a large number of attributes appended to it. To avoid that, it is recommended to use multiple caches with a few counters in each, instead of one cache with many counters.

    For example:

    • Cache identified by Release Signal ID “A” has counters: “counter_1” and “counter_2”
    • Cache identified by Release Signal ID “B” has counters: “counter_3” and “counter_4”
    • Cache identified by Release Signal ID “C” has counters: “counter_5” and “counter_6”

    (Counter names do not need to be unique between caches, the counter name(s) used in cache “A” could be reused in cache " B" and “C” as well.)

Properties
Relationships
Name Description
failure When the cache cannot be reached, or if the Release Signal Identifier evaluates to null or empty, FlowFiles will be routed to this relationship
success A FlowFile with a matching release signal in the cache will be routed to this relationship
wait A FlowFile with no matching release signal in the cache will be routed to this relationship
expired A FlowFile that has exceeded the configured Expiration Duration will be routed to this relationship
Writes Attributes
Name Description
wait.start.timestamp All FlowFiles will have an attribute 'wait.start.timestamp', which sets the initial epoch timestamp when the file first entered this processor. This is used to determine the expiration time of the FlowFile. This attribute is not written when the FlowFile is transferred to failure, expired or success
wait.counter.<counterName> The name of each counter for which at least one signal has been present in the cache since the last time the cache was empty gets copied to the current FlowFile as an attribute.
See Also