-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ConsumeKinesisStream 2.0.0
- Bundle
- org.apache.nifi | nifi-aws-nar
- Description
- Reads data from the specified AWS Kinesis stream and outputs a FlowFile for every processed Record (raw) or a FlowFile for a batch of processed records if a Record Reader and Record Writer are configured. At-least-once delivery of all Kinesis Records within the Stream while the processor is running. AWS Kinesis Client Library can take several seconds to initialise before starting to fetch data. Uses DynamoDB for check pointing and CloudWatch (optional) for metrics. Ensure that the credentials provided have access to DynamoDB and CloudWatch (optional) along with Kinesis.
- Tags
- amazon, aws, consume, kinesis, stream
- Input Requirement
- FORBIDDEN
- Supports Sensitive Dynamic Properties
- false
Properties
-
Application Name
The Kinesis stream reader application name.
- Display Name
- Application Name
- Description
- The Kinesis stream reader application name.
- API Name
- amazon-kinesis-stream-application-name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Checkpoint Interval
Interval between Kinesis checkpoints
- Display Name
- Checkpoint Interval
- Description
- Interval between Kinesis checkpoints
- API Name
- amazon-kinesis-stream-checkpoint-interval
- Default Value
- 3 secs
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Report Metrics to CloudWatch
Whether to report Kinesis usage metrics to CloudWatch.
- Display Name
- Report Metrics to CloudWatch
- Description
- Whether to report Kinesis usage metrics to CloudWatch.
- API Name
- amazon-kinesis-stream-cloudwatch-flag
- Default Value
- false
- Allowable Values
-
- true
- false
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
DynamoDB Override
DynamoDB override to use non-AWS deployments
- Display Name
- DynamoDB Override
- Description
- DynamoDB override to use non-AWS deployments
- API Name
- amazon-kinesis-stream-dynamodb-override
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Failover Timeout
Kinesis Client Library failover timeout
- Display Name
- Failover Timeout
- Description
- Kinesis Client Library failover timeout
- API Name
- amazon-kinesis-stream-failover-timeout
- Default Value
- 30 secs
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Graceful Shutdown Timeout
Kinesis Client Library graceful shutdown timeout
- Display Name
- Graceful Shutdown Timeout
- Description
- Kinesis Client Library graceful shutdown timeout
- API Name
- amazon-kinesis-stream-graceful-shutdown-timeout
- Default Value
- 20 secs
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Initial Stream Position
Initial position to read Kinesis streams.
- Display Name
- Initial Stream Position
- Description
- Initial position to read Kinesis streams.
- API Name
- amazon-kinesis-stream-initial-position
- Default Value
- LATEST
- Allowable Values
-
- LATEST
- TRIM_HORIZON
- AT_TIMESTAMP
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Stream Position Timestamp
Timestamp position in stream from which to start reading Kinesis Records. Required if Initial position to read Kinesis streams. is AT_TIMESTAMP. Uses the Timestamp Format to parse value into a Date.
- Display Name
- Stream Position Timestamp
- Description
- Timestamp position in stream from which to start reading Kinesis Records. Required if Initial position to read Kinesis streams. is AT_TIMESTAMP. Uses the Timestamp Format to parse value into a Date.
- API Name
- amazon-kinesis-stream-position-timestamp
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Initial Stream Position is set to any of [AT_TIMESTAMP]
-
Record Reader
The Record Reader to use for reading received messages. The Kinesis Stream name can be referred to by Expression Language '${kinesis.name}' to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
- Display Name
- Record Reader
- Description
- The Record Reader to use for reading received messages. The Kinesis Stream name can be referred to by Expression Language '${kinesis.name}' to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
- API Name
- amazon-kinesis-stream-record-reader
- Service Interface
- org.apache.nifi.serialization.RecordReaderFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Record Writer
The Record Writer to use for serializing Records to an output FlowFile. The Kinesis Stream name can be referred to by Expression Language '${kinesis.name}' to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
- Display Name
- Record Writer
- Description
- The Record Writer to use for serializing Records to an output FlowFile. The Kinesis Stream name can be referred to by Expression Language '${kinesis.name}' to access a schema. If Record Reader/Writer are not specified, each Kinesis Record will create a FlowFile.
- API Name
- amazon-kinesis-stream-record-writer
- Service Interface
- org.apache.nifi.serialization.RecordSetWriterFactory
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Retry Count
Number of times to retry a Kinesis operation (process record, checkpoint, shutdown)
- Display Name
- Retry Count
- Description
- Number of times to retry a Kinesis operation (process record, checkpoint, shutdown)
- API Name
- amazon-kinesis-stream-retry-count
- Default Value
- 10
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Retry Wait
Interval between Kinesis operation retries (process record, checkpoint, shutdown)
- Display Name
- Retry Wait
- Description
- Interval between Kinesis operation retries (process record, checkpoint, shutdown)
- API Name
- amazon-kinesis-stream-retry-wait
- Default Value
- 1 sec
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Timestamp Format
Format to use for parsing the Stream Position Timestamp into a Date and converting the Kinesis Record's Approximate Arrival Timestamp into a FlowFile attribute.
- Display Name
- Timestamp Format
- Description
- Format to use for parsing the Stream Position Timestamp into a Date and converting the Kinesis Record's Approximate Arrival Timestamp into a FlowFile attribute.
- API Name
- amazon-kinesis-stream-timestamp-format
- Default Value
- yyyy-MM-dd HH:mm:ss
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
-
AWS Credentials Provider Service
The Controller Service that is used to obtain AWS credentials provider
- Display Name
- AWS Credentials Provider Service
- Description
- The Controller Service that is used to obtain AWS credentials provider
- API Name
- AWS Credentials Provider service
- Service Interface
- org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Communications Timeout
- Display Name
- Communications Timeout
- Description
- API Name
- Communications Timeout
- Default Value
- 30 secs
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Endpoint Override URL
Endpoint URL to use instead of the AWS default including scheme, host, port, and path. The AWS libraries select an endpoint URL based on the AWS region, but this property overrides the selected endpoint URL, allowing use with other S3-compatible endpoints.
- Display Name
- Endpoint Override URL
- Description
- Endpoint URL to use instead of the AWS default including scheme, host, port, and path. The AWS libraries select an endpoint URL based on the AWS region, but this property overrides the selected endpoint URL, allowing use with other S3-compatible endpoints.
- API Name
- Endpoint Override URL
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- false
-
Amazon Kinesis Stream Name
The name of Kinesis Stream
- Display Name
- Amazon Kinesis Stream Name
- Description
- The name of Kinesis Stream
- API Name
- kinesis-stream-name
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Proxy Configuration Service
Specifies the Proxy Configuration Controller Service to proxy network requests. Supported proxies: HTTP + AuthN
- Display Name
- Proxy Configuration Service
- Description
- Specifies the Proxy Configuration Controller Service to proxy network requests. Supported proxies: HTTP + AuthN
- API Name
- proxy-configuration-service
- Service Interface
- org.apache.nifi.proxy.ProxyConfigurationService
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
-
Region
- Display Name
- Region
- Description
- API Name
- Region
- Default Value
- us-west-2
- Allowable Values
-
- AWS GovCloud (US-East)
- AWS GovCloud (US-West)
- Africa (Cape Town)
- Asia Pacific (Hong Kong)
- Asia Pacific (Hyderabad)
- Asia Pacific (Jakarta)
- Asia Pacific (Malaysia)
- Asia Pacific (Melbourne)
- Asia Pacific (Mumbai)
- Asia Pacific (Osaka)
- Asia Pacific (Seoul)
- Asia Pacific (Singapore)
- Asia Pacific (Sydney)
- Asia Pacific (Tokyo)
- Canada (Central)
- Canada West (Calgary)
- China (Beijing)
- China (Ningxia)
- EU ISOE West
- Europe (Frankfurt)
- Europe (Ireland)
- Europe (London)
- Europe (Milan)
- Europe (Paris)
- Europe (Spain)
- Europe (Stockholm)
- Europe (Zurich)
- Israel (Tel Aviv)
- Middle East (Bahrain)
- Middle East (UAE)
- South America (Sao Paulo)
- US East (N. Virginia)
- US East (Ohio)
- US ISO East
- US ISO WEST
- US ISOB East (Ohio)
- US West (N. California)
- US West (Oregon)
- aws-cn-global
- aws-global
- aws-iso-b-global
- aws-iso-global
- aws-us-gov-global
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
Dynamic Properties
-
Kinesis Client Library (KCL) Configuration property name
Override default KCL Configuration ConfigsBuilder properties with required values. Supports setting of values directly on the ConfigsBuilder, such as 'namespace', as well as properties on nested builders. For example, to set configsBuilder.retrievalConfig().maxListShardsRetryAttempts(value), name the property as 'retrievalConfig.maxListShardsRetryAttempts'. Only supports setting of simple property values, e.g. String, int, long and boolean. Does not allow override of KCL Configuration settings handled by non-dynamic processor properties.
- Name
- Kinesis Client Library (KCL) Configuration property name
- Description
- Override default KCL Configuration ConfigsBuilder properties with required values. Supports setting of values directly on the ConfigsBuilder, such as 'namespace', as well as properties on nested builders. For example, to set configsBuilder.retrievalConfig().maxListShardsRetryAttempts(value), name the property as 'retrievalConfig.maxListShardsRetryAttempts'. Only supports setting of simple property values, e.g. String, int, long and boolean. Does not allow override of KCL Configuration settings handled by non-dynamic processor properties.
- Value
- Value to set in the KCL Configuration property
- Expression Language Scope
- NONE
System Resource Considerations
Resource | Description |
---|---|
CPU | Kinesis Client Library is used to create a Worker thread for consumption of Kinesis Records. The Worker is initialised and started when this Processor has been triggered. It runs continually, spawning Kinesis Record Processors as required to fetch Kinesis Records. The Worker Thread (and any child Record Processor threads) are not controlled by the normal NiFi scheduler as part of the Concurrent Thread pool and are not released until this processor is stopped. |
NETWORK | Kinesis Client Library will continually poll for new Records, requesting up to a maximum number of Records/bytes per call. This can result in sustained network usage. |
Relationships
Name | Description |
---|---|
success | FlowFiles are routed to success relationship |
Writes Attributes
Name | Description |
---|---|
aws.kinesis.partition.key | Partition key of the (last) Kinesis Record read from the Shard |
aws.kinesis.shard.id | Shard ID from which the Kinesis Record was read |
aws.kinesis.sequence.number | The unique identifier of the (last) Kinesis Record within its Shard |
aws.kinesis.approximate.arrival.timestamp | Approximate arrival timestamp of the (last) Kinesis Record read from the stream |
mime.type | Sets the mime.type attribute to the MIME Type specified by the Record Writer (if configured) |
record.count | Number of records written to the FlowFiles by the Record Writer (if configured) |
record.error.message | This attribute provides on failure the error message encountered by the Record Reader or Record Writer (if configured) |
See Also