Skip to content

Latest commit

 

History

History

cloud-data-pipeline

Folders and files

NameName
Last commit message
Last commit date

parent directory

..
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Cloud-pipeline Subsystem

Build Status

About the pipeline

The repository includes basic code and tools required for readily setup a cloud component of a big data pipeline to enable developers and researchers perform additional testing of the code.

About the data

The data used for ingestion was a time-series temperature sensor data. The format was a serialized json array.

Sample data format:

[
   {
      "topic":"temprature/topic",
      "qos":0,
      "broker_publish_time":"2020-06-12 08:48:09",
      "broker_name":"edge_1",
      "temperature":0.8752983696943439,
      "node_name":"Node 2"
   },
   {
      "topic":"temprature/topic",
      "qos":0,
      "broker_publish_time":"2020-06-12 08:48:10",
      "broker_name":"edge_1",
      "temperature":9.135930601472666,
      "node_name":"Node 3"
   }
]

This data was being generated automatically from mini-batch publisher and multiple node-red flow nodes. (See an example of mini-batch-publisher here and of node-red flow here)


Different components

The overall workflow/architecture can be seen in Figure 1.

The different components available are:

  • zookeeper : Service discovery container required for apache kafka
  • kafka: Message broker with scalability (See how to scale up/down the service below)
  • database: MongoDB database for stream data ingestion
  • stream-processor: Provides streaming analytics by consuming kafka messages for a fixed window interval
  • database-processor: Provides database ingestion into the mongodb database
  • spark: The master/controller instance of Apache Spark Node
  • spark-worker: Worker nodes that connect the spark service (See how to scale up/down the service below).
  • mongo-express: Tool for connecting to database service

In addition, we also have Kafka message consumer code for debugging purposes in /Util directory.

architecture

  • Figure 1: Workflow and architecture of cloud pipeline sub-systems

Create the secret.env file

The sensitive parameters (such as password) are passed through secrets.env file. Create a secrets.env file in the base directory. The secrets.env file for this project looks like:

# MongoDB init creds
MONGO_INITDB_ROOT_USERNAME=root
MONGO_INITDB_ROOT_PASSWORD=password

# Mongo express creds
ME_CONFIG_MONGODB_ADMINUSERNAME=root
ME_CONFIG_MONGODB_ADMINPASSWORD=password

# ingestor creds
MONGO_USERNAME=root
MONGO_PASSWORD=password

Running the Kafka

To run the cloud pipeline service, we need to perform the following:

1. Starting Kafka

To start Kakfa, first run zookeeper:

$ docker-compose up -d zookeeper

Next start the Kafka brokers by:

$ docker-compose up --scale kafka=NUMBER_OF_BROKERS

Note the dynamic ports exposed by the scaled up brokers.

2. Start Mongo Databse

To start MongoDB, just run the command:

$ docker-compose up -d database

3. Start the database-consumer:

To start the Kafka consumer service, run the following command while Kafka is running:

$ docker-compose up  --scale kafka=NUMBER_OF_BROKERS  database-processor

Note: The Kafka Consumer requires a comma seperated list of Kafka brokers. It has to be provided in the entrypoint config of the docker-compose.yml file. Example: entrypoint: ["python3", "MongoIngestor.py", "192.168.1.12:32812,192.168.1.12:32814", "kafka-database-consumer-group-2"]

Important: Note the IP address and the ports of Kafka brokers that is provided in the entrypoint config

4. Start Apache Spark

To start spark, run the following docker-compose command

  • Start master/controller node
$ docker-compose up spark
  • Start multiple instances of worker nodes
$ docker-compose scale spark-worker=2

5. Start the stream-processor application

This computes the average of node's temperature for a fixed window duration. You can update the WINDOW_DURATION in the spark_processor.py. To start the stream-processor application, use the following command:

$ docker-compose up stream-processor

6. Start the database-ingestion application

Start this using:

$ docker-compose up --scale kafka=2 database-processor

Note: The database-processor and stream-processor applications both belong to separate consumer groups in Kafka. As such, running both of them will provide simultaneous stream ingestion and processing capability.

7. (optional) Kafka Message Consumer

The python script to consume any message on any topic in present in /Utils folder. Launch it as:

$ python3 client-report.py "kafka_broker" "topic_to_connect"

for example:

$ python3 client-report.py "192.168.1.12:32812,192.168.1.12:32814" report

Monitoring the application

The recommended application for monitoring is netdata.

architecture

  • Figure 2: Sample application monitoring (Notice the containers at the bottom right)

Some other interesting cases: