Introduction

With any sufficiently complex system, the designers and maintainers must make tradeoffs. Apache NiFi is no exception.

NiFi is geared toward being run in an environment in which it is free to consume virtually all system resources, especially CPU, disk, and network bandwidth. It is designed in such a way that data is pulled from a source system, optionally filtered, routed, and transformed, before ultimately being published to zero or more destinations. Moreover, the architecture lends itself best to situations in which the source and destinations of a particular piece of data (FlowFile) are themselves loosely coupled.

As such, NiFi stores all FlowFile content on disk in order to be resilient across restarts. It provides backpressure so that data consumers avoid overwhelming the system if the data publishers/producers are not able to keep up for some amount of time. It provides the ability to assign more resources to individual parts of a dataflow (via the Concurrent Tasks configuration).

All of these design decisions have served NiFi well, making it a leading platform for data integration. However, there are some use cases which lend themselves better to a slightly different architecture than what is provided by traditional NiFi.

For example, some use cases are better served by an architecture in which data is not persisted across restarts. Where, instead of storing the data that has been received, the user knows that the data source is both persistent and replayable. In such a situation, it might make more sense to avoid persisting the data and instead source the data anew after restart. This would provide an advantage in that data could potentially be stored in memory instead of on disk, which can provide better performance. Additionally, it provides the ability to move the processing from one machine to another machine without needing to worry about data loss.

Stateless NiFi provides a different Runtime Engine than traditional NiFi. It is a single-threaded runtime engine, in which data is not persisted across restarts. Additionally, the data that is sourced can be processed through the entire chain of processors in the dataflow before it is ever even acknowledged from the source. The README document for NiFi Stateless provides far more context as to the differences between traditional NiFi and Stateless NiFi, and you are encouraged to read through it in order to gain a better understanding of the different tradeoffs that were made for the Stateless architecture.

Both the traditional NiFi Runtime Engine and the Stateless NiFi Runtime Engine have their strengths and weaknesses. The ideal situation would be one in which users had the ability to easily choose which parts of their dataflow run Stateless and which parts run in the traditional NiFi Runtime Engine.

The ExecuteStateless Processor makes this possible.

Configuration

In order to use the ExecuteStateless Processor, the most important configuration element is the flow definition. That is, where to find that dataflow that is to be run using the Stateless Engine.

Flow Definition

The Processor allows the dataflow to be stored in a local file (i.e., a file local to the NiFi server), a URL that is accessible from the NiFi server, or in a NiFi Registry. Once the flow has been fetched, it is cached in the configured Working Directory for resiliency purposes. If NiFi or the ExecuteStateless Processor is stopped and restarted, we do not want to add a single point of failure by relying on some external service to be available. As a result, when the Processor is started, it will first attempt to fetch the flow from the configured location. If unable to do so, it will load the dataflow from the cache, if it is available.

Ports

Depending on the dataflow that is to be run, it may obtain its data from some external source, such as a JMS Broker via the ConsumeJMS processor. Or, it may take in data from another point in the NiFi flow. In order to do this, the dataflow must be created with an Input Port at the root level of the dataflow. The ExecuteStateless processor is then able to be configured with an incoming connection. When the processor is triggered to run, it will take one FlowFile from the incoming connection and enqueue it into the stateless dataflow for the configured Port. If the Processor is configured to have an incoming Connection, the Input Port property must also be configured, unless there is exactly one Input Port in the dataflow.

Similarly, after completing its processing, the stateless flow may route data to one or more Output Ports. Data routed to these Output Ports will then be transferred to the output relationship of the ExecuteStateless Processor. Any FlowFile routed to the output relationship will also have an attribute added to it named "output.port.name" which can be used to route the data if necessary.

It is a common practice, however, to have ports that use a naming convention such as "success" and "failure." It may not make sense to have the dataflow take in a FlowFile into its Input Port, perform some processing, and route 1 FlowFile to "success" and route another to "failure" and then to have all of the FlowFile transferred to the output relationship together. We are likely to want to consider the processing of the incoming FlowFile to be a failure if any FlowFile makes its way to the "failure" port. In such a case, we want nothing to go to the "output" relationship, and we want the incoming FlowFile instead to be routed to the "failure" relationship of ExecuteStateless. We can accomplish this by simplify providing a comma-separated list of the Output Ports in the dataflow that should be considered a "failure."

Success and Failure

If the ExecuteStateless Processor is configured with an incoming connection, the data will be transferred to one of three relationships: "original," "failure," or "timeout."

When the dataflow is triggered, it will have up to the configured amount of time to complete its processing. This time period is configured via the "Dataflow Timeout" property. If the dataflow has not completed in the allotted amount of time, the dataflow is canceled, and the input FlowFile is routed to the "timeout" relationship.

