This repository has been archived by the owner on Jan 24, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 137
Fix wrong offset increment for Sarama message set #1992
Merged
BewareMyPower
merged 1 commit into
streamnative:master
from
BewareMyPower:bewaremypower/sarama_wrong_num_messages
Aug 4, 2023
Merged
Fix wrong offset increment for Sarama message set #1992
BewareMyPower
merged 1 commit into
streamnative:master
from
BewareMyPower:bewaremypower/sarama_wrong_num_messages
Aug 4, 2023
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
### Motivation When Sarama producers send messages to KoP with message set, e.g. ```golang conf := sarama.NewConfig() conf.Version = sarama.V0_10_2_2 conf.Producer.Compression = sarama.CompressionLZ4 producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf) ``` The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong `LogAppendInfo#numMessages` result. It's because this method uses `RecordBatch#lastOffset` to get the latest offset. https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028 However, when Sarama client handles the message set (v0 or v1 records), it only writes a offset of zero value: https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107 In this case, every batch is counted as 1 message, which might results in a wrong offset. Assuming there are 2 batches whose size is 5, the offsets written in the entries could be: - Entry 0: index=0 - Entry 1: index=1 - LEO: 2 The correct offsets should be: - Entry 0: index=0 - Entry 1: index=5 - LEO: 10 The wrong LEO could make the offset check in `PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset the offset and consume duplicated messages. ### Modifications When the `lastOffset` is 0, iterate over the records in the batch to compute the number of messages. Ideally we should add a Golang client test to avoid the regression, in this patch, we only add a `SaramaCompressedV1Records` class to simulate the behavior of Sarama. Then add the unit tests.
I think for clients of other languages, we can add the tests into our own private test repo in future. |
Codecov Report
@@ Coverage Diff @@
## master #1992 +/- ##
=========================================
Coverage 17.28% 17.29%
- Complexity 726 728 +2
=========================================
Files 190 190
Lines 14012 14041 +29
Branches 1312 1320 +8
=========================================
+ Hits 2422 2428 +6
- Misses 11414 11437 +23
Partials 176 176
|
Demogorgon314
approved these changes
Aug 4, 2023
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
Demogorgon314
pushed a commit
to Demogorgon314/kop
that referenced
this pull request
Aug 14, 2023
### Motivation When Sarama producers send messages to KoP with message set, e.g. ```golang conf := sarama.NewConfig() conf.Version = sarama.V0_10_2_2 conf.Producer.Compression = sarama.CompressionLZ4 producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf) ``` The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong `LogAppendInfo#numMessages` result. It's because this method uses `RecordBatch#lastOffset` to get the latest offset. https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028 However, when Sarama client handles the message set (v0 or v1 records), it only writes a offset of zero value: https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107 In this case, every batch is counted as 1 message, which might results in a wrong offset. Assuming there are 2 batches whose size is 5, the offsets written in the entries could be: - Entry 0: index=0 - Entry 1: index=1 - LEO: 2 The correct offsets should be: - Entry 0: index=0 - Entry 1: index=5 - LEO: 10 The wrong LEO could make the offset check in `PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset the offset and consume duplicated messages. ### Modifications When the `lastOffset` is 0, iterate over the records in the batch to compute the number of messages. Ideally we should add a Golang client test to avoid the regression, in this patch, we only add a `SaramaCompressedV1Records` class to simulate the behavior of Sarama. Then add the unit tests. (cherry picked from commit 0241e43)
Demogorgon314
pushed a commit
to Demogorgon314/kop
that referenced
this pull request
Nov 28, 2023
### Motivation When Sarama producers send messages to KoP with message set, e.g. ```golang conf := sarama.NewConfig() conf.Version = sarama.V0_10_2_2 conf.Producer.Compression = sarama.CompressionLZ4 producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf) ``` The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong `LogAppendInfo#numMessages` result. It's because this method uses `RecordBatch#lastOffset` to get the latest offset. https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028 However, when Sarama client handles the message set (v0 or v1 records), it only writes a offset of zero value: https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107 In this case, every batch is counted as 1 message, which might results in a wrong offset. Assuming there are 2 batches whose size is 5, the offsets written in the entries could be: - Entry 0: index=0 - Entry 1: index=1 - LEO: 2 The correct offsets should be: - Entry 0: index=0 - Entry 1: index=5 - LEO: 10 The wrong LEO could make the offset check in `PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset the offset and consume duplicated messages. ### Modifications When the `lastOffset` is 0, iterate over the records in the batch to compute the number of messages. Ideally we should add a Golang client test to avoid the regression, in this patch, we only add a `SaramaCompressedV1Records` class to simulate the behavior of Sarama. Then add the unit tests. (cherry picked from commit 0241e43) (cherry picked from commit f3b2b30)
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Labels
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
When Sarama producers send messages to KoP with message set, e.g.
The
PartitionLog#analyzeAndValidateRecords
method could parse a wrongLogAppendInfo#numMessages
result. It's because this method usesRecordBatch#lastOffset
to get the latest offset.kop/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java
Line 1028 in 3b22e79
However, when Sarama client handles the message set (v0 or v1 records), it only writes a offset of zero value:
https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107
In this case, every batch is counted as 1 message, which might results in a wrong offset. Assuming there are 2 batches whose size is 5, the offsets written in the entries could be:
The correct offsets should be:
The wrong LEO could make the offset check in
PartitionLog#checkOffsetOutOfRange
fail and then consumers will reset the offset and consume duplicated messages.Modifications
When the
lastOffset
is 0, iterate over the records in the batch to compute the number of messages.Ideally we should add a Golang client test to avoid the regression, in this patch, we only add a
SaramaCompressedV1Records
class to simulate the behavior of Sarama. Then add the unit tests.Documentation
Check the box below.
Need to update docs?
doc-required
(If you need help on updating docs, create a doc issue)
no-need-doc
(Please explain why)
doc
(If this PR contains doc changes)