Skip to content

Commit

Permalink
pass partition
Browse files Browse the repository at this point in the history
  • Loading branch information
czerwonk committed Feb 23, 2024
1 parent a591909 commit 4518d76
Showing 1 changed file with 2 additions and 2 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type Client interface {
Topics(filter TopicFilter) ([]string, error)

// ConsumeTopic starts consuming messages from the topic
ConsumeTopic(topic string, offset int64) error
ConsumeTopic(topic string, partition int32, offset int64) error

// Close disconnects from the kafka cluster
Close()
Expand Down Expand Up @@ -93,7 +93,7 @@ func (cl *client) ConsumeTopic(topic string, partition int32, offset int64) erro
if err != nil {
if err == sarama.ErrOffsetOutOfRange && offset != sarama.OffsetNewest {
logger.Warnf("could not consume topic %s with offset %d (out of range). trying to get newest messages.", topic, offset)
return cl.ConsumeTopic(topic, sarama.OffsetNewest)
return cl.ConsumeTopic(topic, partition, sarama.OffsetNewest)
}

return fmt.Errorf("could not consume topic %s: %w", topic, err)
Expand Down

0 comments on commit 4518d76

Please sign in to comment.