- This repo aims for implementing
Capturing Data Change
system by using Debezium and Structured Spark Streaming. - Written in Python and used Docker Compose for demonstration.
.
├── docs/ # attached documentation
├── env/ # service environments
├── main.py # main spark streaming service
├── pyspark.Dockerfile # base image for running spark streaming service
├── schemas/ # avro schemas registry interfaces
└── svc/ # service utility package
- to launch POC demo
$ docker-compose build && docker-compose up -d
- to observe the logging of particular service
$ docker-compose logs -f <service>
- tear down all services
$ docker-compose down
- Table CDC Streaming
- To capture data change on specific table.
- Usage
$ python main.py \
--cls "svc.streams.CDCMySQLTable" \
--opts "{\"app_name\": \"Customers Table CDC Stream\", \"table_name\": \"customers\"}"
- Tables with suffix
_cdc
is for capturing data changes on each source respectively
- For instance
customers
table schema
- CDC schema of
customers
table
- Notes:
op
field captured the action on source table:u
-> update,c
-> create,d
-> update
- Click Through Rate Streaming (WIP)
- Join
Clicks
andImpressions
stream to capture, calculate and stream the click through rate. - Usage
$ python main.py \
--cls "svc.streams.ClickThroughRateStreaming" \
--opts "{\"app_name\": \"Click Through Rate Stream\", \"click_table\": \"clicks\", \"impression_table\": \"impressions\"}"
- [WIP] Populate dummy data
- to launch a concrete stream which attached to existing systems
$ docker-compose build # to build cdc-ingestion image
$ docker run -it --rm --name cdc-addresses-stream \
--network your-compose-network \
--volume ${PWD}:/app \
--env-file env/kafka.env \
--env-file env/mysql.env \
--env-file env/connectors.env \
--env-file env/streamsvc.env \
cdc-ingestion \
main.py \
--cls "svc.streams.CDCMySQLTable" \
--opts "{\"app_name\": \"arbitrary stream\", \"table_name\": \"addresses\"}"
- to run kafka consumer on particular topic for debugging
$ docker run -it --rm --name avro-consumer \
--network your-compose-network \
debezium/connect:1.9 \
/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--property print.key=true \
--formatter io.confluent.kafka.formatter.AvroMessageFormatter \
--property schema.registry.url=http://schema-registry:8081 \
--topic dbserver1.inventory.customers