-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ConsumeAMQP 2.0.0
- Bundle
- org.apache.nifi | nifi-amqp-nar
- Description
- Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be emitted as its own FlowFile to the 'success' relationship.
- Tags
- amqp, consume, get, message, rabbit, receive
- Input Requirement
- FORBIDDEN
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for ConsumeAMQP 2.0.0
ConsumeAMQP
Summary
This processor consumes messages from AMQP messaging queue and converts them to a FlowFile to be routed to the next component in the flow. At the time of writing this document the supported AMQP protocol version is v0.9.1.
The component is based on RabbitMQ Client API The following guide and tutorial may also help you to brush up on some of the AMQP basics.
This processor does two things. It constructs FlwFile by extracting information from the consumed AMQP message (both body and attributes). Once message is consumed a FlowFile is constructed. The message body is written to a FlowFile and its com.rabbitmq.client.AMQP.BasicProperties are transfered into the FlowFile as attributes. AMQP attribute names are prefixed with amqp$ prefix.
AMQP Properties
The following is the list of available standard AMQP properties which may come with the message: (“amqp$contentType”, “amqp$contentEncoding”, “amqp$headers”, “amqp$deliveryMode”, “amqp$priority”, “amqp$correlationId”, “amqp$replyTo”, “amqp$expiration”, “amqp$messageId”, “amqp$timestamp”, “amqp$type”, “amqp$userId”, “amqp$appId”, “amqp$clusterId”, “amqp$routingKey”)
Configuration Details
At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. Other properties will be defined later as this component progresses. Configuring PublishAMQP:
- Queue - [REQUIRED] the name of AMQP queue the messages will be retrieved from. Usually provided by administrator (e.g., ‘amq.direct’)
- Host Name - [REQUIRED] the name of the host where AMQP broker is running. Usually provided by administrator ( e.g., ‘myhost.com’). Defaults to ’localhost’.
- Port - [REQUIRED] the port number where AMQP broker is running. Usually provided by the administrator (e.g., ' 2453’). Defaults to ‘5672’.
- User Name - [REQUIRED] user name to connect to AMQP broker. Usually provided by the administrator (e.g., ‘me’). Defaults to ‘guest’.
- Password - [REQUIRED] password to use with user name to connect to AMQP broker. Usually provided by the administrator. Defaults to ‘guest’.
- Use Certificate Authentication - [OPTIONAL] Use the SSL certificate common name for authentication rather than user name/password. This can only be used in conjunction with SSL. Defaults to ‘false’.
- Virtual Host - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security. Please refer to this blog for more details on Virtual Host.
-
AMQP Version
AMQP Version. Currently only supports AMQP v0.9.1.
- Display Name
- AMQP Version
- Description
- AMQP Version. Currently only supports AMQP v0.9.1.
- API Name
- AMQP Version
- Default Value
- 0.9.1
- Allowable Values
-
- 0.9.1
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Auto-Acknowledge Messages
If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.
- Display Name
- Auto-Acknowledge Messages
- Description
- If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.
- API Name
- auto.acknowledge
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Batch Size
The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.
- Display Name
- Batch Size
- Description
- The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.
- API Name
- batch.size
- Default Value
- 10
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Brokers
A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.
- Display Name
- Brokers
- Description
- A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.
- API Name
- Brokers
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Use Client Certificate Authentication
Authenticate using the SSL certificate rather than user name/password.
- Display Name
- Use Client Certificate Authentication
- Description
- Authenticate using the SSL certificate rather than user name/password.
- API Name
- cert-authentication
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Header Output Format
Defines how to output headers from the received message
- Display Name
- Header Output Format
- Description
- Defines how to output headers from the received message
- API Name
- header.format
- Default Value
- Comma-Separated String
- Allowable Values
-
- Comma-Separated String
- JSON String
- FlowFile Attributes
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Header Key Prefix
Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property
- Display Name
- Header Key Prefix
- Description
- Text to be prefixed to header keys as the are added to the FlowFile attributes. Processor will append '.' to the value of this property
- API Name
- header.key.prefix
- Default Value
- consume.amqp
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Header Output Format is set to any of [FlowFile Attributes]
-
Header Separator
The character that is used to separate key-value for header in String. The value must be only one character.
- Display Name
- Header Separator
- Description
- The character that is used to separate key-value for header in String. The value must be only one character.
- API Name
- header.separator
- Default Value
- ,
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Header Output Format is set to any of [Comma-Separated String]
-
Host Name
Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.
- Display Name
- Host Name
- Description
- Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.
- API Name
- Host Name
- Default Value
- localhost
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Password
Password used for authentication and authorization.
- Display Name
- Password
- Description
- Password used for authentication and authorization.
- API Name
- Password
- Expression Language Scope
- Not Supported
- Sensitive
- true
- Required
- false
-
Port
Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.
- Display Name
- Port
- Description
- Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.
- API Name
- Port
- Default Value
- 5672
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Prefetch Count
The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will no longer send new messages until consumer acknowledges some of the messages already delivered to it.Allowed values: from 0 to 65535. 0 means no limit
- Display Name
- Prefetch Count
- Description
- The maximum number of unacknowledged messages for the consumer. If consumer has this number of unacknowledged messages, AMQP broker will no longer send new messages until consumer acknowledges some of the messages already delivered to it.Allowed values: from 0 to 65535. 0 means no limit
- API Name
- prefetch.count
- Default Value
- 0
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Queue
The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator.
- Display Name
- Queue
- Description
- The name of the existing AMQP Queue from which messages will be consumed. Usually pre-defined by AMQP administrator.
- API Name
- Queue
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Remove Curly Braces
If true Remove Curly Braces, Curly Braces in the header will be automatically remove.
- Display Name
- Remove Curly Braces
- Description
- If true Remove Curly Braces, Curly Braces in the header will be automatically remove.
- API Name
- remove.curly.braces
- Default Value
- False
- Allowable Values
-
- True
- False
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Header Output Format is set to any of [Comma-Separated String]
-
SSL Context Service
The SSL Context Service used to provide client certificate information for TLS/SSL connections.
- Display Name
- SSL Context Service
- Description
- The SSL Context Service used to provide client certificate information for TLS/SSL connections.
- API Name
- ssl-context-service
- Service Interface
- org.apache.nifi.ssl.SSLContextService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
User Name
User Name used for authentication and authorization.
- Display Name
- User Name
- Description
- User Name used for authentication and authorization.
- API Name
- User Name
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Virtual Host
Virtual Host name which segregates AMQP system for enhanced security.
- Display Name
- Virtual Host
- Description
- Virtual Host name which segregates AMQP system for enhanced security.
- API Name
- Virtual Host
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
Name | Description |
---|---|
success | All FlowFiles that are received from the AMQP queue are routed to this relationship |
Name | Description |
---|---|
amqp$appId | The App ID field from the AMQP Message |
amqp$contentEncoding | The Content Encoding reported by the AMQP Message |
amqp$contentType | The Content Type reported by the AMQP Message |
amqp$headers | The headers present on the AMQP Message. Added only if processor is configured to output this attribute. |
<Header Key Prefix>.<attribute> | Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute |
amqp$deliveryMode | The numeric indicator for the Message's Delivery Mode |
amqp$priority | The Message priority |
amqp$correlationId | The Message's Correlation ID |
amqp$replyTo | The value of the Message's Reply-To field |
amqp$expiration | The Message Expiration |
amqp$messageId | The unique ID of the Message |
amqp$timestamp | The timestamp of the Message, as the number of milliseconds since epoch |
amqp$type | The type of message |
amqp$userId | The ID of the user |
amqp$clusterId | The ID of the AMQP Cluster |
amqp$routingKey | The routingKey of the AMQP Message |
amqp$exchange | The exchange from which AMQP Message was received |