A microservice that uses Kafka to listens to an event involving order creation and then insert the product orders to a PostgreSQL database.
Table of Contents
Kafka Streamer is a microservice that uses Kafka to listens to an event involving order creation and then insert the product orders to a Postgres database. The database operations are carried out using a DB sink connector which reads product_orders topic and writes the data to a table called orders in Postgres DB.
The overall goal of the assignment is to build a system that accepts product orders, stores them in a PostgreSQL database, forwards them to city-specific topics in Kafka, and sends an email to customers for high-priority orders.
Following technologies and libraries are used for the development of this project.
- Python
- Flask
- Kafka
- PostgreSQL
- Docker
- Docker Compose
- Shell/CLI
To setup the project locally follow the steps below
- Docker
- Docker Compose
- Terminal
- Fork and clone the project to your local system
- cd into the project and run
docker-compose build --no-cache
docker-compose up
- Then we have to create a table in postgre by running the following commands:
docker ps -a
<!-- copy the container id of db container -->
docker exec -it <container_id> /bin/bash
psql -U pritishsamal -p 5432 -h localhost -d order
<!-- Now run the following query: -->
CREATE TABLE orders (
order_id SERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
email VARCHAR(255) NOT NULL,
street VARCHAR(255) NOT NULL,
city VARCHAR(255) NOT NULL,
state VARCHAR(255) NOT NULL,
postal_code VARCHAR(10) NOT NULL,
product_name VARCHAR(255) NOT NULL,
quantity INTEGER NOT NULL,
order_date DATE NOT NULL,
priority VARCHAR(255) NOT NULL
);
Then if you run \dt
, you'll be able to see a table named orders
- Now, let's exec into kafka container & create a topic named
product_orders
. Run the following commands:
docker ps -a
<!-- copy the container id of kafka container -->
docker exec -it <container_id> /bin/bash
cd /opt/bitnami/kafka/bin
./kafka-topics.sh --create --topic product_orders --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
<!-- The following command will list the topics -->
./kafka-topics.sh --list --bootstrap-server localhost:9092
<!-- To run the consumer and see what events it is listening to in real-time, run -->
kafka-console-consumer.sh --topic "product_orders" --from-beginning --bootstrap-server localhost:9092
<!-- To run the producer for testing purposes and publish dummy orders, run -->
kafka-console-producer.sh --topic "product_orders" --bootstrap-server localhost:9092
- Now we have the server, broker as well as the database up and running.
- If visit 127.0.0.1:5000, you'll find the flask server running in this port. Now open postman and send a
POST
request to 127.0.0.1:5000/orders with the following payload:
{
"order_id": 90123,
"customer": {
"name": "James Clark",
"email": "[email protected]",
"address": {
"street": "Avenue Street",
"city": "Illinois",
"state": "Chicago",
"postal_code": "768987"
}
},
"product_name": "Protein Powder",
"quantity": 5,
"order_date": "2023-06-09",
"priority": "high"
}
- When we send a
POST
request to/orders
endpoint, the following set of events are carried out:
- the json is extracted from request body
- then, it will call
publish_to_kafka_topic
function to initialize a producer and send data to the kafka topic product_orders - Since the consumer is always running in a separate thread, it'll be able to listen whenever any new order is added to the product_orders topic via the
consume_and_send_emails
function - In this function, first the consumer is initialized and then we iterate through the
consumer
variable. - We then check the
priority
of the order. If it is high then we call thesend_email
function and then call thesave_order_to_postgres(order)
to write to db else directly write to db if priority is medium or low. - Once it is added to database via consumer, we then extract the
city
from theorder
object and invokecreate_topic_if_not_exists
function. This will create a new topic with the name of the city if not already present. - Once the respective city topic is created,
publish_to_kafka_topic
function will be called which will publish the order to it's respective city topic. - At last, the api will return a success message
- If we send a
GET
request to/orders
endpoint, it will simply establish connection with the db and then get all the orders and send it in the form of a json.