From c4370ad9f1d67529f623b041626c9a2deadfa2cc Mon Sep 17 00:00:00 2001 From: dsohaliya-ontic Date: Sun, 22 Dec 2024 16:44:52 +0530 Subject: [PATCH 1/2] Fix for issue 437 --- kafka_exporter.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/kafka_exporter.go b/kafka_exporter.go index 1122501b..577fdfe5 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -635,24 +635,27 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) - e.mu.Lock() - if offset, ok := offset[topic][partition]; ok { - // If the topic is consumed by that consumer group, but no offset associated with the partition - // forcing lag to -1 to be able to alert on that + currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest) + if currentPartitionOffsetError != nil { + klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError) + } else { var lag int64 if offsetFetchResponseBlock.Offset == -1 { lag = -1 } else { - lag = offset - offsetFetchResponseBlock.Offset + if offset, ok := offset[topic][partition]; ok { + if currentPartitionOffset == -1 { + currentPartitionOffset = offset + } + } + lag = currentPartitionOffset - offsetFetchResponseBlock.Offset lagSum += lag } + ch <- prometheus.MustNewConstMetric( consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) - } else { - klog.Errorf("No offset of topic %s partition %d, cannot get consumer group lag", topic, partition) - } - e.mu.Unlock() + } } ch <- prometheus.MustNewConstMetric( consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic, From 4e4fc2512e410f0964e47df95fc4390e39291d82 Mon Sep 17 00:00:00 2001 From: dsohaliya-ontic Date: Sun, 22 Dec 2024 16:55:14 +0530 Subject: [PATCH 2/2] Added the lock while reading partition offset --- kafka_exporter.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kafka_exporter.go b/kafka_exporter.go index 577fdfe5..636188e8 100644 --- a/kafka_exporter.go +++ b/kafka_exporter.go @@ -635,6 +635,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { ch <- prometheus.MustNewConstMetric( consumergroupCurrentOffset, prometheus.GaugeValue, float64(currentOffset), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) + e.mu.Lock() currentPartitionOffset, currentPartitionOffsetError := e.client.GetOffset(topic, partition, sarama.OffsetNewest) if currentPartitionOffsetError != nil { klog.Errorf("Cannot get current offset of topic %s partition %d: %v", topic, partition, currentPartitionOffsetError) @@ -656,6 +657,7 @@ func (e *Exporter) collect(ch chan<- prometheus.Metric) { consumergroupLag, prometheus.GaugeValue, float64(lag), group.GroupId, topic, strconv.FormatInt(int64(partition), 10), ) } + e.mu.Unlock() } ch <- prometheus.MustNewConstMetric( consumergroupCurrentOffsetSum, prometheus.GaugeValue, float64(currentOffsetSum), group.GroupId, topic,