-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
PutElasticsearchJson 2.0.0
- Bundle
- org.apache.nifi | nifi-elasticsearch-restapi-nar
- Description
- An Elasticsearch put processor that uses the official Elastic REST client libraries. Each FlowFile is treated as a document to be sent to the Elasticsearch _bulk API. Multiple FlowFiles can be batched together into each Request sent to Elasticsearch.
- Tags
- elasticsearch, elasticsearch5, elasticsearch6, elasticsearch7, elasticsearch8, index, json, put
- Input Requirement
- REQUIRED
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for PutElasticsearchJson 2.0.0
PutElasticsearchJson
This processor is for accessing the Elasticsearch Bulk API. It provides the ability to configure bulk operations on a per-FlowFile basis, which is what separates it from PutElasticsearchRecord.
As part of the Elasticsearch REST API bundle, it uses a controller service to manage connection information and that controller service is built on top of the official Elasticsearch client APIs. That provides features such as automatic master detection against the cluster which is missing in the other bundles.
This processor builds one Elasticsearch Bulk API body per (batch of) FlowFiles. Care should be taken to batch FlowFiles into appropriately-sized chunks so that NiFi does not run out of memory and the requests sent to Elasticsearch are not too large for it to handle. When failures do occur, this processor is capable of attempting to route the FlowFiles that failed to an errors queue so that only failed FlowFiles can be processed downstream or replayed.
The index, operation and (optional) type fields are configured with default values. The ID (optional unless the operation is “index”) can be set as an attribute on the FlowFile(s).
Dynamic Templates
Index and Create operations can use Dynamic Templates. The Dynamic Templates property must be parsable as a JSON object.
Example - Index with Dynamic Templates
{ "message": "Hello, world" }
The Dynamic Templates property below would be parsable:
{ "message": "keyword_lower" }
Would create Elasticsearch action:
{ "index": { "_id": "1", "_index": "test", "dynamic_templates": { "message": "keyword_lower" } } }
{ "doc": { "message": "Hello, world" } }
Update/Upsert Scripts
Update and Upsert operations can use a script. Scripts must contain all the elements required by Elasticsearch, e.g. source and lang. The Script property must be parsable as a JSON object.
If a script is defined for an upset, the Flowfile content will be used as the upsert fields in the Elasticsearch action. If no script is defined, the FlowFile content will be used as the update doc (or doc_as_upsert for upsert operations).
Example - Update without Script
{ "message": "Hello, world", "from": "john.smith" }
Would create Elasticsearch action:
{ "update": { "_id": "1", "_index": "test" } }
{ "doc": { "message": "Hello, world", "from": "john.smith" } }
Example - Upsert with Script
{ "counter": 1 }
The script property below would be parsable:
{ "source": "ctx._source.counter += params.param1", "lang": "painless", "params": { "param1": 1 } }
Would create Elasticsearch action:
{ "update": { "_id": "1", "_index": "test" } }
{ "script": { "source": "ctx._source.counter += params.param1", "lang": "painless", "params": { "param1": 1 } }, "upsert": { "counter": 1 } }
Bulk Action Header Fields
Dynamic Properties can be defined on the processor with BULK: prefixes. Users must ensure that only known Bulk action fields are sent to Elasticsearch for the relevant index operation defined for the FlowFile, Elasticsearch will reject invalid combinations of index operation and Bulk action fields.
Example - Update with Retry on Conflict
{ "message": "Hello, world", "from": "john.smith" }
With the Dynamic Property below:
- BULK:retry_on_conflict = 3
Would create Elasticsearch action:
{ "update": { "_id": "1", "_index": "test", "retry_on_conflict": "3" } }
{ "doc": { "message": "Hello, world", "from": "john.smith" } }
Index Operations
Valid values for “operation” are:
- create
- delete
- index
- update
- upsert
-
Client Service
An Elasticsearch client service to use for running queries.
- Display Name
- Client Service
- Description
- An Elasticsearch client service to use for running queries.
- API Name
- el-rest-client-service
- Service Interface
- org.apache.nifi.elasticsearch.ElasticSearchClientService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Index
The name of the index to use.
- Display Name
- Index
- Description
- The name of the index to use.
- API Name
- el-rest-fetch-index
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
-
Type
The type of this document (used by Elasticsearch for indexing and searching).
- Display Name
- Type
- Description
- The type of this document (used by Elasticsearch for indexing and searching).
- API Name
- el-rest-type
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
-
Max JSON Field String Length
The maximum allowed length of a string value when parsing a JSON document or attribute.
- Display Name
- Max JSON Field String Length
- Description
- The maximum allowed length of a string value when parsing a JSON document or attribute.
- API Name
- Max JSON Field String Length
- Default Value
- 20 MB
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Character Set
Specifies the character set of the document data.
- Display Name
- Character Set
- Description
- Specifies the character set of the document data.
- API Name
- put-es-json-charset
- Default Value
- UTF-8
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
-
Dynamic Templates
The dynamic_templates for the document. Must be parsable as a JSON Object. Requires Elasticsearch 7+
- Display Name
- Dynamic Templates
- Description
- The dynamic_templates for the document. Must be parsable as a JSON Object. Requires Elasticsearch 7+
- API Name
- put-es-json-dynamic_templates
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
-
Identifier Attribute
The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is "index", this property may be left empty or evaluate to an empty value, in which case the document's identifier will be auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.
- Display Name
- Identifier Attribute
- Description
- The name of the FlowFile attribute containing the identifier for the document. If the Index Operation is "index", this property may be left empty or evaluate to an empty value, in which case the document's identifier will be auto-generated by Elasticsearch. For all other Index Operations, the attribute must evaluate to a non-empty value.
- API Name
- put-es-json-id-attr
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Script
The script for the document update/upsert. Only applies to Update/Upsert operations. Must be parsable as JSON Object. If left blank, the FlowFile content will be used for document update/upsert
- Display Name
- Script
- Description
- The script for the document update/upsert. Only applies to Update/Upsert operations. Must be parsable as JSON Object. If left blank, the FlowFile content will be used for document update/upsert
- API Name
- put-es-json-script
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
-
Scripted Upsert
Whether to add the scripted_upsert flag to the Upsert Operation. Forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. If the Upsert Document provided (from FlowFile content) will be empty, but sure to set the Client Service controller service's Suppress Null/Empty Values to Never Suppress or no "upsert" doc will be, included in the request to Elasticsearch and the operation will not create a new document for the script to execute against, resulting in a "not_found" error
- Display Name
- Scripted Upsert
- Description
- Whether to add the scripted_upsert flag to the Upsert Operation. Forces Elasticsearch to execute the Script whether or not the document exists, defaults to false. If the Upsert Document provided (from FlowFile content) will be empty, but sure to set the Client Service controller service's Suppress Null/Empty Values to Never Suppress or no "upsert" doc will be, included in the request to Elasticsearch and the operation will not create a new document for the script to execute against, resulting in a "not_found" error
- API Name
- put-es-json-scripted-upsert
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
-
Treat "Not Found" as Success
If true, "not_found" Elasticsearch Document associated Records will be routed to the "successful" relationship, otherwise to the "errors" relationship. If Output Error Responses is "true" then "not_found" responses from Elasticsearch will be sent to the error_responses relationship.
- Display Name
- Treat "Not Found" as Success
- Description
- If true, "not_found" Elasticsearch Document associated Records will be routed to the "successful" relationship, otherwise to the "errors" relationship. If Output Error Responses is "true" then "not_found" responses from Elasticsearch will be sent to the error_responses relationship.
- API Name
- put-es-not_found-is-error
- Default Value
- true
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Output Error Responses
If this is enabled, response messages from Elasticsearch marked as "error" will be output to the "error_responses" relationship.This does not impact the output of flowfiles to the "successful" or "errors" relationships
- Display Name
- Output Error Responses
- Description
- If this is enabled, response messages from Elasticsearch marked as "error" will be output to the "error_responses" relationship.This does not impact the output of flowfiles to the "successful" or "errors" relationships
- API Name
- put-es-output-error-responses
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Batch Size
The preferred number of FlowFiles to send over in a single batch
- Display Name
- Batch Size
- Description
- The preferred number of FlowFiles to send over in a single batch
- API Name
- put-es-record-batch-size
- Default Value
- 100
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
-
Index Operation
The type of the operation used to index (create, delete, index, update, upsert)
- Display Name
- Index Operation
- Description
- The type of the operation used to index (create, delete, index, update, upsert)
- API Name
- put-es-record-index-op
- Default Value
- index
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- true
-
Log Error Responses
If this is enabled, errors will be logged to the NiFi logs at the error log level. Otherwise, they will only be logged if debug logging is enabled on NiFi as a whole. The purpose of this option is to give the user the ability to debug failed operations without having to turn on debug logging.
- Display Name
- Log Error Responses
- Description
- If this is enabled, errors will be logged to the NiFi logs at the error log level. Otherwise, they will only be logged if debug logging is enabled on NiFi as a whole. The purpose of this option is to give the user the ability to debug failed operations without having to turn on debug logging.
- API Name
- put-es-record-log-error-responses
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
The name of the Bulk request header
Prefix: BULK: - adds the specified property name/value as a Bulk request header in the Elasticsearch Bulk API body used for processing. If the value is null or blank, the Bulk header will be omitted for the document operation. These parameters will override any matching parameters in the _bulk request body.
- Name
- The name of the Bulk request header
- Description
- Prefix: BULK: - adds the specified property name/value as a Bulk request header in the Elasticsearch Bulk API body used for processing. If the value is null or blank, the Bulk header will be omitted for the document operation. These parameters will override any matching parameters in the _bulk request body.
- Value
- The value of the Bulk request header
- Expression Language Scope
- FLOWFILE_ATTRIBUTES
-
The name of a URL query parameter to add
Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the _bulk request body. If FlowFiles are batched, only the first FlowFile in the batch is used to evaluate property values.
- Name
- The name of a URL query parameter to add
- Description
- Adds the specified property name/value as a query parameter in the Elasticsearch URL used for processing. These parameters will override any matching parameters in the _bulk request body. If FlowFiles are batched, only the first FlowFile in the batch is used to evaluate property values.
- Value
- The value of the URL query parameter
- Expression Language Scope
- FLOWFILE_ATTRIBUTES
Resource | Description |
---|---|
MEMORY | The Batch of FlowFiles will be stored in memory until the bulk operation is performed. |
Name | Description |
---|---|
failure | All flowfiles that fail for reasons unrelated to server availability go to this relationship. |
original | All flowfiles that are sent to Elasticsearch without request failures go to this relationship. |
successful | Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that did not result in an "error" (within Elasticsearch) will be routed here. |
errors | Record(s)/Flowfile(s) corresponding to Elasticsearch document(s) that resulted in an "error" (within Elasticsearch) will be routed here. |
retry | All flowfiles that fail due to server/cluster availability go to this relationship. |
Name | Description |
---|---|
elasticsearch.put.error | The error message if there is an issue parsing the FlowFile, sending the parsed document to Elasticsearch or parsing the Elasticsearch response |
elasticsearch.bulk.error | The _bulk response if there was an error during processing the document within Elasticsearch. |