Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid late records preemptively rotating/committing S3 output files #574

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Commits on Oct 21, 2022

  1. Avoid late records preemptively rotating/committing S3 output files

    When late data is arriving on a Kafka partition (e.g. data for the previous
    hourly encodedPartition) the following check triggers an immediate rotation
    and commit of files:
    
    https://github.com/confluentinc/kafka-connect-storage-cloud/blob/918730d011dcd199e810ec3a68a03ab01c927f62/kafka-connect-s3/src/main/java/io/confluent/connect/s3/TopicPartitionWriter.java#L410
    
    When late data is interleaved with up-to-date data arriving the problem
    is exacerbated.
    When this happens, a quick succession of rotations cause a large number of
    small files to be committed to S3.
    This affects both the performance/throughput of Kafka Connect as well
    as downstream consumers which need to deal with the many small file fragments.
    
    This PR adds a new `max.open.files.per.partition` S3SinkConnectorConfig.
    It defaults to 1, which preserves the current existing behavior.
    
    If set to a value > 1, the following behavior is enabled:
    
    - A separate commit file is kept open for each encodedPartition target
       up to a maximum of `max.open.files.per.partition`
    
    - Only when any of the encodedPartition targets hits its rotation condition
       (`flush.size`, `rotate.interval.ms`) does rotation occur, committing all
       open files. All files are committed so that S3Sink's pre-commit hook will
       commit a high watermark of offset to the Kafka consumer group. This avoids
       buffered gaps of data still being in-flight when that occurs.
    
    It's worth noting that this issue/limitation was previously encountered
    and is well-described as part of:
    "CC-2313 Handle late arriving records in storage cloud sink connectors"
    confluentinc#187
    
    However, that feature was subsequently reverted:
    confluentinc@a2ce6fc
    confluentinc/kafka-connect-storage-common#87
    
    N.B. Unlike the solution proposed on CC-2313, we do not opt to write
    late data to an incorrect encodedPartition. i.e. late data for hour 7 will
    not land in a path/file for hour 8
    frankgrimes97 committed Oct 21, 2022
    Configuration menu
    Copy the full SHA
    0af783e View commit details
    Browse the repository at this point in the history

Commits on Nov 16, 2022

  1. Fix bug in rotateOnTime

    The default baseRecordTimestamp was incorrect leading to
    unnessary file committing and rotation.
    Update unit test case accordingly.
    frankgrimes97 committed Nov 16, 2022
    Configuration menu
    Copy the full SHA
    db365c4 View commit details
    Browse the repository at this point in the history