Skip to content

Commit

Permalink
fix(kafka-source-connector): switch to confluent_kafka_python library
Browse files Browse the repository at this point in the history
It was decided to switch from kafka-python to confluent_kafka_python library,
as a Kafka client in sdcm.kafka.kafka_consumer module.
confluent_kafka_python is more feature reach (it has support for kafka messages schemas
serialization/deserialization out of the box) and actively maintained.
  • Loading branch information
dimakr authored and fruch committed Oct 28, 2024
1 parent 56a1d5e commit 7cbe4d5
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 26 deletions.
1 change: 0 additions & 1 deletion requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,5 @@ hdrhistogram==0.9.2
deepdiff==6.2.3
PyGithub==2.1.1
gimme-aws-creds==2.8.0
kafka-python==2.0.2
confluent-kafka==2.5.3
fastavro==1.9.7
4 changes: 0 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -935,10 +935,6 @@ jwcrypto==1.5.6 \
--hash=sha256:150d2b0ebbdb8f40b77f543fb44ffd2baeff48788be71f67f03566692fd55789 \
--hash=sha256:771a87762a0c081ae6166958a954f80848820b2ab066937dc8b8379d65b1b039
# via okta
kafka-python==2.0.2 \
--hash=sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3 \
--hash=sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e
# via -r requirements.in
keyring==25.4.1 \
--hash=sha256:5426f817cf7f6f007ba5ec722b1bcad95a75b27d780343772ad76b17cb47b0bf \
--hash=sha256:b07ebc55f3e8ed86ac81dd31ef14e81ace9dd9c3d4b5d77a6e9a2016d0d71a1b
Expand Down
46 changes: 25 additions & 21 deletions sdcm/kafka/kafka_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
import base64
import json
import logging
import time

from threading import Event, Thread

import kafka
from confluent_kafka import Consumer

from sdcm.sct_config import SCTConfiguration
from sdcm.kafka.kafka_config import SctKafkaConfiguration
Expand Down Expand Up @@ -46,18 +47,19 @@ def __init__(self, tester, params: SCTConfiguration, kafka_addresses: list | Non
self.read_number_of_key = int(kwargs.get('read_number_of_key', 0))

connector_config: SctKafkaConfiguration = params.get("kafka_connectors")[connector_index]
consumer_config = {
'bootstrap.servers': ','.join(self.kafka_addresses),
'group.id': self.group_id,
'auto.offset.reset': 'earliest',
'enable.auto.commit': True,
'auto.commit.interval.ms': 1000,
}
self.consumer = Consumer(consumer_config)

# TODO: handle setup of multiple tables
topic = f'{connector_config.config.scylla_name}.{connector_config.config.scylla_table_names}'
self.wait_for_topic(topic, timeout=60)
self.consumer = kafka.KafkaConsumer(
topic,
auto_offset_reset='earliest',
enable_auto_commit=True,
auto_commit_interval_ms=1000,
group_id=self.group_id,
bootstrap_servers=self.kafka_addresses,
)
self.consumer.subscribe([topic])

super().__init__(daemon=True)

Expand All @@ -70,8 +72,7 @@ def kafka_addresses(self):
return None

def get_topics(self):
admin_client = kafka.KafkaAdminClient(bootstrap_servers=self.kafka_addresses)
topics = admin_client.list_topics()
topics = list(self.consumer.list_topics(timeout=10).topics.keys())
LOGGER.debug(topics)
return topics

Expand All @@ -84,16 +85,19 @@ def check_topic_exists():

def run(self):
while not self.termination_event.is_set():
records = self.consumer.poll(timeout_ms=1000)
for _, consumer_records in records.items():
for msg in consumer_records:
data = json.loads(msg.value).get('payload', {}).get('after', {})
key = base64.b64decode(data.get('key')).decode()
self.keys.add(key)

if len(self.keys) >= self.read_number_of_key:
LOGGER.info("reach `read_number_of_key` stopping reader thread")
self.stop()
msgs = self.consumer.consume(num_messages=self.read_number_of_key, timeout=1.0)
if not msgs:
time.sleep(0.5)
continue
for msg in msgs:
data = json.loads(msg.value()).get('payload', {}).get('after', {})
key = base64.b64decode(data.get('key')).decode()
self.keys.add(key)

if len(self.keys) >= self.read_number_of_key:
LOGGER.info("reach `read_number_of_key` stopping reader thread")
self.stop()
break

def stop(self):
self.termination_event.set()
Expand Down

0 comments on commit 7cbe4d5

Please sign in to comment.