ConsumeKafka 2.5.0

Bundle
org.apache.nifi | nifi-kafka-nar
Description
Consumes messages from Apache Kafka Consumer API. The complementary NiFi processor for sending messages is PublishKafka. The Processor supports consumption of Kafka messages, optionally interpreted as NiFi records. Please note that, at this time (in read record mode), the Processor assumes that all records that are retrieved from a given partition have the same schema. For this mode, if any of the Kafka messages are pulled but cannot be parsed or written with the configured Record Reader or Record Writer, the contents of the message will be written to a separate FlowFile, and that FlowFile will be transferred to the 'parse.failure' relationship. Otherwise, each FlowFile is sent to the 'success' relationship and may contain many individual messages within the single FlowFile. A 'record.count' attribute is added to indicate how many messages are contained in the FlowFile. No two Kafka messages will be placed into the same FlowFile if they have different schemas, or if they have different values for a message header that is included by the <Headers to Add as Attributes> property.
Tags
Consume, Get, Ingest, Ingress, Kafka, PubSub, Record, Topic, avro, csv, json
Input Requirement
FORBIDDEN
Supports Sensitive Dynamic Properties
false
  • Additional Details for ConsumeKafka 2.5.0

    ConsumeKafka

    Output Strategies

    This processor offers multiple output strategies (configured via processor property ‘Output Strategy’) for converting Kafka records into FlowFiles.

    • ‘Use Content as Value’ (the default) emits FlowFile records containing only the Kafka record value.
    • ‘Use Wrapper’ emits FlowFile records containing the Kafka record key, value, and headers, as well as additional metadata from the Kafka record.
    • ‘Inject Metadata’ emits FlowFile records containing the Kafka record value into which a sub record field has been added to hold metadata, headers and key information.

    The record schema that is used when “Use Wrapper” is selected is as follows (in Avro format):

    {
      "type": "record",
      "name": "nifiRecord",
      "namespace": "org.apache.nifi",
      "fields": [{
          "name": "key",
          "type": [{
              < Schema is determined by the Key Record Reader, or will be "string" or "bytes", depending on the "Key Format" property (see below for more details) >
            }, "null"]
        },
        {
          "name": "value",
          "type": [
            {
              < Schema is determined by the Record Reader >
            },
            "null"
          ]
        },
        {
          "name": "headers",
          "type": [
            { "type": "map", "values": "string", "default": {}},
            "null"]
        },
        {
          "name": "metadata",
          "type": [
            {
              "type": "record",
              "name": "metadataType",
              "fields": [
                { "name": "topic", "type": ["string", "null"] },
                { "name": "partition", "type": ["int", "null"] },
                { "name": "offset", "type": ["int", "null"] },
                { "name": "timestamp", "type": ["long", "null"] }
              ]
            },
            "null"
          ]
        }
      ]
    }
    

    The record schema that is used when “Inject Metadata” is selected is as follows (in Avro format):

    {
      "type": "record",
      "name": "nifiRecord",
      "namespace": "org.apache.nifi",
      "fields": [
          < Fields as determined by the Record Reader for the Kafka message >,
        {
          "name": "kafkaMetadata",
          "type": [
            {
              "type": "record",
              "name": "metadataType",
              "fields": [
                { "name": "topic", "type": ["string", "null"] },
                { "name": "partition", "type": ["int", "null"] },
                { "name": "offset", "type": ["int", "null"] },
                { "name": "timestamp", "type": ["long", "null"] },
                {
                  "name": "headers",
                  "type": [ { "type": "map", "values": "string", "default": {}}, "null"]
                },
                {
                  "name": "key",
                  "type": [{
                    < Schema is determined by the Key Record Reader, or will be "string" or "bytes", depending on the "Key Format" property (see below for more details) >
                  }, "null"]
                }
              ]
            },
            "null"
          ]
        }
      ]
    }
    

    If the Output Strategy property is set to ‘Use Wrapper’ or ‘Inject Metadata’, an additional processor configuration property (‘Key Format’) is activated. This property is used to specify how the Kafka Record’s key should be written out to the FlowFile. The possible values for ‘Key Format’ are as follows:

    • ‘Byte Array’ supplies the Kafka Record Key as a byte array, exactly as they are received in the Kafka record.
    • ‘String’ converts the Kafka Record Key bytes into a string using the UTF-8 character encoding. (Failure to parse the key bytes as UTF-8 will result in the record being routed to the ‘parse.failure’ relationship.)
    • ‘Record’ converts the Kafka Record Key bytes into a deserialized NiFi record, using the associated ‘Key Record Reader’ controller service. If the Key Format property is set to ‘Record’, an additional processor configuration property name ‘Key Record Reader’ is made available. This property is used to specify the Record Reader to use in order to parse the Kafka Record’s key as a Record.

    Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy “Use Wrapper” is selected:

    [
      {
        "key": {
          "name": "Acme",
          "number": "AC1234"
        },
        "value": {
          "address": "1234 First Street",
          "zip": "12345",
          "account": {
            "name": "Acme",
            "number": "AC1234"
          }
        },
        "headers": {
          "attributeA": "valueA",
          "attributeB": "valueB"
        },
        "metadata": {
          "topic": "accounts",
          "partition": 0,
          "offset": 0,
          "timestamp": 0
        }
      }
    ]
    

    Here is an example of FlowFile content that is emitted by JsonRecordSetWriter when strategy “Inject Metadata” is selected:

    [
      {
        "address": "1234 First Street",
        "zip": "12345",
        "account": {
          "name": "Acme",
          "number": "AC1234"
        },
        "kafkaMetadata": {
          "topic": "accounts",
          "partition": 0,
          "offset": 0,
          "timestamp": 0,
          "headers": {
            "attributeA": "valueA",
            "attributeB": "valueB"
          },
          "key": {
            "name": "Acme",
            "number": "AC1234"
          }
        }
      }
    ]
    
Properties
Relationships
Name Description
success FlowFiles containing one or more serialized Kafka Records
Writes Attributes
Name Description
record.count The number of records received
mime.type The MIME Type that is provided by the configured Record Writer
kafka.count The number of records in the FlowFile for a batch of records
kafka.key The key of message if present and if single message. How the key is encoded depends on the value of the 'Key Attribute Encoding' property.
kafka.offset The offset of the record in the partition or the minimum value of the offset in a batch of records
kafka.timestamp The timestamp of the message in the partition of the topic.
kafka.partition The partition of the topic for a record or batch of records
kafka.topic The topic the for a record or batch of records
kafka.tombstone Set to true if the consumed message is a tombstone message
kafka.max.offset The maximum value of the Kafka offset in batch of records
See Also