Insert JSON data to MongoDB collections using NiFi

In this article, we are going to see how to extract JSON data from a file and insert it into a collection in MongoDB using NiFi processors.

Data

Lets reuse the data, word population, from our previous article which explains steps to insert JSON data into MySQL table with NiFi flow.

[{
"world_rank": "1",
"country": "China",
"population": "1388232694",
"World": "0.185"
}, {
"world_rank": "2",
"country": "India",
"population": "1342512706",
"World": "0.179"
},...]

Problem Statement

Our goal here is to parse the JSON data file, extract the field names and values separately and feed the extracted data as a document into MongoDB collections.

NiFi Processors

The flow required to solve the problem can be constructed using below processors.

  • GetFile processor
  • PutMongoRecord processor

GetFile

Read the data file from specified location in the local machine, converts it into flow file and give to downstream processor. Minimum requirement is to provide file path configuration. All other configurations can be tuned according to the need. For example, if your data files were generated every 5 seconds by source, you could set ‘polling interval‘ parameter to 5 seconds to avoid high frequency polling.

PutMongoRecord

From Apache docs, PutMongoRecord processor is a record-aware processor for inserting data into MongoDB. It uses a configured record reader and schema to read an incoming record set from the body of a flowfile and then inserts batches of those records into a configured MongoDB collection.

There is no support for updates, deletes or upserts in this processor. The number of documents to insert at a time is controlled by the “Insert Batch Size” configuration property. This value should be set to a reasonable size to ensure that MongoDB is not overloaded with too many inserts at once.

We are using ‘Client Service’ parameter to configure MongoDBControllerService which provides connection to database. Hence we are not using ‘Mongo URI’ parameter. This settings makes ‘Client Service’ a mandatory parameter.

Apart from ‘Client Service’ following parameters Database name, collection name, write concern, record reader (JsonTreeReader Controller service) and batch size are mandatory.

PutMongoRecord processor
PutMongoRecord processor

Controller Services

PutMongoRecord processor requires below controller services to get connection pool to MongoDB database and read JSON data from incoming record.

  • MongoDBControllerService
  • JsonTreeReader

MongoDBControllerService

Provides Database Connection Pooling Service. Connections can be asked from pool and returned after usage. ‘Mongo URI’ is mandatory parameter and used to get connection to database.

MongoDBControllerService
MongoDBControllerService

JsonTreeReader

This is used to read Json data from incoming flow file. Schema text property specifies the format of incoming data.

{
"type": "record",
"namespace": "forkedblog.world.population",
"name" : "forkedblog_world",
"fields" : [
{ "name": "world_rank", "type":"string"},
{ "name": "country", "type":"string"},
{ "name": "population", "type":"string"},
{ "name": "world", "type":"string"}
]
}
JsonTreeReader controller service
JsonTreeReader controller service

NiFi Flow

NiFi flow is given below adding all processors in flow.

NiFi Flow
NiFi Flow

When Database/collection is missing

New database/collection is created by MongoDB, if not found already. So no error message will be displayed in NiFi bulletins

When incoming data is not as per schema?

Any missing attributes will not be present in inserted document and any additional attributes will not be included in inserted document

Hope this is useful and you enjoyed reading. Please let us know your thoughts/queries/feedback in comments. Thank you.

Leave a Reply

avatar
  Subscribe  
Notify of