From 545a93fba9777977da455c20581bf4de4f16efcc Mon Sep 17 00:00:00 2001 From: Arshdeep Tinna Date: Thu, 20 Feb 2025 09:14:17 -0500 Subject: [PATCH] Create new kafka consumer client only if needed --- kafka_consumer/changelog.d/19658.fixed | 1 + .../datadog_checks/kafka_consumer/kafka_consumer.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) create mode 100644 kafka_consumer/changelog.d/19658.fixed diff --git a/kafka_consumer/changelog.d/19658.fixed b/kafka_consumer/changelog.d/19658.fixed new file mode 100644 index 0000000000000..28b53a971394d --- /dev/null +++ b/kafka_consumer/changelog.d/19658.fixed @@ -0,0 +1 @@ +Create new kafka consumer client only if needed when fetching highwater offsets diff --git a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py index 2b13670a8f88b..0b54abf1ab461 100644 --- a/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py +++ b/kafka_consumer/datadog_checks/kafka_consumer/kafka_consumer.py @@ -318,15 +318,22 @@ def get_highwater_offsets(self, consumer_offsets): topic_partition_checked = set() - for consumer_group, _topic, _partition in consumer_offsets: + sorted_consumer_offsets = dict(sorted(consumer_offsets.items(), key=lambda item: item[0][0])) + current_group = None + + for consumer_group, _topic, _partition in sorted_consumer_offsets: self.log.debug('CONSUMER GROUP: %s', consumer_group) if (_topic, _partition) in topic_partition_checked: self.log.debug('Highwater offset already collected for topic %s with partition %s', _topic, _partition) continue topic_partitions_for_highwater_offsets = set() + if current_group != consumer_group: + if current_group is not None: + self.client.close_consumer() + current_group = consumer_group + self.client.open_consumer(consumer_group) - self.client.open_consumer(consumer_group) cluster_id, topics = self.client.consumer_get_cluster_id_and_list_topics(consumer_group) for topic, partitions in topics: @@ -363,6 +370,7 @@ def get_highwater_offsets(self, consumer_offsets): else: self.log.debug('No new highwater offsets to query for consumer group %s', consumer_group) + if current_group is not None: self.client.close_consumer() self.log.debug('Got %s highwater offsets', len(highwater_offsets))