-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ExecuteGroovyScript 2.0.0
- Bundle
- org.apache.nifi | nifi-groovyx-nar
- Description
- Experimental Extended Groovy script processor. The script is responsible for handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by the script. If the handling is incomplete or incorrect, the session will be rolled back.
- Tags
- groovy, groovyx, script
- Input Requirement
- ALLOWED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for ExecuteGroovyScript 2.0.0
Groovy
Summary
This is the grooviest groovy script :)
Script Bindings:
variable type description session org.apache.nifi.processor.ProcessSession the session that is used to get, change, and transfer input files context org.apache.nifi.processor.ProcessContext the context (almost unuseful) log org.apache.nifi.logging.ComponentLog the logger for this processor instance REL_SUCCESS org.apache.nifi.processor.Relationship the success relationship REL_FAILURE org.apache.nifi.processor.Relationship the failure relationship CTL java.util.HashMap<String,ControllerService> Map populated with controller services defined with `CTL.*` processor properties.
The `CTL.` prefixed properties could be linked to controller service and provides access to this service from a script without additional code.SQL java.util.HashMap<String, groovy.sql.Sql> Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties.
The `SQL.` prefixed properties could be linked only to DBCPSercice.RecordReader java.util.HashMap<String,RecordReaderFactory> Map populated with controller services defined with `RecordReader.*` processor properties.
The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances.RecordWriter java.util.HashMap<String,RecordSetWriterFactory> Map populated with controller services defined with `RecordWriter.*` processor properties.
The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances.Dynamic processor properties org.apache.nifi.components.PropertyDescriptor All processor properties not started with `CTL.` or `SQL.` are bound to script variables SQL map details
Example: if you defined property
`SQL.mydb`
and linked it to any DBCPService, then you can access it from codeSQL.mydb.rows('select * from mytable')
The processor automatically takes connection from dbcp service before executing script and tries to handle transaction:
database transactions automatically rolled back on script exception and committed on success.
Or you can manage transaction manually.
NOTE: Script must not disconnect connection.SessionFile - flow file extension
The (org.apache.nifi.processors.groovyx.flow.SessionFile) is an actual object returned by session in Extended Groovy processor.
This flow file is a container that references session and the real flow file.
This allows to use simplified syntax to work with file attributes and content:set new attribute value
flowFile.ATTRIBUTE_NAME = ATTRIBUTE_VALUE flowFile.'mime.type' = 'text/xml' flowFile.putAttribute("ATTRIBUTE_NAME", ATTRIBUTE_VALUE) // the same as flowFile = session.putAttribute(flowFile, "ATTRIBUTE_NAME", ATTRIBUTE_VALUE)
remove attribute
flowFile.ATTRIBUTE_NAME = null // equals to flowFile = session.removeAttribute(flowFile, "ATTRIBUTE_NAME")
get attribute value
String a = flowFile.ATTRIBUTE_NAME
write content
flowFile.write("UTF-8", "THE CharSequence to write into flow file replacing current content") flowFile.write("UTF-8") { writer -> // do something with java.io.Writer... } flowFile.write { outStream -> // do something with output stream... } flowFile.write { inStream, outStream -> // do something with input and output streams... }
get content
InputStream i = flowFile.read() def json = new groovy.json.JsonSlurper().parse(flowFile.read()) String text = flowFile.read().getText("UTF-8")
transfer flow file to success relation
REL_SUCCESS << flowFile flowFile.transfer(REL_SUCCESS) // the same as: session.transfer(flowFile, REL_SUCCESS)
work with dbcp
import groovy.sql.Sql // define property named `SQL.db` connected to a DBCPConnectionPool controller service // for this case it's an H2 database example // read value from the database with prepared statement // and assign into flowfile attribute `db.yesterday` def daysAdd = -1 def row = SQL.db.firstRow("select dateadd('DAY', ${daysAdd}, sysdate) as DB_DATE from dual") flowFile.'db.yesterday' = row.DB_DATE // to work with BLOBs and CLOBs in the database // use parameter casting using groovy.sql.Sql.BLOB(Stream) and groovy.sql.Sql.CLOB(Reader) // write content of the flow file into database blob flowFile.read { rawIn -> def parms = [ p_id : flowFile.ID as Long, // get flow file attribute named \`ID\` p_data: Sql.BLOB(rawIn), // use input stream as BLOB sql parameter ] SQL.db.executeUpdate(parms, "update mytable set data = :p_data where id = :p_id") }
Handling processor start & stop
In the extended groovy processor you can catch `start` and `stop` and `unscheduled` events by providing corresponding static methods:
import org.apache.nifi.processor.ProcessContext import java.util.concurrent.atomic.AtomicLong class Const { static Date startTime = null; static AtomicLong triggerCount = null; } static onStart(ProcessContext context) { Const.startTime = new Date() Const.triggerCount = new AtomicLong(0) println "onStart $context ${Const.startTime}" } static onStop(ProcessContext context) { def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000 println "onStop $context executed ${Const.triggerCount} times during ${alive} seconds" } static onUnscheduled(ProcessContext context) { def alive = (System.currentTimeMillis() - Const.startTime.getTime()) / 1000 println "onUnscheduled $context executed ${Const.triggerCount} times during ${alive} seconds" } flowFile.'trigger.count' = Const.triggerCount.incrementAndGet() REL_SUCCESS << flowFile
-
Additional classpath
Classpath list separated by semicolon or comma. You can use masks like `*`, `*.jar` in file name.
- Display Name
- Additional classpath
- Description
- Classpath list separated by semicolon or comma. You can use masks like `*`, `*.jar` in file name.
- API Name
- groovyx-additional-classpath
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Failure strategy
What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`. If `transfer to failure` selected and unhandled exception occurred then all flowFiles received from incoming queues in this session will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE. If `rollback` selected and unhandled exception occurred then all flowFiles received from incoming queues will be penalized and returned. If the processor has no incoming connections then this parameter has no effect.
- Display Name
- Failure strategy
- Description
- What to do with unhandled exceptions. If you want to manage exception by code then keep the default value `rollback`. If `transfer to failure` selected and unhandled exception occurred then all flowFiles received from incoming queues in this session will be transferred to `failure` relationship with additional attributes set: ERROR_MESSAGE and ERROR_STACKTRACE. If `rollback` selected and unhandled exception occurred then all flowFiles received from incoming queues will be penalized and returned. If the processor has no incoming connections then this parameter has no effect.
- API Name
- groovyx-failure-strategy
- Default Value
- rollback
- Allowable Values
-
- rollback
- transfer to failure
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Script Body
Body of script to execute. Only one of Script File or Script Body may be used
- Display Name
- Script Body
- Description
- Body of script to execute. Only one of Script File or Script Body may be used
- API Name
- groovyx-script-body
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Script File
Path to script file to execute. Only one of Script File or Script Body may be used
- Display Name
- Script File
- Description
- Path to script file to execute. Only one of Script File or Script Body may be used
- API Name
- groovyx-script-file
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
A script engine property to update
Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value. Use `CTL.` to access any controller services, `SQL.` to access any DBCPServices, `RecordReader.` to access RecordReaderFactory instances, or `RecordWriter.` to access any RecordSetWriterFactory instances.
- Name
- A script engine property to update
- Description
- Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value. Use `CTL.` to access any controller services, `SQL.` to access any DBCPServices, `RecordReader.` to access RecordReaderFactory instances, or `RecordWriter.` to access any RecordSetWriterFactory instances.
- Value
- The value to set it to
- Expression Language Scope
- FLOWFILE_ATTRIBUTES
Required Permission | Explanation |
---|---|
execute code | Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has. |
Name | Description |
---|---|
success | FlowFiles that were successfully processed |
failure | FlowFiles that failed to be processed |