-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
JoinEnrichment 2.0.0
- Bundle
- org.apache.nifi | nifi-standard-nar
- Description
- Joins together Records from two different FlowFiles where one FlowFile, the 'original' contains arbitrary records and the second FlowFile, the 'enrichment' contains additional data that should be used to enrich the first. See Additional Details for more information on how to configure this processor and the different use cases that it aims to accomplish.
- Tags
- combine, enrichment, fork, join, merge, record, recordpath, sql, streams, wrap
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for JoinEnrichment 2.0.0
JoinEnrichment
Introduction
The JoinEnrichment processor is designed to be used in conjunction with the ForkEnrichment Processor. Used together, they provide a powerful mechanism for transforming data into a separate request payload for gathering enrichment data, gathering that enrichment data, optionally transforming the enrichment data, and finally joining together the original payload with the enrichment data.
Typical Dataflow
A ForkEnrichment processor that is responsible for taking in a FlowFile and producing two copies of it: one to the “original” relationship and the other to the “enrichment” relationship. Each copy will have its own set of attributes added to it.
The “original” FlowFile being routed to the JoinEnrichment processor, while the “enrichment” FlowFile is routed in a different direction. Each of these FlowFiles will have an attribute named “enrichment.group.id” with the same value. The JoinEnrichment processor then uses this information to correlate the two FlowFiles. The “enrichment.role” attribute will also be added to each FlowFile but with a different value. The FlowFile routed to “original” will have an enrichment.role of ORIGINAL while the FlowFile routed to “enrichment” will have an enrichment.role of ENRICHMENT.
The Processors that make up the “enrichment” path will vary from use case to use case. In this example, we use JoltTransformJSON processor in order to transform our payload from the original payload into a payload that is expected by our web service. We then use the InvokeHTTP processor in order to gather enrichment data that is relevant to our use case. Other common processors to use in this path include QueryRecord, UpdateRecord, ReplaceText, JoltTransformRecord, and ScriptedTransformRecord. It is also be a common use case to transform the response from the web service that is invoked via InvokeHTTP using one or more of these processors.
After the enrichment data has been gathered, it does us little good unless we are able to somehow combine our enrichment data back with our original payload. To achieve this, we use the JoinEnrichment processor. It is responsible for combining records from both the “original” FlowFile and the “enrichment” FlowFile.
The JoinEnrichment Processor is configured with a separate RecordReader for the “original” FlowFile and for the “enrichment” FlowFile. This means that the original data and the enrichment data can have entirely different schemas and can even be in different data formats. For example, our original payload may be CSV data, while our enrichment data is a JSON payload. Because we make use of RecordReaders, this is entirely okay. The Processor also requires a RecordWriter to use for writing out the enriched payload (i.e., the payload that contains the join of both the “original” and the “enrichment” data).
The JoinEnrichment Processor offers different strategies for how to combine the original records with the enrichment data. Each of these is explained here in some detail.
Wrapper
The Wrapper strategy is the default. Each record in the original payload is expected to have a corresponding record in the enrichment payload. The output record will be a record with two fields:
original
andenrichment
. Each of these will contain the _n_th record from the corresponding FlowFile. For example, if the original FlowFile has the following content:id, name, age 28021, John Doe, 55 832, Jane Doe, 22 29201, Jake Doe, 23 555, Joseph Doe, 2
And our enrichment FlowFile has the following content:
id, email 28021, john.doe@nifi.apache.org 832, jane.doe@nifi.apache.org 29201, jake.doe@nifi.apache.org
This strategy would produce output the looks like this (assuming a JSON Writer):
[ { "original": { "id": 28021, "name": "John Doe", "age": 55 }, "enrichment": { "id": 28021, "email": "john.doe@nifi.apache.org" } }, { "original": { "id": 832, "name": "Jane Doe", "age": 22 }, "enrichment": { "id": 832, "email": "jane.doe@nifi.apache.org" } }, { "original": { "id": 29201, "name": "Jake Doe", "age": 23 }, "enrichment": { "id": 29201, "email": "jake.doe@nifi.apache.org" } }, { "original": { "id": 555, "name": "Joseph Doe", "age": 2 }, "enrichment": null } ]
With this strategy, the first record of the original FlowFile is coupled together with the first record of the enrichment FlowFile. The second record of the original FlowFile is coupled together with the second record of the enrichment FlowFile, and so on. If one of the FlowFiles has more records than the other, a
null
value will be used.Insert Enrichment Fields
The “Insert Enrichment Fields” strategy inserts all the fields of the “enrichment” record into the original record. The records are correlated by their index in the FlowFile. That is, the first record in the “enrichment” FlowFile is inserted into the first record in the “original” FlowFile. The second record of the “enrichment” FlowFile is inserted into the second record of the “original” FlowFile and so on.
When this strategy is selected, the “Record Path” property is required. The Record Path is evaluated against the " original" record. Consider, for example, the following content for the “original” FlowFile:
[ { "purchase": { "customer": { "loyaltyId": 48202, "firstName": "John", "lastName": "Doe" }, "total": 48.28, "items": [ { "itemDescription": "book", "price": 24.14, "quantity": 2 } ] } }, { "purchase": { "customer": { "loyaltyId": 5512, "firstName": "Jane", "lastName": "Doe" }, "total": 121.44, "items": [ { "itemDescription": "book", "price": 28.15, "quantity": 4 }, { "itemDescription": "inkpen", "price": 4.42, "quantity": 2 } ] } } ]
Joined using the following enrichment content:
[ { "customerDetails": { "id": 48202, "phone": "555-555-5555", "email": "john.doe@nifi.apache.org" } }, { "customerDetails": { "id": 5512, "phone": "555-555-5511", "email": "jane.doe@nifi.apache.org" } } ]
Let us then consider that a Record Path is used with a value of “/purchase/customer”. This would yield the following results:
[ { "purchase": { "customer": { "loyaltyId": 48202, "firstName": "John", "lastName": "Doe", "customerDetails": { "id": 48202, "phone": "555-555-5555", "email": "john.doe@nifi.apache.org" } }, "total": 48.28, "items": [ { "itemDescription": "book", "price": 24.14, "quantity": 2 } ] } }, { "purchase": { "customer": { "loyaltyId": 5512, "firstName": "Jane", "lastName": "Doe", "customerDetails": { "id": 5512, "phone": "555-555-5511", "email": "jane.doe@nifi.apache.org" } }, "total": 121.44, "items": [ { "itemDescription": "book", "price": 28.15, "quantity": 4 }, { "itemDescription": "inkpen", "price": 4.42, "quantity": 2 } ] } } ]
SQL
The SQL strategy provides an important capability that differs from the others, in that it allows for correlating the records in the “original” FlowFile and the records in the “enrichment” FlowFile in ways other than index based. That is, the SQL-based strategy doesn’t necessarily correlate the first record of the original FlowFile with the first record of the enrichment FlowFile. Instead, it allows the records to be correlated using standard SQL JOIN expressions.
A common use case for this is to create a payload to query some web service. The response contains identifiers with additional information for enrichment, but the order of the records in the enrichment may not correspond to the order of the records in the original.
As an example, consider the following original payload, in CSV:
id, name, age 28021, John Doe, 55 832, Jane Doe, 22 29201, Jake Doe, 23 555, Joseph Doe, 2
Additionally, consider the following payload for the enrichment data:
customer_id, customer_email, customer_name, customer_since 555, joseph.doe@nifi.apache.org, Joe Doe, 08/Dec/14 832, jane.doe@nifi.apache.org, Mrs. Doe, 14/Nov/14 28021, john.doe@nifi.apache.org, John Doe, 22/Jan/22
When making use of the SQL strategy, we must provide a SQL SELECT statement to combine both our original data and our enrichment data into a single FlowFile. To do this, we treat our original FlowFile as its own table with the name " original" while we treat the enrichment data as its own table with the name “enrichment”.
Given this, we might combine all the data using a simple query such as:
SELECT o.*, e.* FROM original o JOIN enrichment e ON o.id = e.customer_id
And this would provide the following output:
id, name, age, customer_id, customer_email, customer_name, customer_since 28021, John Doe, 55, 28021, john.doe@nifi.apache.org, John Doe, 22/Jan/22 832, Jane Doe, 22, 832, jane.doe@nifi.apache.org, Mrs. Doe, 14/Nov/14 555, Joseph Doe, 2, 555, joseph.doe@nifi.apache.org, Joe Doe, 08/Dec/14
Note that in this case, the record for Jake Doe was removed because we used a JOIN, rather than an OUTER JOIN. We could instead use a LEFT OUTER JOIN to ensure that we retain all records from the original FlowFile and simply provide null values for any missing records in the enrichment:
SELECT o.*, e.* FROM original o LEFT OUTER JOIN enrichment e ON o.id = e.customer_id
Which would produce the following output:
id, name, age, customer_id, customer_email, customer_name, customer_since 28021, John Doe, 55, 28021, john.doe@nifi.apache.org, John Doe, 22/Jan/22 832, Jane Doe, 22, 832, jane.doe@nifi.apache.org, Mrs. Doe, 14/Nov/14 29201, Jake Doe, 23,,,, 555, Joseph Doe, 2, 555, joseph.doe@nifi.apache.org, Joe Doe, 08/Dec/14
But SQL is far more expressive than this, allowing us to perform far more powerful expressions. In this case, we probably don’t want both the “id” and “customer_id” fields, or the “name” and “customer_name” fields. Let’s consider, though, that the enrichment provides the customer’s preferred name instead of their legal name. We might want to drop the customer_since column, as it doesn’t make sense for our use case. We might then change our SQL to the following:
SELECT o.id, o.name, e.customer_name AS preferred_name, o.age, e.customer_email AS email FROM original o LEFT OUTER JOIN enrichment e ON o.id = e.customer_id
And this will produce a more convenient output:
id, name, preferred_name, age, email 28021, John Doe, John Doe, 55, john.doe@nifi.apache.org 832, Jane Doe, Mrs. Doe, 22, jane.doe@nifi.apache.org 29201, Jake Doe,, 23, 555, Joseph Doe, Joe Doe, 2, joseph.doe@nifi.apache.org
So we can see tremendous power from the SQL strategy. However, there is a very important consideration that must be taken into account when using the SQL strategy.
WARNING: while the SQL strategy provides us great power, it may require significant amounts of heap. Depending on the query, the SQL engine may require buffering the contents of the entire “enrichment” FlowFile in memory, in Java’s heap. Additionally, if the Processor is scheduled with multiple concurrent tasks, each of the tasks made hold the entire contents of the enrichment FlowFile in memory. This can lead to heap exhaustion and cause stability problems or OutOfMemoryErrors to occur.
There are a couple of options that will help to mitigate these concerns.
- Split into smaller chunks. It is generally ill-advised to split Record-oriented data into many tiny FlowFiles, as NiFi tends to perform best with larger FlowFiles. The sweet spot for NiFi tends to be around 300 KB to 3 MB in size. So we do not want to break a large FlowFile with 100,000 records into 100,000 FlowFiles each with 1 record. It may be advantageous, though, before the ForkEnrichment processor to break that FlowFile into 100 FlowFiles, each 1,000 records; or 10 FlowFiles, each 10,000 records. This typically results in a smaller amount of enrichment data so that we don’t need to hold as much in memory.
- Before the JoinEnrichment processor, trim the enrichment data to remove any fields that are not desirable. In the example above, we may have used QueryRecord, UpdateRecord, JoltTransformRecord, or updated our schema in order to remove the “customer_since” field from the enrichment dataset. Because we didn’t make use of the field, we could easily remove it before the JoinEnrichment in order to reduce the size of the enrichment FlowFile and thereby reduce the amount of data held in memory.
It is also worth noting that the SQL strategy may result in reordering the records within the FlowFile, so it may be necessary to use an ORDER BY clause, etc. if the ordering is important.
Additional Memory Considerations
In addition to the warning above about using the SQL Join Strategy, there is another consideration to keep in mind in order to limit the amount of information that this Processor must keep in memory. While the Processor does not store the contents of all FlowFiles in memory, it does hold all FlowFiles’ attributes in memory. As a result, the following points should be kept in mind when using this Processor.
- Avoid large attributes. FlowFile attributes should not be used to hold FlowFile content. Attributes are intended to be small. Generally, on the order of 100-200 characters. If there are any large attributes, it is recommended that they be removed by using the UpdateAttribute Processor before the ForkEnrichment processor.
- Avoid large numbers of attributes. While it is important to avoid creating large FlowFile attributes, it is just as important to avoid creating large numbers of attributes. Keeping 30 small attributes on a FlowFile is perfectly fine. Storing 300 attributes, on the other hand, may occupy a significant amount of heap.
- Limit backpressure. The JoinEnrichment Processor will pull into its own memory all the incoming FlowFiles. As a result, it will be helpful to avoid providing a huge number of FlowFiles to the Processor at any given time. This can be done by setting the backpressure limits to a smaller value. For example, in our example above, the ForkEnrichment Processor is connected directly to the JoinEnrichment Processor. We may want to limit the backpressure on this connection to 500 or 1,000 instead of the default 10,000. Doing so will limit the number of FlowFiles that are allowed to be loaded into the JoinEnrichment Processor at one time.
More Complex Joining Strategies
This Processor offers several strategies that can be used for correlating data together and joining records from two different FlowFiles into a single FlowFile. However, there are times when users may require more powerful capabilities than what is offered. We might, for example, want to use the information in an enrichment record to determine whether to null out a value in the corresponding original records.
For such uses cases, the recommended approach is to make use of the Wrapper strategy or the SQL strategy in order to combine the original and enrichment FlowFiles into a single FlowFile. Then, connect the “joined” relationship of this Processor to the most appropriate processor for further processing the data. For example, consider that we use the Wrapper strategy to produce output that looks like this:
{ "original": { "id": 482028, "name": "John Doe", "ssn": "555-55-5555", "phone": "555-555-5555", "email": "john.doe@nifi.apache.org" }, "enrichment": { "country": "UK", "allowsPII": false } }
We might then use the TransformRecord processor with a JSON RecordReader and a JSON RecordSetWriter to transform this. Using Groovy, our transformation may look something like this:
import org.apache.nifi.serialization.record.Record Record original = (Record) record.getValue("original") Record enrichment = (Record) record.getValue("enrichment") if (Boolean.TRUE != enrichment?.getAsBoolean("allowsPII")) { original.setValue("ssn", null) original.setValue("phone", null) original.setValue("email", null) } return original
Which will produce for us the following output:
{ "id": 482028, "name": "John Doe", "ssn": null, "phone": null, "email": null }
In this way, we have used information from the enrichment record to optionally transform the original record. We then return the original record, dropping the enrichment record all together. In this way, we open up an infinite number of possibilities for transforming our original payload based on the content of the enrichment data that we have fetched based on that data.
-
Default Decimal Precision
When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'precision' denoting number of available digits is required. Generally, precision is defined by column data type definition or database engines default. However undefined precision (0) can be returned from some database engines. 'Default Decimal Precision' is used when writing those undefined precision numbers.
- Display Name
- Default Decimal Precision
- Description
- When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'precision' denoting number of available digits is required. Generally, precision is defined by column data type definition or database engines default. However undefined precision (0) can be returned from some database engines. 'Default Decimal Precision' is used when writing those undefined precision numbers.
- API Name
- dbf-default-precision
- Default Value
- 10
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Join Strategy is set to any of [SQL]
-
Default Decimal Scale
When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'scale' denoting number of available decimal digits is required. Generally, scale is defined by column data type definition or database engines default. However when undefined precision (0) is returned, scale can also be uncertain with some database engines. 'Default Decimal Scale' is used when writing those undefined numbers. If a value has more decimals than specified scale, then the value will be rounded-up, e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.
- Display Name
- Default Decimal Scale
- Description
- When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'scale' denoting number of available decimal digits is required. Generally, scale is defined by column data type definition or database engines default. However when undefined precision (0) is returned, scale can also be uncertain with some database engines. 'Default Decimal Scale' is used when writing those undefined numbers. If a value has more decimals than specified scale, then the value will be rounded-up, e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.
- API Name
- dbf-default-scale
- Default Value
- 0
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Join Strategy is set to any of [SQL]
-
Enrichment Record Reader
The Record Reader for reading the 'enrichment' FlowFile
- Display Name
- Enrichment Record Reader
- Description
- The Record Reader for reading the 'enrichment' FlowFile
- API Name
- Enrichment Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Insertion Record Path
Specifies where in the 'original' Record the 'enrichment' Record's fields should be inserted. Note that if the RecordPath does not point to any existing field in the original Record, the enrichment will not be inserted.
- Display Name
- Insertion Record Path
- Description
- Specifies where in the 'original' Record the 'enrichment' Record's fields should be inserted. Note that if the RecordPath does not point to any existing field in the original Record, the enrichment will not be inserted.
- API Name
- Insertion Record Path
- Default Value
- /
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
- Dependencies
-
- Join Strategy is set to any of [Insert Enrichment Fields]
-
Join Strategy
Specifies how to join the two FlowFiles into a single FlowFile
- Display Name
- Join Strategy
- Description
- Specifies how to join the two FlowFiles into a single FlowFile
- API Name
- Join Strategy
- Default Value
- Wrapper
- Allowable Values
-
- Wrapper
- SQL
- Insert Enrichment Fields
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Maximum number of Bins
Specifies the maximum number of bins that can be held in memory at any one time
- Display Name
- Maximum number of Bins
- Description
- Specifies the maximum number of bins that can be held in memory at any one time
- API Name
- Maximum number of Bins
- Default Value
- 10000
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Original Record Reader
The Record Reader for reading the 'original' FlowFile
- Display Name
- Original Record Reader
- Description
- The Record Reader for reading the 'original' FlowFile
- API Name
- Original Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Record Writer
The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result of merging both the 'original' record schema and the 'enrichment' record schema.
- Display Name
- Record Writer
- Description
- The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result of merging both the 'original' record schema and the 'enrichment' record schema.
- API Name
- Record Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
SQL
The SQL SELECT statement to evaluate. Expression Language may be provided, but doing so may result in poorer performance. Because this Processor is dealing with two FlowFiles at a time, it's also important to understand how attributes will be referenced. If both FlowFiles have an attribute with the same name but different values, the Expression Language will resolve to the value provided by the 'enrichment' FlowFile.
- Display Name
- SQL
- Description
- The SQL SELECT statement to evaluate. Expression Language may be provided, but doing so may result in poorer performance. Because this Processor is dealing with two FlowFiles at a time, it's also important to understand how attributes will be referenced. If both FlowFiles have an attribute with the same name but different values, the Expression Language will resolve to the value provided by the 'enrichment' FlowFile.
- API Name
- SQL
- Default Value
- SELECT original.*, enrichment.* FROM original LEFT OUTER JOIN enrichment ON original.id = enrichment.id
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
- Dependencies
-
- Join Strategy is set to any of [SQL]
-
Timeout
Specifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.
- Display Name
- Timeout
- Description
- Specifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.
- API Name
- Timeout
- Default Value
- 10 min
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
Resource | Description |
---|---|
MEMORY | This Processor will load into heap all FlowFiles that are on its incoming queues. While it loads the FlowFiles themselves, and not their content, the FlowFile attributes can be very memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL engine may require buffering the entire contents of the enrichment FlowFile for each concurrent task. See Processor's Additional Details for more details and for steps on how to mitigate these concerns. |
Name | Description |
---|---|
original | Both of the incoming FlowFiles ('original' and 'enrichment') will be routed to this Relationship. I.e., this is the 'original' version of both of these FlowFiles. |
timeout | If one of the incoming FlowFiles (i.e., the 'original' FlowFile or the 'enrichment' FlowFile) arrives to this Processor but the other does not arrive within the configured Timeout period, the FlowFile that did arrive is routed to this relationship. |
failure | If both the 'original' and 'enrichment' FlowFiles arrive at the processor but there was a failure in joining the records, both of those FlowFiles will be routed to this relationship. |
joined | The resultant FlowFile with Records joined together from both the original and enrichment FlowFiles will be routed to this relationship |
Name | Description |
---|---|
mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer |
record.count | The number of records in the FlowFile |