This guide provides a step-by-step walkthrough to set up a streaming pipeline from an OLTP database to Kafka and finally to ClickHouse using Debezium, Kafka Connect, and the ClickHouse Sink Connector in a Kubernetes environment.
- Step 1: Deploy Kafka Connect with Debezium
- Step 2: Deploy Debezium Source Connector for PostgreSQL
- Step 3: Deploy ClickHouse
- Step 4: Deploy ClickHouse Sink Connector
- Step 5: Verify the Setup
- Step 6: ClickHouse Integration in Metabase (Kubernetes)
- Documentation Reference
To use the io.debezium.connector.postgresql.PostgresConnector
and com.clickhouse.kafka.connect.ClickHouseSinkConnector
in your Kafka Connect setup, you need to create a custom Docker image with these plugins.
The Kafka Connect image building is now automated using Cloud Build CI/CD pipeline. This includes:
- Debezium PostgreSQL Connector:
io.debezium.connector.postgresql.PostgresConnector
- ClickHouse Sink Connector:
com.clickhouse.kafka.connect.ClickHouseSinkConnector
- Custom SMT Transformation Plugins: For data transformation during streaming
The custom Kafka Connect image is automatically built via Cloud Build CI/CD when changes are pushed to the repository. The build process:
- Triggers: Automatically triggered on code changes
- Build Process: Uses devops-kafka-connect/cloudbuild.yaml
- Plugin Installation: Automated via install-plugins.sh scripts
- Registry Push: Automatically pushes to
asia-docker.pkg.dev/sbx-ci-cd/public/devops-kafka-connectt:latest
Kafka Connect is required to run both Debezium (for Change Data Capture) and the ClickHouse Sink Connector (for storing data in ClickHouse).
-
Create a
kafka-connect.yaml
file with the necessary configurations for Kafka Connect. -
Apply the manifest to your Kubernetes cluster:
kubectl apply -f kafka/kafka-connect.yaml
Verify the Plugins: Once the Kafka Connect pod is running, verify that the plugins are available:
You should see both io.debezium.connector.postgresql.PostgresConnector
and com.clickhouse.kafka.connect.ClickHouseSinkConnector
in the list of available plugins.
Debezium monitors PostgreSQL and publishes changes to Kafka.
Run the following SQL commands in your PostgreSQL database to enable logical replication:
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 10;
ALTER SYSTEM SET max_wal_senders = 10;
SELECT pg_reload_conf();
Create a replication user:
CREATE USER debezium WITH REPLICATION ENCRYPTED PASSWORD '<your_password>';
GRANT USAGE ON SCHEMA public TO debezium;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO debezium;
GRANT SELECT ON ALL SEQUENCES IN SCHEMA public TO debezium;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON SEQUENCES TO debezium;
Create a publication for the tables you want to monitor:
-- this is a sample sql to create publication on selected tables
CREATE PUBLICATION dbz_publication
FOR TABLE
public.table_name1,
public.table_name2,
public.table_name3;
Adding tables to an Existing PostgreSQL Publication
To add more tables to your existing dbz_publication, you can use the ALTER PUBLICATION command:
ALTER PUBLICATION dbz_publication
ADD TABLE public.new_table_name1, public.new_table_name2;
Replace new_table_name1
and new_table_name2
with the actual names of the tables you want to add.
How to Remove Tables from Publication
ALTER PUBLICATION dbz_publication DROP TABLE public.table_name;
Replace table_name
with the actual names of the tables you want to drop from publication.
Verify the replication slots and publications:
SELECT * FROM pg_replication_slots;
SELECT * FROM pg_publication;
To clean up, use:
SELECT pg_drop_replication_slot('dbz');
DROP PUBLICATION dbz_publication;
-
Create a
debezium-postgres.json
file with the connector configuration. -
To capture all tables in the database, use:
"table.include.list": "public.*"
Or remove this line entirely to capture all tables by default.
-
Deploy the connector using sample config:
./insights-infra/Aiven/connectors/backbone_debezium_postgres.sh
To delete the connector:
# this is just a example command to delete connector curl -X DELETE http://localhost:8083/connectors/<connector>
ClickHouse will store the data streamed from Kafka.
-
Create a
clickhouse.yaml
file with the necessary configurations. -
Use Helm to deploy ClickHouse:
helm show values oci://registry-1.docker.io/bitnamicharts/clickhouse --version 8.0.10 > clickhouse/helm/values.yaml helm install clickhouse oci://registry-1.docker.io/bitnamicharts/clickhouse -f clickhouse/helm/values.yaml --version 8.0.10 -n kafka helm upgrade clickhouse oci://registry-1.docker.io/bitnamicharts/clickhouse -f clickhouse/helm/values.yaml --version 8.0.10 -n kafka
The ClickHouse Sink Connector pulls data from Kafka and inserts it into ClickHouse.
Run the following SQL in ClickHouse to create a table:
# This is a sample query to create tables in clickhouse
CREATE DATABASE sbx;
CREATE TABLE sbx.wms (
id Int64,
name String,
created_at DateTime
updatedAt DateTime
) ENGINE = ReplacingMergeTree(updatedAt)
ORDER BY (name, id);
Create all required tables using SQL queries given in sql-queries
folder.
-
Create a
clickhouse-sink.json
file with the connector configuration. -
Ensure all
topics
andtopic2TableMap
are present. -
Deploy the connector:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d @clickhouse/samples/clickhouse-sink.json
To update the connector:
curl -X PUT http://localhost:8083/connectors/samples/clickhouse-connect/config \ -H "Content-Type: application/json" \ -d @clickhouse/clickhouse-sink-update.json
To delete the connector:
curl -X DELETE http://localhost:8083/connectors/clickhouse-connect
List all Kafka topics:
kubectl exec -it <kafka-pod-name> -n dev -- kafka-topics.sh --list --bootstrap-server <kafka-url>:9092
You should see a topic like postgres.public.your_table
.
Consume messages from a Kafka topic:
kubectl exec -it <kafka-pod-name> -n dev -- kafka-console-consumer.sh --topic postgres.public.ccs_cedge --from-beginning --bootstrap-server <kafka-url>:9092
Query the data in ClickHouse:
clickhouse-client --host clickhouse --query "SELECT * FROM your_db.your_table;"
Run the following query to check the number of threads:
SELECT
name,
value
FROM system.settings
WHERE name = 'max_threads';
Metabase is an easy-to-use, open-source UI tool for asking questions about your data. It connects to ClickHouse using a JDBC driver. Follow these steps to deploy Metabase in a Kubernetes environment and integrate it with ClickHouse.
-
Download the latest ClickHouse plugin JAR using initContainer and mount the plugin via a shared volume:
apiVersion: apps/v1 kind: Deployment metadata: name: metabase labels: app: metabase spec: replicas: 1 selector: matchLabels: app: metabase template: metadata: labels: app: metabase spec: containers: - name: metabase image: metabase/metabase:latest ports: - containerPort: 3000 volumeMounts: - name: plugins mountPath: /plugins env: - name: MB_PLUGINS_DIR value: "/plugins" initContainers: - name: download-clickhouse-plugin image: curlimages/curl:latest command: [ "sh", "-c", "curl -L https://github.com/clickhouse/metabase-clickhouse-driver/releases/latest/download/clickhouse.metabase-driver.jar -o /plugins/clickhouse.metabase-driver.jar" ] volumeMounts: - name: plugins mountPath: /plugins volumes: - name: plugins emptyDir: {}
-
Apply the deployment and wait for metabase pod to load new driver.
-
Access Metabase and use driver.
- Open the Metabase UI in your browser (
https://<EXTERNAL-URL>
). - Go to Settings > Admin > Databases.
- Click Add Database.
- Fill in the following details:
- Database Type: Select
ClickHouse
. - Host: Enter the hostname or IP address of your ClickHouse service.
- Port: Enter the ClickHouse port (default is
8123
). - Database Name: Enter the name of your ClickHouse database (e.g.,
sbx
). - Username: Enter your ClickHouse username.
- Password: Enter your ClickHouse password.
- Database Type: Select
- Click Save to add the database.
Once the database is added, you can start creating dashboards, visualizations, and asking questions about your data in ClickHouse using Metabase.
- Ensure all Kubernetes resources are properly configured and deployed.
- Use secure credentials and secrets for sensitive configurations.
- Monitor the pipeline for errors and performance bottlenecks.
- Kafka Client Authentication for Google Cloud
- ClickHouse Kafka Connect Sink
- PostgreSQL Interface
- Avoid Nullable Columns
- Optimizing Types
- Denormalizing Data
- Connecting Metabase to ClickHouse
- I've operated petabyte-scale ClickHouse® clusters for 5 years
- Using Materialized Views in ClickHouse
- Materialized Views and JOINs
- Conditional Functions
- Refreshable Materialized View
- Handling DELETE operations with ClickHouse Kafka Connect Sink
- is_deleted