PutElasticsearchRecord 2.0.0

Bundle
org.apache.nifi | nifi-elasticsearch-restapi-nar
Description
A record-aware Elasticsearch put processor that uses the official Elastic REST client libraries. Each Record within the FlowFile is converted into a document to be sent to the Elasticsearch _bulk APi. Multiple documents can be batched into each Request sent to Elasticsearch. Each document's Bulk operation can be configured using Record Path expressions.
Tags
elasticsearch, elasticsearch5, elasticsearch6, elasticsearch7, elasticsearch8, index, json, put, record
Input Requirement
REQUIRED
Supports Sensitive Dynamic Properties
false
  • Additional Details for PutElasticsearchRecord 2.0.0

    PutElasticsearchRecord

    This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on a per-record basis which is what separates it from PutElasticsearchJson. For example, it is possible to define multiple commands to index documents, followed by deletes, creates and update operations against the same index or other indices as desired.

    As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and that controller service is built on top of the official Elasticsearch client APIs. That provides features such as automatic master detection against the cluster which is missing in the other bundles.

    This processor builds one Elasticsearch Bulk API body per record set. Care should be taken to split up record sets into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are not too large for it to handle. When failures do occur, this processor is capable of attempting to write the records that failed to an output record writer so that only failed records can be processed downstream or replayed.

    Per-Record Actions

    The index, operation and (optional) type fields are configured with default values that can be overridden using record path operations that find an index or type value in the record set. The ID and operation type (create, index, update, upsert or delete) can also be extracted in a similar fashion from the record set. A “@timestamp” field can be added to the data either using a default or by extracting it from the record set. This is useful if the documents are being indexed into an Elasticsearch Data Stream.

    Example - per-record actions

    The following is an example of a document exercising all of these features:

    {
      "metadata": {
        "id": "12345",
        "index": "test",
        "type": "message",
        "operation": "index"
      },
      "message": "Hello, world",
      "from": "john.smith",
      "ts": "2021-12-03'T'14:00:00.000Z"
    }
    
    {
      "metadata": {
        "id": "12345",
        "index": "test",
        "type": "message",
        "operation": "delete"
      }
    }
    

    The record path operations below would extract the relevant data:

    • /metadata/id
    • /metadata/index
    • metadata/type
    • metadata/operation
    • /ts

    Dynamic Templates

    Index and Create operations can use Dynamic Templates from the Record, record path operations can be configured to find the Dynamic Templates from the record set. Dynamic Templates fields in Records must either be a Map, child Record or a string that can be parsable as a JSON object.

    Example - Index with Dynamic Templates

    {
      "message": "Hello, world",
      "dynamic_templates": "{\"message\": \"keyword_lower\"}"
    }
    

    The record path operation below would extract the relevant Dynamic Templates:

    • /dynamic_templates

    Would create Elasticsearch action:

    {
      "index": {
        "_id": "1",
        "_index": "test",
        "dynamic_templates": {
          "message": "keyword_lower"
        }
      }
    }
    
    {
      "doc": {
        "message": "Hello, world"
      }
    }
    

    Update/Upsert Scripts

    Update and Upsert operations can use a script from the Record, record path operations can be configured to find the script from the record set. Scripts must contain all the elements required by Elasticsearch, e.g. source and lang. Script fields in Records must either be a Map, child Record or a string that can be parsable as a JSON object.

    If a script is defined for an upset, any fields remaining in the Record will be used as the upsert fields in the Elasticsearch action. If no script is defined, all Record fields will be used as the update doc (or doc_as_upsert for upsert operations).

    Example - Update without Script

    {
      "message": "Hello, world",
      "from": "john.smith"
    }
    

    Would create Elasticsearch action:

    {
      "update": {
        "_id": "1",
        "_index": "test"
      }
    }
    
    {
      "doc": {
        "message": "Hello, world",
        "from": "john.smith"
      }
    }
    

    Example - Upsert with Script

    {
      "counter": 1,
      "script": {
        "source": "ctx._source.counter += params.param1",
        "lang": "painless",
        "params": {
          "param1": 1
        }
      }
    }
    

    The record path operation below would extract the relevant script:

    • /script

    Would create Elasticsearch action:

    {
      "update": {
        "_id": "1",
        "_index": "test"
      }
    }
    
    {
      "script": {
        "source": "ctx._source.counter += params.param1",
        "lang": "painless",
        "params": {
          "param1": 1
        }
      },
      "upsert": {
        "counter": 1
      }
    }
    

    Bulk Action Header Fields

    Dynamic Properties can be defined on the processor with BULK: prefixes. The value of the Dynamic Property is a record path operation to find the field value from the record set. Users must ensure that only known Bulk action fields are sent to Elasticsearch for the relevant index operation defined for the Record, Elasticsearch will reject invalid combinations of index operation and Bulk action fields.

    Example - Update with Retry on Conflict

    {
      "message": "Hello, world",
      "from": "john.smith",
      "retry": 3
    }
    

    The Dynamic Property and record path operation below would extract the relevant field:

    • BULK:retry_on_conflict = /retry

    Would create Elasticsearch action:

    {
      "update": {
        "_id": "1",
        "_index": "test",
        "retry_on_conflict": 3
      }
    }
    
    {
      "doc": {
        "message": "Hello, world",
        "from": "john.smith"
      }
    }
    

    Index Operations

    Valid values for “operation” are:

    • create
    • delete
    • index
    • update
    • upsert
Properties
Dynamic Properties
System Resource Considerations
Resource Description
MEMORY The Batch of Records will be stored in memory until the bulk operation is performed.
Relationships
Name Description
failure All flowfiles that fail for reasons unrelated to server availability go to this relationship.
original All flowfiles that are sent to Elasticsearch without request failures go to this relationship.
errors Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that resulted in an "error" (within Elasticsearch) will be routed here.
successful Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that did not result in an "error" (within Elasticsearch) will be routed here.
retry All flowfiles that fail due to server/cluster availability go to this relationship.
Writes Attributes
Name Description
elasticsearch.put.error The error message if there is an issue parsing the FlowFile records, sending the parsed documents to Elasticsearch or parsing the Elasticsearch response.
elasticsearch.put.error.count The number of records that generated errors in the Elasticsearch _bulk API.
elasticsearch.put.success.count The number of records that were successfully processed by the Elasticsearch _bulk API.
elasticsearch.bulk.error The _bulk response if there was an error during processing the record within Elasticsearch.
See Also