Convert CSV to complex JSON data using NiFi – Part 2

In this article, we are going to see the NiFi flow that converts CSV format data to complex JSON format data. What is complex JSON format? In our previous article, we have seen converting data from CSV to flat JSON format i.e. with out any nested documents. Here we are going to create final JSON document with nested fields/documents and this we call it complex JSON format data.

Data Sample

I am using Biodiversity by County – Distribution of Animals, Plants and Natural Communities data collected by the NYS Department of Environmental Conservation (DEC) and available in their website data.ny.gov.

county,category,taxonomic_group,taxonomic_subgroup,scientific_name,common_name,year_last_documented,ny_listing_status,federal_listing_status,state_conservation_rank,global_conservation_rank,distribution_status
Albany,Animal,Amphibians,Salamanders,Pseudotriton ruber,Red Salamander,1990-1999,Game with no open season,not listed,S3S4,G5,Recently Confirmed
Albany,Animal,Amphibians,Frogs and Toads,Lithobates palustris,Pickerel Frog,1990-1999,Game with open season,not listed,S5,G5,Recently Confirmed
Albany,Animal,Amphibians,Frogs and Toads,Lithobates clamitans,Green Frog,1990-1999,Game with open season,not listed,S5,G5,Recently Confirmed
Albany,Animal,Amphibians,Frogs and Toads,Lithobates catesbeianus,Bullfrog,1990-1999,Game with open season,not listed,S5,G5,Recently Confirmed

Expected Output Data

[
   {
     "county": "Albany",
     "category": "Animal",
     "year_last_documented": "1990-1999",
     "distribution_status": "Recently Confirmed",
     "group": {
       "taxonomic_group": "Amphibians",
       "taxonomic_subgroup": "Salamanders"
     },
     "species_name": {
       "scientific_name": "Pseudotriton ruber",
       "common_name": "Red Salamander"
     },
     "listing_status": {
       "ny_listing_status": "Game with no open season",
       "federal_listing_status": "not listed"
     },
     "conservation_rank": {
       "state_conservation_rank": "S3S4",
       "global_conservation_rank": "G5"
     }
   },
   {
     "county": "Albany",
     "category": "Animal",
     "year_last_documented": "1990-1999",
     "distribution_status": "Recently Confirmed",
     "group": {
       "taxonomic_group": "Amphibians",
       "taxonomic_subgroup": "Frogs and Toads"
     },
     "species_name": {
       "scientific_name": "Lithobates palustris",
       "common_name": "Pickerel Frog"
     },
     "listing_status": {
       "ny_listing_status": "Game with open season",
       "federal_listing_status": "not listed"
     },
     "conservation_rank": {
       "state_conservation_rank": "S5",
       "global_conservation_rank": "G5"
     }
   },
   {
     "county": "Albany",
     "category": "Animal",
     "year_last_documented": "1990-1999",
     "distribution_status": "Recently Confirmed",
     "group": {
       "taxonomic_group": "Amphibians",
       "taxonomic_subgroup": "Frogs and Toads"
     },
     "species_name": {
       "scientific_name": "Lithobates clamitans",
       "common_name": "Green Frog"
     },
     "listing_status": {
       "ny_listing_status": "Game with open season",
       "federal_listing_status": "not listed"
     },
     "conservation_rank": {
       "state_conservation_rank": "S5",
       "global_conservation_rank": "G5"
     }
   },
   {
     "county": "Albany",
     "category": "Animal",
     "year_last_documented": "1990-1999",
     "distribution_status": "Recently Confirmed",
     "group": {
       "taxonomic_group": "Amphibians",
       "taxonomic_subgroup": "Frogs and Toads"
     },
     "species_name": {
       "scientific_name": "Lithobates catesbeianus",
       "common_name": "Bullfrog"
     },
     "listing_status": {
       "ny_listing_status": "Game with open season",
       "federal_listing_status": "not listed"
     },
     "conservation_rank": {
       "state_conservation_rank": "S5",
       "global_conservation_rank": "G5"
     }
   }
 ]

CSV to Complex JSON Format – Flow Description

To achieve the above expected output, flow should read the data in CSV file and convert it into expected JSON format.

NiFi has JoltTransformJSON processor to create complex JSON data like this. ConvertRecord processor is suitable when we are expecting output as a flat JSON data but cannot be used to create complex JSON structures.

JoltTransformJSON processor works only with JSON data. So it is required that first we need to convert CSV data to flat JSON format data and use Jolt transformation to transform flat JSON data into expected complex data format.

