This repository demonstrates how to integrate with IBM technologies (IBM MQ and DB2). Two connectors will be started up: Datagen source, to mock clickstream data and IBM MQ Connetor source. Then we'll use KSQL to join the two sources together. We'll also configure a IBM DB2 source connector to read data from DB2. The resut of the ksqlDB join will be sent to IBM MQ using a sink connector.
Using your terminal, Download the zip containing this confluentinc/demo-scene GitHub repository..
wget http://github.com/confluentinc/demo-scene/archive/master.zip
Then unzip the file and enter in the directory demo-scene-master/ibm-demo from your teminal. If you are using a Mac or similar commands should be:
unzip master.zip
cd demo-scene-master/ibm-demo
This step will spin up the Confluent Platform cluster and the IBM DB2 and IBM MQ servers.
make build
make cluster
# wait a minute for cluster to spinup
With these commands we create the topics we need
make topic
UserName=admin
Password=passw0rd
You need to send a message to IBM MQ before the schema will appear in the topic in C3.
- Select
DEV.QUEUE.1
under "Queues on MQ1"
- Add a message
Notice that the messages are not consumed yet...
Access Confluent Control Center Here you can see your local Confluent cluster, and the topics created before.
Now we configure the connector so we can read data from IBM MQ
make connectsource
# wait a minute before moving on to the next step
- You can now see the schema assigned to the
ibmmq
topic
Run the ibmmq consumer to see messages coming in from DEV.QUEUE.1
(or check in C3)
make consumer
You can also see in IBM MQ that the messages are not there anymore.
In Confluent Control Center , Select the cluster tile, Click on ksqlDB on the left menu , and select the ksqldb1 cluster.
Using the editor run the queries below:
CREATE STREAM CLICKSTREAM
WITH (KAFKA_TOPIC='clickstream',
VALUE_FORMAT='AVRO');
Send another message to IBM MQ. You can use the user names bobk_43
or akatz1022
to capture clickstreams for those users with a KSQL join.
CREATE STREAM ibmmq
WITH (KAFKA_TOPIC='ibmmq',
VALUE_FORMAT='AVRO');
Click on Add query properties and select auto.offset.reset = Earliest
SELECT * FROM ibmmq
EMIT CHANGES;
SELECT "TEXT" FROM ibmmq
EMIT CHANGES;
Paste the KSQL statement into the KSQL Editor to perform the join.
CREATE STREAM VIP_USERS AS
select * from CLICKSTREAM
join IBMMQ WITHIN 5 seconds
on text = username emit changes;
SELECT * FROM VIP_USERS
emit changes;
This query will return you values only if you added messages in IBM MQ that will match usernames in the CLICKSTREAM stream/topic (as instructed above).
docker exec -ti ibmdb2 bash -c "su - db2inst1"
db2 connect to sample user db2inst1 using passw0rd
db2 LIST TABLES
You can now exit db2
exit
Now you can create the connector to load the data from db2
make connectdb2source
You will see that the connector automatically creates data in Confluent. Check in Confluent Control Center , under topics.
You can also see the connectors created by clicking on the Connect link in the left menu.
Let's sink the new stream data into IBM MQ into DEV.QUEUE.2
make connectsink
You can see the data by loggin in
UserName=admin
Password=passw0rd
When you are done with the demo execute the command:
make down
docker exec -ti ibmdb2 bash -c "su - db2inst1"
db2 get dbm cfg | grep "SVCENAME"
grep "db2c_db2inst1" /etc/services
db2level