Skip to content

Commit

Permalink
bug: reader un-interuptable condition
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Simpson committed Dec 11, 2024
1 parent a6b0b97 commit 51cc6f8
Showing 1 changed file with 13 additions and 3 deletions.
16 changes: 13 additions & 3 deletions pkg/bus/bus_kafka_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,17 @@ func (k *kafkaBusReader[T]) FetchMessage(ctx context.Context) (*T, error) {
Logger: kafka.LoggerFunc(log.Logger.Tracef),
})
}

wg := sync.WaitGroup{}
wg.Add(1)
var err error
*k.currentMessage, err = k.Reader.FetchMessage(ctx)
go func() {
*k.currentMessage, err = k.Reader.FetchMessage(ctx)
wg.Done()
}()
k.mu.Unlock()
wg.Wait()

if k.isResetting {
log.Logger.WithContext(ctx).Info("Reset started, skipping message")
return k.FetchMessage(ctx)
Expand Down Expand Up @@ -88,8 +96,10 @@ func (k *kafkaBusReader[T]) Reset(ctx context.Context) error {
// Close the reader if it is open and cancel the current message
if k.Reader != nil {
k.Reader.Close()
k.Reader = nil
k.currentMessage = nil
defer func() {
k.Reader = nil
k.currentMessage = nil
}()
}

// Connect to the kafka cluster
Expand Down

0 comments on commit 51cc6f8

Please sign in to comment.