Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Losing S3 data due to overwrite from hanging consumer task #800

Open
gafeol opened this issue Nov 28, 2024 · 1 comment
Open

Losing S3 data due to overwrite from hanging consumer task #800

gafeol opened this issue Nov 28, 2024 · 1 comment

Comments

@gafeol
Copy link

gafeol commented Nov 28, 2024

Kafka brokers and connect version: 3.0.2 with scala 2.12
We have 5 machines running kafka connect distributed, and 50 tasks running in our S3 Sink connector, handling ~1k partitions of kafka.

We're facing an issue roughly every month where the S3 sink connector will overwrite an S3 data file that was previously uploaded with incomplete data. This seems to happen when a task from the connector hangs and finishes an incomplete S3 upload it started before hanging even if it's unable to commit offsets to Kafka.

The last incident we had is as follows:

  • An S3 Sink task (task-1) is kicked out of the Kafka Consumer Group due to exceeding poll interval timeout, it creates an S3OutputStream to upload data for a specific topic T starting from an offset X, but can't make the upload since it's kicked out of the consumer group. task-1 seems to be idle and has no logs for around 30 minutes.
  • The partitions from task-1 are rebalanced to another task, task-2. The data upload for topic T and offset X is made, and task-2 runs fine, making further uploads as expected.
  • ~30 min passes, then task-1 starts logging again when it uploads an incomplete data file to S3 starting from offset X (overwriting with incomplete data the previous upload done by task-2) and fails to commit an offset to Kafka (since it's still kicked out of the consumer group).
  • After task-1 rejoins the consumer group it manages to update the last committed offset of topic T from task-2 and resumes S3 uploads normally from the point where task-2 stopped.

In this process, the data file uploaded for topic T with data starting from offset X was uploaded twice, once from task-2 then again 30min later from task-1 with incomplete data.

I'd appreciate any help in understanding why a task can hang and exceed the poll timeout and if there'd be a way to avoid the overwrite causing the data loss. Thanks!


Also sharing the non-summarized logs from the same incident summarized above for reference:
The data file kafka_hourly/TOPIC/dt=2024-11-21/hour=14/TOPIC+1+14549241101.bin.gz was first uploaded by task-thread-s3-uploader-42 with 280k rows, but then reuploaded by task-thread-s3-uploader-4 with only 262k rows when it revived at ~14:50, losing some rows.
Logs from the connector process:

# task-1 creates the s3 output stream from offset X=14549241101 but is unable to complete it before being kicked out of the consumer group
2024-11-21 14:22:27.021 [task-thread-s3-uploader-1] INFO Starting commit and rotation for topic partition TOPIC-1 with start offset {dt=2024-11-21/hour=14=14548961101} (io.confluent.connect.s3.TopicPartitionWriter:338)
2024-11-21 14:22:27.437 [task-thread-s3-uploader-1] INFO Files committed to S3. Target commit offset for TOPIC-1 is 14549241101 (io.confluent.connect.s3.TopicPartitionWriter:659)
2024-11-21 14:22:27.437 [task-thread-s3-uploader-1] INFO Creating S3 output stream. (io.confluent.connect.s3.storage.S3Storage:200)
2024-11-21 14:22:27.441 [task-thread-s3-uploader-1] INFO Create S3OutputStream for bucket 'bucket' key 'kafka_hourly/TOPIC/dt=2024-11-21/hour=14/TOPIC+1+14549241101.bin.gz' (io.confluent.connect.s3.storage.S3OutputStream:95)

# task-1 is kicked out of the consumer group
2024-11-21 14:30:55.208 [kafka-coordinator-heartbeat-thread | connect-s3-uploader] INFO [Consumer clientId=connector-consumer-s3-uploader-1, groupId=connect-s3-uploader] Member connector-consumer-s3-uploader-1-e92e1949-0afd-4894-8997-ba6febed0ac2 sending LeaveGroup request to coordinator kafka1 (id: 2147483612 rack: null) due to consumer poll timeout has expired. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1048)

# task-2 uploads the data file starting from offset X and continues uploads normally
2024-11-21 14:30:58.007 [task-thread-s3-uploader-2] INFO [Consumer clientId=connector-consumer-s3-uploader-2, groupId=connect-s3-uploader] Setting offset for partition TOPIC-1 to the committed offset FetchPosition{offset=14549241101, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka1 (id: 1 rack: 3)], epoch=56}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:844)
2024-11-21 14:30:58.360 [task-thread-s3-uploader-2] INFO Create S3OutputStream for bucket 'bucket' key 'kafka_hourly/TOPIC/dt=2024-11-21/hour=14/TOPIC+1+14549241101.bin.gz' (io.confluent.connect.s3.storage.S3OutputStream:95)
2024-11-21 14:31:09.286 [task-thread-s3-uploader-2] INFO Starting commit and rotation for topic partition TOPIC-1 with start offset {dt=2024-11-21/hour=14=14549241101} (io.confluent.connect.s3.TopicPartitionWriter:338)
2024-11-21 14:31:09.771 [task-thread-s3-uploader-2] INFO Files committed to S3. Target commit offset for TOPIC-1 is 14549521101 (io.confluent.connect.s3.TopicPartitionWriter:659)

# task-1 starts logging again and makes the incomplete file upload starting from offset 14549241101 until 14549503739 (less data than the previous file had), but fails to commit to Kafka since it's kicked out of the consumer group
2024-11-21 14:50:46.018 [task-thread-s3-uploader-1] INFO Committing files after waiting for rotateIntervalMs time but less than flush.size records available. (io.confluent.connect.s3.TopicPartitionWriter:355)
2024-11-21 14:50:46.321 [task-thread-s3-uploader-1] INFO Files committed to S3. Target commit offset for TOPIC-1 is 14549503739 (io.confluent.connect.s3.TopicPartitionWriter:659)
2024-11-21 14:50:46.323 [task-thread-s3-uploader-1] ERROR WorkerSinkTask{id=s3-uploader-1} Commit of offsets threw an unexpected exception for sequence number 47776: {...,TOPIC-1=OffsetAndMetadata{offset=14549503739, leaderEpoch=null, metadata=''}} (org.apache.kafka.connect.runtime.WorkerSinkTask:269)
org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1163)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.doCommitOffsetsAsync(ConsumerCoordinator.java:981)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:948)
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1562)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommitAsync(WorkerSinkTask.java:358)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.doCommit(WorkerSinkTask.java:369)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:460)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:374)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:218)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
        at java.base/java.lang.Thread.run(Thread.java:1583)
2024-11-21 14:50:46.324 [task-thread-s3-uploader-1] INFO [Consumer clientId=connector-consumer-s3-uploader-1, groupId=connect-s3-uploader] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:717)
2024-11-21 14:50:46.325 [task-thread-s3-uploader-1] INFO [Consumer clientId=connector-consumer-s3-uploader-1, groupId=connect-s3-uploader] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:535)
2024-11-21 14:50:46.326 [task-thread-s3-uploader-1] INFO [Consumer clientId=connector-consumer-s3-uploader-1, groupId=connect-s3-uploader] Group coordinator kafka1 (id: 2147483612 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:912)

# task-1 re-joined the consumer group and starts consuming correctly from the last offset processed by task-2
2024-11-21 14:50:49.440 [task-thread-s3-uploader-1] INFO [Consumer clientId=connector-consumer-s3-uploader-1, groupId=connect-s3-uploader] Adding newly assigned partitions: ..., TOPIC-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:291)
@raphaelauv
Copy link

without explicit coordination this can happen, one way to limit is to have max 1 task for every connector and use tasks.max.enforce since kafka 3.8

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants