The CSVReader allows for interpreting input data as delimited Records. By default, a comma is used as the field separator, but this is configurable. It is common, for instance, to use a tab in order to read tab-separated values, or TSV.
There are pre-defined CSV formats in the reader like EXCEL. Further information regarding their settings can be found here: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html
The reader allows for customization of the CSV Format, such as which character should be used to separate CSV fields, which character should be used for quoting and when to quote fields, which character should denote a comment, etc. The names of the fields may be specified either by having a "header line" as the first line in the CSV (in which case the Schema Access Strategy should be "Infer Schema" or "Use String Fields From Header") or can be supplied by specifying the schema by using the Schema Text or looking up the schema in a Schema Registry.

Schemas and Type Coercion

When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.

The following rules apply when attempting to coerce a field value from one data type to another:

If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception will be thrown.

Schema Inference

While NiFi's Record API does require that each Record have a schema, it is often convenient to infer the schema based on the values in the data, rather than having to manually create a schema. This is accomplished by selecting a value of "Infer Schema" for the "Schema Access Strategy" property. When using this strategy, the Reader will determine the schema by first parsing all data in the FlowFile, keeping track of all fields that it has encountered and the type of each field. Once all data has been parsed, a schema is formed that encompasses all fields that have been encountered.

A common concern when inferring schemas is how to handle the condition of two values that have different types. For example, consider a FlowFile with the following two records:

name, age
John, 8
Jane, Ten

It is clear that the "name" field will be inferred as a STRING type. However, how should we handle the "age" field? Should the field be an CHOICE between INT and STRING? Should we prefer LONG over INT? Should we just use a STRING? Should the field be considered nullable?

To help understand how this Record Reader infers schemas, we have the following list of rules that are followed in the inference logic:

Caching of Inferred Schemas

This Record Reader requires that if a schema is to be inferred, that all records be read in order to ensure that the schema that gets inferred is applicable for all records in the FlowFile. However, this can become expensive, especially if the data undergoes many different transformations. To alleviate the cost of inferring schemas, the Record Reader can be configured with a "Schema Inference Cache" by populating the property with that name. This is a Controller Service that can be shared by Record Readers and Record Writers.

Whenever a Record Writer is used to write data, if it is configured with a "Schema Cache," it will also add the schema to the Schema Cache. This will result in an identifier for that schema being added as an attribute to the FlowFile.

Whenever a Record Reader is used to read data, if it is configured with a "Schema Inference Cache", it will first look for a "schema.cache.identifier" attribute on the FlowFile. If the attribute exists, it will use the value of that attribute to lookup the schema in the schema cache. If it is able to find a schema in the cache with that identifier, then it will use that schema instead of reading, parsing, and analyzing the data to infer the schema. If the attribute is not available on the FlowFile, or if the attribute is available but the cache does not have a schema with that identifier, then the Record Reader will proceed to infer the schema as described above.

The end result is that users are able to chain together many different Processors to operate on Record-oriented data. Typically, only the first such Processor in the chain will incur the "penalty" of inferring the schema. For all other Processors in the chain, the Record Reader is able to simply lookup the schema in the Schema Cache by identifier. This allows the Record Reader to infer a schema accurately, since it is inferred based on all data in the FlowFile, and still allows this to happen efficiently since the schema will typically only be inferred once, regardless of how many Processors handle the data.

Examples

Example 1

As an example, consider a FlowFile whose contents consists of the following:

id, name, balance, join_date, notes
1, John, 48.23, 04/03/2007 "Our very
first customer!"
2, Jane, 1245.89, 08/22/2009,
3, Frank Franklin, "48481.29", 04/04/2016,

Additionally, let's consider that this Controller Service is configured with the Schema Registry pointing to an AvroSchemaRegistry and the schema is configured as the following:

{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "name", "type": "string" },
    { "name": "balance", "type": "double" },
    { "name": "join_date", "type": {
      "type": "int",
      "logicalType": "date"
    }},
    { "name": "notes", "type": "string" }
  ]
}

In the example above, we see that the 'join_date' column is a Date type. In order for the CSV Reader to be able to properly parse a value as a date, we need to provide the reader with the date format to use. In this example, we would configure the Date Format property to be MM/dd/yyyy to indicate that it is a two-digit month, followed by a two-digit day, followed by a four-digit year - each separated by a slash. In this case, the result will be that this FlowFile consists of 3 different records. The first record will contain the following values:

Field Name Field Value
id 1
name John
balance 48.23
join_date 04/03/2007
notes Our very
first customer!

The second record will contain the following values:

Field Name Field Value
id 2
name Jane
balance 1245.89
join_date 08/22/2009
notes

The third record will contain the following values:

Field Name Field Value
id 3
name Frank Franklin
balance 48481.29
join_date 04/04/2016
notes

Example 2 - Schema with CSV Header Line

When CSV data consists of a header line that outlines the column names, the reader provides a couple of different properties for configuring how to handle these column names. The "Schema Access Strategy" property as well as the associated properties ("Schema Registry," "Schema Text," and "Schema Name" properties) can be used to specify how to obtain the schema. If the "Schema Access Strategy" is set to "Use String Fields From Header" then the header line of the CSV will be used to determine the schema. Otherwise, a schema will be referenced elsewhere. But what happens if a schema is obtained from a Schema Registry, for instance, and the CSV Header indicates a different set of column names?

For example, let's say that the following schema is obtained from the Schema Registry:

{
  "namespace": "nifi",
  "name": "balances",
  "type": "record",
  "fields": [
    { "name": "id", "type": "int" },
    { "name": "name", "type": "string" },
    { "name": "balance", "type": "double" },
    { "name": "memo", "type": "string" }
  ]
}

And the CSV contains the following data:

id, name, balance, notes
1, John Doe, 123.45, First Customer

Note here that our schema indicates that the final column is named "memo" whereas the CSV Header indicates that it is named "notes."

In this case, the reader will look at the "Ignore CSV Header Column Names" property. If this property is set to "true" then the column names provided in the CSV will simply be ignored and the last column will be called "memo." However, if the "Ignore CSV Header Column Names" property is set to "false" then the result will be that the last column will be named "notes" and each record will have a null value for the "memo" column.

With "Ignore CSV Header Column Names" property set to "false":

Field Name Field Value
id 1
name John Doe
balance 123.45
memo First Customer

With "Ignore CSV Header Column Names" property set to "true":

Field Name Field Value
id 1
name John Doe
balance 123.45
notes First Customer
memo null