Skip to content

Commit

Permalink
remove topics member from SaramaEventsConsumer
Browse files Browse the repository at this point in the history
No need to keep it as a member, it's derived from the config that we're
keeping around anyway, and only used in a single function.
  • Loading branch information
ewollesen committed Mar 28, 2024
1 parent b79d8dd commit 6ad9e4a
Showing 1 changed file with 2 additions and 3 deletions.
5 changes: 2 additions & 3 deletions events/consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ var ErrConsumerStopped = errors.New("consumer has been stopped")
type SaramaEventConsumer struct {
config *CloudEventsConfig
consumer MessageConsumer
topic string

cancelFuncMu sync.Mutex
cancelFunc context.CancelFunc
Expand All @@ -34,7 +33,6 @@ func NewSaramaConsumerGroup(config *CloudEventsConfig, consumer MessageConsumer)
return &SaramaEventConsumer{
config: config,
consumer: consumer,
topic: config.GetPrefixedTopic(),
}, nil
}

Expand All @@ -60,11 +58,12 @@ func (s *SaramaEventConsumer) Start() error {
}

handler := &SaramaMessageConsumer{s.consumer}
topics := []string{s.config.GetPrefixedTopic()}
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := cg.Consume(ctx, []string{s.topic}, handler); err != nil {
if err := cg.Consume(ctx, topics, handler); err != nil {
log.Printf("Error from consumer: %v", err)
if err == context.Canceled {
return ErrConsumerStopped
Expand Down

0 comments on commit 6ad9e4a

Please sign in to comment.