Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix wrong offset increment for Sarama message set #1992

Conversation

BewareMyPower
Copy link
Collaborator

Motivation

When Sarama producers send messages to KoP with message set, e.g.

conf := sarama.NewConfig()
conf.Version = sarama.V0_10_2_2
conf.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)

The PartitionLog#analyzeAndValidateRecords method could parse a wrong LogAppendInfo#numMessages result. It's because this method uses RecordBatch#lastOffset to get the latest offset.

numMessages += (batch.lastOffset() - batch.baseOffset() + 1);

However, when Sarama client handles the message set (v0 or v1 records), it only writes a offset of zero value:

https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107

In this case, every batch is counted as 1 message, which might results in a wrong offset. Assuming there are 2 batches whose size is 5, the offsets written in the entries could be:

  • Entry 0: index=0
  • Entry 1: index=1
  • LEO: 2

The correct offsets should be:

  • Entry 0: index=0
  • Entry 1: index=5
  • LEO: 10

The wrong LEO could make the offset check in
PartitionLog#checkOffsetOutOfRange fail and then consumers will reset the offset and consume duplicated messages.

Modifications

When the lastOffset is 0, iterate over the records in the batch to compute the number of messages.

Ideally we should add a Golang client test to avoid the regression, in this patch, we only add a SaramaCompressedV1Records class to simulate the behavior of Sarama. Then add the unit tests.

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

### Motivation

When Sarama producers send messages to KoP with message set, e.g.

```golang
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_2_2
conf.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
```

The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong
`LogAppendInfo#numMessages` result. It's because this method uses
`RecordBatch#lastOffset` to get the latest offset.

https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028

However, when Sarama client handles the message set (v0 or v1 records),
it only writes a offset of zero value:

https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107

In this case, every batch is counted as 1 message, which might results
in a wrong offset. Assuming there are 2 batches whose size is 5, the
offsets written in the entries could be:

- Entry 0: index=0
- Entry 1: index=1
- LEO: 2

The correct offsets should be:

- Entry 0: index=0
- Entry 1: index=5
- LEO: 10

The wrong LEO could make the offset check in
`PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset
the offset and consume duplicated messages.

### Modifications

When the `lastOffset` is 0, iterate over the records in the batch to
compute the number of messages.

Ideally we should add a Golang client test to avoid the regression, in
this patch, we only add a `SaramaCompressedV1Records` class to simulate
the behavior of Sarama. Then add the unit tests.
@BewareMyPower
Copy link
Collaborator Author

I think for clients of other languages, we can add the tests into our own private test repo in future.

https://github.com/streamnative/kop-tests

@Demogorgon314 @gaoran10 @codelipenghui

@BewareMyPower
Copy link
Collaborator Author

Here is the reproduce code for Sarama. (Just run a unit test in KoP and sleep for a long time)

package main

import (
	"fmt"
	"log"
	"os"

	"github.com/IBM/sarama"
)

func main() {
	sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
	conf := sarama.NewConfig()
	conf.Version = sarama.V0_10_2_2
	conf.Producer.Compression = sarama.CompressionLZ4
	// producer.Successes() will return the msg metadata
	conf.Producer.Return.Successes = true
	producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
	if err != nil {
		panic("Couldn't create a Kafka producer")
	}
	defer producer.AsyncClose()
	for i := 0; i < 2; i++ {
		producer.Input() <- &sarama.ProducerMessage{
			Topic: "my-topic",
			Key:   nil,
			Value: sarama.StringEncoder(fmt.Sprintf("my-value-%d", i)),
		}
	}
	for i := 0; i < 2; i++ {
		select {
		case err := <-producer.Errors():
			sarama.Logger.Println(i, "failed", err)
		case metadata := <-producer.Successes():
			sarama.Logger.Println(i, "send to ", metadata.Offset)
		}
	}
}
image

@codecov
Copy link

codecov bot commented Aug 3, 2023

Codecov Report

Merging #1992 (4e2ead3) into master (1fd3bdb) will increase coverage by 0.00%.
Report is 1 commits behind head on master.
The diff coverage is 71.42%.

Impacted file tree graph

@@            Coverage Diff            @@
##             master    #1992   +/-   ##
=========================================
  Coverage     17.28%   17.29%           
- Complexity      726      728    +2     
=========================================
  Files           190      190           
  Lines         14012    14041   +29     
  Branches       1312     1320    +8     
=========================================
+ Hits           2422     2428    +6     
- Misses        11414    11437   +23     
  Partials        176      176           
Files Changed Coverage Δ
...tive/pulsar/handlers/kop/storage/PartitionLog.java 8.61% <71.42%> (+0.71%) ⬆️

... and 4 files with indirect coverage changes

Copy link
Member

@Demogorgon314 Demogorgon314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

@BewareMyPower BewareMyPower merged commit 0241e43 into streamnative:master Aug 4, 2023
24 checks passed
@BewareMyPower BewareMyPower deleted the bewaremypower/sarama_wrong_num_messages branch August 4, 2023 02:31
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Aug 14, 2023
### Motivation

When Sarama producers send messages to KoP with message set, e.g.

```golang
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_2_2
conf.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
```

The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong
`LogAppendInfo#numMessages` result. It's because this method uses
`RecordBatch#lastOffset` to get the latest offset.

https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028

However, when Sarama client handles the message set (v0 or v1 records),
it only writes a offset of zero value:

https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107

In this case, every batch is counted as 1 message, which might results
in a wrong offset. Assuming there are 2 batches whose size is 5, the
offsets written in the entries could be:

- Entry 0: index=0
- Entry 1: index=1
- LEO: 2

The correct offsets should be:

- Entry 0: index=0
- Entry 1: index=5
- LEO: 10

The wrong LEO could make the offset check in
`PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset
the offset and consume duplicated messages.

### Modifications

When the `lastOffset` is 0, iterate over the records in the batch to
compute the number of messages.

Ideally we should add a Golang client test to avoid the regression, in
this patch, we only add a `SaramaCompressedV1Records` class to simulate
the behavior of Sarama. Then add the unit tests.

(cherry picked from commit 0241e43)
Demogorgon314 pushed a commit to Demogorgon314/kop that referenced this pull request Nov 28, 2023
### Motivation

When Sarama producers send messages to KoP with message set, e.g.

```golang
conf := sarama.NewConfig()
conf.Version = sarama.V0_10_2_2
conf.Producer.Compression = sarama.CompressionLZ4
producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
```

The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong
`LogAppendInfo#numMessages` result. It's because this method uses
`RecordBatch#lastOffset` to get the latest offset.

https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028

However, when Sarama client handles the message set (v0 or v1 records),
it only writes a offset of zero value:

https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107

In this case, every batch is counted as 1 message, which might results
in a wrong offset. Assuming there are 2 batches whose size is 5, the
offsets written in the entries could be:

- Entry 0: index=0
- Entry 1: index=1
- LEO: 2

The correct offsets should be:

- Entry 0: index=0
- Entry 1: index=5
- LEO: 10

The wrong LEO could make the offset check in
`PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset
the offset and consume duplicated messages.

### Modifications

When the `lastOffset` is 0, iterate over the records in the batch to
compute the number of messages.

Ideally we should add a Golang client test to avoid the regression, in
this patch, we only add a `SaramaCompressedV1Records` class to simulate
the behavior of Sarama. Then add the unit tests.

(cherry picked from commit 0241e43)
(cherry picked from commit f3b2b30)
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants