Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

Fix wrong offset increment for Sarama message set #1992

Commits on Aug 3, 2023

  1. Fix wrong offset increment for Sarama message set

    ### Motivation
    
    When Sarama producers send messages to KoP with message set, e.g.
    
    ```golang
    conf := sarama.NewConfig()
    conf.Version = sarama.V0_10_2_2
    conf.Producer.Compression = sarama.CompressionLZ4
    producer, err := sarama.NewAsyncProducer([]string{"localhost:15003"}, conf)
    ```
    
    The `PartitionLog#analyzeAndValidateRecords` method could parse a wrong
    `LogAppendInfo#numMessages` result. It's because this method uses
    `RecordBatch#lastOffset` to get the latest offset.
    
    https://github.com/streamnative/kop/blob/3b22e79764ca22107228ec2f74590f0769bd9fd9/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java#L1028
    
    However, when Sarama client handles the message set (v0 or v1 records),
    it only writes a offset of zero value:
    
    https://github.com/IBM/sarama/blob/c10bd1e5709a7b47729b445bc98f2e41bc7cc0a8/produce_request.go#L107
    
    In this case, every batch is counted as 1 message, which might results
    in a wrong offset. Assuming there are 2 batches whose size is 5, the
    offsets written in the entries could be:
    
    - Entry 0: index=0
    - Entry 1: index=1
    - LEO: 2
    
    The correct offsets should be:
    
    - Entry 0: index=0
    - Entry 1: index=5
    - LEO: 10
    
    The wrong LEO could make the offset check in
    `PartitionLog#checkOffsetOutOfRange` fail and then consumers will reset
    the offset and consume duplicated messages.
    
    ### Modifications
    
    When the `lastOffset` is 0, iterate over the records in the batch to
    compute the number of messages.
    
    Ideally we should add a Golang client test to avoid the regression, in
    this patch, we only add a `SaramaCompressedV1Records` class to simulate
    the behavior of Sarama. Then add the unit tests.
    BewareMyPower committed Aug 3, 2023
    Configuration menu
    Copy the full SHA
    4e2ead3 View commit details
    Browse the repository at this point in the history