Insert JSON data to MySQL table using NiFi

In this article, we are going to see how to extract JSON data and insert it into a table in MySQL database using NiFi processors.

Data

We have a world population data file which has data in below JSON format.

[{
"world_rank": "1",
"country": "China",
"population": "1388232694",
"World": "0.185"
}, {
"world_rank": "2",
"country": "India",
"population": "1342512706",
"World": "0.179"
},...]

The file has data for all countries, total 195. We need to read the file and insert the data into MySQL table. Note: Source data can be any and it is out of scope for this article.

Problem Statement

Goal here is to parse the JSON data file, extract the field names and values separately and feed the file data as rows into MySQL table.

Assumptions

Data files are present in local machine itself and NiFi has access to the files i.e. NiFi can read the contents of the file.

NiFi Processors

The flow required to solve the problem can be constructed using below processors.

  • GetFile processor
  • ConvertJSONToSQL processor
  • PutSQL processor

GetFile

Read the data file from specified location in the local machine, converts it into flow file and give to downstream processor.

ConvertJSONToSQL

The incoming FlowFile is expected to be flat JSON message, meaning that it consists of a single JSON element and each field maps to a simple type. If the incoming flow file meets the expectation, then this processor

  • converts JSON formatted FlowFile into an UPDATE, INSERT, or DELETE SQL statement.
  • does not directly insert data into Database
  • upon successful conversion, routes the original flow file to ‘original‘ relationship and SQL statement to ‘sql‘ relationship

PutSQL

This processor executes the SQL UPDATE or INSERT command and actually inserts the data into Database. The content of an incoming FlowFile is expected to be the SQL command to execute.

NiFi Controller Services

Both ConvertJSONToSQL and PutSQL processor requires below controller services to connect to database.

  • DBCPConnectionPool

DBCPConnectionPool

Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage. Requires following mandatory parameters

  • Database connection url with database name (Example: jdbc:mysql://localhost:3306/world)
  • Database driver class name (Example: com.mysql.jdbc.Driver)
  • Database driver location ( mysql-connector-java.jar file location in case of MySQL)
  • Database username
  • Database password (Stored as secret value)

Database Pre-requisites

  1. MySQL database up and running
  2. Table is created with desired format
  3. User has permission to add data to table

Table DDL statement

Table Name: world_population

create table world_population (world_rank varchar(4), country varchar(50) PRIMARY KEY, population varchar(30), World varchar(5));

NiFi Flow – Insert

NiFi flow to read and insert Json data into MySQL table
Complete NiFi flow to read and insert Json data into MySQL table
DBCPConnectionPool controller service to create database connection pool
DBCPConnectionPool controller service
ConvertJSONToSQL processor
ConvertJSONToSQL processor
PutSQL processor
PutSQL processor

You can start the process group or all processors after configuring it as mentioned above.

Possible Error Cases

There are couple of possible scenarios that may result in error

  1. Table not available in database
  2. JSON data fields mismatch with table columns

These two scenarios results in ProcessException

2019-05-04 07:53:54,491 ERROR [Timer-Driven Process Thread-2] o.a.n.p.standard.ConvertJSONToSQL ConvertJSONToSQL[id=72f15ab5-016a-1000-13e0-ac91be48c3b4] Failed to convert StandardFlowFileRecord[uuid=e93d9723-3381-4f95-8945-2eb615e824fa,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1556936170584-1, container=default, section=1], offset=560, length=114],offset=0,name=test.json,size=114] to a SQL INSERT statement due to org.apache.nifi.processor.exception.ProcessException: None of the fields in the JSON map to the columns defined by the world_population table; routing to failure: org.apache.nifi.processor.exception.ProcessException: None of the fields in the JSON map to the columns defined by the world_population table
org.apache.nifi.processor.exception.ProcessException: None of the fields in the JSON map to the columns defined by the world_population table
at org.apache.nifi.processors.standard.ConvertJSONToSQL.generateInsert(ConvertJSONToSQL.java:519)
at org.apache.nifi.processors.standard.ConvertJSONToSQL.onTrigger(ConvertJSONToSQL.java:385)
at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1165)
at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:203)
at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Hope this is useful and you enjoyed reading. Please let us know your thoughts/queries/feedback in comments. Thank you.

Leave a Reply

avatar
  Subscribe  
Notify of