ConsumeAMQP 2.0.0

Bundle
org.apache.nifi | nifi-amqp-nar
Description
Consumes AMQP Messages from an AMQP Broker using the AMQP 0.9.1 protocol. Each message that is received from the AMQP Broker will be emitted as its own FlowFile to the 'success' relationship.
Tags
amqp, consume, get, message, rabbit, receive
Input Requirement
FORBIDDEN
Supports Sensitive Dynamic Properties
false
  • Additional Details for ConsumeAMQP 2.0.0

    ConsumeAMQP

    Summary

    This processor consumes messages from AMQP messaging queue and converts them to a FlowFile to be routed to the next component in the flow. At the time of writing this document the supported AMQP protocol version is v0.9.1.

    The component is based on RabbitMQ Client API The following guide and tutorial may also help you to brush up on some of the AMQP basics.

    This processor does two things. It constructs FlwFile by extracting information from the consumed AMQP message (both body and attributes). Once message is consumed a FlowFile is constructed. The message body is written to a FlowFile and its com.rabbitmq.client.AMQP.BasicProperties are transfered into the FlowFile as attributes. AMQP attribute names are prefixed with amqp$ prefix.

    AMQP Properties

    The following is the list of available standard AMQP properties which may come with the message: (“amqp$contentType”, “amqp$contentEncoding”, “amqp$headers”, “amqp$deliveryMode”, “amqp$priority”, “amqp$correlationId”, “amqp$replyTo”, “amqp$expiration”, “amqp$messageId”, “amqp$timestamp”, “amqp$type”, “amqp$userId”, “amqp$appId”, “amqp$clusterId”, “amqp$routingKey”)

    Configuration Details

    At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. Other properties will be defined later as this component progresses. Configuring PublishAMQP:

    1. Queue - [REQUIRED] the name of AMQP queue the messages will be retrieved from. Usually provided by administrator (e.g., ‘amq.direct’)
    2. Host Name - [REQUIRED] the name of the host where AMQP broker is running. Usually provided by administrator ( e.g., ‘myhost.com’). Defaults to ’localhost’.
    3. Port - [REQUIRED] the port number where AMQP broker is running. Usually provided by the administrator (e.g., ' 2453’). Defaults to ‘5672’.
    4. User Name - [REQUIRED] user name to connect to AMQP broker. Usually provided by the administrator (e.g., ‘me’). Defaults to ‘guest’.
    5. Password - [REQUIRED] password to use with user name to connect to AMQP broker. Usually provided by the administrator. Defaults to ‘guest’.
    6. Use Certificate Authentication - [OPTIONAL] Use the SSL certificate common name for authentication rather than user name/password. This can only be used in conjunction with SSL. Defaults to ‘false’.
    7. Virtual Host - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security. Please refer to this blog for more details on Virtual Host.
Properties
Relationships
Name Description
success All FlowFiles that are received from the AMQP queue are routed to this relationship
Writes Attributes
Name Description
amqp$appId The App ID field from the AMQP Message
amqp$contentEncoding The Content Encoding reported by the AMQP Message
amqp$contentType The Content Type reported by the AMQP Message
amqp$headers The headers present on the AMQP Message. Added only if processor is configured to output this attribute.
<Header Key Prefix>.<attribute> Each message header will be inserted with this attribute name, if processor is configured to output headers as attribute
amqp$deliveryMode The numeric indicator for the Message's Delivery Mode
amqp$priority The Message priority
amqp$correlationId The Message's Correlation ID
amqp$replyTo The value of the Message's Reply-To field
amqp$expiration The Message Expiration
amqp$messageId The unique ID of the Message
amqp$timestamp The timestamp of the Message, as the number of milliseconds since epoch
amqp$type The type of message
amqp$userId The ID of the user
amqp$clusterId The ID of the AMQP Cluster
amqp$routingKey The routingKey of the AMQP Message
amqp$exchange The exchange from which AMQP Message was received