-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
MergeContent 2.0.0
- Bundle
- org.apache.nifi | nifi-standard-nar
- Description
- Merges a Group of FlowFiles together based on a user-defined strategy and packages them into a single FlowFile. It is recommended that the Processor be configured with only a single incoming connection, as Group of FlowFiles will not be created from FlowFiles in different connections. This processor updates the mime.type attribute as appropriate. NOTE: this processor should NOT be configured with Cron Driven for the Scheduling Strategy.
- Tags
- archive, concatenation, content, correlation, flowfile-stream, flowfile-stream-v3, merge, stream, tar, zip
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for MergeContent 2.0.0
MergeContent
Introduction
The MergeContent Processor provides the ability to combine many FlowFiles into a single FlowFile. There are many reasons that a dataflow designer may want to do this. For example, it may be helpful to create batches of data before sending to a downstream system, because the downstream system is better optimized for large files than for many tiny files. NiFi itself can also benefit from this, as NiFi operates best on “micro-batches,” where each FlowFile is several kilobytes to several megabytes in size.
The Processor creates several ‘bins’ to put the FlowFiles in. The maximum number of bins to use is set to 5 by default, but this can be changed by updating the value of the
property. The number of bins is bound in order to avoid running out of Java heap space. Note: while the contents of a FlowFile are stored in the Content Repository and not in the Java heap space, the Processor must hold the FlowFile objects themselves in memory. As a result, these FlowFiles with their attributes can potentially take up a great deal of heap space and cause OutOfMemoryError’s to be thrown. In order to avoid this, if you expect to merge many small FlowFiles together, it is advisable to instead use a MergeContent that merges no more than say 1,000 FlowFiles into a bundle and then use a second MergeContent to merges these small bundles into larger bundles. For example, to merge 1,000,000 FlowFiles together, use MergeContent that uses a of 1,000 and route the “merged” Relationship to a second MergeContent that also sets the to 1,000. The second MergeContent will then merge 1,000 bundles of 1,000, which in effect produces bundles of 1,000,000. How FlowFiles are Binned
How the Processor determines which bin to place a FlowFile in depends on a few different configuration options. Firstly, the Merge Strategy is considered. The Merge Strategy can be set to one of two options: “Bin Packing Algorithm,” or " Defragment". When the goal is to simply combine smaller FlowFiles into one larger FlowFile, the Bin Packing Algorithm should be used. This algorithm picks a bin based on whether the FlowFile can fit in the bin according to its size and the
property and whether the FlowFile is ’like’ the other FlowFiles in the bin. What it means for two FlowFiles to be ’like FlowFiles’ is discussed at the end of this section. The “Defragment” Merge Strategy can be used when FlowFiles need to be explicitly assigned to the same bin. For example, if data is split apart using the UnpackContent Processor, each unpacked FlowFile can be processed independently and later merged back together using this Processor with the Merge Strategy set to Defragment. In order for FlowFiles to be added to the same bin when using this configuration, the FlowFiles must have the same value for the " fragment.identifier" attribute. Each FlowFile with the same identifier must also have a unique value for the " fragment.index" attribute so that the FlowFiles can be ordered correctly. For a given “fragment.identifier”, at least one FlowFile must have the “fragment.count” attribute (which indicates how many FlowFiles belong in the bin). Other FlowFiles with the same identifier must have the same value for the “fragment.count” attribute, or they can omit this attribute. NOTE: while there are valid use cases for breaking apart FlowFiles and later re-merging them, it is an antipattern to take a larger FlowFile, break it into a million tiny FlowFiles, and then re-merge them. Doing so can result in using huge amounts of Java heap and can result in Out Of Memory Errors. Additionally, it adds large amounts of load to the NiFi framework. This can result in increased CPU and disk utilization and often times can be an order of magnitude lower throughput and an order of magnitude higher latency. As an alternative, whenever possible, dataflows should be built to make use of Record-oriented processors, such as QueryRecord, PartitionRecord, UpdateRecord, LookupRecord, PublishKafkaRecord_2_6, etc.
In order to be added to the same bin, two FlowFiles must be ’like FlowFiles.’ In order for two FlowFiles to be like FlowFiles, they must have the same schema, and if the
property is set, they must have the same value for the specified attribute. For example, if the is set to “filename”, then two FlowFiles must have the same value for the “filename” attribute in order to be binned together. If more than one attribute is needed in order to correlate two FlowFiles, it is recommended to use an UpdateAttribute processor before the MergeContent processor and combine the attributes. For example, if the goal is to bin together two FlowFiles only if they have the same value for the “abc” attribute and the “xyz” attribute, then we could accomplish this by using UpdateAttribute and adding a property with name “correlation.attribute” and a value of “abc=${abc},xyz=${xyz}” and then setting MergeContent’s property to “correlation.attribute”. When a Bin is Merged
Above, we discussed how a bin is chosen for a given FlowFile. Once a bin has been created and FlowFiles added to it, we must have some way to determine when a bin is “full” so that we can bin those FlowFiles together into a “merged” FlowFile.
If the
property is set to “Bin Packing Algorithm”, then the following rules will be evaluated. MergeContent exposes several different thresholds that can be used to create bins that are of an ideal size. For example, the user can specify the minimum number of FlowFiles that must be packaged together before merging will be performed. The minimum number of bytes can also be configured. Additionally, a maximum number of FlowFiles and bytes may be specified.
There are also two other conditions that will result in the contents of a Bin being merged together. The Max Bin Age property specifies the maximum amount of time that FlowFiles can be binned together before the bin is merged. This property should almost always be set, as it provides a means to set a timeout on a bin, so that even if data stops flowing to the Processor for a while (due to a problem with an upstream system, a source processor being stopped, etc.) the FlowFiles won’t remain stuck in the MergeContent processor indefinitely. Additionally, the processor exposes a property for the maximum number of Bins that should be used. For some use cases, this won’t matter much. However, if the Correlation Attribute property is set, this can be important. When an incoming FlowFile is to be placed in a Bin, the processor must find an appropriate Bin to place the FlowFile into, or else create a new one. If a Bin must be created, and the number of Bins that exist is greater than or equal to the value of the
property, then the oldest Bin will be merged together to make room for the new one. If the
property is set to “Defragment”, then a bin is full only when the number of FlowFiles in the bin is equal to the number specified by the “fragment.count” attribute of one of the FlowFiles in the bin. All FlowFiles that have this attribute must have the same value for this attribute, or else they will be routed to the “failure” relationship. It is not necessary that all FlowFiles have this value, but at least one FlowFile in the bin must have this value or the bin will never be complete. If all the necessary FlowFiles are not binned together by the point at which the bin times amount (as specified by the property), then the FlowFiles will all be routed to the ' failure’ relationship instead of being merged together. Whenever the contents of a Bin are merged, an attribute with the name “merge.reason” will be added to the merged FlowFile. The below table provides a listing of all possible values for this attribute with an explanation of each.
Attribute Value Explanation MAX_BYTES_THRESHOLD_REACHED The bin has reached the maximum number of bytes, as configured by the property. When this threshold is reached, the contents of the Bin will be merged together, even if the Minimum Number of Entries has not yet been reached. MAX_ENTRIES_THRESHOLD_REACHED The bin has reached the maximum number of FlowFiles, as configured by the property. When this threshold is reached, the contents of the Bin will be merged together, even if the minimum number of bytes (Min Group Size) has not yet been reached. MIN_THRESHOLDS_REACHED The bin has reached both the minimum number of bytes, as configured by the property, AND the minimum number of FlowFiles, as configured by the property. The bin has not reached the maximum number of bytes (Max Group Size) OR the maximum number of FlowFiles (Maximum Number of Entries). TIMEOUT The Bin has reached the maximum age, as configured by the property. If this threshold is reached, the contents of the Bin will be merged together, even if the Bin has not yet reached either of the minimum thresholds. Note that the age here is determined by when the Bin was created, NOT the age of the FlowFiles that reside within those Bins. As a result, if the Processor is stopped until it has 1 million FlowFiles queued, each one being 10 days old, but the Max Bin Age is set to “1 day,” the Max Bin Age will not be met for at least one full day, even though the FlowFiles themselves are much older than this threshold. If the Processor is stopped and restarted, all Bins are destroyed and recreated, and the timer is reset. BIN_MANAGER_FULL If an incoming FlowFile does not fit into any of the existing Bins (either due to the Maximum thresholds set, or due to the Correlation Attribute being used, etc.), then a new Bin must be created for the incoming FlowFiles. If the number of active Bins is already equal to the property, the oldest Bin will be merged in order to make room for the new Bin. In that case, the Bin Manager is said to be full, and this value will be used. Note that the attribute value is minimally named, while the textual description is far more verbose. This is done for a few reasons. Firstly, storing a large value for the attribute can be more costly, utilizing more heap space and requiring more resources to process. Secondly, it’s more succinct, which makes it easier to talk about. Most importantly, though, it means that a processor such as RouteOnAttribute can be used, if necessary, to route based on the value of the attribute. In this way, the explanation can be further expanded or updated, without changing the value of the attribute and without disturbing existing flows.
-
Attribute Strategy
Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.
- Display Name
- Attribute Strategy
- Description
- Determines which FlowFile attributes should be added to the bundle. If 'Keep All Unique Attributes' is selected, any attribute on any FlowFile that gets bundled will be kept unless its value conflicts with the value from another FlowFile. If 'Keep Only Common Attributes' is selected, only the attributes that exist on all FlowFiles in the bundle, with the same value, will be preserved.
- API Name
- Attribute Strategy
- Default Value
- Keep Only Common Attributes
- Allowable Values
-
- Keep Only Common Attributes
- Keep All Unique Attributes
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Compression Level
Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is ignored
- Display Name
- Compression Level
- Description
- Specifies the compression level to use when using the Zip Merge Format; if not using the Zip Merge Format, this value is ignored
- API Name
- Compression Level
- Default Value
- 1
- Allowable Values
-
- 0
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Format is set to any of [ZIP]
-
Correlation Attribute Name
If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.
- Display Name
- Correlation Attribute Name
- Description
- If specified, like FlowFiles will be binned together, where 'like FlowFiles' means FlowFiles that have the same value for this Attribute. If not specified, FlowFiles are bundled by the order in which they are pulled from the queue.
- API Name
- Correlation Attribute Name
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Merge Strategy is set to any of [Bin-Packing Algorithm]
-
Delimiter Strategy
Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if the values of the properties should be used as the content.
- Display Name
- Delimiter Strategy
- Description
- Determines if Header, Footer, and Demarcator should point to files containing the respective content, or if the values of the properties should be used as the content.
- API Name
- Delimiter Strategy
- Default Value
- Do Not Use Delimiters
- Allowable Values
-
- Do Not Use Delimiters
- Filename
- Text
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Format is set to any of [Binary Concatenation]
-
Demarcator
Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.
- Display Name
- Demarcator
- Description
- Filename or text specifying the demarcator to use. If not specified, no demarcator is supplied.
- API Name
- Demarcator File
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Merge Format is set to any of [Binary Concatenation]
- Delimiter Strategy is set to any of [Filename, Text]
-
Footer
Filename or text specifying the footer to use. If not specified, no footer is supplied.
- Display Name
- Footer
- Description
- Filename or text specifying the footer to use. If not specified, no footer is supplied.
- API Name
- Footer File
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Merge Format is set to any of [Binary Concatenation]
- Delimiter Strategy is set to any of [Filename, Text]
-
Header
Filename or text specifying the header to use. If not specified, no header is supplied.
- Display Name
- Header
- Description
- Filename or text specifying the header to use. If not specified, no header is supplied.
- API Name
- Header File
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Merge Format is set to any of [Binary Concatenation]
- Delimiter Strategy is set to any of [Filename, Text]
-
Keep Path
If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.
- Display Name
- Keep Path
- Description
- If using the Zip or Tar Merge Format, specifies whether or not the FlowFiles' paths should be included in their entry names.
- API Name
- Keep Path
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Format is set to any of [ZIP, TAR]
-
Max Bin Age
The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours
- Display Name
- Max Bin Age
- Description
- The maximum age of a Bin that will trigger a Bin to be complete. Expected format is <duration> <time unit> where <duration> is a positive integer and time unit is one of seconds, minutes, hours
- API Name
- Max Bin Age
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Maximum Group Size
The maximum size for the bundle. If not specified, there is no maximum.
- Display Name
- Maximum Group Size
- Description
- The maximum size for the bundle. If not specified, there is no maximum.
- API Name
- Maximum Group Size
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Merge Strategy is set to any of [Bin-Packing Algorithm]
-
Maximum number of Bins
Specifies the maximum number of bins that can be held in memory at any one time
- Display Name
- Maximum number of Bins
- Description
- Specifies the maximum number of bins that can be held in memory at any one time
- API Name
- Maximum number of Bins
- Default Value
- 5
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Maximum Number of Entries
The maximum number of files to include in a bundle
- Display Name
- Maximum Number of Entries
- Description
- The maximum number of files to include in a bundle
- API Name
- Maximum Number of Entries
- Default Value
- 1000
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Strategy is set to any of [Bin-Packing Algorithm]
-
Merge Format
Determines the format that will be used to merge the content.
- Display Name
- Merge Format
- Description
- Determines the format that will be used to merge the content.
- API Name
- Merge Format
- Default Value
- Binary Concatenation
- Allowable Values
-
- TAR
- ZIP
- FlowFile Stream, v3
- FlowFile Stream, v2
- FlowFile Tar, v1
- Binary Concatenation
- Avro
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Merge Strategy
Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles
- Display Name
- Merge Strategy
- Description
- Specifies the algorithm used to merge content. The 'Defragment' algorithm combines fragments that are associated by attributes back into a single cohesive FlowFile. The 'Bin-Packing Algorithm' generates a FlowFile populated by arbitrarily chosen FlowFiles
- API Name
- Merge Strategy
- Default Value
- Bin-Packing Algorithm
- Allowable Values
-
- Bin-Packing Algorithm
- Defragment
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Metadata Strategy
For FlowFiles whose input format supports metadata (Avro, e.g.), this property determines which metadata should be added to the bundle. If 'Use First Metadata' is selected, the metadata keys/values from the first FlowFile to be bundled will be used. If 'Keep Only Common Metadata' is selected, only the metadata that exists on all FlowFiles in the bundle, with the same value, will be preserved. If 'Ignore Metadata' is selected, no metadata is transferred to the outgoing bundled FlowFile. If 'Do Not Merge Uncommon Metadata' is selected, any FlowFile whose metadata values do not match those of the first bundled FlowFile will not be merged.
- Display Name
- Metadata Strategy
- Description
- For FlowFiles whose input format supports metadata (Avro, e.g.), this property determines which metadata should be added to the bundle. If 'Use First Metadata' is selected, the metadata keys/values from the first FlowFile to be bundled will be used. If 'Keep Only Common Metadata' is selected, only the metadata that exists on all FlowFiles in the bundle, with the same value, will be preserved. If 'Ignore Metadata' is selected, no metadata is transferred to the outgoing bundled FlowFile. If 'Do Not Merge Uncommon Metadata' is selected, any FlowFile whose metadata values do not match those of the first bundled FlowFile will not be merged.
- API Name
- mergecontent-metadata-strategy
- Default Value
- Do Not Merge Uncommon Metadata
- Allowable Values
-
- Use First Metadata
- Keep Only Common Metadata
- Do Not Merge Uncommon Metadata
- Ignore Metadata
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Format is set to any of [Avro]
-
Minimum Group Size
The minimum size for the bundle
- Display Name
- Minimum Group Size
- Description
- The minimum size for the bundle
- API Name
- Minimum Group Size
- Default Value
- 0 B
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Strategy is set to any of [Bin-Packing Algorithm]
-
Minimum Number of Entries
The minimum number of files to include in a bundle
- Display Name
- Minimum Number of Entries
- Description
- The minimum number of files to include in a bundle
- API Name
- Minimum Number of Entries
- Default Value
- 1
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Merge Strategy is set to any of [Bin-Packing Algorithm]
-
Tar Modified Time
If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression (e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.
- Display Name
- Tar Modified Time
- Description
- If using the Tar Merge Format, specifies if the Tar entry should store the modified timestamp either by expression (e.g. ${file.lastModifiedTime} or static value, both of which must match the ISO8601 format 'yyyy-MM-dd'T'HH:mm:ssZ'.
- API Name
- Tar Modified Time
- Default Value
- ${file.lastModifiedTime}
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Merge Format is set to any of [TAR]
Resource | Description |
---|---|
MEMORY | While content is not stored in memory, the FlowFiles' attributes are. The configuration of MergeContent (maximum bin size, maximum group size, maximum bin age, max number of entries) will influence how much memory is used. If merging together many small FlowFiles, a two-stage approach may be necessary in order to avoid excessive use of memory. |
Name | Description |
---|---|
merged | The FlowFile containing the merged content |
original | The FlowFiles that were used to create the bundle |
failure | If the bundle cannot be created, all FlowFiles that would have been used to created the bundle will be transferred to failure |
Name | Description |
---|---|
fragment.identifier | Applicable only if the <Merge Strategy> property is set to Defragment. All FlowFiles with the same value for this attribute will be bundled together. |
fragment.index | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates the order in which the fragments should be assembled. This attribute must be present on all FlowFiles when using the Defragment Merge Strategy and must be a unique (i.e., unique across all FlowFiles that have the same value for the "fragment.identifier" attribute) integer between 0 and the value of the fragment.count attribute. If two or more FlowFiles have the same value for the "fragment.identifier" attribute and the same value for the "fragment.index" attribute, the first FlowFile processed will be accepted and subsequent FlowFiles will not be accepted into the Bin. |
fragment.count | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute indicates how many FlowFiles should be expected in the given bundle. At least one FlowFile must have this attribute in the bundle. If multiple FlowFiles contain the "fragment.count" attribute in a given bundle, all must have the same value. |
segment.original.filename | Applicable only if the <Merge Strategy> property is set to Defragment. This attribute must be present on all FlowFiles with the same value for the fragment.identifier attribute. All FlowFiles in the same bundle must have the same value for this attribute. The value of this attribute will be used for the filename of the completed merged FlowFile. |
tar.permissions | Applicable only if the <Merge Format> property is set to TAR. The value of this attribute must be 3 characters; each character must be in the range 0 to 7 (inclusive) and indicates the file permissions that should be used for the FlowFile's TAR entry. If this attribute is missing or has an invalid value, the default value of 644 will be used |
Name | Description |
---|---|
filename | When more than 1 file is merged, the filename comes from the segment.original.filename attribute. If that attribute does not exist in the source FlowFiles, then the filename is set to the number of nanoseconds matching system time. Then a filename extension may be applied:if Merge Format is TAR, then the filename will be appended with .tar, if Merge Format is ZIP, then the filename will be appended with .zip, if Merge Format is FlowFileStream, then the filename will be appended with .pkg |
merge.count | The number of FlowFiles that were merged into this bundle |
merge.bin.age | The age of the bin, in milliseconds, when it was merged and output. Effectively this is the greatest amount of time that any FlowFile in this bundle remained waiting in this processor before it was output |
merge.uuid | UUID of the merged flow file that will be added to the original flow files attributes. |
merge.reason | This processor allows for several thresholds to be configured for merging FlowFiles. This attribute indicates which of the Thresholds resulted in the FlowFiles being merged. For an explanation of each of the possible values and their meanings, see the Processor's Usage / documentation and see the 'Additional Details' page. |
-
Concatenate FlowFiles with textual content together in order to create fewer, larger FlowFiles.
- Description
- Concatenate FlowFiles with textual content together in order to create fewer, larger FlowFiles.
- Keywords
- concatenate, bundle, aggregate, bin, merge, combine, smash
- Configuration
"Merge Strategy" = "Bin Packing Algorithm" "Merge Format" = "Binary Concatenation" "Delimiter Strategy" = "Text" "Demarcator" = "\n" (a newline can be inserted by pressing Shift + Enter) "Minimum Number of Entries" = "1" "Maximum Number of Entries" = "500000000" "Minimum Group Size" = the minimum amount of data to write to an output FlowFile. A reasonable value might be "128 MB" "Maximum Group Size" = the maximum amount of data to write to an output FlowFile. A reasonable value might be "256 MB" "Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller than the Max Bin Age. A reasonable value might be "5 mins"
-
Concatenate FlowFiles with binary content together in order to create fewer, larger FlowFiles.
- Description
- Concatenate FlowFiles with binary content together in order to create fewer, larger FlowFiles.
- Notes
- Not all binary data can be concatenated together. Whether or not this configuration is valid depends on the type of your data.
- Keywords
- concatenate, bundle, aggregate, bin, merge, combine, smash
- Configuration
"Merge Strategy" = "Bin Packing Algorithm" "Merge Format" = "Binary Concatenation" "Delimiter Strategy" = "Text" "Minimum Number of Entries" = "1" "Maximum Number of Entries" = "500000000" "Minimum Group Size" = the minimum amount of data to write to an output FlowFile. A reasonable value might be "128 MB" "Maximum Group Size" = the maximum amount of data to write to an output FlowFile. A reasonable value might be "256 MB" "Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the FlowFile along even though it is smaller than the Max Bin Age. A reasonable value might be "5 mins"
-
Reassemble a FlowFile that was previously split apart into smaller FlowFiles by a processor such as SplitText, UnpackContext, SplitRecord, etc.
- Description
- Reassemble a FlowFile that was previously split apart into smaller FlowFiles by a processor such as SplitText, UnpackContext, SplitRecord, etc.
- Keywords
- reassemble, repack, merge, recombine
- Configuration
"Merge Strategy" = "Defragment" "Merge Format" = the value of Merge Format depends on the desired output format. If the file was previously zipped together and was split apart by UnpackContent, a Merge Format of "ZIP" makes sense. If it was previously a .tar file, a Merge Format of "TAR" makes sense. If the data is textual, "Binary Concatenation" can be used to combine the text into a single document. "Delimiter Strategy" = "Text" "Max Bin Age" = the maximum amount of time to wait for incoming data before timing out and transferring the fragments to 'failure'. A reasonable value might be "5 mins" For textual data, "Demarcator" should be set to a newline (\n), set by pressing Shift+Enter in the UI. For binary data, "Demarcator" should be left blank.