ListFile 2.0.0

Bundle
org.apache.nifi | nifi-standard-nar
Description
Retrieves a listing of files from the input directory. For each file listed, creates a FlowFile that represents the file so that it can be fetched in conjunction with FetchFile. This Processor is designed to run on Primary Node only in a cluster when 'Input Directory Location' is set to 'Remote'. If the primary node changes, the new Primary Node will pick up where the previous node left off without duplicating all the data. When 'Input Directory Location' is 'Local', the 'Execution' mode can be anything, and synchronization won't happen. Unlike GetFile, this Processor does not delete any data from the local filesystem.
Tags
file, filesystem, get, ingest, list, source
Input Requirement
FORBIDDEN
Supports Sensitive Dynamic Properties
false
  • Additional Details for ListFile 2.0.0

    ListFile

    ListFile performs a listing of all files that it encounters in the configured directory. There are two common, broadly defined use cases.

    Streaming Use Case

    By default, the Processor will create a separate FlowFile for each file in the directory and add attributes for filename, path, etc. A common use case is to connect ListFile to the FetchFile processor. These two processors used in conjunction with one another provide the ability to easily monitor a directory and fetch the contents of any new file as it lands in an efficient streaming fashion.

    Batch Use Case

    Another common use case is the desire to process all newly arriving files in a given directory, and to then perform some action only when all files have completed their processing. The above approach of streaming the data makes this difficult, because NiFi is inherently a streaming platform in that there is no “job” that has a beginning and an end. Data is simply picked up as it becomes available.

    To solve this, the ListFile Processor can optionally be configured with a Record Writer. When a Record Writer is configured, a single FlowFile will be created that will contain a Record for each file in the directory, instead of a separate FlowFile per file. With this pattern, in order to fetch the contents of each file, the records must be split up into individual FlowFiles and then fetched. So how does this help us?

    We can still accomplish the desired use case of waiting until all files in the directory have been processed by splitting apart the FlowFile and processing all the data within a Process Group. Configuring the Process Group with a FlowFile Concurrency of “Single FlowFile per Node” means that only one FlowFile will be brought into the Process Group. Once that happens, the FlowFile can be split apart and each part processed. Configuring the Process Group with an Outbound Policy of “Batch Output” means that none of the FlowFiles will leave the Process Group until all have finished processing.

    In this flow, we perform a listing of a directory with ListFile. The processor is configured with a Record Writer (in this case a CSV Writer, but any Record Writer can be used) so that only a single FlowFile is generated for the entire listing. That listing is then sent to the “Process Listing” Process Group (shown below). Only after the contents of the entire directory have been processed will data leave the “Process Listing” Process Group. At that point, when all data in the Process Group is ready to leave, each of the processed files will be sent to the “Post-Processing” Process Group. At the same time, the original listing is to be sent to the “Processing Complete Notification” Process Group. In order to accomplish this, the Process Group must be configured with a FlowFile Concurrency of “Single FlowFile per Node” and an Outbound Policy of “Batch Output.”

    The “Process Listing” Process Group a listing is received via the “Listing” Input Port. This is then sent directly to the “Listing of Processed Data” Output Port so that when all processing completes, the original listing will be sent out also.

    Next, the listing is broken apart into an individual FlowFile per record. Because we want to use FetchFile to fetch the data, we need to get the file’s filename and path as FlowFile attributes. This can be done in a few different ways, but the easiest mechanism is to use the PartitionRecord processor. This Processor is configured with a Record Reader that is able to read the data written by ListFile (in this case, a CSV Reader). The Processor is also configured with two additional user-defined properties:

    • absolute.path: /path
    • filename: /filename

    As a result, each record that comes into the PartitionRecord processor will be split into an individual FlowFile ( because the combination of the “path” and “filename” fields will be unique for each Record) and the “filename” and " path" record fields will become attributes on the FlowFile (using attribute names of “absolute.path” and “filename”). FetchFile uses default configuration, which references these attributes.

    Finally, we process the data - in this example, simply by compressing it with GZIP compression - and send the output to the “Processed Data” Output Port. The data will queue up here until all data is ready to leave the Process Group and then will be released.

    Record Schema

    When the Processor is configured to write the listing using a Record Writer, the Records will be written using the following schema (in Avro format):

    {
      "type": "record",
      "name": "nifiRecord",
      "namespace": "org.apache.nifi",
      "fields": [
        {
          "name": "filename",
          "type": "string"
        },
        {
          "name": "path",
          "type": "string"
        },
        {
          "name": "directory",
          "type": "boolean"
        },
        {
          "name": "size",
          "type": "long"
        },
        {
          "name": "lastModified",
          "type": {
            "type": "long",
            "logicalType": "timestamp-millis"
          }
        },
        {
          "name": "permissions",
          "type": [
            "null",
            "string"
          ]
        },
        {
          "name": "owner",
          "type": [
            "null",
            "string"
          ]
        },
        {
          "name": "group",
          "type": [
            "null",
            "string"
          ]
        }
      ]
    }
    
Properties
State Management
Scopes Description
LOCAL, CLUSTER After performing a listing of files, the timestamp of the newest file is stored. This allows the Processor to list only files that have been added or modified after this date the next time that the Processor is run. Whether the state is stored with a Local or Cluster scope depends on the value of the <Input Directory Location> property.
Relationships
Name Description
success All FlowFiles that are received are routed to success
Writes Attributes
Name Description
filename The name of the file that was read from filesystem.
path The path is set to the relative path of the file's directory on filesystem compared to the Input Directory property. For example, if Input Directory is set to /tmp, then files picked up from /tmp will have the path attribute set to "/". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to "abc/1/2/3/".
absolute.path The absolute.path is set to the absolute path of the file's directory on filesystem. For example, if the Input Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to "/tmp/". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to "/tmp/abc/1/2/3/".
file.owner The user that owns the file in filesystem
file.group The group that owns the file in filesystem
file.size The number of bytes in the file in filesystem
file.permissions The permissions for the file in filesystem. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r--
file.lastModifiedTime The timestamp of when the file in filesystem was last modified as 'yyyy-MM-dd'T'HH:mm:ssZ'
file.lastAccessTime The timestamp of when the file in filesystem was last accessed as 'yyyy-MM-dd'T'HH:mm:ssZ'
file.creationTime The timestamp of when the file in filesystem was created as 'yyyy-MM-dd'T'HH:mm:ssZ'
See Also