Summary

This processor publishes the contents of the incoming FlowFile to an AMQP-based messaging system. At the time of writing this document the supported AMQP protocol version is v0.9.1.

The component is based on RabbitMQ Client API The following guide and tutorial may also help you to brush up on some of the AMQP basics.

This processor does two things. It constructs AMQP Message by extracting FlowFile contents (both body and attributes). Once message is constructed it is sent to an AMQP Exchange. AMQP Properties will be extracted from the FlowFile and converted to com.rabbitmq.client.AMQP.BasicProperties to be sent along with the message. Upon success the incoming FlowFile is transfered to success Relationship and upon failure FlowFile is penalized and transfered to the failure Relationship.

Where did my message go?

In a typical AMQP exchange model, the message that is sent to an AMQP Exchange will be routed based on the Routing Key to its final destination in the Queue. It's called Binding. If due to some misconfiguration the binding between the Exchange, Routing Key and the Queue is not set up, the message will have no final destination and will return (i.e., the data will not make it to the queue). If that happens you will see a log in both app-log and bulletin stating to that effect. Fixing the binding (normally done by AMQP administrator) will resolve the issue.

AMQP Properties

Attributes extracted from the FlowFile are considered candidates for AMQP properties if their names are prefixed with amqp$ (e.g., amqp$contentType=text/xml). To enrich message with additional AMQP properties you may use UpdateAttribute processor between the source processor and PublishAMQP processor. The following is the list of available standard AMQP properties: ("amqp$contentType", "amqp$contentEncoding", "amqp$headers", "amqp$deliveryMode", "amqp$priority", "amqp$correlationId", "amqp$replyTo", "amqp$expiration", "amqp$messageId", "amqp$timestamp", "amqp$type", "amqp$userId", "amqp$appId", "amqp$clusterId")

Configuration Details

At the time of writing this document it only defines the essential configuration properties which are suitable for most cases. Other properties will be defined later as this component progresses. Configuring PublishAMQP:

  1. Exchange Name - [OPTIONAL] the name of AMQP exchange the messages will be sent to. Usually provided by the administrator (e.g., 'amq.direct') It is an optional property. If kept empty the messages will be sent to a default AMQP exchange. More on AMQP Exchanges could be found here.
  2. Routing Key - [REQUIRED] the name of the routing key that will be used by AMQP to route messages from the exchange to destination queue(s). Usually provided by administrator (e.g., 'myKey') In the event when messages are sent to a default exchange this property corresponds to a destination queue name, otherwise a binding from the Exchange to a Queue via Routing Key must be set (usually by the AMQP administrator). More on AMQP Exchanges and Bindings could be found here.
  3. Host Name - [REQUIRED] the name of the host where AMQP broker is running. Usually provided by administrator (e.g., 'myhost.com'). Defaults to 'localhost'.
  4. Port - [REQUIRED] the port number where AMQP broker is running. Usually provided by the administrator (e.g., '2453'). Defaults to '5672'.
  5. User Name - [REQUIRED] user name to connect to AMQP broker. Usually provided by the administrator (e.g., 'me'). Defaults to 'guest'.
  6. Password - [REQUIRED] password to use with user name to connect to AMQP broker. Usually provided by the administrator. Defaults to 'guest'.
  7. Use Certificate Authentication - [OPTIONAL] whether or not to use the SSL certificate common name for authentication rather than user name/password. This can only be used in conjunction with SSL. Defaults to 'false'.
  8. Virtual Host - [OPTIONAL] Virtual Host name which segregates AMQP system for enhanced security. Please refer to this blog for more details on Virtual Host.