Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not commit offsets for past generations if partition not owned #1330

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ func (batch *Batch) ReadMessage() (Message, error) {
msg.HighWaterMark = batch.highWaterMark
msg.Time = makeTime(timestamp)
msg.Headers = headers
if batch.conn != nil {
msg.GenerationId = batch.conn.generationId
}

return msg, err
}
Expand Down
14 changes: 8 additions & 6 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package kafka
// A commit represents the instruction of publishing an update of the last
// offset read by a program for a topic and partition.
type commit struct {
topic string
partition int
offset int64
topic string
partition int
offset int64
generationId int32
}

// makeCommit builds a commit value from a message, the resulting commit takes
// its topic, partition, and offset from the message.
func makeCommit(msg Message) commit {
return commit{
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
generationId: msg.GenerationId,
}
}

Expand Down
6 changes: 6 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ var (
errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
)

const undefinedGenerationId int32 = -1

// Conn represents a connection to a kafka broker.
//
// Instances of Conn are safe to use concurrently from multiple goroutines.
Expand Down Expand Up @@ -65,6 +67,8 @@ type Conn struct {
apiVersions atomic.Value // apiVersionMap

transactionalID *string

generationId int32
}

type apiVersionMap map[apiKey]ApiVersion
Expand Down Expand Up @@ -182,6 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
offset: FirstOffset,
requiredAcks: -1,
transactionalID: emptyToNullable(config.TransactionalID),
generationId: undefinedGenerationId,
}

c.wb.w = &c.wbuf
Expand Down Expand Up @@ -388,6 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

c.generationId = response.GenerationID
return response, nil
}

