Skip to content

Commit

Permalink
fix: race condition issue on batch consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Abdulsametileri committed Sep 5, 2023
1 parent efcead4 commit f9aebc1
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 21 deletions.
59 changes: 47 additions & 12 deletions batch_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,29 @@ func newBatchConsumer(cfg *ConsumerConfig) (Consumer, error) {
return &c, nil
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
}

func (b *batchConsumer) Consume() {
go b.base.subprocesses.Start()
b.base.wg.Add(1)
go b.base.startConsume()
b.wg.Add(1)
go b.startConsume()

for i := 0; i < b.concurrency; i++ {
b.wg.Add(1)
go b.startBatch()
}
}

func (b *batchConsumer) GetMetric() *ConsumerMetric {
return b.metric
go b.handleCommit()
}

func (b *batchConsumer) startBatch() {
defer b.wg.Done()

ticker := time.NewTicker(b.messageGroupDuration)
defer ticker.Stop()

messages := make([]Message, 0, b.messageGroupLimit)

for {
Expand Down Expand Up @@ -113,12 +117,43 @@ func (b *batchConsumer) process(messages []Message) {
segmentioMessages = append(segmentioMessages, kafka.Message(messages[i]))
}

commitErr := b.r.CommitMessages(context.Background(), segmentioMessages...)
if commitErr != nil {
b.metric.TotalUnprocessedBatchMessagesCounter++
b.logger.Error("Error Committing messages %s", commitErr.Error())
return
}
b.commitReqCh <- segmentioMessages
}

func (b *batchConsumer) handleCommit() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

// it is used for tracking the latest committed offsets by topic => partition => offset
offsets := offsetStash{}

for {
select {
case <-ticker.C:
b.logger.Debug(offsets)
continue
case msgs, ok := <-b.commitReqCh:
if !ok {
return
}

// Extract messages which needed to commit
willBeCommitted := offsets.IgnoreAlreadyCommittedMessages(msgs)
if len(willBeCommitted) == 0 {
continue
}

commitErr := b.r.CommitMessages(context.Background(), willBeCommitted...)
if commitErr != nil {
b.metric.TotalUnprocessedBatchMessagesCounter++
b.logger.Error("Error Committing messages %s", commitErr.Error())
continue
}

b.metric.TotalProcessedBatchMessagesCounter++
// Update the latest offsets with recently committed messages
offsets.UpdateWithNewestCommittedOffsets(willBeCommitted)

b.metric.TotalProcessedBatchMessagesCounter++
}
}
}
3 changes: 3 additions & 0 deletions consumer_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type base struct {
context context.Context
messageCh chan Message
quit chan struct{}
commitReqCh chan []kafka.Message
cancelFn context.CancelFunc
r *kafka.Reader
retryTopic string
Expand Down Expand Up @@ -56,6 +57,7 @@ func newBase(cfg *ConsumerConfig) (*base, error) {
metric: &ConsumerMetric{},
messageCh: make(chan Message, cfg.Concurrency),
quit: make(chan struct{}),
commitReqCh: make(chan []kafka.Message),
concurrency: cfg.Concurrency,
retryEnabled: cfg.RetryEnabled,
logger: log,
Expand Down Expand Up @@ -122,6 +124,7 @@ func (c *base) Stop() error {
c.quit <- struct{}{}
close(c.messageCh)
c.wg.Wait()
close(c.commitReqCh)
err = c.r.Close()
})

Expand Down
46 changes: 46 additions & 0 deletions offset_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package kafka

import "github.com/segmentio/kafka-go"

// offsetStash holds the latest committed offsets by topic => partition => offset.
type offsetStash map[string]map[int]int64

// UpdateWithNewestCommittedOffsets calculates latest committed, and it
// updates the latest offset value by looking topic => partition => offset
func (o offsetStash) UpdateWithNewestCommittedOffsets(messages []kafka.Message) {
for i := range messages {
offsetsByPartition, ok := o[messages[i].Topic]
if !ok {
offsetsByPartition = map[int]int64{}
o[messages[i].Topic] = offsetsByPartition
}

// Because of incoming messages is already committed, we need to increase their offsets by 1
messageOffset := messages[i].Offset + 1

if offset, ok := offsetsByPartition[messages[i].Partition]; !ok || messageOffset > offset {
offsetsByPartition[messages[i].Partition] = messageOffset
}
}
}

// IgnoreAlreadyCommittedMessages When committing messages in consumer groups, the message with the highest offset for
// a given topic/partition determines the value of the committed offset for that partition. For example, if messages at
// offset 1, 2, and 3 of a single partition and if we commit with message offset 3 it will also result in committing the
// messages at offsets 1 and 2 for that partition. It means we can safely ignore the messages which have offsets 1 and 2.
func (o offsetStash) IgnoreAlreadyCommittedMessages(messages []kafka.Message) []kafka.Message {
willBeCommitted := make([]kafka.Message, 0, len(messages))
for i := range messages {
offsetsByPartition := o[messages[i].Topic]
offset := offsetsByPartition[messages[i].Partition]

// it means, we already committed this message, so we can safely ignore it
if messages[i].Offset <= offset {
continue
}

willBeCommitted = append(willBeCommitted, messages[i])
}

return willBeCommitted
}
85 changes: 85 additions & 0 deletions offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package kafka

import (
segmentio "github.com/segmentio/kafka-go"
"reflect"
"testing"
)

func Test_offsetStash_SetWithNewestCommittedOffsets(t *testing.T) {
// Given
offsets := offsetStash{}
messages := []segmentio.Message{
{Topic: "a", Partition: 0, Offset: 10},
{Topic: "a", Partition: 0, Offset: 5},

{Topic: "a", Partition: 1, Offset: 11},
{Topic: "a", Partition: 1, Offset: 20},

{Topic: "a", Partition: 2, Offset: 12},

{Topic: "b", Partition: 0, Offset: 15},
{Topic: "b", Partition: 0, Offset: 20},

{Topic: "c", Partition: 2, Offset: 1},
{Topic: "c", Partition: 2, Offset: 2},
{Topic: "c", Partition: 2, Offset: 3},
}

// When
offsets.UpdateWithNewestCommittedOffsets(messages)

// Then
assertFunc := func(actual map[int]int64, expected map[int]int64) {
if !reflect.DeepEqual(actual, expected) {
t.Fatal(actual, "is not equal to", expected)
}
}

assertFunc(offsets["a"], map[int]int64{0: 11, 1: 21, 2: 13})
assertFunc(offsets["b"], map[int]int64{0: 21})
assertFunc(offsets["c"], map[int]int64{2: 4})
}

func Test_offsetStash_IgnoreAlreadyCommittedMessages(t *testing.T) {
// Given
offsets := offsetStash{
"a": map[int]int64{
0: 10,
},
"b": map[int]int64{
0: 5,
1: 6,
},
"c": map[int]int64{
10: 15,
},
}
messages := []segmentio.Message{
{Topic: "a", Partition: 0, Offset: 8}, // ignore
{Topic: "a", Partition: 0, Offset: 9}, // ignore
{Topic: "a", Partition: 0, Offset: 10}, // ignore

{Topic: "b", Partition: 0, Offset: 15}, // update
{Topic: "b", Partition: 1, Offset: 5}, // ignore

{Topic: "c", Partition: 1, Offset: 5}, // update
{Topic: "c", Partition: 10, Offset: 10}, // ignore
}

// When
actual := offsets.IgnoreAlreadyCommittedMessages(messages)

// Then
if len(actual) != 2 {
t.Fatal("Actual must be length of 2")
}

if actual[0].Topic != "b" && actual[0].Partition == 0 && actual[0].Offset == 15 {
t.Fatalf("Actual %v is not equal to expected", actual[0])
}

if actual[1].Topic != "c" && actual[1].Partition == 1 && actual[0].Offset == 5 {
t.Fatalf("Actual %v is not equal to expected", actual[1])
}
}
28 changes: 19 additions & 9 deletions test/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {
messageCh <- message
return nil
},
Dial: &kafka.DialConfig{KeepAlive: 150 * time.Second, Timeout: 30 * time.Second},
}

