diff --git a/verify_topic.go b/verify_topic.go index 19f20a7..ced47f6 100644 --- a/verify_topic.go +++ b/verify_topic.go @@ -3,6 +3,7 @@ package kafka import ( "context" "fmt" + "sync" "github.com/segmentio/kafka-go" ) @@ -16,24 +17,39 @@ type client struct { *kafka.Client } +var ( + kafkaClientInstance *client + kafkaClientOnce sync.Once +) + func newKafkaClient(cfg *ConsumerConfig) (kafkaClient, error) { - kc := client{ - Client: &kafka.Client{ - Addr: kafka.TCP(cfg.Reader.Brokers...), - }, - } + var err error + kafkaClientOnce.Do(func() { + kc := &client{ + Client: &kafka.Client{ + Addr: kafka.TCP(cfg.Reader.Brokers...), + }, + } - transport := &Transport{ - Transport: &kafka.Transport{ - MetadataTopics: cfg.getTopics(), - }, - } - if err := fillLayer(transport, cfg.SASL, cfg.TLS); err != nil { - return nil, fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err) + transport := &Transport{ + Transport: &kafka.Transport{ + MetadataTopics: cfg.getTopics(), + }, + } + if err = fillLayer(transport, cfg.SASL, cfg.TLS); err != nil { + err = fmt.Errorf("error when initializing kafka client for verify topic purpose %w", err) + return + } + + kc.Transport = transport + kafkaClientInstance = kc + }) + + if err != nil { + return nil, err } - kc.Transport = transport - return &kc, nil + return kafkaClientInstance, nil } func (k *client) GetClient() *kafka.Client {