Streaming Versus Batch Processing

ListGCSBucket performs a listing of all GCS Objects that it encounters in the configured GCS bucket. There are two common, broadly defined use cases.

Streaming Use Case

By default, the Processor will create a separate FlowFile for each object in the bucket and add attributes for filename, bucket, etc. A common use case is to connect ListGCSBucket to the FetchGCSObject processor. These two processors used in conjunction with one another provide the ability to easily monitor a bucket and fetch the contents of any new object as it lands in GCS in an efficient streaming fashion.

Batch Use Case

Another common use case is the desire to process all newly arriving objects in a given bucket, and to then perform some action only when all objects have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently a streaming platform in that there is no "job" that has a beginning and an end. Data is simply picked up as it becomes available.

To solve this, the ListGCSBucket Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single FlowFile will be created that will contain a Record for each object in the bucket, instead of a separate FlowFile per object. See the documentation for ListFile for an example of how to build a dataflow that allows for processing all of the objects before proceeding with any other step.

One important difference between the data produced by ListFile and ListGCSBucket, though, is the structure of the Records that are emitted. The Records emitted by ListFile have a different schema than those emitted by ListGCSBucket. ListGCSBucket emits records that follow the following schema (in Avro format):

{
  "type": "record",
  "name": "nifiRecord",
  "namespace": "org.apache.nifi",
  "fields": [{
    "name": "bucket",
    "type": "string"
  }, {
    "name": "name",
    "type": "string"
  }, {
    "name": "size",
    "type": ["null", "long"]
  }, {
    "name": "cacheControl",
    "type": ["null", "string"]
  }, {
    "name": "componentCount",
    "type": ["null", "int"]
  }, {
    "name": "contentDisposition",
    "type": ["null", "long"]
  }, {
    "name": "contentEncoding",
    "type": ["null", "string"]
  }, {
    "name": "contentLanguage",
    "type": ["null", "string"]
  }, {
    "name": "crc32c",
    "type": ["null", "string"]
  }, {
    "name": "createTime",
    "type": ["null", {
      "type": "long",
      "logicalType": "timestamp-millis"
    }]
  }, {
    "name": "updateTime",
    "type": ["null", {
      "type": "long",
      "logicalType": "timestamp-millis"
    }]
  }, {
    "name": "encryptionAlgorithm",
    "type": ["null", "string"]
  }, {
    "name": "encryptionKeySha256",
    "type": ["null", "string"]
  }, {
    "name": "etag",
    "type": ["null", "string"]
  }, {
    "name": "generatedId",
    "type": ["null", "string"]
  }, {
    "name": "generation",
    "type": ["null", "long"]
  }, {
    "name": "md5",
    "type": ["null", "string"]
  }, {
    "name": "mediaLink",
    "type": ["null", "string"]
  }, {
    "name": "metageneration",
    "type": ["null", "long"]
  }, {
    "name": "owner",
    "type": ["null", "string"]
  }, {
    "name": "ownerType",
    "type": ["null", "string"]
  }, {
    "name": "uri",
    "type": ["null", "string"]
  }]
}