Expand Down
30 changes: 18 additions & 12 deletions consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,12 @@ func (g *Generation) Start(fn func(ctx context.Context)) {
// consumer group coordinator. This can be used to reset the consumer to
// explicit offsets.
func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {
return g.CommitOffsetsForGenID(g.ID, offsets)
}

// CommitOffsetsForGenID commits the provided topic+partition+offset combos to the
// consumer group coordinator specifying the given genID.
func (g *Generation) CommitOffsetsForGenID(genID int32, offsets map[string]map[int]int64) error {
if len(offsets) == 0 {
return nil
}
Expand All @@ -434,7 +440,7 @@ func (g *Generation) CommitOffsets(offsets map[string]map[int]int64) error {

request := offsetCommitRequestV2{
GroupID: g.GroupID,
GenerationID: g.ID,
GenerationID: genID,
MemberID: g.MemberID,
RetentionTime: g.retentionMillis,
Topics: topics,
Expand Down Expand Up @@ -925,12 +931,12 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) {
// the leader. Otherwise, GroupMemberAssignments will be nil.
//
// Possible kafka error codes returned:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
// * GroupLoadInProgress:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * InconsistentGroupProtocol:
// * InvalidSessionTimeout:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) {
request, err := cg.makeJoinGroupRequestV1(memberID)
if err != nil {
Expand Down Expand Up @@ -1073,11 +1079,11 @@ func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember
// Readers subscriptions topic => partitions
//
// Possible kafka error codes returned:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
// * GroupCoordinatorNotAvailable:
// * NotCoordinatorForGroup:
// * IllegalGeneration:
// * RebalanceInProgress:
// * GroupAuthorizationFailed:
func (cg *ConsumerGroup) syncGroup(conn coordinator, memberID string, generationID int32, memberAssignments GroupMemberAssignments) (map[string][]int32, error) {
request := cg.makeSyncGroupRequestV0(memberID, generationID, memberAssignments)
response, err := conn.syncGroup(request)
Expand Down
4 changes: 4 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Message struct {
Value []byte
Headers []Header

// If the message has been sent by a consumer group, it contains the
// generation's id. Value is -1 if not using consumer groups.
GenerationId int32

// This field is used to hold arbitrary data you wish to include, so it
// will be available when handle it on the Writer's `Completion` method,
// this support the application can do any post operation on each message.
Expand Down
94 changes: 75 additions & 19 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
// another consumer to avoid such a race.
}

func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
Expand All @@ -134,7 +134,7 @@ func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
}

r.mutex.Lock()
r.start(offsets)
r.start(generationId, offsets)
r.mutex.Unlock()

r.withLogger(func(l Logger) {
Expand All @@ -150,35 +150,73 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
backoffDelayMax = 5 * time.Second
)

for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
return
messagesToSendForGeneration := make(map[int32]map[string]map[int]int64)
for topic, partitionsInfo := range offsetStash {
for partition, commitInfo := range partitionsInfo {
if _, ok := messagesToSendForGeneration[commitInfo.generationID]; !ok {
messagesToSendForGeneration[commitInfo.generationID] = make(map[string]map[int]int64)
}
msgsForTopic := messagesToSendForGeneration[commitInfo.generationID]
if _, ok := msgsForTopic[topic]; !ok {
msgsForTopic[topic] = make(map[int]int64)
}
msgsForPartition := msgsForTopic[topic]
msgsForPartition[partition] = commitInfo.offset
}
}
var illegalGenerationErr bool
for generationID, messages := range messagesToSendForGeneration {
for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
continue
}
}

if err = gen.CommitOffsets(offsetStash); err == nil {
return
if err = gen.CommitOffsetsForGenID(generationID, messages); err == nil {
break
}

// IllegalGeneration error is not retriable, but we should attempt to
// perform the remaining commits
if errors.Is(err, IllegalGeneration) {
r.withErrorLogger(func(l Logger) { l.Printf("generation %d - %v", generationID, err) })
offsetStash.removeGenerationID(generationID)
illegalGenerationErr = true
err = nil
break
}
}
}

// if configured to ignore the error
if illegalGenerationErr && r.config.ErrorOnWrongGenerationCommit {
err = IllegalGeneration
}
return // err will not be nil
}

// offsetStash holds offsets by topic => partition => offset.
type offsetStash map[string]map[int]int64
// offsetStash holds offsets by topic => partition => offsetEntry.
type offsetEntry struct {
offset int64
generationID int32
}
type offsetStash map[string]map[int]offsetEntry

// merge updates the offsetStash with the offsets from the provided messages.
func (o offsetStash) merge(commits []commit) {
for _, c := range commits {
offsetsByPartition, ok := o[c.topic]
if !ok {
offsetsByPartition = map[int]int64{}
offsetsByPartition = map[int]offsetEntry{}
o[c.topic] = offsetsByPartition
}

if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
offsetsByPartition[c.partition] = c.offset
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
offsetsByPartition[c.partition] = offsetEntry{
offset: c.offset,
generationID: c.generationId,
}
}
}
}
Expand All @@ -190,6 +228,19 @@ func (o offsetStash) reset() {
}
}

func (o offsetStash) removeGenerationID(genID int32) {
for topic, offsetsForTopic := range o {
for partition, offsetsForPartition := range offsetsForTopic {
if offsetsForPartition.generationID == genID {
delete(offsetsForTopic, partition)
}
if len(offsetsForTopic) == 0 {
delete(o, topic)
}
}
}
}

// commitLoopImmediate handles each commit synchronously.
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
offsets := offsetStash{}
Expand Down Expand Up @@ -329,7 +380,7 @@ func (r *Reader) run(cg *ConsumerGroup) {

r.stats.rebalances.observe(1)

r.subscribe(gen.Assignments)
r.subscribe(gen.ID, gen.Assignments)

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
Expand Down Expand Up @@ -522,6 +573,10 @@ type ReaderConfig struct {
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool

// ErrorOnWrongGenerationCommit indicates that we should return an error when
// attempting to commit a message to a generation different than the current one.
ErrorOnWrongGenerationCommit bool
}

// Validate method validates ReaderConfig properties.
Expand Down Expand Up @@ -819,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
}

version := r.version
Expand Down Expand Up @@ -1040,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error {
r.offset = offset

if r.version != 0 {
r.start(r.getTopicPartitionOffset())
r.start(undefinedGenerationId, r.getTopicPartitionOffset())
}

r.activateReadLag()
Expand Down Expand Up @@ -1178,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) {
}
}

func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
Expand Down Expand Up @@ -1216,7 +1271,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
}).run(ctx, offset)
}).run(ctx, generationId, offset)
}(ctx, key, offset, &r.join)
}
}
Expand Down Expand Up @@ -1253,7 +1308,7 @@ type readerMessage struct {
error error
}

func (r *reader) run(ctx context.Context, offset int64) {
func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
Expand Down Expand Up @@ -1306,6 +1361,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
}
continue
}
conn.generationId = generationId

// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
Expand Down
Loading