diff --git a/batch.go b/batch.go index 19dcef8c..f0a26138 100644 --- a/batch.go +++ b/batch.go @@ -232,6 +232,9 @@ func (batch *Batch) ReadMessage() (Message, error) { msg.HighWaterMark = batch.highWaterMark msg.Time = makeTime(timestamp) msg.Headers = headers + if batch.conn != nil { + msg.GenerationId = batch.conn.generationId + } return msg, err } diff --git a/commit.go b/commit.go index e7740d58..fc5c9b27 100644 --- a/commit.go +++ b/commit.go @@ -3,18 +3,20 @@ package kafka // A commit represents the instruction of publishing an update of the last // offset read by a program for a topic and partition. type commit struct { - topic string - partition int - offset int64 + topic string + partition int + offset int64 + generationId int32 } // makeCommit builds a commit value from a message, the resulting commit takes // its topic, partition, and offset from the message. func makeCommit(msg Message) commit { return commit{ - topic: msg.Topic, - partition: msg.Partition, - offset: msg.Offset + 1, + topic: msg.Topic, + partition: msg.Partition, + offset: msg.Offset + 1, + generationId: msg.GenerationId, } } diff --git a/conn.go b/conn.go index 2b51afbd..9f66e960 100644 --- a/conn.go +++ b/conn.go @@ -19,6 +19,8 @@ var ( errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message") ) +const undefinedGenerationId int32 = -1 + // Conn represents a connection to a kafka broker. // // Instances of Conn are safe to use concurrently from multiple goroutines. @@ -65,6 +67,8 @@ type Conn struct { apiVersions atomic.Value // apiVersionMap transactionalID *string + + generationId int32 } type apiVersionMap map[apiKey]ApiVersion @@ -182,6 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn { offset: FirstOffset, requiredAcks: -1, transactionalID: emptyToNullable(config.TransactionalID), + generationId: undefinedGenerationId, } c.wb.w = &c.wbuf @@ -388,6 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error return joinGroupResponseV1{}, Error(response.ErrorCode) } + c.generationId = response.GenerationID return response, nil } diff --git a/consumergroup.go b/consumergroup.go index f4bb382c..de59cb41 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -416,6 +416,12 @@ func (g *Generation) Start(fn func(ctx context.Context)) { // consumer group coordinator. This can be used to reset the consumer to // explicit offsets. func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error { + return g.CommitOffsetsForGenID(g.ID, offsets) +} + +// CommitOffsetsForGenID commits the provided topic+partition+offset combos to the +// consumer group coordinator specifying the given genID. +func (g *Generation) CommitOffsetsForGenID(genID int32, offsets map[string]map[int]int64) error { if len(offsets) == 0 { return nil } @@ -434,7 +440,7 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error { request := offsetCommitRequestV2{ GroupID: g.GroupID, - GenerationID: g.ID, + GenerationID: genID, MemberID: g.MemberID, RetentionTime: g.retentionMillis, Topics: topics, @@ -925,12 +931,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // the leader. Otherwise, GroupMemberAssignments will be nil. // // Possible kafka error codes returned: -// * GroupLoadInProgress: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * InconsistentGroupProtocol: -// * InvalidSessionTimeout: -// * GroupAuthorizationFailed: +// * GroupLoadInProgress: +// * GroupCoordinatorNotAvailable: +// * NotCoordinatorForGroup: +// * InconsistentGroupProtocol: +// * InvalidSessionTimeout: +// * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { @@ -1073,11 +1079,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember // Readers subscriptions topic => partitions // // Possible kafka error codes returned: -// * GroupCoordinatorNotAvailable: -// * NotCoordinatorForGroup: -// * IllegalGeneration: -// * RebalanceInProgress: -// * GroupAuthorizationFailed: +// * GroupCoordinatorNotAvailable: +// * NotCoordinatorForGroup: +// * IllegalGeneration: +// * RebalanceInProgress: +// * GroupAuthorizationFailed: func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) { request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments) response, err := conn.syncGroup(request) diff --git a/message.go b/message.go index 0539e603..2be3db7f 100644 --- a/message.go +++ b/message.go @@ -20,6 +20,10 @@ type Message struct { Value []byte Headers []Header + // If the message has been sent by a consumer group, it contains the + // generation's id. Value is -1 if not using consumer groups. + GenerationId int32 + // This field is used to hold arbitrary data you wish to include, so it // will be available when handle it on the Writer's `Completion` method, // this support the application can do any post operation on each message. diff --git a/reader.go b/reader.go index 04d90f35..70855e60 100644 --- a/reader.go +++ b/reader.go @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() { // another consumer to avoid such a race. } -func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { +func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) { offsets := make(map[topicPartition]int64) for topic, assignments := range allAssignments { for _, assignment := range assignments { @@ -134,7 +134,7 @@ func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { } r.mutex.Lock() - r.start(offsets) + r.start(generationId, offsets) r.mutex.Unlock() r.withLogger(func(l Logger) { @@ -150,35 +150,73 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash backoffDelayMax = 5 * time.Second ) - for attempt := 0; attempt < retries; attempt++ { - if attempt != 0 { - if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) { - return + messagesToSendForGeneration := make(map[int32]map[string]map[int]int64) + for topic, partitionsInfo := range offsetStash { + for partition, commitInfo := range partitionsInfo { + if _, ok := messagesToSendForGeneration[commitInfo.generationID]; !ok { + messagesToSendForGeneration[commitInfo.generationID] = make(map[string]map[int]int64) + } + msgsForTopic := messagesToSendForGeneration[commitInfo.generationID] + if _, ok := msgsForTopic[topic]; !ok { + msgsForTopic[topic] = make(map[int]int64) } + msgsForPartition := msgsForTopic[topic] + msgsForPartition[partition] = commitInfo.offset } + } + var illegalGenerationErr bool + for generationID, messages := range messagesToSendForGeneration { + for attempt := 0; attempt < retries; attempt++ { + if attempt != 0 { + if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) { + continue + } + } - if err = gen.CommitOffsets(offsetStash); err == nil { - return + if err = gen.CommitOffsetsForGenID(generationID, messages); err == nil { + break + } + + // IllegalGeneration error is not retriable, but we should attempt to + // perform the remaining commits + if errors.Is(err, IllegalGeneration) { + r.withErrorLogger(func(l Logger) { l.Printf("generation %d - %v", generationID, err) }) + offsetStash.removeGenerationID(generationID) + illegalGenerationErr = true + err = nil + break + } } } + // if configured to ignore the error + if illegalGenerationErr && r.config.ErrorOnWrongGenerationCommit { + err = IllegalGeneration + } return // err will not be nil } -// offsetStash holds offsets by topic => partition => offset. -type offsetStash map[string]map[int]int64 +// offsetStash holds offsets by topic => partition => offsetEntry. +type offsetEntry struct { + offset int64 + generationID int32 +} +type offsetStash map[string]map[int]offsetEntry // merge updates the offsetStash with the offsets from the provided messages. func (o offsetStash) merge(commits []commit) { for _, c := range commits { offsetsByPartition, ok := o[c.topic] if !ok { - offsetsByPartition = map[int]int64{} + offsetsByPartition = map[int]offsetEntry{} o[c.topic] = offsetsByPartition } - if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset { - offsetsByPartition[c.partition] = c.offset + if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset { + offsetsByPartition[c.partition] = offsetEntry{ + offset: c.offset, + generationID: c.generationId, + } } } } @@ -190,6 +228,19 @@ func (o offsetStash) reset() { } } +func (o offsetStash) removeGenerationID(genID int32) { + for topic, offsetsForTopic := range o { + for partition, offsetsForPartition := range offsetsForTopic { + if offsetsForPartition.generationID == genID { + delete(offsetsForTopic, partition) + } + if len(offsetsForTopic) == 0 { + delete(o, topic) + } + } + } +} + // commitLoopImmediate handles each commit synchronously. func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) { offsets := offsetStash{} @@ -329,7 +380,7 @@ func (r *Reader) run(cg *ConsumerGroup) { r.stats.rebalances.observe(1) - r.subscribe(gen.Assignments) + r.subscribe(gen.ID, gen.Assignments) gen.Start(func(ctx context.Context) { r.commitLoop(ctx, gen) @@ -522,6 +573,10 @@ type ReaderConfig struct { // This flag is being added to retain backwards-compatibility, so it will be // removed in a future version of kafka-go. OffsetOutOfRangeError bool + + // ErrorOnWrongGenerationCommit indicates that we should return an error when + // attempting to commit a message to a generation different than the current one. + ErrorOnWrongGenerationCommit bool } // Validate method validates ReaderConfig properties. @@ -819,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { r.mutex.Lock() if !r.closed && r.version == 0 { - r.start(r.getTopicPartitionOffset()) + r.start(undefinedGenerationId, r.getTopicPartitionOffset()) } version := r.version @@ -1040,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error { r.offset = offset if r.version != 0 { - r.start(r.getTopicPartitionOffset()) + r.start(undefinedGenerationId, r.getTopicPartitionOffset()) } r.activateReadLag() @@ -1178,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) { } } -func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { +func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) { if r.closed { // don't start child reader if parent Reader is closed return @@ -1216,7 +1271,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { // backwards-compatibility flags offsetOutOfRangeError: r.config.OffsetOutOfRangeError, - }).run(ctx, offset) + }).run(ctx, generationId, offset) }(ctx, key, offset, &r.join) } } @@ -1253,7 +1308,7 @@ type readerMessage struct { error error } -func (r *reader) run(ctx context.Context, offset int64) { +func (r *reader) run(ctx context.Context, generationId int32, offset int64) { // This is the reader's main loop, it only ends if the context is canceled // and will keep attempting to reader messages otherwise. // @@ -1306,6 +1361,7 @@ func (r *reader) run(ctx context.Context, offset int64) { } continue } + conn.generationId = generationId // Resetting the attempt counter ensures that if a failure occurs after // a successful initialization we don't keep increasing the backoff diff --git a/reader_test.go b/reader_test.go index f413d742..2f0a8c65 100644 --- a/reader_test.go +++ b/reader_test.go @@ -15,6 +15,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -791,10 +792,11 @@ func TestExtractTopics(t *testing.T) { func TestReaderConsumerGroup(t *testing.T) { tests := []struct { - scenario string - partitions int - commitInterval time.Duration - function func(*testing.T, context.Context, *Reader) + scenario string + partitions int + commitInterval time.Duration + errorOnWrongGeneration bool + function func(*testing.T, context.Context, *Reader) }{ { scenario: "basic handshake", @@ -855,6 +857,14 @@ func TestReaderConsumerGroup(t *testing.T) { partitions: 1, function: testConsumerGroupSimple, }, + + { + scenario: "Do not commit not assigned messages after rebalance", + partitions: 2, + function: testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions, + commitInterval: 0, + errorOnWrongGeneration: true, + }, } for _, test := range tests { @@ -870,15 +880,16 @@ func TestReaderConsumerGroup(t *testing.T) { groupID := makeGroupID() r := NewReader(ReaderConfig{ - Brokers: []string{"localhost:9092"}, - Topic: topic, - GroupID: groupID, - HeartbeatInterval: 2 * time.Second, - CommitInterval: test.commitInterval, - RebalanceTimeout: 2 * time.Second, - RetentionTime: time.Hour, - MinBytes: 1, - MaxBytes: 1e6, + Brokers: []string{"localhost:9092"}, + Topic: topic, + GroupID: groupID, + HeartbeatInterval: 2 * time.Second, + CommitInterval: test.commitInterval, + RebalanceTimeout: 2 * time.Second, + RetentionTime: time.Hour, + MinBytes: 1, + MaxBytes: 1e6, + ErrorOnWrongGenerationCommit: test.errorOnWrongGeneration, }) defer r.Close() @@ -1174,6 +1185,126 @@ func testReaderConsumerGroupRebalanceAcrossManyPartitionsAndConsumers(t *testing } } +func testReaderConsumerGroupRebalanceDoesNotCommitNotOwnedPartitions(t *testing.T, ctx context.Context, firstReader *Reader) { + client, shutdown := newLocalClient() + defer shutdown() + + // write messages across both partitions + writer := &Writer{ + Addr: TCP(firstReader.config.Brokers...), + Topic: firstReader.config.Topic, + Balancer: &RoundRobin{}, + BatchSize: 1, + Transport: client.Transport, + } + + // Write 4 messages and ensure that they go the each one of the partitions + messageCount := 4 + if err := writer.WriteMessages(ctx, makeTestSequence(messageCount)...); err != nil { + t.Fatalf("bad write messages: %v", err) + } + if err := writer.Close(); err != nil { + t.Fatalf("bad write err: %v", err) + } + + // read all messages for the first reader + msgsForFirstReader := make(map[int][]Message, 0) + allMessages := make([]Message, 0) + totalEvents := 0 + for i := 0; i < messageCount; i++ { + if msg, err := firstReader.FetchMessage(ctx); err != nil { + t.Errorf("reader %v expected to read 1 message", i) + } else { + msgs, ok := msgsForFirstReader[msg.Partition] + if !ok { + msgs = make([]Message, 0) + } + msgs = append(msgs, msg) + allMessages = append(allMessages, msg) + msgsForFirstReader[msg.Partition] = msgs + totalEvents++ + } + } + require.Equal(t, messageCount, totalEvents) + + // create a second reader + secondReader := NewReader(firstReader.config) + defer secondReader.Close() + + // wait until the group has 2 members + require.Eventually(t, func() bool { + resp, err := client.DescribeGroups( + ctx, + &DescribeGroupsRequest{ + GroupIDs: []string{firstReader.config.GroupID}, + }, + ) + assert.NoError(t, err) + assert.NotNil(t, resp) + return len(resp.Groups[0].Members) == 2 + }, 10*time.Second, 100*time.Millisecond, "Group does not have 2 members") + + var partitionAssignedToSecondConsumer int + msgsForSecondReader := make([]Message, 0, messageCount) + for i := 0; i < messageCount/2; i++ { + if msg, err := secondReader.FetchMessage(ctx); err != nil { + t.Errorf("reader %v expected to read 1 message", i) + } else { + msgsForSecondReader = append(msgsForSecondReader, msg) + partitionAssignedToSecondConsumer = msg.Partition + } + } + partitionAssignedToFirstConsumer := (partitionAssignedToSecondConsumer + 1) % 2 + + // commit all messages for the second reader (no need to wait until commits reach the server + // because CommitInterval is set to 0) + require.NoError(t, secondReader.CommitMessages(ctx, msgsForSecondReader...)) + + // commit all messages the first reader received, we expect an error + require.ErrorIs(t, IllegalGeneration, firstReader.CommitMessages(ctx, allMessages...)) + + // verify that no offsets have been altered + require.Eventually(t, func() bool { + resp, err := client.OffsetFetch(ctx, &OffsetFetchRequest{ + GroupID: firstReader.config.GroupID, + Topics: map[string][]int{firstReader.config.Topic: {partitionAssignedToSecondConsumer}}, + }) + require.NoError(t, err) + require.NotNil(t, resp) + + for _, topicOffsets := range resp.Topics { + for _, offsetPartition := range topicOffsets { + if offsetPartition.Partition == partitionAssignedToSecondConsumer { + return msgsForSecondReader[len(msgsForSecondReader)-1].Offset+1 == offsetPartition.CommittedOffset + } + } + } + return false + }, 5*time.Second, 100*time.Millisecond, "Offsets were altered") + + // we can read the messages again because generation changes + // cause uncommitted offsets to be lost + totalEvents = 0 + for i := 0; i < len(msgsForFirstReader); i++ { + if msg, err := firstReader.FetchMessage(ctx); err != nil { + t.Errorf("reader %v expected to read 1 message", i) + } else { + msgs, ok := msgsForFirstReader[msg.Partition] + if !ok { + msgs = make([]Message, 0) + } + msgs = append(msgs, msg) + msgsForFirstReader[msg.Partition] = msgs + totalEvents++ + } + } + require.Equal(t, 2, totalEvents) + + // commit the messages it can actually commit and verify it works + // no need to wait because CommitInterval is 0 + require.NoError(t, firstReader.CommitMessages(ctx, msgsForFirstReader[partitionAssignedToFirstConsumer][len(msgsForFirstReader[partitionAssignedToFirstConsumer])-1])) +} + func TestOffsetStash(t *testing.T) { const topic = "topic" @@ -1195,16 +1326,16 @@ func TestOffsetStash(t *testing.T) { Given: offsetStash{}, Messages: []Message{newMessage(0, 0)}, Expected: offsetStash{ - topic: {0: 1}, + topic: {0: {1, 0}}, }, }, "ignores earlier offsets": { Given: offsetStash{ - topic: {0: 2}, + topic: {0: {2, 1}}, }, Messages: []Message{newMessage(0, 0)}, Expected: offsetStash{ - topic: {0: 2}, + topic: {0: {2, 1}}, }, }, "uses latest offset": { @@ -1215,7 +1346,7 @@ func TestOffsetStash(t *testing.T) { newMessage(0, 1), }, Expected: offsetStash{ - topic: {0: 4}, + topic: {0: {4, 0}}, }, }, "uses latest offset, across multiple topics": { @@ -1229,8 +1360,8 @@ func TestOffsetStash(t *testing.T) { }, Expected: offsetStash{ topic: { - 0: 4, - 1: 7, + 0: {4, 0}, + 1: {7, 0}, }, }, }, @@ -1282,10 +1413,11 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) { return offsetCommitResponseV2{}, nil }, }, - done: make(chan struct{}), - log: func(func(Logger)) {}, - logError: func(func(Logger)) {}, - joined: make(chan struct{}), + done: make(chan struct{}), + log: func(func(Logger)) {}, + logError: func(func(Logger)) {}, + joined: make(chan struct{}), + Assignments: map[string][]PartitionAssignment{"topic": {{0, 1}}}, } // initialize commits so that the commitLoopImmediate select statement blocks @@ -1319,52 +1451,111 @@ func TestCommitLoopImmediateFlushOnGenerationEnd(t *testing.T) { } func TestCommitOffsetsWithRetry(t *testing.T) { - offsets := offsetStash{"topic": {0: 0}} + offsets := func() offsetStash { + return offsetStash{"topic": {0: {0, 1}}} + } tests := map[string]struct { - Fails int - Invocations int - HasError bool + Fails int + Invocations int + HasError bool + Offsets offsetStash + Config ReaderConfig + ExpectedOffsets offsetStash + GenerationId int32 }{ "happy path": { - Invocations: 1, + Invocations: 1, + Offsets: offsets(), + ExpectedOffsets: offsets(), + GenerationId: 1, }, "1 retry": { - Fails: 1, - Invocations: 2, + Fails: 1, + Invocations: 2, + Offsets: offsets(), + ExpectedOffsets: offsets(), + GenerationId: 1, }, "out of retries": { - Fails: defaultCommitRetries + 1, - Invocations: defaultCommitRetries, - HasError: true, + Fails: defaultCommitRetries + 1, + Invocations: defaultCommitRetries, + HasError: true, + Offsets: offsets(), + ExpectedOffsets: offsets(), + GenerationId: 1, + }, + "illegal generation error only 1 generation": { + Fails: 1, + Invocations: 1, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}}, + ExpectedOffsets: offsetStash{}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: false}, + GenerationId: 2, + }, + "illegal generation error only 2 generations": { + Fails: 1, + Invocations: 1, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}}, + ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: false}, + GenerationId: 2, + }, + "illegal generation error only 1 generation - error propagation": { + Fails: 1, + Invocations: 1, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}}, + ExpectedOffsets: offsetStash{}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: true}, + HasError: true, + GenerationId: 2, + }, + "illegal generation error only 2 generations - error propagation": { + Fails: 1, + Invocations: 1, + Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}}, + ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}}, + Config: ReaderConfig{ErrorOnWrongGenerationCommit: true}, + HasError: true, + GenerationId: 2, }, } for label, test := range tests { t.Run(label, func(t *testing.T) { + requests := make([]offsetCommitRequestV2, 0) count := 0 gen := &Generation{ conn: mockCoordinator{ - offsetCommitFunc: func(offsetCommitRequestV2) (offsetCommitResponseV2, error) { + offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) { + requests = append(requests, r) count++ + if r.GenerationID != test.GenerationId { + return offsetCommitResponseV2{}, IllegalGeneration + } if count <= test.Fails { return offsetCommitResponseV2{}, io.EOF } return offsetCommitResponseV2{}, nil }, }, - done: make(chan struct{}), - log: func(func(Logger)) {}, - logError: func(func(Logger)) {}, + done: make(chan struct{}), + log: func(func(Logger)) {}, + logError: func(func(Logger)) {}, + Assignments: map[string][]PartitionAssignment{"topic": {{0, 1}}}, } - r := &Reader{stctx: context.Background()} - err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries) + r := &Reader{stctx: context.Background(), config: test.Config} + err := r.commitOffsetsWithRetry(gen, test.Offsets, defaultCommitRetries) switch { case test.HasError && err == nil: t.Error("bad err: expected not nil; got nil") case !test.HasError && err != nil: t.Errorf("bad err: expected nil; got %v", err) + default: + if !reflect.DeepEqual(test.ExpectedOffsets, test.Offsets) { + t.Errorf("bad expected offsets: expected %+v; got %v", test.ExpectedOffsets, test.Offsets) + } } }) } @@ -1559,7 +1750,7 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { } } -func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { +func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel()