-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ScriptedTransformRecord 2.0.0
- Bundle
- org.apache.nifi | nifi-scripting-nar
- Description
- Provides the ability to evaluate a simple script against each record in an incoming FlowFile. The script may transform the record in some way, filter the record, or fork additional records. See Processor's Additional Details for more information.
- Tags
- filter, groovy, modify, record, script, transform, update
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for ScriptedTransformRecord 2.0.0
ScriptedTransformRecord
Description
The ScriptedTransformRecord provides the ability to use a scripting language, such as Groovy, to quickly and easily update the contents of a Record. NiFi provides several different Processors that can be used to manipulate Records in different ways. Each of these processors has its pros and cons. The ScriptedTransformRecord is perhaps the most powerful and most versatile option. However, it is also the most error-prone, as it depends on writing custom scripts. It is also likely to yield the lowest performance, as processors and libraries written directly in Java are likely to perform better than interpreted scripts.
When creating a script, it is important to note that, unlike ExecuteScript, this Processor does not allow the script itself to expose Properties to be configured or define Relationships. This is a deliberate decision. If it is necessary to expose such configuration, the ExecuteScript processor should be used instead. By not exposing these elements, the script avoids the need to define a Class or implement methods with a specific method signature. Instead, the script can avoid any boilerplate code and focus purely on the task at hand.
The provided script is evaluated once for each Record that is encountered in the incoming FlowFile. Each time that the script is invoked, it is expected to return a Record object (See note below regarding Return Values). That Record is then written using the configured Record Writer. If the script returns a
null
value, the Record will not be written. If the script returns an object that is not a Record, the incoming FlowFile will be routed to thefailure
relationship.This processor maintains two Counters: “Records Transformed” indicating the number of Records that were passed to the script and for which the script returned a Record, and “Records Dropped” indicating the number of Records that were passed to the script and for which the script returned a value of
null
.Variable Bindings
While the script provided to this Processor does not need to provide boilerplate code or implement any classes/interfaces, it does need some way to access the Records and other information that it needs in order to perform its task. This is accomplished by using Variable Bindings. Each time that the script is invoked, each of the following variables will be made available to the script:
Variable Name Description Variable Class record The Record that is to be transformed. Record recordIndex The zero-based index of the Record in the FlowFile. Long (64-bit signed integer) log The Processor’s Logger. Anything that is logged to this logger will be written to the logs as if the Processor itself had logged it. Additionally, a bulletin will be created for any log message written to this logger (though by default, the Processor will hide any bulletins with a level below WARN). ComponentLog attributes Map of key/value pairs that are the Attributes of the FlowFile. Both the keys and the values of this Map are of type String. This Map is immutable. Any attempt to modify it will result in an UnsupportedOperationException being thrown. java.util.Map Return Value
Each time that the script is invoked, it is expected to return a Record object or a Collection of Record objects. Those Records are then written using the configured Record Writer. If the script returns a
null
value, the Record will not be written. If the script returns an object that is not a Record or Collection of Records, the incoming FlowFile will be routed to thefailure
relationship.The Record that is provided to the script is mutable. Therefore, it is a common pattern to update the
record
object in the script and simply return that samerecord
object.Note: Depending on the scripting language, a script with no explicit return value may return
null
or may return the last value that was referenced. Because returningnull
will result in dropping the Record and a non-Record return value will result in an Exception (and simply for the sake of clarity), it is important to ensure that the configured script has an explicit return value.Adding a New Fields
A very common usage of Record-oriented processors is to allow the Record Reader to infer its schema and have the Record Writer inherit the Record’s schema. In this scenario, it is important to note that the Record Writer will inherit the schema of the first Record that it encounters. Therefore, if the configured script will add a new field to a Record, it is important to ensure that the field is added to all Records (with a
null
value where appropriate).See the Adding New Fields example for more details.
Performance Considerations
NiFi offers many different processors for updating records in various ways. While each of these has its own pros and cons, performance is often an important consideration. It is generally the case that standard processors, such as UpdateRecord, will perform better than script-oriented processors. However, this may not always be the case. For situations when performance is critical, the best case is to test both approaches to see which performs best.
A simple 5-minute benchmark was done to analyze the difference in performance. The script used simply modifies one field and return the Record otherwise unmodified. The results are shown below. Note that no specifics are given regarding hardware, specifically because the results should not be used to garner expectations of absolute performance but rather to show relative performance between the different options.
Processor Script Used Records processed in 5 minutes UpdateAttribute No Script. User-defined Property added with name /num and value 42 50.1 million ScriptedTransformRecord - Using Language: Groovy record.setValue(“num”, 42)
record18.9 million Example Scripts
Remove First Record
The following script will remove the first Record from each FlowFile that it encounters.
Example Input (CSV):
name, num Mark, 42 Felicia, 3720 Monica, -3
Example Output (CSV):
name, num Felicia, 3720 Monica, -3
Example Script (Groovy):
return recordIndex == 0 ? null : record
Replace Field Value
The following script will replace any field in a Record if the value of that field is equal to the value of the “Value To Replace” attribute. The value of that field will be replaced with whatever value is in the “Replacement Value” attribute.
Example Input Content (JSON):
[ { "book": { "author": "John Doe", "date": "01/01/1980" } }, { "book": { "author": "Jane Doe", "date": "01/01/1990" } } ]
Example Input Attributes:
Attribute Name Attribute Value Value To Replace Jane Doe Replacement Value Author Unknown Example Output (JSON):
[ { "book": { "author": "John Doe", "date": "01/01/1980" } }, { "book": { "author": "Author Unknown", "date": "01/01/1990" } } ]
Example Script (Groovy):
def replace(rec) { rec.toMap().each { k, v -> // If the field value is equal to the attribute 'Value to Replace', then set the // field value to the 'Replacement Value' attribute. if (v?.toString()?.equals(attributes['Value to Replace'])) { rec.setValue(k, attributes['Replacement Value']) } // Call Recursively if the value is a Record if (v instanceof org.apache.nifi.serialization.record.Record) { replace(v) } } } replace(record) return record
Pass-through
The following script allows each Record to pass through without altering the Record in any way.
Example Input:
Example output:
Example Script (Groovy):
record
Adding New Fields
The following script adds a new field named “favoriteColor” to all Records. Additionally, it adds an “isOdd” field to all even-numbered Records.
It is important that all Records have the same schema. Since we want to add an “isOdd” field to Records 1 and 3, the schema for Records 0 and 2 must also account for this. As a result, we will add the field to all Records but use a null value for Records that are not even. See Adding New Fields for more information.
Example Input Content (CSV):
name, favoriteFood John Doe, Spaghetti Jane Doe, Pizza Jake Doe, Sushi June Doe, Hamburger
Example Output (CSV):
name, favoriteFood, favoriteColor, isOdd John Doe, Spaghetti, Blue, Jane Doe, Pizza, Blue, true Jake Doe, Sushi, Blue, June Doe, Hamburger, Blue, true
Example Script (Groovy):
import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; // Always set favoriteColor to Blue. // Because we are calling #setValue with a String as the field name, the field type will be inferred. record.setValue("favoriteColor", "Blue") // Set the 'isOdd' field to true if the record index is odd. Otherwise, set the 'isOdd' field to `null`. // Because the value may be `null` for the first Record (in fact, it always will be for this particular case), // we need to ensure that the Record Writer's schema be given the correct type for the field. As a result, we will not call // #setValue with a String as the field name but rather will pass a RecordField as the first argument, as the RecordField // allows us to specify the type of the field. // Also note that `RecordField` and `RecordFieldType` are `import`ed above. record.setValue(new RecordField("isOdd", RecordFieldType.BOOLEAN.getDataType()), recordIndex % 2 == 1 ? true : null) return record
Fork Record
The following script return each Record that it encounters, plus another Record, which is derived from the first, but where the ’num’ field is one less than the ’num’ field of the input.
Example Input (CSV):
name, num Mark, 42 Felicia, 3720 Monica, -3
Example Output (CSV):
name, num Mark, 42 Mark, 41 Felicia, 3720 Felicia, 3719 Monica, -3 Monica, -4
Example Script (Groovy):
import org.apache.nifi.serialization.record.* def derivedValues = new HashMap(record.toMap()) derivedValues.put('num', derivedValues['num'] - 1) derived = new MapRecord(record.schema, derivedValues) return [record, derived]
-
Module Directory
Comma-separated list of paths to files and/or directories which contain modules required by the script.
- Display Name
- Module Directory
- Description
- Comma-separated list of paths to files and/or directories which contain modules required by the script.
- API Name
- Module Directory
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Record Reader
The Record Reader to use parsing the incoming FlowFile into Records
- Display Name
- Record Reader
- Description
- The Record Reader to use parsing the incoming FlowFile into Records
- API Name
- Record Reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Record Writer
The Record Writer to use for serializing Records after they have been transformed
- Display Name
- Record Writer
- Description
- The Record Writer to use for serializing Records after they have been transformed
- API Name
- Record Writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Script Body
Body of script to execute. Only one of Script File or Script Body may be used
- Display Name
- Script Body
- Description
- Body of script to execute. Only one of Script File or Script Body may be used
- API Name
- Script Body
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Script Language
The Language to use for the script
- Display Name
- Script Language
- Description
- The Language to use for the script
- API Name
- Script Engine
- Default Value
- Groovy
- Allowable Values
-
- Clojure
- Groovy
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Script File
Path to script file to execute. Only one of Script File or Script Body may be used
- Display Name
- Script File
- Description
- Path to script file to execute. Only one of Script File or Script Body may be used
- API Name
- Script File
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
Required Permission | Explanation |
---|---|
execute code | Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has. |
Name | Description |
---|---|
success | Each FlowFile that were successfully transformed will be routed to this Relationship |
failure | Any FlowFile that cannot be transformed will be routed to this Relationship |
Name | Description |
---|---|
mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer |
record.count | The number of records in the FlowFile |
record.error.message | This attribute provides on failure the error message encountered by the Reader or Writer. |