diff --git a/pkg/bus/bus_kafka_reader.go b/pkg/bus/bus_kafka_reader.go index 6dbcc3a..3fe403e 100644 --- a/pkg/bus/bus_kafka_reader.go +++ b/pkg/bus/bus_kafka_reader.go @@ -44,6 +44,7 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) { } k.currentMessage = new(kafka.Message) + k.mu.Lock() if k.Reader == nil { k.Reader = kafka.NewReader(kafka.ReaderConfig{ Brokers: k.brokers.Addresses(), @@ -56,6 +57,7 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) { } var err error *k.currentMessage, err = k.Reader.FetchMessage(ctx) + k.mu.Unlock() if k.isResetting { log.Logger.WithContext(ctx).Info("Reset started, skipping message") return k.FetchMessage(ctx)