-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
PublishAMQP 2.0.0
- Bundle
- org.apache.nifi | nifi-amqp-nar
- Description
- Creates an AMQP Message from the contents of a FlowFile and sends the message to an AMQP Exchange. In a typical AMQP exchange model, the message that is sent to the AMQP Exchange will be routed based on the 'Routing Key' to its final destination in the queue (the binding). If due to some misconfiguration the binding between the Exchange, Routing Key and Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If that happens you will see a log in both app-log and bulletin stating to that effect, and the FlowFile will be routed to the 'failure' relationship.
- Tags
- amqp, message, publish, put, rabbit, send
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for PublishAMQP 2.0.0
PublishAMQP
Summary
This processor publishes the contents of the incoming FlowFile to an AMQP-based messaging system. At the time of writing this document the supported AMQP protocol version is v0.9.1.
The component is based on RabbitMQ Client API The following guide and tutorial may also help you to brush up on some of the AMQP basics.
This processor does two things. It constructs AMQP Message by extracting FlowFile contents (both body and attributes). Once message is constructed it is sent to an AMQP Exchange. AMQP Properties will be extracted from the FlowFile and converted to com.rabbitmq.client.AMQP.BasicProperties to be sent along with the message. Upon success the incoming FlowFile is transferred to success Relationship and upon failure FlowFile is penalized and transferred to the failure Relationship.
Where did my message go?
In a typical AMQP exchange model, the message that is sent to an AMQP Exchange will be routed based on the Routing Key to its final destination in the Queue. It’s called Binding. If due to some misconfiguration the binding between the Exchange, Routing Key and the Queue is not set up, the message will have no final destination and will return ( i.e., the data will not make it to the queue). If that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding (normally done by AMQP administrator) will resolve the issue.
AMQP Properties
Attributes extracted from the FlowFile are considered candidates for AMQP properties if their names are prefixed with amqp$ (e.g., amqp$contentType=text/xml). To enrich message with additional AMQP properties you may use * UpdateAttribute* processor between the source processor and PublishAMQP processor. The following is the list of available standard AMQP properties: (“amqp$contentType”, “amqp$contentEncoding”, " amqp$headers" (if ‘Headers Source’ is set to ‘Attribute “amqp$headers” Value’) , “amqp$deliveryMode”, “amqp$priority”, “amqp$correlationId”, “amqp$replyTo”, “amqp$expiration”, “amqp$messageId”, “amqp$timestamp”, “amqp$type”, “amqp$userId”, “amqp$appId”, “amqp$clusterId”)
AMQP Message Headers Source
The headers attached to AMQP message by the processor depends on the “Headers Source” property value.
- Attribute “amqp$headers” Value - The processor will read single attribute “amqp$headers” and split it based on " Header Separator" and then read headers in key=value format.
- Attributes Matching Regex - The processor will pick flow file attributes by matching the regex provided in " Attributes To Headers Regular Expression". The name of the attribute is used as key of header
Configuration Details
At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. Other properties will be defined later as this component progresses. Configuring PublishAMQP:
- Exchange Name - [OPTIONAL] the name of AMQP exchange the messages will be sent to. Usually provided by the administrator (e.g., ‘amq.direct’) It is an optional property. If kept empty the messages will be sent to a default AMQP exchange. More on AMQP Exchanges could be found here.
- Routing Key - [REQUIRED] the name of the routing key that will be used by AMQP to route messages from the exchange to destination queue(s). Usually provided by administrator (e.g., ‘myKey’) In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator). More on AMQP Exchanges and Bindings could be found here.
- Host Name - [REQUIRED] the name of the host where AMQP broker is running. Usually provided by administrator ( e.g., ‘myhost.com’). Defaults to ’localhost’.
- Port - [REQUIRED] the port number where AMQP broker is running. Usually provided by the administrator (e.g., ' 2453’). Defaults to ‘5672’.
- User Name - [REQUIRED] user name to connect to AMQP broker. Usually provided by the administrator (e.g., ‘me’). Defaults to ‘guest’.
- Password - [REQUIRED] password to use with user name to connect to AMQP broker. Usually provided by the administrator. Defaults to ‘guest’.
- Use Certificate Authentication - [OPTIONAL] Use the SSL certificate common name for authentication rather than user name/password. This can only be used in conjunction with SSL. Defaults to ‘false’.
- Virtual Host - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security. Please refer to this blog for more details on Virtual Host.
-
AMQP Version
AMQP Version. Currently only supports AMQP v0.9.1.
- Display Name
- AMQP Version
- Description
- AMQP Version. Currently only supports AMQP v0.9.1.
- API Name
- AMQP Version
- Default Value
- 0.9.1
- Allowable Values
-
- 0.9.1
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Brokers
A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.
- Display Name
- Brokers
- Description
- A comma-separated list of known AMQP Brokers in the format <host>:<port> (e.g., localhost:5672). If this is set, Host Name and Port are ignored. Only include hosts from the same AMQP cluster.
- API Name
- Brokers
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Use Client Certificate Authentication
Authenticate using the SSL certificate rather than user name/password.
- Display Name
- Use Client Certificate Authentication
- Description
- Authenticate using the SSL certificate rather than user name/password.
- API Name
- cert-authentication
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Exchange Name
The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.
- Display Name
- Exchange Name
- Description
- The name of the AMQP Exchange the messages will be sent to. Usually provided by the AMQP administrator (e.g., 'amq.direct'). It is an optional property. If kept empty the messages will be sent to a default AMQP exchange.
- API Name
- Exchange Name
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
-
Header Separator
The character that is used to split key-value for headers. The value must only one character. Otherwise you will get an error message
- Display Name
- Header Separator
- Description
- The character that is used to split key-value for headers. The value must only one character. Otherwise you will get an error message
- API Name
- header.separator
- Default Value
- ,
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Headers Source is set to any of [AMQP_HEADERS_ATTRIBUTE]
-
Headers Pattern
Regular expression that will be evaluated against the FlowFile attributes to select the matching attributes and put as AMQP headers. Attribute name will be used as header key.
- Display Name
- Headers Pattern
- Description
- Regular expression that will be evaluated against the FlowFile attributes to select the matching attributes and put as AMQP headers. Attribute name will be used as header key.
- API Name
- Headers Pattern
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
- Dependencies
-
- Headers Source is set to any of [FLOWFILE_ATTRIBUTES]
-
Headers Source
The source of the headers which will be applied to the published message.
- Display Name
- Headers Source
- Description
- The source of the headers which will be applied to the published message.
- API Name
- Headers Source
- Default Value
- AMQP_HEADERS_ATTRIBUTE
- Allowable Values
-
- FlowFile Attributes
- AMQP Headers Attribute
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Host Name
Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.
- Display Name
- Host Name
- Description
- Network address of AMQP broker (e.g., localhost). If Brokers is set, then this property is ignored.
- API Name
- Host Name
- Default Value
- localhost
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Password
Password used for authentication and authorization.
- Display Name
- Password
- Description
- Password used for authentication and authorization.
- API Name
- Password
- Expression Language Scope
- Not Supported
- Sensitive
- true
- Required
- false
-
Port
Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.
- Display Name
- Port
- Description
- Numeric value identifying Port of AMQP broker (e.g., 5671). If Brokers is set, then this property is ignored.
- API Name
- Port
- Default Value
- 5672
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Routing Key
The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator)
- Display Name
- Routing Key
- Description
- The name of the Routing Key that will be used by AMQP to route messages from the exchange to a destination queue(s). Usually provided by the administrator (e.g., 'myKey')In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator)
- API Name
- Routing Key
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
-
SSL Context Service
The SSL Context Service used to provide client certificate information for TLS/SSL connections.
- Display Name
- SSL Context Service
- Description
- The SSL Context Service used to provide client certificate information for TLS/SSL connections.
- API Name
- ssl-context-service
- Service Interface
- org.apache.nifi.ssl.SSLContextService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
User Name
User Name used for authentication and authorization.
- Display Name
- User Name
- Description
- User Name used for authentication and authorization.
- API Name
- User Name
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Virtual Host
Virtual Host name which segregates AMQP system for enhanced security.
- Display Name
- Virtual Host
- Description
- Virtual Host name which segregates AMQP system for enhanced security.
- API Name
- Virtual Host
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
Resource | Description |
---|---|
MEMORY | An instance of this component can cause high usage of this system resource. Multiple instances or high concurrency settings may result a degradation of performance. |
Name | Description |
---|---|
failure | All FlowFiles that cannot be routed to the AMQP destination are routed to this relationship |
success | All FlowFiles that are sent to the AMQP destination are routed to this relationship |
Name | Description |
---|---|
amqp$appId | The App ID field to set on the AMQP Message |
amqp$contentEncoding | The Content Encoding to set on the AMQP Message |
amqp$contentType | The Content Type to set on the AMQP Message |
amqp$headers | The headers to set on the AMQP Message, if 'Header Source' is set to use it. See additional details of the processor. |
amqp$deliveryMode | The numeric indicator for the Message's Delivery Mode |
amqp$priority | The Message priority |
amqp$correlationId | The Message's Correlation ID |
amqp$replyTo | The value of the Message's Reply-To field |
amqp$expiration | The Message Expiration |
amqp$messageId | The unique ID of the Message |
amqp$timestamp | The timestamp of the Message, as the number of milliseconds since epoch |
amqp$type | The type of message |
amqp$userId | The ID of the user |
amqp$clusterId | The ID of the AMQP Cluster |