This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information. NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.
merge, record, content, correlation, stream, event
In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.
Display Name | API Name | Default Value | Allowable Values | Description |
---|---|---|---|---|
Record Reader | record-reader | Controller Service API: RecordReaderFactory Implementations: CEFReader SyslogReader ReaderLookup ProtobufReader Syslog5424Reader CSVReader GrokReader WindowsEventLogReader ScriptedReader AvroReader ParquetReader JsonPathReader ExcelReader JsonTreeReader YamlTreeReader XMLReader | Specifies the Controller Service to use for reading incoming data | |
Record Writer | record-writer | Controller Service API: RecordSetWriterFactory Implementations: JsonRecordSetWriter RecordSetWriterLookup AvroRecordSetWriter XMLRecordSetWriter FreeFormTextRecordSetWriter CSVRecordSetWriter ParquetRecordSetWriter ScriptedRecordSetWriter | Specifies the Controller Service to use for writing out the records | |
Merge Strategy | merge-strategy | Bin-Packing Algorithm |
| Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles |
Correlation Attribute Name | correlation-attribute-name | If specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue. | ||
Attribute Strategy | Attribute Strategy | Keep Only Common Attributes |
| Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved. |
Minimum Number of Records | min-records | 1 | The minimum number of records to include in a bin Supports Expression Language: true (will be evaluated using variable registry only) | |
Maximum Number of Records | max-records | 1000 | The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of records in the last input FlowFile. Supports Expression Language: true (will be evaluated using variable registry only) | |
Minimum Bin Size | min-bin-size | 0 B | The minimum size of for the bin | |
Maximum Bin Size | max-bin-size | The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile. | ||
Max Bin Age | max-bin-age | The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours | ||
Maximum Number of Bins | max.bin.count | 10 | Specifies the maximum number of bins that can be held in memory at any one time. This number should not be smaller than the maximum number of concurrent threads for this Processor, or the bins that are created will often consist only of a single incoming FlowFile. |
Name | Description |
---|---|
failure | If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure |
original | The FlowFiles that were used to create the bundle |
merged | The FlowFile containing the merged records |
Name | Description |
---|---|
fragment.identifier | Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. |
fragment.count | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. |
Name | Description |
---|---|
record.count | The merged FlowFile will have a 'record.count' attribute indicating the number of records that were written to the FlowFile. |
mime.type | The MIME Type indicated by the Record Writer |
merge.count | The number of FlowFiles that were merged into this bundle |
merge.bin.age | The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output |
merge.uuid | UUID of the merged FlowFile that will be added to the original FlowFiles attributes |
<Attributes from Record Writer> | Any Attribute that the configured Record Writer returns will be added to the FlowFile. |