PutElasticsearchJson 2.0.0

Bundle
org.apache.nifi | nifi-elasticsearch-restapi-nar
Description
An Elasticsearch put processor that uses the official Elastic REST client libraries. Each FlowFile is treated as a document to be sent to the Elasticsearch _bulk API. Multiple FlowFiles can be batched together into each Request sent to Elasticsearch.
Tags
elasticsearch, elasticsearch5, elasticsearch6, elasticsearch7, elasticsearch8, index, json, put
Input Requirement
REQUIRED
Supports Sensitive Dynamic Properties
false
  • Additional Details for PutElasticsearchJson 2.0.0

    PutElasticsearchJson

    This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on a per-FlowFile basis, which is what separates it from PutElasticsearchRecord.

    As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and that controller service is built on top of the official Elasticsearch client APIs. That provides features such as automatic master detection against the cluster which is missing in the other bundles.

    This processor builds one Elasticsearch Bulk API body per (batch of) FlowFiles. Care should be taken to batch FlowFiles into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are not too large for it to handle. When failures do occur, this processor is capable of attempting to route the FlowFiles that failed to an errors queue so that only failed FlowFiles can be processed downstream or replayed.

    The index, operation and (optional) type fields are configured with default values. The ID (optional unless the operation is “index”) can be set as an attribute on the FlowFile(s).

    Dynamic Templates

    Index and Create operations can use Dynamic Templates. The Dynamic Templates property must be parsable as a JSON object.

    Example - Index with Dynamic Templates

    {
      "message": "Hello, world"
    }
    

    The Dynamic Templates property below would be parsable:

    {
      "message": "keyword_lower"
    }
    

    Would create Elasticsearch action:

    {
      "index": {
        "_id": "1",
        "_index": "test",
        "dynamic_templates": {
          "message": "keyword_lower"
        }
      }
    }
    
    {
      "doc": {
        "message": "Hello, world"
      }
    }
    

    Update/Upsert Scripts

    Update and Upsert operations can use a script. Scripts must contain all the elements required by Elasticsearch, e.g. source and lang. The Script property must be parsable as a JSON object.

    If a script is defined for an upset, the Flowfile content will be used as the upsert fields in the Elasticsearch action. If no script is defined, the FlowFile content will be used as the update doc (or doc_as_upsert for upsert operations).

    Example - Update without Script

    {
      "message": "Hello, world",
      "from": "john.smith"
    }
    

    Would create Elasticsearch action:

    {
      "update": {
        "_id": "1",
        "_index": "test"
      }
    }
    
    {
      "doc": {
        "message": "Hello, world",
        "from": "john.smith"
      }
    }
    

    Example - Upsert with Script

    {
      "counter": 1
    }
    

    The script property below would be parsable:

    {
      "source": "ctx._source.counter += params.param1",
      "lang": "painless",
      "params": {
        "param1": 1
      }
    }
    

    Would create Elasticsearch action:

    {
      "update": {
        "_id": "1",
        "_index": "test"
      }
    }
    
    {
      "script": {
        "source": "ctx._source.counter += params.param1",
        "lang": "painless",
        "params": {
          "param1": 1
        }
      },
      "upsert": {
        "counter": 1
      }
    }
    

    Bulk Action Header Fields

    Dynamic Properties can be defined on the processor with BULK: prefixes. Users must ensure that only known Bulk action fields are sent to Elasticsearch for the relevant index operation defined for the FlowFile, Elasticsearch will reject invalid combinations of index operation and Bulk action fields.

    Example - Update with Retry on Conflict

    {
      "message": "Hello, world",
      "from": "john.smith"
    }
    

    With the Dynamic Property below:

    • BULK:retry_on_conflict = 3

    Would create Elasticsearch action:

    {
      "update": {
        "_id": "1",
        "_index": "test",
        "retry_on_conflict": "3"
      }
    }
    
    {
      "doc": {
        "message": "Hello, world",
        "from": "john.smith"
      }
    }
    

    Index Operations

    Valid values for “operation” are:

    • create
    • delete
    • index
    • update
    • upsert
Properties
Dynamic Properties
System Resource Considerations
Resource Description
MEMORY The Batch of FlowFiles will be stored in memory until the bulk operation is performed.
Relationships
Name Description
failure All flowfiles that fail for reasons unrelated to server availability go to this relationship.
original All flowfiles that are sent to Elasticsearch without request failures go to this relationship.
successful Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that did not result in an "error" (within Elasticsearch) will be routed here.
errors Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that resulted in an "error" (within Elasticsearch) will be routed here.
retry All flowfiles that fail due to server/cluster availability go to this relationship.
Writes Attributes
Name Description
elasticsearch.put.error The error message if there is an issue parsing the FlowFile, sending the parsed document to Elasticsearch or parsing the Elasticsearch response
elasticsearch.bulk.error The _bulk response if there was an error during processing the document within Elasticsearch.
See Also