If any Processor within the dataflow throws an Exception that it does not handle, the dataflow is considered a failure, and the input FlowFile will be routed to the "failure" relationship.

Additionally, if any FlowFile is routed to one of the Ports whose name is defined in the "Failure Ports" property of ExecuteStateless, the dataflow is considered a failure. In this case, an attribute named "failure.port.name" is added to the FlowFile, as there may be multiple ports that are considered failures, and this can be used in order to differentiate between them.

Otherwise, the incoming FlowFile will be routed to the "original" relationship, and any FlowFiles routed to any Output Port of the dataflow will be transferred to the "output" relationship of the ExecuteStateless Processor. All FlowFiles transferred to the "output" relationship will also have an attribute named "output.port.name."

Designing Flows for Stateless

When designing a flow to use with Stateless, it is important to consider how the flow might want to receive its data and what it might want to do with the data once it is processed. One option is for the flow to fully encapsulate the source of data and all destinations. For example, it might have a ConsumeKafkaRecord processor, perform some processing, and then publish to another topic via PublishKafkaRecord.

Another option would be to build a flow that source data from some external source, possibly perform some processing, but not define where the destination of the data. For example, the flow might consist of a ConsumeKafkaRecord_2_6 processor and perform some filtering and transformation, but stop short of publishing the data anywhere. Instead, it can transfer the data to an Output Port, which could then be used by ExecuteStateless to bring that data into the NiFi dataflow.

Similarly, a dataflow may not define where it receives its input from, and instead just use an Input Port, so that any dataflow can be built to source data, and then deliver it to this dataflow, which is responsible for preparing and delivering the data.

Finally, the dataflow may define neither the source nor the destination of the data. Instead, the dataflow will be built to use an Input Port, it will perform some filtering/routing/transformation, and finally provide its processing results to an Output Port.

Input Ports

When designing a Stateless dataflow, it is recommended to use zero or one Input Port. It is possible, however, to define multiple Input Ports. In this case, ExecuteStateless Processor needs to be configured by setting the Input Port property in order to dictate which of those Input Ports the incoming data should be transferred to. Note that the property expects the NAME of the Port and not the identifier. It is also important to note that the name is case sensitive.

Output Ports

While it is recommended not to use more than one Input Port, it often makes sense to make use of multiple Output Ports. For example, consider a dataflow that takes in CSV data representing information about book sales. The dataflow then partitions the data into "large sales" and "small sales," performs some enrichment, and converts the results into JSON. This dataflow might have four different output ports: "Input CSV," "Large Sales," "Small Sales," and "Failure."

Parameters

When we build a dataflow, it is often important that we not run the flow with the exact same configuration in every situation. For example, if we are consuming from Kafka, we may want to parameterize the Kafka Brokers, and the name of the Topic. This is best done by making use of Parameters when building the dataflow.

Once some value has been parameterized, though, we must have some way of conveying values for those parameters to the ExecuteStateless Processor. To do this, we use user-defined properties. When configuring the ExecuteStateless Processor, in the Properties tab, we can click the '+' icon in the top-right. This allows us to add a custom property to the Processor. Whatever is used for the name and value of that property will be used as the name and value of a parameter in the flow.

For example, if our dataflow references a Parameter named "Kafka Topic" and we want to run our dataflow using a value of "book-sales," then we can add a property to ExecuteStateless with the name "Kafka Topic" and the value "book-sales."

Exposing the Dataflow

Now that we've discussed some considerations for building our dataflow, we must consider how we can expose the dataflow, or make the dataflow available to the ExecuteStateless processor.

We have three options for this. Firstly, we can right-click on the Process Group that we want to expose, and choose to add the Process Group to Version Control by adding it to the NiFi Registry. This is the recommended approach.

However, we can also right-click on the Process Group and instead choose to "Download flow definition." At this point, we can copy the flow definition JSON file to every node in our cluster. Or, alternatively, we can upload the flow definition to some location that is accessible via a URL from every node in the cluster. For example, we might choose to check the JSON file into a Git repository and provide the URL to that file to the processor.

It is worth noting that if we define the location of the dataflow to be some external URL or to live within the NiFi Registry, we don't want to add a dependency on that external service to be available and accessible. As a result, when the dataflow is downloaded, it will be cached in the configured Working Directory and if unable to access the flow at some later time, that cached version will be used.

It is also important to note that when using an external URL or NiFi Registry, if the Processor is stopped and started (or NiFi is restarted), it is possible that the dataflow could be different than the last time that it ran. Additionally, it's possible that two nodes in a cluster could potentially be running a different version of the flow if they downloaded the file at different times (or if a different file were copied to the nodes). When using NiFi Registry, this can be avoided by explicitly specifying the version of the flow to run.

Surfacing NiFi Concepts

Because this one processor is responsible for internally running an entire dataflow, there are several concepts that must be taken into consideration.

