Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
dursunkoc committed Aug 18, 2022
0 parents commit 12adeb0
Show file tree
Hide file tree
Showing 8 changed files with 445 additions and 0 deletions.
147 changes: 147 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# Streaming ETL Using Kafka Connect & KSQL DB

This is a demo project for demonstrating data extraction from multiple tables/databases using debezium, joining them on the fly using ksqldb and storing into a different table/database.

![Architecture](./assets/architecture.png)

## Starting up the architecture

In order to run the infrastructure you should build and run the docker-compose file.

```bash
> docker-compose -f docker-compose.yml up --build --no-start
```

Start every container described in the `docker-compose.yml`,

```bash
> docker-compose -f docker-compose.yml start
```

## Building the ETL Pipeline

First we should create a source connector (debezium) for listening the changes in the inventory schema objects. First things first we should open up a terminal to connect to Ksql Server by running the following command, after that we will be using the terminal of ksqldb client created here.

```bash
> docker-compose exec ksqldb-cli ksql http://primary-ksqldb-server:8088
```

### Create Source Connector

The script for the source connector is avaliable at [mysql_source_connector](./mysql_source_connector.sql)

```bash
ksql> CREATE SOURCE CONNECTOR `mysql-connector` WITH(
> "connector.class"= 'io.debezium.connector.mysql.MySqlConnector',
> "tasks.max"= '1',
> "database.hostname"= 'mysql',
> "database.port"= '3306',
> "database.user"= 'root',
> "database.password"= 'debezium',
> "database.server.id"= '184054',
> "database.server.name"= 'dbserver1',
> "database.whitelist"= 'inventory',
> "table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',
> "database.history.kafka.bootstrap.servers"= 'kafka:9092',
> "database.history.kafka.topic"= 'schema-changes.inventory',
> "transforms"= 'unwrap',
> "transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',
> "key.converter"= 'org.apache.kafka.connect.json.JsonConverter',
> "key.converter.schemas.enable"= 'false',
> "value.converter"= 'org.apache.kafka.connect.json.JsonConverter',
> "value.converter.schemas.enable"= 'false');
```

After that you should be able to see the topics for the tables residing in the inventory schema at mysql.

```bash
ksql> show topics;
```

