From f03fee0ee7c29ec64426a0a4bf3c5ca40321dc43 Mon Sep 17 00:00:00 2001 From: MenD <92942774+AmitMendl@users.noreply.github.com> Date: Mon, 1 Jan 2024 08:38:41 +0200 Subject: [PATCH] fix: added Errors() read from kafkaSensor's producer (#2959) Signed-off-by: AmitMendl --- eventbus/kafka/sensor/kafka_sensor.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/eventbus/kafka/sensor/kafka_sensor.go b/eventbus/kafka/sensor/kafka_sensor.go index 593419cf8d..1f39403701 100644 --- a/eventbus/kafka/sensor/kafka_sensor.go +++ b/eventbus/kafka/sensor/kafka_sensor.go @@ -132,6 +132,13 @@ func (s *KafkaSensor) Initialize() error { return err } + // producer is at risk of deadlocking if Errors channel isn't read. + go func() { + for err := range producer.Errors() { + s.Logger.Errorf("Kafka producer error", zap.Error(err)) + } + }() + s.client = client s.consumer = consumer s.kafkaHandler = &KafkaHandler{