The JsonTreeReader Controller Service reads a JSON Object and creates a Record object either for the entire JSON Object tree or a subpart (see "Starting Field Strategies" section). The Controller Service must be configured with a Schema that describes the structure of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped. If the schema contains a field for which no JSON field exists, a null value will be used in the Record (or the default value defined in the schema, if applicable).
If the root element of the JSON is a JSON Array, each JSON Object within that array will be treated as its own separate Record. If the root element is a JSON Object, the JSON will all be treated as a single Record.
When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
The following rules apply when attempting to coerce a field value from one data type to another:
8
can be coerced into any numeric type. However, the String value 8.2
can be coerced into a Double or Float
type but not an Integer.If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception will be thrown.
While NiFi's Record API does require that each Record have a schema, it is often convenient to infer the schema based on the values in the data, rather than having to manually create a schema. This is accomplished by selecting a value of "Infer Schema" for the "Schema Access Strategy" property. When using this strategy, the Reader will determine the schema by first parsing all data in the FlowFile, keeping track of all fields that it has encountered and the type of each field. Once all data has been parsed, a schema is formed that encompasses all fields that have been encountered.
A common concern when inferring schemas is how to handle the condition of two values that have different types. For example, consider a FlowFile with the following two records:
[{
"name": "John",
"age": 8,
"values": "N/A"
}, {
"name": "Jane",
"age": "Ten",
"values": [ 8, "Ten" ]
}]
It is clear that the "name" field will be inferred as a STRING type. However, how should we handle the "age" field? Should the field be an CHOICE between INT and STRING? Should we prefer LONG over INT? Should we just use a STRING? Should the field be considered nullable?
To help understand how this Record Reader infers schemas, we have the following list of rules that are followed in the inference logic:
This Record Reader requires that if a schema is to be inferred, that all records be read in order to ensure that the schema that gets inferred is applicable for all records in the FlowFile. However, this can become expensive, especially if the data undergoes many different transformations. To alleviate the cost of inferring schemas, the Record Reader can be configured with a "Schema Inference Cache" by populating the property with that name. This is a Controller Service that can be shared by Record Readers and Record Writers.
Whenever a Record Writer is used to write data, if it is configured with a "Schema Cache," it will also add the schema to the Schema Cache. This will result in an identifier for that schema being added as an attribute to the FlowFile.
Whenever a Record Reader is used to read data, if it is configured with a "Schema Inference Cache", it will first look for a "schema.cache.identifier" attribute on the FlowFile. If the attribute exists, it will use the value of that attribute to lookup the schema in the schema cache. If it is able to find a schema in the cache with that identifier, then it will use that schema instead of reading, parsing, and analyzing the data to infer the schema. If the attribute is not available on the FlowFile, or if the attribute is available but the cache does not have a schema with that identifier, then the Record Reader will proceed to infer the schema as described above.
The end result is that users are able to chain together many different Processors to operate on Record-oriented data. Typically, only the first such Processor in the chain will incur the "penalty" of inferring the schema. For all other Processors in the chain, the Record Reader is able to simply lookup the schema in the Schema Cache by identifier. This allows the Record Reader to infer a schema accurately, since it is inferred based on all data in the FlowFile, and still allows this to happen efficiently since the schema will typically only be inferred once, regardless of how many Processors handle the data.
When using JsonTreeReader, two different starting field strategies can be selected. With the default Root Node strategy, the JsonTreeReader begins processing from the root element of the JSON and creates a Record object for the entire JSON Object tree, while the Nested Field strategy defines a nested field from which to begin processing.
Using the Nested Field strategy, a schema corresponding to the nested JSON part should be specified. In case of schema inference, the JsonTreeReader will automatically infer a schema from nested records.
Consider the following JSON is read with the default Root Node strategy:
[
{
"id": 17,
"name": "John",
"child": {
"id": "1"
},
"dob": "10-29-1982",
"siblings": [
{
"name": "Jeremy",
"id": 4
},
{
"name": "Julia",
"id": 8
}
]
},
{
"id": 98,
"name": "Jane",
"child": {
"id": 2
},
"dob": "08-30-1984",
"gender": "F",
"siblingIds": [],
"siblings": []
}
]
Also, consider that the schema that is configured for this JSON is as follows (assuming that the AvroSchemaRegistry Controller Service is chosen to denote the Schema):
{
"namespace": "nifi",
"name": "person",
"type": "record",
"fields": [
{ "name": "id", "type": "int" },
{ "name": "name", "type": "string" },
{ "name": "gender", "type": "string" },
{ "name": "dob", "type": {
"type": "int",
"logicalType": "date"
}},
{ "name": "siblings", "type": {
"type": "array",
"items": {
"type": "record",
"fields": [
{ "name": "name", "type": "string" }
]
}
}}
]
}
Let us also assume that this Controller Service is configured with the "Date Format" property set to "MM-dd-yyyy", as this matches the date format used for our JSON data. This will result in the JSON creating two separate records, because the root element is a JSON array with two elements.
The first Record will consist of the following values:
Field Name | Field Value | ||||||||
---|---|---|---|---|---|---|---|---|---|
id | 17 | ||||||||
name | John | ||||||||
gender | null | ||||||||
dob | 11-30-1983 | ||||||||
siblings |
array with two elements, each of which is itself a Record:
and:
|
The second Record will consist of the following values:
Field Name | Field Value |
---|---|
id | 98 |
name | Jane |
gender | F |
dob | 08-30-1984 |
siblings | empty array |
Using the Nested Field strategy, consider the same JSON where the specified Starting Field Name is "siblings". The schema that is configured for this JSON is as follows:
{
"namespace": "nifi",
"name": "siblings",
"type": "record",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "id", "type": "int" }
]
}
The first Record will consist of the following values:
Field Name | Field Value |
---|---|
name | Jeremy |
id | 4 |
The second Record will consist of the following values:
Field Name | Field Value |
---|---|
name | Julia |
id | 8 |
When using JsonTreeReader with "Nested Field Strategy" and the "Schema Access Strategy" is not "Infer Schema", it can be configured for the entire original JSON ("Whole document" strategy) or for the nested field section ("Selected part" strategy).