From 72deca30f9aa16b04860693a836c9797d36e4946 Mon Sep 17 00:00:00 2001 From: keremcankabadayi Date: Mon, 26 Feb 2024 10:11:54 +0300 Subject: [PATCH] feat: added topic exists check (#59) --- consumer.go | 26 ++++++++++++++++++++++++++ consumer_base.go | 4 ++++ 2 files changed, 30 insertions(+) diff --git a/consumer.go b/consumer.go index 66f5e6b..30c5853 100644 --- a/consumer.go +++ b/consumer.go @@ -1,6 +1,7 @@ package kafka import ( + "github.com/segmentio/kafka-go/topics" "time" "github.com/prometheus/client_golang/prometheus" @@ -55,6 +56,8 @@ func (c *consumer) GetMetricCollectors() []prometheus.Collector { func (c *consumer) Consume() { go c.subprocesses.Start() + c.topicExists() + c.wg.Add(1) go c.startConsume() @@ -157,3 +160,26 @@ func (c *consumer) process(message *Message) { c.metric.TotalProcessedMessagesCounter++ } } + +func (c *consumer) topicExists() { + list, err := topics.List(c.context, &kafka.Client{ + Addr: kafka.TCP(c.brokers...), + Timeout: 3 * time.Second, + }) + + if err != nil { + c.logger.Errorf("Topic Exists Function Err %s", err.Error()) + } + + var exist bool + for i := range list { + if list[i].Name == c.topic { + exist = true + break + } + } + + if !exist { + c.logger.Errorf("Topic doesn't exist") + } +} diff --git a/consumer_base.go b/consumer_base.go index d429ec5..2ad6cc3 100644 --- a/consumer_base.go +++ b/consumer_base.go @@ -67,6 +67,8 @@ type base struct { incomingMessageStream chan *IncomingMessage singleConsumingStream chan *Message batchConsumingStream chan []*Message + brokers []string + topic string retryTopic string subprocesses subprocesses wg sync.WaitGroup @@ -116,6 +118,8 @@ func newBase(cfg *ConsumerConfig, messageChSize int) (*base, error) { consumerState: stateRunning, skipMessageByHeaderFn: cfg.SkipMessageByHeaderFn, metricPrefix: cfg.MetricPrefix, + brokers: cfg.Reader.Brokers, + topic: cfg.Reader.Topic, } if cfg.DistributedTracingEnabled {