-
Processors
- AttributeRollingWindow
- AttributesToCSV
- AttributesToJSON
- CalculateRecordStats
- CaptureChangeMySQL
- CompressContent
- ConnectWebSocket
- ConsumeAMQP
- ConsumeAzureEventHub
- ConsumeElasticsearch
- ConsumeGCPubSub
- ConsumeIMAP
- ConsumeJMS
- ConsumeKafka
- ConsumeKinesisStream
- ConsumeMQTT
- ConsumePOP3
- ConsumeSlack
- ConsumeTwitter
- ConsumeWindowsEventLog
- ControlRate
- ConvertCharacterSet
- ConvertRecord
- CopyAzureBlobStorage_v12
- CopyS3Object
- CountText
- CryptographicHashContent
- DebugFlow
- DecryptContentAge
- DecryptContentPGP
- DeduplicateRecord
- DeleteAzureBlobStorage_v12
- DeleteAzureDataLakeStorage
- DeleteByQueryElasticsearch
- DeleteDynamoDB
- DeleteFile
- DeleteGCSObject
- DeleteGridFS
- DeleteMongo
- DeleteS3Object
- DeleteSFTP
- DeleteSQS
- DetectDuplicate
- DistributeLoad
- DuplicateFlowFile
- EncodeContent
- EncryptContentAge
- EncryptContentPGP
- EnforceOrder
- EvaluateJsonPath
- EvaluateXPath
- EvaluateXQuery
- ExecuteGroovyScript
- ExecuteProcess
- ExecuteScript
- ExecuteSQL
- ExecuteSQLRecord
- ExecuteStreamCommand
- ExtractAvroMetadata
- ExtractEmailAttachments
- ExtractEmailHeaders
- ExtractGrok
- ExtractHL7Attributes
- ExtractRecordSchema
- ExtractText
- FetchAzureBlobStorage_v12
- FetchAzureDataLakeStorage
- FetchBoxFile
- FetchDistributedMapCache
- FetchDropbox
- FetchFile
- FetchFTP
- FetchGCSObject
- FetchGoogleDrive
- FetchGridFS
- FetchS3Object
- FetchSFTP
- FetchSmb
- FilterAttribute
- FlattenJson
- ForkEnrichment
- ForkRecord
- GenerateFlowFile
- GenerateRecord
- GenerateTableFetch
- GeoEnrichIP
- GeoEnrichIPRecord
- GeohashRecord
- GetAsanaObject
- GetAwsPollyJobStatus
- GetAwsTextractJobStatus
- GetAwsTranscribeJobStatus
- GetAwsTranslateJobStatus
- GetAzureEventHub
- GetAzureQueueStorage_v12
- GetDynamoDB
- GetElasticsearch
- GetFile
- GetFTP
- GetGcpVisionAnnotateFilesOperationStatus
- GetGcpVisionAnnotateImagesOperationStatus
- GetHubSpot
- GetMongo
- GetMongoRecord
- GetS3ObjectMetadata
- GetSFTP
- GetShopify
- GetSmbFile
- GetSNMP
- GetSplunk
- GetSQS
- GetWorkdayReport
- GetZendesk
- HandleHttpRequest
- HandleHttpResponse
- IdentifyMimeType
- InvokeHTTP
- InvokeScriptedProcessor
- ISPEnrichIP
- JoinEnrichment
- JoltTransformJSON
- JoltTransformRecord
- JSLTTransformJSON
- JsonQueryElasticsearch
- ListAzureBlobStorage_v12
- ListAzureDataLakeStorage
- ListBoxFile
- ListDatabaseTables
- ListDropbox
- ListenFTP
- ListenHTTP
- ListenOTLP
- ListenSlack
- ListenSyslog
- ListenTCP
- ListenTrapSNMP
- ListenUDP
- ListenUDPRecord
- ListenWebSocket
- ListFile
- ListFTP
- ListGCSBucket
- ListGoogleDrive
- ListS3
- ListSFTP
- ListSmb
- LogAttribute
- LogMessage
- LookupAttribute
- LookupRecord
- MergeContent
- MergeRecord
- ModifyBytes
- ModifyCompression
- MonitorActivity
- MoveAzureDataLakeStorage
- Notify
- PackageFlowFile
- PaginatedJsonQueryElasticsearch
- ParseEvtx
- ParseNetflowv5
- ParseSyslog
- ParseSyslog5424
- PartitionRecord
- PublishAMQP
- PublishGCPubSub
- PublishJMS
- PublishKafka
- PublishMQTT
- PublishSlack
- PutAzureBlobStorage_v12
- PutAzureCosmosDBRecord
- PutAzureDataExplorer
- PutAzureDataLakeStorage
- PutAzureEventHub
- PutAzureQueueStorage_v12
- PutBigQuery
- PutBoxFile
- PutCloudWatchMetric
- PutDatabaseRecord
- PutDistributedMapCache
- PutDropbox
- PutDynamoDB
- PutDynamoDBRecord
- PutElasticsearchJson
- PutElasticsearchRecord
- PutEmail
- PutFile
- PutFTP
- PutGCSObject
- PutGoogleDrive
- PutGridFS
- PutKinesisFirehose
- PutKinesisStream
- PutLambda
- PutMongo
- PutMongoBulkOperations
- PutMongoRecord
- PutRecord
- PutRedisHashRecord
- PutS3Object
- PutSalesforceObject
- PutSFTP
- PutSmbFile
- PutSNS
- PutSplunk
- PutSplunkHTTP
- PutSQL
- PutSQS
- PutSyslog
- PutTCP
- PutUDP
- PutWebSocket
- PutZendeskTicket
- QueryAirtableTable
- QueryAzureDataExplorer
- QueryDatabaseTable
- QueryDatabaseTableRecord
- QueryRecord
- QuerySalesforceObject
- QuerySplunkIndexingStatus
- RemoveRecordField
- RenameRecordField
- ReplaceText
- ReplaceTextWithMapping
- RetryFlowFile
- RouteHL7
- RouteOnAttribute
- RouteOnContent
- RouteText
- RunMongoAggregation
- SampleRecord
- ScanAttribute
- ScanContent
- ScriptedFilterRecord
- ScriptedPartitionRecord
- ScriptedTransformRecord
- ScriptedValidateRecord
- SearchElasticsearch
- SegmentContent
- SendTrapSNMP
- SetSNMP
- SignContentPGP
- SplitAvro
- SplitContent
- SplitExcel
- SplitJson
- SplitPCAP
- SplitRecord
- SplitText
- SplitXml
- StartAwsPollyJob
- StartAwsTextractJob
- StartAwsTranscribeJob
- StartAwsTranslateJob
- StartGcpVisionAnnotateFilesOperation
- StartGcpVisionAnnotateImagesOperation
- TagS3Object
- TailFile
- TransformXml
- UnpackContent
- UpdateAttribute
- UpdateByQueryElasticsearch
- UpdateCounter
- UpdateDatabaseTable
- UpdateRecord
- ValidateCsv
- ValidateJson
- ValidateRecord
- ValidateXml
- VerifyContentMAC
- VerifyContentPGP
- Wait
-
Controller Services
- ADLSCredentialsControllerService
- ADLSCredentialsControllerServiceLookup
- AmazonGlueSchemaRegistry
- ApicurioSchemaRegistry
- AvroReader
- AvroRecordSetWriter
- AvroSchemaRegistry
- AWSCredentialsProviderControllerService
- AzureBlobStorageFileResourceService
- AzureCosmosDBClientService
- AzureDataLakeStorageFileResourceService
- AzureEventHubRecordSink
- AzureStorageCredentialsControllerService_v12
- AzureStorageCredentialsControllerServiceLookup_v12
- CEFReader
- ConfluentEncodedSchemaReferenceReader
- ConfluentEncodedSchemaReferenceWriter
- ConfluentSchemaRegistry
- CSVReader
- CSVRecordLookupService
- CSVRecordSetWriter
- DatabaseRecordLookupService
- DatabaseRecordSink
- DatabaseTableSchemaRegistry
- DBCPConnectionPool
- DBCPConnectionPoolLookup
- DistributedMapCacheLookupService
- ElasticSearchClientServiceImpl
- ElasticSearchLookupService
- ElasticSearchStringLookupService
- EmailRecordSink
- EmbeddedHazelcastCacheManager
- ExcelReader
- ExternalHazelcastCacheManager
- FreeFormTextRecordSetWriter
- GCPCredentialsControllerService
- GCSFileResourceService
- GrokReader
- HazelcastMapCacheClient
- HikariCPConnectionPool
- HttpRecordSink
- IPLookupService
- JettyWebSocketClient
- JettyWebSocketServer
- JMSConnectionFactoryProvider
- JndiJmsConnectionFactoryProvider
- JsonConfigBasedBoxClientService
- JsonPathReader
- JsonRecordSetWriter
- JsonTreeReader
- Kafka3ConnectionService
- KerberosKeytabUserService
- KerberosPasswordUserService
- KerberosTicketCacheUserService
- LoggingRecordSink
- MapCacheClientService
- MapCacheServer
- MongoDBControllerService
- MongoDBLookupService
- PropertiesFileLookupService
- ProtobufReader
- ReaderLookup
- RecordSetWriterLookup
- RecordSinkServiceLookup
- RedisConnectionPoolService
- RedisDistributedMapCacheClientService
- RestLookupService
- S3FileResourceService
- ScriptedLookupService
- ScriptedReader
- ScriptedRecordSetWriter
- ScriptedRecordSink
- SetCacheClientService
- SetCacheServer
- SimpleCsvFileLookupService
- SimpleDatabaseLookupService
- SimpleKeyValueLookupService
- SimpleRedisDistributedMapCacheClientService
- SimpleScriptedLookupService
- SiteToSiteReportingRecordSink
- SlackRecordSink
- SmbjClientProviderService
- StandardAsanaClientProviderService
- StandardAzureCredentialsControllerService
- StandardDropboxCredentialService
- StandardFileResourceService
- StandardHashiCorpVaultClientService
- StandardHttpContextMap
- StandardJsonSchemaRegistry
- StandardKustoIngestService
- StandardKustoQueryService
- StandardOauth2AccessTokenProvider
- StandardPGPPrivateKeyService
- StandardPGPPublicKeyService
- StandardPrivateKeyService
- StandardProxyConfigurationService
- StandardRestrictedSSLContextService
- StandardS3EncryptionService
- StandardSSLContextService
- StandardWebClientServiceProvider
- Syslog5424Reader
- SyslogReader
- UDPEventRecordSink
- VolatileSchemaCache
- WindowsEventLogReader
- XMLFileLookupService
- XMLReader
- XMLRecordSetWriter
- YamlTreeReader
- ZendeskRecordSink
ProtobufReader 2.0.0
- Bundle
- org.apache.nifi | nifi-protobuf-services-nar
- Description
- Parses a Protocol Buffers message from binary format.
- Tags
- parser, protobuf, reader, record
- Input Requirement
- Supports Sensitive Dynamic Properties
- false
-
Additional Details for ProtobufReader 2.0.0
ProtobufReader
The ProtobufReader Controller Service reads and parses a Protocol Buffers Message from binary format and creates a Record object. The Controller Service must be configured with the same ‘.proto’ file that was used for the Message encoding, and the fully qualified name of the Message type including its package (e.g. mypackage.MyMessage). The Reader will always generate one record from the input data which represents the provided Protocol Buffers Message type. Further information about Protocol Buffers can be found here: protobuf.dev
Data Type Mapping
When a record is parsed from incoming data, the Controller Service is going to map the Proto Message field types to the corresponding NiFi data types. The mapping between the provided Message fields and the encoded input is always based on the field tag numbers. When a field is defined as ‘repeated’ then it’s data type will be an array with data type of it’s originally specified type. The following tables show which proto field type will correspond to which NiFi field type after the conversion.
Scalar Value Types
Proto Type Proto Wire Type NiFi Data Type double fixed64 double float fixed32 float int32 varint int int64 varint long uint32 varint long uint64 varint bigint sint32 varint long sint64 varint long fixed32 fixed32 long fixed64 fixed64 bigint sfixed32 varint int sfixed64 varint long bool varint boolean string length delimited string bytes length delimited array[byte] Composite Value Types
Proto Type Proto Wire Type NiFi Data Type message length delimited record enum varint enum map length delimited map oneof - choice Schemas and Type Coercion
When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in the schema, that field will be stored in the Record’s value list on its original type. If the field is found in the schema, the data type of the received data is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
The following rules apply when attempting to coerce a field value from one data type to another:
- Any data type can be coerced into a String type.
- Any numeric data type (Int, Long, Float, Double) can be coerced into any other numeric data type.
- Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of milliseconds since epoch (Midnight GMT, January 1, 1970).
- A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured “Date Format,” “Time Format,” or “Timestamp Format.”
- A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String
value
8
can be coerced into any numeric type. However, the String value8.2
can be coerced into a Double or Float type but not an Integer. - A String value of “true” or “false” (regardless of case) can be coerced into a Boolean value.
- A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used and the rest of the characters are ignored.
If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception will be thrown.
Schema Access Strategy
Beside the common Schema Access strategies like getting the schema from property value or accessing it from Schema Registry, the ProtobufReader Controller Service offers another access strategy option called “Generate from Proto file”. When using this strategy, the Reader will generate the Record Schema from the provided ‘.proto’ file and Message type. This is a recommended strategy when the user doesn’t want to manually create the schema or when no type coercion is needed.
Protobuf Any Field Type
Protocol Buffers offers further Message types called Well-Known Types. These are additionally provided messages that defines complex structured types and wrappers for scalar types. The Any type is one of these Well-Known Types which is used to store an arbitrary serialized Message along with a URL that describes the type of the serialized Message. Since the Message type and the embedded Message will be available only when the Any Message is already populated with data, the ProtobufReader needs to do this Message processing at data conversion time. The Reader is capable to generate schema for the embedded Message in the Any field and replace it in the result Record schema.
Example
There is a Message called ‘TestMessage’ which has only one field that is an Any typed field. There is another Message called ‘NestedMessage’ that we would like to add as serialized Message in the value of ‘anyField’.
message Any { string type_url = 1; bytes value = 2; } message TestMessage { google.protobuf.Any anyField = 3; } message NestedMessage { string field_1 = 1; string field_2 = 2; string field_3 = 3; }
``
With normal data conversion our result would look like this:
{ anyField: { type_url: "type.googleapis.com/NestedMessage" value: [ 84, 101, 115, 116, 32, 98, 121, 116, 101, 115 ] } }
Result after the Protobuf Reader replaces the Any Message’s fields with the processed embedded Message:
{ anyField: { field_1: "value 1", field_2: "value 2", field_3: "value 3" } }
-
Message Type
Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). The .proto files configured in 'Proto Directory' must contain the definition of this message type.
- Display Name
- Message Type
- Description
- Fully qualified name of the Protocol Buffers message type including its package (eg. mypackage.MyMessage). The .proto files configured in 'Proto Directory' must contain the definition of this message type.
- API Name
- Message Type
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
-
Proto Directory
Directory containing Protocol Buffers message definition (.proto) file(s).
- Display Name
- Proto Directory
- Description
- Directory containing Protocol Buffers message definition (.proto) file(s).
- API Name
- Proto Directory
- Expression Language Scope
- Environment variables defined at JVM level and system properties
- Sensitive
- false
- Required
- true
-
Schema Access Strategy
Specifies how to obtain the schema that is to be used for interpreting the data.
- Display Name
- Schema Access Strategy
- Description
- Specifies how to obtain the schema that is to be used for interpreting the data.
- API Name
- schema-access-strategy
- Default Value
- generate-from-proto-file
- Allowable Values
-
- Use 'Schema Name' Property
- Use 'Schema Text' Property
- Schema Reference Reader
- Generate from Proto file
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
-
Schema Branch
Specifies the name of the branch to use when looking up the schema in the Schema Registry property. If the chosen Schema Registry does not support branching, this value will be ignored.
- Display Name
- Schema Branch
- Description
- Specifies the name of the branch to use when looking up the schema in the Schema Registry property. If the chosen Schema Registry does not support branching, this value will be ignored.
- API Name
- schema-branch
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Schema Access Strategy is set to any of [schema-name]
-
Schema Name
Specifies the name of the schema to lookup in the Schema Registry property
- Display Name
- Schema Name
- Description
- Specifies the name of the schema to lookup in the Schema Registry property
- API Name
- schema-name
- Default Value
- ${schema.name}
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Schema Access Strategy is set to any of [schema-name]
-
Schema Reference Reader
Service implementation responsible for reading FlowFile attributes or content to determine the Schema Reference Identifier
- Display Name
- Schema Reference Reader
- Description
- Service implementation responsible for reading FlowFile attributes or content to determine the Schema Reference Identifier
- API Name
- schema-reference-reader
- Service Interface
- org.apache.nifi.schemaregistry.services.SchemaReferenceReader
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- true
- Dependencies
-
- Schema Access Strategy is set to any of [schema-reference-reader]
-
Schema Registry
Specifies the Controller Service to use for the Schema Registry
- Display Name
- Schema Registry
- Description
- Specifies the Controller Service to use for the Schema Registry
- API Name
- schema-registry
- Service Interface
- org.apache.nifi.schemaregistry.services.SchemaRegistry
- Service Implementations
- Expression Language Scope
- Not Supported
- Sensitive
- false
- Required
- false
- Dependencies
-
- Schema Access Strategy is set to any of [schema-reference-reader, schema-name]
-
Schema Text
The text of an Avro-formatted Schema
- Display Name
- Schema Text
- Description
- The text of an Avro-formatted Schema
- API Name
- schema-text
- Default Value
- ${avro.schema}
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Schema Access Strategy is set to any of [schema-text-property]
-
Schema Version
Specifies the version of the schema to lookup in the Schema Registry. If not specified then the latest version of the schema will be retrieved.
- Display Name
- Schema Version
- Description
- Specifies the version of the schema to lookup in the Schema Registry. If not specified then the latest version of the schema will be retrieved.
- API Name
- schema-version
- Expression Language Scope
- Environment variables and FlowFile Attributes
- Sensitive
- false
- Required
- false
- Dependencies
-
- Schema Access Strategy is set to any of [schema-name]