Convert CSV to JSON format data using NiFi – Part 1

In this article, we are going to see the NiFi flow that converts CSV format data to JSON format data.

Data Sample

I am using Biodiversity by County – Distribution of Animals, Plants and Natural Communities data collected by the NYS Department of Environmental Conservation (DEC) and available in their website data.ny.gov.

county,category,taxonomic_group,taxonomic_subgroup,scientific_name,common_name,year_last_documented,ny_listing_status,federal_listing_status,state_conservation_rank,global_conservation_rank,distribution_status
Albany,Animal,Amphibians,Salamanders,Pseudotriton ruber,Red Salamander,1990-1999,Game with no open season,not listed,S3S4,G5,Recently Confirmed
Albany,Animal,Amphibians,Frogs and Toads,Lithobates palustris,Pickerel Frog,1990-1999,Game with open season,not listed,S5,G5,Recently Confirmed
Albany,Animal,Amphibians,Frogs and Toads,Lithobates clamitans,Green Frog,1990-1999,Game with open season,not listed,S5,G5,Recently Confirmed
Albany,Animal,Amphibians,Frogs and Toads,Lithobates catesbeianus,Bullfrog,1990-1999,Game with open season,not listed,S5,G5,Recently Confirmed

Flow Description

Our flow requirement for this article is to read the data in CSV file and convert the data into JSON format. This is an example and in real time, we might be dealing with continuous stream of CSV data.

Assumptions

Data files are present in local machine itself and NiFi has access to the files i.e. NiFi can read the contents of the file.

NiFi Processors

The flow required to solve the problem can be constructed using below processors.

  • GetFile
  • ConvertRecord
  • UpdateAttribute
  • PutFile

GetFile

This processor reads the data file from specified location in the local machine, converts it into flow file and give the flow file to downstream processor. Requires source data directory path to be configured as ‘Input Directory’ parameter.

All files in the given directory will be pulled hence file name is not required. However you can control the files that will be pulled by giving file name Regular Expression filter. Minimum permission required is ‘read’ otherwise NiFi will ignore files.

ConvertRecord

This is a record oriented processor which converts record from one data format to another using configured Record Reader and Record Writer Controller Services.

The Reader and Writer must be configured with matching schemas which means the schemas must have the same field names as in incoming data field name.

This processor reads incoming flow file data using reader controller service and writes the data into output file using configured writer controller services. Schema is used to define the format of incoming/outgoing (converted) flow file.

For example, schema for our above input data must be as follows

{
     "type": "record",
     "namespace": "forkedblog.world.biodiversity",
     "name" : "forkedblog_world",
     "fields" : [
         { "name": "county", "type":"string"},
         { "name": "category", "type":"string"},
         { "name": "taxonomic_group", "type":"string"},
         { "name": "taxonomic_subgroup", "type":"string"},
         { "name": "scientific_name", "type":"string"},
         { "name": "common_name", "type":"string"},
         { "name": "year_last_documented", "type":"string"},
         { "name": "ny_listing_status", "type":"string"},
         { "name": "federal_listing_status", "type":"string"},
         { "name": "state_conservation_rank", "type":"string"},
         { "name": "global_conservation_rank", "type":"string"},
         { "name": "distribution_status", "type":"string"}
     ]
 }

Note: To see all schemas used in this flow, visit controller services section below.

ConvertRecord processor CSV to JSON data format
ConvertRecord processor

The types of the fields do not have to be the same if a field value can be type converted from one type to another. For example, ‘double’ type can be converted to either one fo the following types – string, double, or float.

Important behaviors of processor to note is listed below.

  • If any field is present in the input that is not present in the output, the field will be left out of the output.
  • If any field is specified in the output schema but is not present in the input data/schema, then the field will not be present in the output or will have a null value, depending on the writer.

NiFi Controller Services

ConvertRecord processor requires two controller services by default. One is to read the data from incoming flow file and other one is to write the data to outgoing flow flow after conversion.

In this flow, we are using below controller services to read CSV data and convert into JSON data.

  • CSVReader
  • JSONRecordSetWriter

CSVReader

Reads CSV-formatted data and returns each row in the CSV file as a separate record/flow file. First line of a FlowFile should specify the name of each column in the data – The header. The rest of the lines in the FlowFile should be valid CSV data that follows header in first line. The typical CSV file format.

We can control field separator (Default: comma), quote character (Default: double quote) and escape character (Default: backslash) used to parse CSV data. Supports CSV files with comments.

Schema:

