Skip to content

Commit ce78521

Browse files
committed
Added Confluent Kafka Interface Folder
1 parent b1e2c9e commit ce78521

File tree

5 files changed

+272
-0
lines changed

5 files changed

+272
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
2+
%% TO CREATE THE SOURCE
3+
-------------------------------------------------------------------------------------------------
4+
-------------------------------------------------------------------------------------------------
5+
6+
CREATE SOURCE CONNECTOR SOURCE_TWITTER_01 WITH (
7+
'connector.class' = 'com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector',
8+
'twitter.oauth.accessToken' = 'YOUR_OWN_TWITTER_ACCESS_TOKEN',
9+
'twitter.oauth.consumerSecret' = 'YOUR_OWN_TWITTER_SECRET_CONSUMER_KEY',
10+
'twitter.oauth.consumerKey' = 'YOUR_OWN_TWITTER_CONSUMER_KEY',
11+
'twitter.oauth.accessTokenSecret' = 'YOUR_OWN_TWITTER_SECRET_ACCESS_TOKEN',
12+
'kafka.status.topic' = 'twitter_01',
13+
'process.deletes' = false,
14+
'filter.keywords' = '#Keywords,for,#the, #tweets, #to, be, #retrieved'
15+
);
16+
17+
-------------------------------------------------------------------------------------------------
18+
-------------------------------------------------------------------------------------------------
19+
20+
%% TO START THE TWEET STREAM
21+
22+
-------------------------------------------------------------------------------------------------
23+
-------------------------------------------------------------------------------------------------
24+
25+
CREATE STREAM TWEETS WITH (KAFKA_TOPIC='twitter_01', VALUE_FORMAT='AVRO');
26+
27+
-------------------------------------------------------------------------------------------------
28+
-------------------------------------------------------------------------------------------------
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
TWITTER_CONSUMERKEY='YOUR_OWN_TWITTER_CONSUMER_KEY'
2+
TWITTER_CONSUMERSECRET='YOUR_OWN_TWITTER_SECRET_CONSUMER_KEY'
3+
TWITTER_ACCESSTOKEN='YOUR_OWN_TWITTER_ACCESS_TOKEN'
4+
TWITTER_ACCESSTOKENSECRET='YOUR_OWN_TWITTER_SECRET_ACCESS_TOKEN'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
---
2+
version: '3'
3+
services:
4+
zookeeper:
5+
image: confluentinc/cp-zookeeper:5.5.0
6+
container_name: zookeeper
7+
environment:
8+
ZOOKEEPER_CLIENT_PORT: 2181
9+
ZOOKEEPER_TICK_TIME: 2000
10+
11+
kafka:
12+
image: confluentinc/cp-enterprise-kafka:5.5.0
13+
container_name: kafka
14+
depends_on:
15+
- zookeeper
16+
ports:
17+
- 9092:9092
18+
environment:
19+
KAFKA_BROKER_ID: 1
20+
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
21+
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,HOST:PLAINTEXT
22+
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
23+
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,HOST://localhost:9092
24+
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
25+
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
26+
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
27+
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
28+
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
29+
30+
schema-registry:
31+
image: confluentinc/cp-schema-registry:5.5.0
32+
ports:
33+
- 8081:8081
34+
container_name: schema-registry
35+
depends_on:
36+
- kafka
37+
environment:
38+
SCHEMA_REGISTRY_HOST_NAME: schema-registry
39+
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka:29092
40+
SCHEMA_REGISTRY_CUB_KAFKA_TIMEOUT: 300
41+
42+
ksqldb:
43+
image: confluentinc/ksqldb-server:0.9.0
44+
hostname: ksqldb
45+
container_name: ksqldb
46+
depends_on:
47+
- kafka
48+
- kafka-connect
49+
ports:
50+
- "8088:8088"
51+
environment:
52+
KSQL_LISTENERS: http://0.0.0.0:8088
53+
KSQL_BOOTSTRAP_SERVERS: kafka:29092
54+
KSQL_KSQL_CONNECT_URL: http://kafka-connect:8083
55+
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
56+
KSQL_KSQL_HIDDEN_TOPICS: '^_.*,default_ksql_processing_log'
57+
58+
kafka-connect:
59+
image: confluentinc/cp-kafka-connect:5.4.1
60+
container_name: kafka-connect
61+
depends_on:
62+
- kafka
63+
- schema-registry
64+
ports:
65+
- 8083:8083
66+
environment:
67+
CONNECT_CUB_KAFKA_TIMEOUT: 300
68+
CONNECT_BOOTSTRAP_SERVERS: "kafka:29092"
69+
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect'
70+
CONNECT_REST_PORT: 8083
71+
CONNECT_GROUP_ID: kafka-connect-group-01
72+
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-configs
73+
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-offsets
74+
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-status
75+
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
76+
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
77+
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
78+
CONNECT_INTERNAL_KEY_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
79+
CONNECT_INTERNAL_VALUE_CONVERTER: 'org.apache.kafka.connect.json.JsonConverter'
80+
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
81+
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
82+
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
83+
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '1'
84+
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '1'
85+
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '1'
86+
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars'
87+
# External secrets config
88+
# See https://docs.confluent.io/current/connect/security.html#externalizing-secrets
89+
CONNECT_CONFIG_PROVIDERS: 'file'
90+
CONNECT_CONFIG_PROVIDERS_FILE_CLASS: 'org.apache.kafka.common.config.provider.FileConfigProvider'
91+
volumes:
92+
- CURRENT_WORKING_DIRECTORY/credentials.properties:/data/credentials.properties
93+
command:
94+
# In the command section, $ are replaced with $$ to avoid the error 'Invalid interpolation format for "command" option'
95+
- bash
96+
- -c
97+
- |
98+
echo "Installing connector plugins"
99+
confluent-hub install --no-prompt jcustenborder/kafka-connect-twitter:0.3.33
100+
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.7
101+
#
102+
echo "Launching Kafka Connect worker"
103+
/etc/confluent/docker/run &
104+
#
105+
sleep infinity
106+
postgres:
107+
# *-----------------------------*
108+
# To connect to the DB:
109+
# docker exec -it postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB'
110+
# *-----------------------------*
111+
image: postgres:11
112+
container_name: postgres
113+
ports:
114+
- 5432:5432
115+
environment:
116+
- POSTGRES_USER=postgres
117+
- POSTGRES_PASSWORD=postgres
118+
volumes:
119+
- ./data/postgres:/docker-entrypoint-initdb.d/
120+
121+
neo4j:
122+
image: neo4j:4.0.4
123+
container_name: neo4j
124+
ports:
125+
- "7474:7474"
126+
- "7687:7687"
127+
environment:
128+
NEO4J_AUTH: neo4j/connect
129+
NEO4J_dbms_memory_heap_max__size: 8G
130+
NEO4J_ACCEPT_LICENSE_AGREEMENT: 'yes'

Confluent_Kafka_Interface/ksql_retrieve.ipynb

+102
Large diffs are not rendered by default.

Confluent_Kafka_Interface/read.me

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
= ksqlDB Stream.Bit setup
2+
3+
1. Get Twitter API credentials from https://developer.twitter.com/
4+
2. Update the file `credentials.properties` with the Twitter credentials
5+
3. Install Docker Desktop
6+
4. Run `docker-compose up -d` on a cmd instance. This will install and run the Confluent environment.
7+
5. On a new cmd follow the instruction on `TweetStreamInstructions.ksql`
8+
6. Run the Python Notebook `ksql_retrieve.ipynb` and it will start to fetch in real-time the tweet stream from the ksqlDB server.

0 commit comments

Comments
 (0)