> **_NOTE:_** In order to keep the offset at begining during the demo please run the following command!
>
>```bash
>ksql> SET 'auto.offset.reset' = 'earliest';
>````
### Create Transformations with Streams and Tables
Run the following script which is avaliable in [transformation_scripts](./transformation_scripts.sql) creating stream and tables for the transformation.
```bash
ksql> CREATE STREAM S_CUSTOMER (ID INT,
> FIRST_NAME string,
> LAST_NAME string,
> EMAIL string)
> WITH (KAFKA_TOPIC='dbserver1.inventory.customers',
> VALUE_FORMAT='json');
>
>CREATE TABLE T_CUSTOMER
>AS
> SELECT id,
> latest_by_offset(first_name) as fist_name,
> latest_by_offset(last_name) as last_name,
> latest_by_offset(email) as email
> FROM s_customer
> GROUP BY id
> EMIT CHANGES;
>
>CREATE STREAM S_PRODUCT (ID INT,
> NAME string,
> description string,
> weight DOUBLE)
> WITH (KAFKA_TOPIC='dbserver1.inventory.products',
> VALUE_FORMAT='json');
>
>CREATE TABLE T_PRODUCT
>AS
> SELECT id,
> latest_by_offset(name) as name,
> latest_by_offset(description) as description,
> latest_by_offset(weight) as weight
> FROM s_product
> GROUP BY id
> EMIT CHANGES;
>
>CREATE STREAM s_order (
> order_number integer,
> order_date timestamp,
> purchaser integer,
> quantity integer,
> product_id integer)
> WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json');
>
>CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS
> select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id
> from s_order as o
>left join t_product as p on o.product_id = p.id
>left join t_customer as c on o.purchaser = c.id
>partition by o.order_number
>emit changes;
```
### Create Sink Connector
In order to load the final Order Stream into Postgresql Database run the script in [postgresql_sink_connector](./postgresql_sink_connector.sql)
```bash
ksql> CREATE SINK CONNECTOR `postgres-sink` WITH(
> "connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector',
> "tasks.max"= '1',
> "dialect.name"= 'PostgreSqlDatabaseDialect',
> "table.name.format"= 'ENRICHED_ORDER',
> "topics"= 'SA_ENRICHED_ORDER',
> "connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw',
> "auto.create"= 'true',
> "insert.mode"= 'upsert',
> "pk.fields"= 'ORDER_NUMBER',
> "pk.mode"= 'record_key',
> "key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter',
> "key.converter.schemas.enable" = 'false',
> "value.converter"= 'io.confluent.connect.avro.AvroConverter',
> "value.converter.schemas.enable" = 'true',
> "value.converter.schema.registry.url"= 'http://schema-registry:8081'
>);
```
Finally you can modify the data in mysql's orders, customers and products tables, and you can see the result in the ENRICHED_ORDER table at postgresql database.
101 changes: 101 additions & 0 deletions all_commands.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
CREATE SOURCE CONNECTOR `mysql-connector` WITH(
"connector.class"= 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max"= '1',
"database.hostname"= 'mysql',
"database.port"= '3306',
"database.user"= 'root',
"database.password"= 'debezium',
"database.server.id"= '184054',
"database.server.name"= 'dbserver1',
"database.whitelist"= 'inventory',
"table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',
"database.history.kafka.bootstrap.servers"= 'kafka:9092',
"database.history.kafka.topic"= 'schema-changes.inventory',
"transforms"= 'unwrap',
"transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',
"key.converter"= 'org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable"= 'false',
"value.converter"= 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable"= 'false');

show topics;

SET 'auto.offset.reset' = 'earliest';

PRINT "dbserver1.inventory.customers" FROM BEGINNING;

CREATE STREAM S_CUSTOMER (ID INT,
FIRST_NAME string,
LAST_NAME string,
EMAIL string)
WITH (KAFKA_TOPIC='dbserver1.inventory.customers',
VALUE_FORMAT='json');

CREATE TABLE T_CUSTOMER
AS
SELECT id,
latest_by_offset(first_name) as fist_name,
latest_by_offset(last_name) as last_name,
latest_by_offset(email) as email
FROM s_customer
GROUP BY id
EMIT CHANGES;

CREATE STREAM S_PRODUCT (ID INT,
NAME string,
description string,
weight DOUBLE)
WITH (KAFKA_TOPIC='dbserver1.inventory.products',
VALUE_FORMAT='json');

CREATE TABLE T_PRODUCT
AS
SELECT id,
latest_by_offset(name) as name,
latest_by_offset(description) as description,
latest_by_offset(weight) as weight
FROM s_product
GROUP BY id
EMIT CHANGES;

CREATE STREAM s_order (
order_number integer,
order_date timestamp,
purchaser integer,
quantity integer,
product_id integer)
WITH (KAFKA_TOPIC='dbserver1.inventory.orders',VALUE_FORMAT='json');

select o.order_number, o.quantity, p.name as product from s_order as o left join t_product as p on p.id = o.product_id emit changes;

select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id
from s_order as o
left join t_product as p on o.product_id = p.id
left join t_customer as c on o.purchaser = c.id
emit changes;

CREATE STREAM SA_ENRICHED_ORDER WITH (VALUE_FORMAT='AVRO') AS
select o.order_number, o.quantity, p.name as product, c.email as customer, p.id as product_id, c.id as customer_id
from s_order as o
left join t_product as p on o.product_id = p.id
left join t_customer as c on o.purchaser = c.id
partition by o.order_number
emit changes;

CREATE SINK CONNECTOR `postgres-sink` WITH(
"connector.class"= 'io.confluent.connect.jdbc.JdbcSinkConnector',
"tasks.max"= '1',
"dialect.name"= 'PostgreSqlDatabaseDialect',
"table.name.format"= 'ENRICHED_ORDER',
"topics"= 'SA_ENRICHED_ORDER',
"connection.url"= 'jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw',
"auto.create"= 'true',
"insert.mode"= 'upsert',
"pk.fields"= 'ORDER_NUMBER',
"pk.mode"= 'record_key',
"key.converter"= 'org.apache.kafka.connect.converters.IntegerConverter',
"key.converter.schemas.enable" = 'false',
"value.converter"= 'io.confluent.connect.avro.AvroConverter',
"value.converter.schemas.enable" = 'true',
"value.converter.schema.registry.url"= 'http://schema-registry:8081'
);
Binary file added assets/architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
17 changes: 17 additions & 0 deletions debezium-jdbc/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
ARG DEBEZIUM_VERSION=1.9

FROM quay.io/debezium/connect:$DEBEZIUM_VERSION

ARG POSTGRES_VERSION=42.2.8
ARG KAFKA_JDBC_VERSION=5.3.1

# Fetch and deploy PostgreSQL JDBC driver
RUN cd /kafka/libs && \
curl -sO https://repo1.maven.org/maven2/org/postgresql/postgresql/$POSTGRES_VERSION/postgresql-$POSTGRES_VERSION.jar

# Fetch and deploy Kafka Connect JDBC
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
RUN mkdir $KAFKA_CONNECT_JDBC_DIR

RUN cd $KAFKA_CONNECT_JDBC_DIR && \
curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar
95 changes: 95 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
version: '2'

services:
zookeeper:
image: confluentinc/cp-zookeeper:5.4.9
environment:
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-enterprise-kafka:5.4.9
ports:
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:32181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100

connect:
build:
context: debezium-jdbc
ports:
- 8083:8083
links:
- kafka
- mysql
- postgres
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses

schema-registry:
image: confluentinc/cp-schema-registry:5.4.9
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:9092

primary-ksqldb-server:
image: confluentinc/ksqldb-server:0.27.2
hostname: primary-ksqldb-server
container_name: primary-ksqldb-server
depends_on:
- kafka
- schema-registry
ports:
- "8088:8088"
environment:
KSQL_LISTENERS: http://0.0.0.0:8088
KSQL_BOOTSTRAP_SERVERS: kafka:9092
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KSQL_KSQL_CONNECT_URL: http://connect:8083
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"

mysql:
image: debezium/example-mysql:1.9.5.Final
ports:
- 3306:3306
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw

postgres:
image: debezium/postgres:14-alpine
ports:
- "5432:5432"
environment:
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw
- POSTGRES_DB=inventory

# Access the cli by running:
# > docker-compose exec ksqldb-cli ksql http://primary-ksqldb-server:8088
ksqldb-cli:
image: confluentinc/ksqldb-cli:0.27.2
container_name: ksqldb-cli
depends_on:
- primary-ksqldb-server
entrypoint: /bin/sh
tty: true
19 changes: 19 additions & 0 deletions mysql_source_connector.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
CREATE SOURCE CONNECTOR `mysql-connector` WITH(
"connector.class"= 'io.debezium.connector.mysql.MySqlConnector',
"tasks.max"= '1',
"database.hostname"= 'mysql',
"database.port"= '3306',
"database.user"= 'root',
"database.password"= 'debezium',
"database.server.id"= '184054',
"database.server.name"= 'dbserver1',
"database.whitelist"= 'inventory',
"table.whitelist"= 'inventory.customers,inventory.products,inventory.orders',
"database.history.kafka.bootstrap.servers"= 'kafka:9092',
"database.history.kafka.topic"= 'schema-changes.inventory',
"transforms"= 'unwrap',
"transforms.unwrap.type"= 'io.debezium.transforms.ExtractNewRecordState',
"key.converter"= 'org.apache.kafka.connect.json.JsonConverter',
"key.converter.schemas.enable"= 'false',
"value.converter"= 'org.apache.kafka.connect.json.JsonConverter',
"value.converter.schemas.enable"= 'false');
Loading

0 comments on commit 12adeb0

Please sign in to comment.