Assumptions

Data files are present in local machine itself and NiFi has access to the files i.e. NiFi can read the contents of the file.

NiFi Processors

The flow required to solve the problem can be constructed using below processors in the given order.

  • GetFile
  • ConvertRecord
  • SplitJson
  • JoltTransformJSON
  • MergeRecord
  • PutFile

GetFile and ConvertRecord processors are taken as it is from part 1 flow along with required controller services. Also there is no change in schema used.

SplitJson

Splits a JSON File into multiple, separate FlowFiles for an array element specified by a JsonPath expression. This processor is used to split the flat JSON file, one document in a line format that we get from ConvertRecord processor, into individual JSON document.

After split, each individual document is sent to JoltTransformJSON processor as a flow file.

SplitJson processor
SplitJson processor

JoltTransformJSON

This processor applies a list of Jolt specifications given to the flow file JSON payload. A new FlowFile is created with transformed content and is routed to the ‘success’ relationship. If the JSON transform fails, the original FlowFile is routed to the ‘failure’ relationship.

In our case, we are using jolt-transform-chain operation with below specification.

For example,

  1. Specified to keep county and category fields as it is in new flow file
  2. Specified transformation to take all the fields in flat JSON payload ending with *_listing_status*, add it to JSON array ‘listing status’ and add this new array field to new flow file.

Specification

[
   {
     "operation": "shift",
     "spec": {
       "county": "county",
       "category": "category",
       "taxonomic_": "group.&(0,0)",
       "_name": "species_name.&(0,0)",
       "rewards_id": "rewards_id",
       "year_last_documented": "year_last_documented",
       "_listing_status": "listing_status.&(0,0)", 
       "_conservation_rank": "conservation_rank.&(0,0)",
       "distribution_status": "distribution_status"
     }
   }
 ]
JoltTransformJSON processor
JoltTransformJSON processor

Input JSON from SplitJson Processor

{
     "county": "Albany",
     "category": "Animal",
     "taxonomic_group": "Amphibians",
     "taxonomic_subgroup": "Salamanders",
     "scientific_name": "Pseudotriton ruber",
     "common_name": "Red Salamander",
     "year_last_documented": "1990-1999",
     "ny_listing_status": "Game with no open season",
     "federal_listing_status": "not listed",
     "state_conservation_rank": "S3S4",
     "global_conservation_rank": "G5",
     "distribution_status": "Recently Confirmed"
   }

Output JSON from JoltTransformJSON Processor

{
     "county": "Albany",
     "category": "Animal",
     "year_last_documented": "1990-1999",
     "distribution_status": "Recently Confirmed",
     "group": {
       "taxonomic_group": "Amphibians",
       "taxonomic_subgroup": "Salamanders"
     },
     "species_name": {
       "scientific_name": "Pseudotriton ruber",
       "common_name": "Red Salamander"
     },
     "listing_status": {
       "ny_listing_status": "Game with no open season",
       "federal_listing_status": "not listed"
     },
     "conservation_rank": {
       "state_conservation_rank": "S3S4",
       "global_conservation_rank": "G5"
     }
   }

MergeRecord

This Processor merges together multiple record-oriented FlowFiles, in our case JSON flow files, into a single FlowFile that contains all of the Records of the input FlowFiles.

We use SplitJson processor to split original flow file containing multiple records into multiple flow files each with one record. Finally we merge all transformed JSON flow files each containing one record into a single flow file with all transformed JSON data. Otherwise each flow file will be written as a new file on disk by PutFile processor(used to write converted output to disk).

MergeRecord processor uses controller services to read/write data and these two controller services are explained in below sections.

NiFi Controller Services

MergeRecord processor requires two controller services by default. One is to read the data from incoming flow file and other one is to write the data to outgoing flow flow after merging.

In this flow, we are using below controller services to read JSON data and merge all data into one JSON file.

  • JSONTreeReader
  • JSONRecordSetWriter

Note: Both controller services are using same schema to read and write data.

JSONTreeReader – Parses JSON into individual Record objects. The root JSON element can be either a single element or an array of JSON elements, and each element in that array will be treated as a separate record.

  • If the schema that is configured contains a field that is not present in the JSON, a null value will be used
  • If the JSON contains a field that is not present in the schema, that field will be skipped

JSONRecordSetWriter – Writes the results of a RecordSet as a JSON Array. Please note that even if the RecordSet consists of a single row, it will be written as an array with a single element.

  • We can control field separator (Default: comma), quote character (Default: double quote) and escape character (Default: backslash) used to parse CSV data.
  • Supports CSV files with comments.