Data Provenance

Throughout the course of a dataflow, many different intermediate FlowFiles may be created, destroyed, transformed, delivered, and fetched. While traditional NiFi will emit Provenance events for each of these, it is not currently possible with the ExecuteStateless Processor. Because those intermediate FlowFiles are not available, we cannot surface Provenance Events that are emitted by the dataflow, such as SEND and RECEIVE events, because the FlowFiles that were sent and received are not available.

Any FlowFile that is transferred to the "output" relationship will be shown as a CREATE Provenance event if there is no input FlowFile. If there is an input FlowFile, those FlowFiles will be shown as FORK events, the child having forked from the incoming FlowFile.

Counters

If any component within the stateless dataflow adjusts a counter, the counters that are adjusted are surfaced as counters for the ExecuteStateless Processor. Consider a dataflow that takes in a single FlowFile and partitions it into two FlowFiles, which are then sent to different Output Ports. Also consider that the partitioning is performed by a PartitionRecord processor with name PartitionData and ID 167ed9c3-a954-3dba-b6fd-c2e1a4572287. Then, we may see a counter for the ExecuteStateless processor with a name "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287)." This is because the PartitionRecord Processor updates a counter with the name "Records Processed." The additional name and ID of the Processor are added in order to give context.

The above mentioned counter, though, will only be incremented for successful invocations of the dataflow. It may be helpful to understand how many times the counter was updated for failed attempts, also. However, we don't want to combine the counters for successful and failed attempts, because that can lead to confusion. Therefore, if the PartitionRecord processor is successful and updates the counter, but the dataflow fails (for example, a FlowFile is then routed to a Failure Port), the ExecuteStateless processor will now have two counters: "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287)" and "Records Processed - PartitionData (167ed9c3-a954-3dba-b6fd-c2e1a4572287) (Failed attempts)."

Bulletins

We must also consider how bulletins from Processors within the stateless flow get surfaced to the ExecuteStateless processor. This can be helpful for indicating that some concerning behavior is taking place. Any bulletin that is created while running the stateless flow that is at a level of WARNING or ERROR will result in a bulletin being created by the ExecuteStateless Processor (assuming that the ExecuteStateless Processor's Bulletin Level is set sufficiently high in its Settings tab).

Yielding

Similarly, if any Processor in the Stateless flow chooses to yield, the ExecuteStateless processor will yield. This is important if there are source or destination systems that the Stateless flow is unable to communicate with or that have no more data to offer, as this allows us to avoid constantly interacting with that external service, which could add significant load to it.

Performance Considerations

There are a few different performance-related considerations to take into effect when configuring the ExecuteStateless Processor.

Content Storage Strategy

One of the most impactful configuration options for the Processor is the configuration of the "Content Storage Strategy" property. For performance reasons, the processor can be configured to hold all FlowFiles in memory. This includes incoming FlowFiles, as well as intermediate and output FlowFiles. This can be a significant performance improvement but comes with a significant risk. The content is stored on NiFi's heap. This is the same heap that is shared by all other ExecuteStateless flows and by NiFi's processors and the NiFi process itself. If the data is very large, it can quickly exhaust the heap, resulting in Out Of Memory Errors in NiFi. These, in turn, can result in poor performance, as well as instability of the NiFi process itself. For this reason, it is not recommended to use the "Store Content on Heap" option unless it is known that all FlowFiles will be small (less than a few MB). And in order to help safeguard against the case that the Processor receives an unexpectedly large FlowFile, the "Max Input FlowFile Size" property must be configured when storing data on the heap.

Alternatively, and by default, the "Content Storage Strategy" can be configured to store FlowFile content on disk. When this option is used, the content of all FlowFiles is stored in the configured Working Directory. It is important to note, however, that this data is not meant to be persisted across restarts. Instead, this simply provides the Stateless Engine with a way to avoid loading everything into memory. Upon restart, the data will be deleted instead of allowing FlowFiles to resume from where they left off.

Concurrent Tasks

As noted before, the Stateless Engine is single-threaded. However, the processor does allow the user to configure more than one concurrent task. In this situation, each thread/concurrent task will run its own instance of the dataflow. This functions in much the same way as if a single thread were run on each of many different computers. Any internal state that is stored by the processor, such as the creation of a client for interacting with another service, is not shared. Additionally, if any Processors are configured to run on Primary Node only, they will run for each instance.

Run Duration

This Processor supports the configuration of NiFi's Run Duration in the Scheduling tab. If the Processor is expected to process many small FlowFiles, it is recommended to configure this option so that the Processor has a Run Duration of "25 ms." Typically, adjusting the value beyond that offers little benefit, but adjusting from "0 ms" to "25 ms" can provide a very significant performance improvement for many dataflows, at the cost of potentially introducing up to 25 milliseconds worth of additional latency.