-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
PartitionRecord 2.0.0
- Bundle
- org.apache.nifi | nifi-standard-nar
- Description
- Splits, or partitions, record-oriented data based on the configured fields in the data. One or more properties must be added. The name of the property is the name of an attribute to add. The value of the property is a RecordPath to evaluate against each Record. Two records will go to the same outbound FlowFile only if they have the same value for each of the given RecordPaths. Because we know that all records in a given output FlowFile have the same value for the fields that are specified by the RecordPath, an attribute is added for each field. See Additional Details on the Usage page for more information and examples.
- Tags
- bin, group, organize, partition, record, recordpath, rpath, segment, split
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for PartitionRecord 2.0.0
PartitionRecord
PartitionRecord allows the user to separate out records in a FlowFile such that each outgoing FlowFile consists only of records that are “alike.” To define what it means for two records to be alike, the Processor makes use of NiFi’s RecordPath DSL.
In order to make the Processor valid, at least one user-defined property must be added to the Processor. The value of the property must be a valid RecordPath. Expression Language is supported and will be evaluated before attempting to compile the RecordPath. However, if Expression Language is used, the Processor is not able to validate the RecordPath beforehand and may result in having FlowFiles fail processing if the RecordPath is not valid when being used.
Once one or more RecordPath’s have been added, those RecordPath’s are evaluated against each Record in an incoming FlowFile. In order for Record A and Record B to be considered “like records,” both of them must have the same value for all RecordPath’s that are configured. Only the values that are returned by the RecordPath are held in Java’s heap. The records themselves are written immediately to the FlowFile content. This means that for most cases, heap usage is not a concern. However, if the RecordPath points to a large Record field that is different for each record in a FlowFile, then heap usage may be an important consideration. In such cases, SplitRecord may be useful to split a large FlowFile into smaller FlowFiles before partitioning.
Once a FlowFile has been written, we know that all the Records within that FlowFile have the same value for the fields that are described by the configured RecordPath’s. As a result, this means that we can promote those values to FlowFile Attributes. We do so by looking at the name of the property to which each RecordPath belongs. For example, if we have a property named
country
with a value of/geo/country/name
, then each outbound FlowFile will have an attribute namedcountry
with the value of the/geo/country/name
field. The addition of these attributes makes it very easy to perform tasks such as routing, or referencing the value in another Processor that can be used for configuring where to send the data, etc. However, for any RecordPath whose value is not a scalar value (i.e., the value is of type Array, Map, or Record), no attribute will be added.Examples
To better understand how this Processor works, we will lay out a few examples. For the sake of these examples, let’s assume that our input data is JSON formatted and looks like this:
[ { "name": "John Doe", "dob": "11/30/1976", "favorites": [ "spaghetti", "basketball", "blue" ], "locations": { "home": { "number": 123, "street": "My Street", "city": "New York", "state": "NY", "country": "US" }, "work": { "number": 321, "street": "Your Street", "city": "New York", "state": "NY", "country": "US" } } }, { "name": "Jane Doe", "dob": "10/04/1979", "favorites": [ "spaghetti", "football", "red" ], "locations": { "home": { "number": 123, "street": "My Street", "city": "New York", "state": "NY", "country": "US" }, "work": { "number": 456, "street": "Our Street", "city": "New York", "state": "NY", "country": "US" } } }, { "name": "Jacob Doe", "dob": "04/02/2012", "favorites": [ "chocolate", "running", "yellow" ], "locations": { "home": { "number": 123, "street": "My Street", "city": "New York", "state": "NY", "country": "US" }, "work": null } }, { "name": "Janet Doe", "dob": "02/14/2007", "favorites": [ "spaghetti", "reading", "white" ], "locations": { "home": { "number": 1111, "street": "Far Away", "city": "San Francisco", "state": "CA", "country": "US" }, "work": null } } ]
Example 1 - Partition By Simple Field
For a simple case, let’s partition all the records based on the state that they live in. We can add a property named
state
with a value of/locations/home/state
. The result will be that we will have two outbound FlowFiles. The first will contain an attribute with the namestate
and a value ofNY
. This FlowFile will consist of 3 records: John Doe, Jane Doe, and Jacob Doe. The second FlowFile will consist of a single record for Janet Doe and will contain an attribute namedstate
that has a value ofCA
.Example 2 - Partition By Nullable Value
In the above example, there are three different values for the work location. If we use a RecordPath of
/locations/work/state
with a property name ofstate
, then we will end up with two different FlowFiles. The first will contain records for John Doe and Jane Doe because they have the same value for the given RecordPath. This FlowFile will have an attribute namedstate
with a value ofNY
.The second FlowFile will contain the two records for Jacob Doe and Janet Doe, because the RecordPath will evaluate to
null
for both of them. This FlowFile will have nostate
attribute (unless such an attribute existed on the incoming FlowFile, in which case its value will be unaltered).Example 3 - Partition By Multiple Values
Now let’s say that we want to partition records based on multiple different fields. We now add two properties to the PartitionRecord processor. The first property is named
home
and has a value of/locations/home
. The second property is namedfavorite.food
and has a value of/favorites[0]
to reference the first element in the “favorites” array.This will result in three different FlowFiles being created. The first FlowFile will contain records for John Doe and Jane Doe. It will contain an attribute named “favorite.food” with a value of “spaghetti.” However, because the second RecordPath pointed to a Record field, no “home” attribute will be added. In this case, both of these records have the same value for both the first element of the “favorites” array and the same value for the home address. Janet Doe has the same value for the first element in the “favorites” array but has a different home address. Similarly, Jacob Doe has the same home address but a different value for the favorite food.
The second FlowFile will consist of a single record: Jacob Doe. This FlowFile will have an attribute named " favorite.food" with a value of “chocolate.” The third FlowFile will consist of a single record: Janet Doe. This FlowFile will have an attribute named “favorite.food” with a value of “spaghetti.”
-
Record Reader
Specifies the Controller Service to use for reading incoming data
- Display Name
- Record Reader
- Description
- Specifies the Controller Service to use for reading incoming data
- API Name
- record-reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Record Writer
Specifies the Controller Service to use for writing out the records
- Display Name
- Record Writer
- Description
- Specifies the Controller Service to use for writing out the records
- API Name
- record-writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associated RecordPath.
Each dynamic property represents a RecordPath that will be evaluated against each record in an incoming FlowFile. When the value of the RecordPath is determined for a Record, an attribute is added to the outgoing FlowFile. The name of the attribute is the same as the name of this property. The value of the attribute is the same as the value of the field in the Record that the RecordPath points to. Note that no attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).
- Name
- The name given to the dynamic property is the name of the attribute that will be used to denote the value of the associated RecordPath.
- Description
- Each dynamic property represents a RecordPath that will be evaluated against each record in an incoming FlowFile. When the value of the RecordPath is determined for a Record, an attribute is added to the outgoing FlowFile. The name of the attribute is the same as the name of this property. The value of the attribute is the same as the value of the field in the Record that the RecordPath points to. Note that no attribute will be added if the value returned for the RecordPath is null or is not a scalar value (i.e., the value is an Array, Map, or Record).
- Value
- A RecordPath that points to a field in the Record.
- Expression Language Scope
- FLOWFILE_ATTRIBUTES
Name | Description |
---|---|
success | FlowFiles that are successfully partitioned will be routed to this relationship |
failure | If a FlowFile cannot be partitioned from the configured input format to the configured output format, the unchanged FlowFile will be routed to this relationship |
original | Once all records in an incoming FlowFile have been partitioned, the original FlowFile is routed to this relationship. |
Name | Description |
---|---|
record.count | The number of records in an outgoing FlowFile |
mime.type | The MIME Type that the configured Record Writer indicates is appropriate |
fragment.identifier | All partitioned FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute |
fragment.index | A one-up number that indicates the ordering of the partitioned FlowFiles that were created from a single parent FlowFile |
fragment.count | The number of partitioned FlowFiles generated from the parent FlowFile |
segment.original.filename | The filename of the parent FlowFile |
<dynamic property name> | For each dynamic property that is added, an attribute may be added to the FlowFile. See the description for Dynamic Properties for more information. |
-
Separate records into separate FlowFiles so that all of the records in a FlowFile have the same value for a given field or set of fields.
- Description
- Separate records into separate FlowFiles so that all of the records in a FlowFile have the same value for a given field or set of fields.
- Keywords
- separate, split, partition, break apart, colocate, segregate, record, field, recordpath
- Configuration
Choose a RecordReader that is appropriate based on the format of the incoming data. Choose a RecordWriter that writes the data in the desired output format. Add a single additional property. The name of the property should describe the type of data that is being used to partition the data. The property's value should be a RecordPath that specifies which output FlowFile the Record belongs to. For example, if we want to separate records based on their `transactionType` field, we could add a new property named `transactionType`. The value of the property might be `/transaction/type`. An input FlowFile will then be separated into as few FlowFiles as possible such that each output FlowFile has the same value for the `transactionType` field.
-
Separate records based on whether or not they adhere to a specific criteria
- Description
- Separate records based on whether or not they adhere to a specific criteria
- Keywords
- separate, split, partition, break apart, segregate, record, field, recordpath, criteria
- Configuration
Choose a RecordReader that is appropriate based on the format of the incoming data. Choose a RecordWriter that writes the data in the desired output format. Add a single additional property. The name of the property should describe the criteria. The property's value should be a RecordPath that returns `true` if the Record meets the criteria or `false` otherwise. For example, if we want to separate records based on whether or not they have a transaction total of more than $1,000 we could add a new property named `largeTransaction` with a value of `/transaction/total > 1000`. This will create two FlowFiles. In the first, all records will have a total over `1000`. In the second, all records will have a transaction less than or equal to 1000. Each FlowFile will have an attribute named `largeTransaction` with a value of `true` or `false`.