Above two points are applicable for both controller services.

Schema to read and write

{
   "type": "record",
   "namespace": "forkedblog.world.biodiversity",
   "name": "forkedblog_world",
   "fields": [
     {
       "name": "county",
       "type": "string"
     },
     {
       "name": "category",
       "type": "string"
     },
     {
       "name": "group",
       "type": {
         "type": "record",
         "name": "groups_record",
         "fields": [
           {
             "name": "taxonomic_group",
             "type": "string"
           },
           {
             "name": "taxonomic_subgroup",
             "type": "string"
           }
         ]
       }
     },
     {
       "name": "species_name",
       "type": {
         "type": "record",
         "name": "name_group_record",
         "fields": [
           {
             "name": "scientific_name",
             "type": "string"
           },
           {
             "name": "common_name",
             "type": "string"
           }
         ]
       }
     },
     {
       "name": "year_last_documented",
       "type": "string"
     },
     {
       "name": "listing_status",
       "type": {
         "type": "record",
         "name": "listing_Status_record",
         "fields": [
           {
             "name": "ny_listing_status",
             "type": "string"
           },
           {
             "name": "federal_listing_status",
             "type": "string"
           }
         ]
       }
     },
     {
       "name": "conservation_rank",
       "type": {
         "type": "record",
         "name": "conversation_rank_record",
         "fields": [
           {
             "name": "state_conservation_rank",
             "type": "string"
           },
           {
             "name": "global_conservation_rank",
             "type": "string"
           }
         ]
       }
     },
     {
       "name": "distribution_status",
       "type": "string"
     }
   ]
 }

NiFi Flow – CSV to complex JSON conversion

Flow to convert from csv to complex json data
CSV to Complex JSON data conversion flow

This is the final NiFi flow created. UpdateAttribute processor can be used to change the file extension to .json.

Transformed Result JSON

[
{
"county": "Albany",
"category": "Animal",
"year_last_documented": "1990-1999",
"distribution_status": "Recently Confirmed",
"group": {
"taxonomic_group": "Amphibians",
"taxonomic_subgroup": "Salamanders"
},
"species_name": {
"scientific_name": "Pseudotriton ruber",
"common_name": "Red Salamander"
},
"listing_status": {
"ny_listing_status": "Game with no open season",
"federal_listing_status": "not listed"
},
"conservation_rank": {
"state_conservation_rank": "S3S4",
"global_conservation_rank": "G5"
}
},
{
"county": "Albany",
"category": "Animal",
"year_last_documented": "1990-1999",
"distribution_status": "Recently Confirmed",
"group": {
"taxonomic_group": "Amphibians",
"taxonomic_subgroup": "Frogs and Toads"
},
"species_name": {
"scientific_name": "Lithobates palustris",
"common_name": "Pickerel Frog"
},
"listing_status": {
"ny_listing_status": "Game with open season",
"federal_listing_status": "not listed"
},
"conservation_rank": {
"state_conservation_rank": "S5",
"global_conservation_rank": "G5"
}
},
{
"county": "Albany",
"category": "Animal",
"year_last_documented": "1990-1999",
"distribution_status": "Recently Confirmed",
"group": {
"taxonomic_group": "Amphibians",
"taxonomic_subgroup": "Frogs and Toads"
},
"species_name": {
"scientific_name": "Lithobates clamitans",
"common_name": "Green Frog"
},
"listing_status": {
"ny_listing_status": "Game with open season",
"federal_listing_status": "not listed"
},
"conservation_rank": {
"state_conservation_rank": "S5",
"global_conservation_rank": "G5"
}
},
{
"county": "Albany",
"category": "Animal",
"year_last_documented": "1990-1999",
"distribution_status": "Recently Confirmed",
"group": {
"taxonomic_group": "Amphibians",
"taxonomic_subgroup": "Frogs and Toads"
},
"species_name": {
"scientific_name": "Lithobates catesbeianus",
"common_name": "Bullfrog"
},
"listing_status": {
"ny_listing_status": "Game with open season",
"federal_listing_status": "not listed"
},
"conservation_rank": {
"state_conservation_rank": "S5",
"global_conservation_rank": "G5"
}
}
]

For more NiFi flows, visit our page.

Hope this is useful and you enjoyed reading. Please let us know your thoughts/queries/feedback in comments. Thank you.

Leave a Reply

avatar
  Subscribe  
Notify of