Skip to content

Commit

Permalink
bug: reader race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent 7d8b6c5 commit a6b0b97
Showing 1 changed file with 2 additions and 0 deletions.
2 changes: 2 additions & 0 deletions pkg/bus/bus_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
}
k.currentMessage = new(kafka.Message)

k.mu.Lock()
if k.Reader == nil {
k.Reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: k.brokers.Addresses(),
Expand All @@ -56,6 +57,7 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
}
var err error
*k.currentMessage, err = k.Reader.FetchMessage(ctx)
k.mu.Unlock()
if k.isResetting {
log.Logger.WithContext(ctx).Info("Reset started, skipping message")
return k.FetchMessage(ctx)
Expand Down

0 comments on commit a6b0b97

Please sign in to comment.