{
     "type": "record",
     "namespace": "forkedblog.world.biodiversity",
     "name" : "forkedblog_world",
     "fields" : [
         { "name": "county", "type":"string"},
         { "name": "category", "type":"string"},
         { "name": "taxonomic_group", "type":"string"},
         { "name": "taxonomic_subgroup", "type":"string"},
         { "name": "scientific_name", "type":"string"},
         { "name": "common_name", "type":"string"},
         { "name": "year_last_documented", "type":"string"},
         { "name": "ny_listing_status", "type":"string"},
         { "name": "federal_listing_status", "type":"string"},
         { "name": "state_conservation_rank", "type":"string"},
         { "name": "global_conservation_rank", "type":"string"},
         { "name": "distribution_status", "type":"string"}
     ]
 }

Sample Screen:

CSVReader controller service
CSVReader ControllerService

JSONRecordSetWriter

Parses incoming CSV data and writes the results of a RecordSet as a JSON Array. Uses the schema specified to form new JSON document.

Schema:

{
   "type": "record",
   "namespace": "forkedblog.world.biodiversity",
   "name": "forkedblog_world",
   "fields": [
     {
       "name": "county",
       "type": "string"
     },
     {
       "name": "category",
       "type": "string"
     },
     {
       "name": "taxonomic_group",
       "type": "string"
     },
     {
       "name": "taxonomic_subgroup",
       "type": "string"
     },
     {
       "name": "scientific_name",
       "type": "string"
     },
     {
       "name": "common_name",
       "type": "string"
     },
     {
       "name": "year_last_documented",
       "type": "string"
     },
     {
       "name": "ny_listing_status",
       "type": "string"
     },
     {
       "name": "federal_listing_status",
       "type": "string"
     },
     {
       "name": "state_conservation_rank",
       "type": "string"
     },
     {
       "name": "global_conservation_rank",
       "type": "string"
     },
     {
       "name": "distribution_status",
       "type": "string"
     }
   ]
 }

Sample Screen:

JSONRecordSetWriter Controller Service
JSONRecordSetWriter ControllerService

NiFi Flow – CSV to JSON conversion

NiFi flow to convert CSV data to JSON format
NiFi Flow – CSV to JSON

Output Data in JSON format data

[
   {
     "county": "Albany",
     "category": "Animal",
     "taxonomic_group": "Amphibians",
     "taxonomic_subgroup": "Salamanders",
     "scientific_name": "Pseudotriton ruber",
     "common_name": "Red Salamander",
     "year_last_documented": "1990-1999",
     "ny_listing_status": "Game with no open season",
     "federal_listing_status": "not listed",
     "state_conservation_rank": "S3S4",
     "global_conservation_rank": "G5",
     "distribution_status": "Recently Confirmed"
   },
   {
     "county": "Albany",
     "category": "Animal",
     "taxonomic_group": "Amphibians",
     "taxonomic_subgroup": "Frogs and Toads",
     "scientific_name": "Lithobates palustris",
     "common_name": "Pickerel Frog",
     "year_last_documented": "1990-1999",
     "ny_listing_status": "Game with open season",
     "federal_listing_status": "not listed",
     "state_conservation_rank": "S5",
     "global_conservation_rank": "G5",
     "distribution_status": "Recently Confirmed"
   },
   {
     "county": "Albany",
     "category": "Animal",
     "taxonomic_group": "Amphibians",
     "taxonomic_subgroup": "Frogs and Toads",
     "scientific_name": "Lithobates clamitans",
     "common_name": "Green Frog",
     "year_last_documented": "1990-1999",
     "ny_listing_status": "Game with open season",
     "federal_listing_status": "not listed",
     "state_conservation_rank": "S5",
     "global_conservation_rank": "G5",
     "distribution_status": "Recently Confirmed"
   },
   {
     "county": "Albany",
     "category": "Animal",
     "taxonomic_group": "Amphibians",
     "taxonomic_subgroup": "Frogs and Toads",
     "scientific_name": "Lithobates catesbeianus",
     "common_name": "Bullfrog",
     "year_last_documented": "1990-1999",
     "ny_listing_status": "Game with open season",
     "federal_listing_status": "not listed",
     "state_conservation_rank": "S5",
     "global_conservation_rank": "G5",
     "distribution_status": "Recently Confirmed"
   }
 ]

This is the final NiFi flow created. I am using UpdateAttribute processor to change the file extension to .json. For more NiFi flows, visit our page.

Hope this is useful and you enjoyed reading. Please let us know your thoughts/queries/feedback in comments. Thank you.

Leave a Reply

avatar
  Subscribe  
Notify of