-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
MergeRecord 2.0.0
- Bundle
- org.apache.nifi | nifi-standard-nar
- Description
- This Processor merges together multiple record-oriented FlowFiles into a single FlowFile that contains all of the Records of the input FlowFiles. This Processor works by creating 'bins' and then adding FlowFiles to these bins until they are full. Once a bin is full, all of the FlowFiles will be combined into a single output FlowFile, and that FlowFile will be routed to the 'merged' Relationship. A bin will consist of potentially many 'like FlowFiles'. In order for two FlowFiles to be considered 'like FlowFiles', they must have the same Schema (as identified by the Record Reader) and, if the <Correlation Attribute Name> property is set, the same value for the specified attribute. See Processor Usage and Additional Details for more information. NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.
- Tags
- content, correlation, event, merge, record, stream
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for MergeRecord 2.0.0
MergeRecord
Introduction
The MergeRecord Processor allows the user to take many FlowFiles that consist of record-oriented data (any data format for which there is a Record Reader available) and combine the FlowFiles into one larger FlowFile. This may be preferable before pushing the data to a downstream system that prefers larger batches of data, such as HDFS, or in order to improve performance of a NiFi flow by reducing the number of FlowFiles that flow through the system (thereby reducing the contention placed on the FlowFile Repository, Provenance Repository, Content Repository, and FlowFile Queues).
The Processor creates several ‘bins’ to put the FlowFiles in. The maximum number of bins to use is set to 5 by default, but this can be changed by updating the value of the
property. The number of bins is bound in order to avoid running out of Java heap space. Note: while the contents of a FlowFile are stored in the Content Repository and not in the Java heap space, the Processor must hold the FlowFile objects themselves in memory. As a result, these FlowFiles with their attributes can potentially take up a great deal of heap space and cause OutOfMemoryError’s to be thrown. In order to avoid this, if you expect to merge many small FlowFiles together, it is advisable to instead use a MergeRecord that merges no more than say 1,000 records into a bundle and then use a second MergeRecord to merges these small bundles into larger bundles. For example, to merge 1,000,000 records together, use MergeRecord that uses a of 1,000 and route the “merged” Relationship to a second MergeRecord that also sets the to 1,000. The second MergeRecord will then merge 1,000 bundles of 1,000, which in effect produces bundles of 1,000,000. How FlowFiles are Binned
How the Processor determines which bin to place a FlowFile in depends on a few different configuration options. Firstly, the Merge Strategy is considered. The Merge Strategy can be set to one of two options: Bin Packing Algorithm, or Defragment. When the goal is to simply combine smaller FlowFiles into one larger FlowFiles, the Bin Packing Algorithm should be used. This algorithm picks a bin based on whether the FlowFile can fit in the bin according to its size and the
property and whether the FlowFile is ’like’ the other FlowFiles in the bin. What it means for two FlowFiles to be ’like FlowFiles’ is discussed at the end of this section. The “Defragment” Merge Strategy can be used when records need to be explicitly assigned to the same bin. For example, if data is split apart using the SplitRecord Processor, each ‘split’ can be processed independently and later merged back together using this Processor with the Merge Strategy set to Defragment. In order for FlowFiles to be added to the same bin when using this configuration, the FlowFiles must have the same value for the “fragment.identifier” attribute. Each FlowFile with the same identifier must also have the same value for the “fragment.count” attribute (which indicates how many FlowFiles belong in the bin) and a unique value for the “fragment.index” attribute so that the FlowFiles can be ordered correctly.
In order to be added to the same bin, two FlowFiles must be ’like FlowFiles.’ In order for two FlowFiles to be like FlowFiles, they must have the same schema, and if the
property is set, they must have the same value for the specified attribute. For example, if the is set to “filename” then two FlowFiles must have the same value for the “filename” attribute in order to be binned together. If more than one attribute is needed in order to correlate two FlowFiles, it is recommended to use an UpdateAttribute processor before the MergeRecord processor and combine the attributes. For example, if the goal is to bin together two FlowFiles only if they have the same value for the “abc” attribute and the “xyz” attribute, then we could accomplish this by using UpdateAttribute and adding a property with name “correlation.attribute” and a value of “abc=${abc},xyz=${xyz}” and then setting MergeRecord’s property to “correlation.attribute”. It is often useful to bin together only Records that have the same value for some field. For example, if we have point-of-sale data, perhaps the desire is to bin together records that belong to the same store, as identified by the ' storeId’ field. This can be accomplished by making use of the PartitionRecord Processor ahead of MergeRecord. This Processor will allow one or more fields to be configured as the partitioning criteria and will create attributes for those corresponding values. An UpdateAttribute processor could then be used, if necessary, to combine multiple attributes into a single correlation attribute, as described above. See documentation for those processors for more details.
When a Bin is Merged
Above, we discussed how a bin is chosen for a given FlowFile. Once a bin has been created and FlowFiles added to it, we must have some way to determine when a bin is “full” so that we can bin those FlowFiles together into a “merged” FlowFile.
If the
property is set to “Bin Packing Algorithm” then the following rules will be evaluated. Firstly, in order for a bin to be full, both of the thresholds specified by the and the properties must be satisfied. If one of these properties is not set, then it is ignored. Secondly, if either the or the property is reached, then the bin is merged. That is, both of the minimum values must be reached but only one of the maximum values need be reached. Note that the property is a “soft limit,” meaning that all records in a given input FlowFile will be added to the same bin, and as a result the number of records may exceed the maximum configured number of records. Once this happens, though, no more Records will be added to that same bin from another FlowFile. If the is reached for a bin, then the FlowFiles in that bin will be merged, even if the minimum bin size and minimum number of records have not yet been met. Finally, if the maximum number of bins have been created (as specified by the property), and some input FlowFiles cannot fit into any of the existing bins, then the oldest bin will be merged to make room. This is done because otherwise we would not be able to add any additional FlowFiles to the existing bins and would have to wait until the Max Bin Age is reached (if ever) in order to merge any FlowFiles. If the
property is set to “Defragment” then a bin is full only when the number of FlowFiles in the bin is equal to the number specified by the “fragment.count” attribute of one of the FlowFiles in the bin. All FlowFiles that have this attribute must have the same value for this attribute, or else they will be routed to the “failure” relationship. It is not necessary that all FlowFiles have this value, but at least one FlowFile in the bin must have this value or the bin will never be complete. If all the necessary FlowFiles are not binned together by the point at which the bin times amount (as specified by the property), then the FlowFiles will all be routed to the ' failure’ relationship instead of being merged together. Once a bin is merged into a single FlowFile, it can sometimes be useful to understand why exactly the bin was merged when it was. For example, if the maximum number of allowable bins is reached, a merged FlowFile may consist of far fewer records than expected. In order to help understand the behavior, the Processor will emit a JOIN Provenance Events when creating the merged FlowFile, and the JOIN event will include in it a “Details” field that explains why the bin was merged when it was. For example, the event will indicate “Records Merged due to: Bin is full” if the bin reached its minimum thresholds and no more subsequent FlowFiles were added to it. Or it may indicate “Records Merged due to: Maximum number of bins has been exceeded” if the bin was merged due to the configured maximum number of bins being filled and needing to free up space for a new bin.
When a Failure Occurs
When a bin is filled, the Processor is responsible for merging together all the records in those FlowFiles into a single FlowFile. If the Processor fails to do so for any reason (for example, a Record cannot be read from an input FlowFile), then all the FlowFiles in that bin are routed to the ‘failure’ Relationship. The Processor does not skip the single problematic FlowFile and merge the others. This behavior was chosen because of two different considerations. Firstly, without those problematic records, the bin may not truly be full, as the minimum bin size may not be reached without those records. Secondly, and more importantly, if the problematic FlowFile contains 100 “good” records before the problematic ones, those 100 records would already have been written to the “merged” FlowFile. We cannot un-write those records. If we were to then send those 100 records on and route the problematic FlowFile to ‘failure’ then in a situation where the “failure” relationship is eventually routed back to MergeRecord, we could end up continually duplicating those 100 successfully processed records.
Examples
To better understand how this Processor works, we will lay out a few examples. For the sake of simplicity of these examples, we will use CSV-formatted data and write the merged data as CSV-formatted data, but the format of the data is not really relevant, as long as there is a Record Reader that is capable of reading the data and a Record Writer capable of writing the data in the desired format.
Example 1 - Batching Together Many Small FlowFiles
When we want to batch together many small FlowFiles in order to create one larger FlowFile, we will accomplish this by using the “Bin Packing Algorithm” Merge Strategy. The idea here is to bundle together as many FlowFiles as we can within our minimum and maximum number of records and bin size. Consider that we have the following properties set:
Property Name Property Value Merge Strategy Bin Packing Algorithm Minimum Number of Records 3 Maximum Number of Records 5 Also consider that we have the following data on the queue, with the schema indicating a Name and an Age field:
FlowFile ID FlowFile Contents 1 Mark, 33 2 John, 45
Jane, 433 Jake, 3 4 Jan, 2 In this, because we have not configured a Correlation Attribute, and because all FlowFiles have the same schema, the Processor will attempt to add all of these FlowFiles to the same bin. Because the Minimum Number of Records is 3 and the Maximum Number of Records is 5, all the FlowFiles will be added to the same bin. The output, then, is a single FlowFile with the following content:
Mark, 33 John, 45 Jane, 43 Jake, 3 Jan, 2
When the Processor runs, it will bin all the FlowFiles that it can get from the queue. After that, it will merge any bin that is “full enough.” So if we had only 3 FlowFiles on the queue, those 3 would have been added, and a new bin would have been created in the next iteration, once the 4th FlowFile showed up. However, if we had 8 FlowFiles queued up, only 5 would have been added to the first bin. The other 3 would have been added to a second bin, and that bin would then be merged since it reached the minimum threshold of 3 also.
-
Attribute Strategy
Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.
- Display Name
- Attribute Strategy
- Description
- Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.
- API Name
- Attribute Strategy
- Default Value
- Keep Only Common Attributes
- Allowable Values
-
- Keep Only Common Attributes
- Keep All Unique Attributes
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Correlation Attribute Name
If specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.
- Display Name
- Correlation Attribute Name
- Description
- If specified, two FlowFiles will be binned together only if they have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.
- API Name
- correlation-attribute-name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Max Bin Age
The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours
- Display Name
- Max Bin Age
- Description
- The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours
- API Name
- max-bin-age
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Maximum Bin Size
The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.
- Display Name
- Maximum Bin Size
- Description
- The maximum size for the bundle. If not specified, there is no maximum. This is a 'soft limit' in that if a FlowFile is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of bytes in last input FlowFile.
- API Name
- max-bin-size
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Maximum Number of Records
The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of records in the last input FlowFile.
- Display Name
- Maximum Number of Records
- Description
- The maximum number of Records to include in a bin. This is a 'soft limit' in that if a FlowFIle is added to a bin, all records in that FlowFile will be added, so this limit may be exceeded by up to the number of records in the last input FlowFile.
- API Name
- max-records
- Default Value
- 1000
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Maximum Number of Bins
Specifies the maximum number of bins that can be held in memory at any one time. This number should not be smaller than the maximum number of concurrent threads for this Processor, or the bins that are created will often consist only of a single incoming FlowFile.
- Display Name
- Maximum Number of Bins
- Description
- Specifies the maximum number of bins that can be held in memory at any one time. This number should not be smaller than the maximum number of concurrent threads for this Processor, or the bins that are created will often consist only of a single incoming FlowFile.
- API Name
- max.bin.count
- Default Value
- 10
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Merge Strategy
Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles
- Display Name
- Merge Strategy
- Description
- Specifies the algorithm used to merge records. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles
- API Name
- merge-strategy
- Default Value
- Bin-Packing Algorithm
- Allowable Values
-
- Bin-Packing Algorithm
- Defragment
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Minimum Bin Size
The minimum size of for the bin
- Display Name
- Minimum Bin Size
- Description
- The minimum size of for the bin
- API Name
- min-bin-size
- Default Value
- 0 B
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Minimum Number of Records
The minimum number of records to include in a bin
- Display Name
- Minimum Number of Records
- Description
- The minimum number of records to include in a bin
- API Name
- min-records
- Default Value
- 1
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
-
Record Reader
Specifies the Controller Service to use for reading incoming data
- Display Name
- Record Reader
- Description
- Specifies the Controller Service to use for reading incoming data
- API Name
- record-reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Record Writer
Specifies the Controller Service to use for writing out the records
- Display Name
- Record Writer
- Description
- Specifies the Controller Service to use for writing out the records
- API Name
- record-writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
Name | Description |
---|---|
merged | The FlowFile containing the merged records |
original | The FlowFiles that were used to create the bundle |
failure | If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure |
Name | Description |
---|---|
fragment.identifier | Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. |
fragment.count | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute indicates how many FlowFiles should be expected in the given bundle. |
Name | Description |
---|---|
record.count | The merged FlowFile will have a 'record.count' attribute indicating the number of records that were written to the FlowFile. |
mime.type | The MIME Type indicated by the Record Writer |
merge.count | The number of FlowFiles that were merged into this bundle |
merge.bin.age | The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output |
merge.uuid | UUID of the merged FlowFile that will be added to the original FlowFiles attributes |
merge.completion.reason | This processor allows for several thresholds to be configured for merging FlowFiles. This attribute indicates which of the Thresholds resulted in the FlowFiles being merged. For an explanation of each of the possible values and their meanings, see the Processor's Usage / documentation and see the 'Additional Details' page. |
<Attributes from Record Writer> | Any Attribute that the configured Record Writer returns will be added to the FlowFile. |
-
Combine together many arbitrary Records in order to create a single, larger file
- Description
- Combine together many arbitrary Records in order to create a single, larger file
- Configuration
Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type. Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type. Set "Merge Strategy" to `Bin-Packing Algorithm`. Set the "Minimum Bin Size" to desired file size of the merged output file. For example, a value of `1 MB` will result in not merging data until at least 1 MB of data is available (unless the Max Bin Age is reached first). If there is no desired minimum file size, leave the default value of `0 B`. Set the "Minimum Number of Records" property to the minimum number of Records that should be included in the merged output file. For example, setting the value to `10000` ensures that the output file will have at least 10,000 Records in it (unless the Max Bin Age is reached first). Set the "Max Bin Age" to specify the maximum amount of time to hold data before merging. This can be thought of as a "timeout" at which time the Processor will merge whatever data it is, even if the "Minimum Bin Size" and "Minimum Number of Records" has not been reached. It is always recommended to set the value. A reasonable default might be `10 mins` if there is no other latency requirement. Connect the 'merged' Relationship to the next component in the flow. Auto-terminate the 'original' Relationship.
-
Combine together many Records that have the same value for a particular field in the data, in order to create a single, larger file
- Description
- Combine together many Records that have the same value for a particular field in the data, in order to create a single, larger file
- Keywords
- merge, combine, aggregate, like records, similar data
- Processor Configurations
-
org.apache.nifi.processors.standard.PartitionRecord
Configure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type. Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type. Add a single additional property. The name of the property should describe the field on which the data is being merged together. The property's value should be a RecordPath that specifies which output FlowFile the Record belongs to. For example, to merge together data that has the same value for the "productSku" field, add a property named `productSku` with a value of `/productSku`. Connect the "success" Relationship to MergeRecord. Auto-terminate the "original" Relationship.
org.apache.nifi.processors.standard.MergeRecordConfigure the "Record Reader" to specify a Record Reader that is appropriate for the incoming data type. Configure the "Record Writer" to specify a Record Writer that is appropriate for the desired output data type. Set "Merge Strategy" to `Bin-Packing Algorithm`. Set the "Minimum Bin Size" to desired file size of the merged output file. For example, a value of `1 MB` will result in not merging data until at least 1 MB of data is available (unless the Max Bin Age is reached first). If there is no desired minimum file size, leave the default value of `0 B`. Set the "Minimum Number of Records" property to the minimum number of Records that should be included in the merged output file. For example, setting the value to `10000` ensures that the output file will have at least 10,000 Records in it (unless the Max Bin Age is reached first). Set the "Maximum Number of Records" property to a value at least as large as the "Minimum Number of Records." If there is no need to limit the maximum number of records per file, this number can be set to a value that will never be reached such as `1000000000`. Set the "Max Bin Age" to specify the maximum amount of time to hold data before merging. This can be thought of as a "timeout" at which time the Processor will merge whatever data it is, even if the "Minimum Bin Size" and "Minimum Number of Records" has not been reached. It is always recommended to set the value. A reasonable default might be `10 mins` if there is no other latency requirement. Set the value of the "Correlation Attribute Name" property to the name of the property that you added in the PartitionRecord Processor. For example, if merging data based on the "productSku" field, the property in PartitionRecord was named `productSku` so the value of the "Correlation Attribute Name" property should be `productSku`. Set the "Maximum Number of Bins" property to a value that is at least as large as the different number of values that will be present for the Correlation Attribute. For example, if you expect 1,000 different SKUs, set this value to at least `1001`. It is not advisable, though, to set the value above 10,000. Connect the 'merged' Relationship to the next component in the flow. Auto-terminate the 'original' Relationship.