consumer, _ := kafka.NewConsumer(consumerCfg)
Expand All @@ -92,9 +91,10 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {

// When
produceMessages(t, conn, segmentio.Message{
Topic: topic,
Key: []byte("1"),
Value: []byte(`foo`),
Topic: topic,
Partition: 0,
Key: []byte("1"),
Value: []byte(`foo`),
})

// Then
Expand All @@ -105,6 +105,11 @@ func Test_Should_Consume_Message_Successfully(t *testing.T) {
if string(actual.Key) != "1" {
t.Fatalf("Key does not equal %s", actual.Key)
}

o, _ := conn.ReadLastOffset()
if o != 1 {
t.Fatalf("offset %v must be equal to 1", o)
}
}

func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {
Expand Down Expand Up @@ -137,11 +142,11 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {

// When
produceMessages(t, conn,
segmentio.Message{Topic: topic, Key: []byte("1"), Value: []byte(`foo1`)},
segmentio.Message{Topic: topic, Key: []byte("2"), Value: []byte(`foo2`)},
segmentio.Message{Topic: topic, Key: []byte("3"), Value: []byte(`foo3`)},
segmentio.Message{Topic: topic, Key: []byte("4"), Value: []byte(`foo4`)},
segmentio.Message{Topic: topic, Key: []byte("5"), Value: []byte(`foo5`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 1, Key: []byte("1"), Value: []byte(`foo1`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 2, Key: []byte("2"), Value: []byte(`foo2`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 3, Key: []byte("3"), Value: []byte(`foo3`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 4, Key: []byte("4"), Value: []byte(`foo4`)},
segmentio.Message{Topic: topic, Partition: 0, Offset: 5, Key: []byte("5"), Value: []byte(`foo5`)},
)

// Then
Expand All @@ -150,6 +155,11 @@ func Test_Should_Batch_Consume_Messages_Successfully(t *testing.T) {
if actual != 5 {
t.Fatalf("Message length does not equal %d", actual)
}

o, _ := conn.ReadLastOffset()
if o != 5 {
t.Fatalf("offset %v must be equal to 5", o)
}
}

func Test_Should_Integrate_With_Kafka_Cronsumer_Successfully(t *testing.T) {
Expand Down

0 comments on commit f9aebc1

Please sign in to comment.