Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Auto offset commits (the right way) #782

Merged
merged 23 commits into from
Jan 2, 2024
Merged

Auto offset commits (the right way) #782

merged 23 commits into from
Jan 2, 2024

Conversation

zcox
Copy link
Collaborator

@zcox zcox commented Sep 8, 2023

The problem with using the underlying Java Kafka consumer's auto offset commits, is that if processing the record fails (i.e. a runtime exception is thrown), the Resource we wrap it in closes the consumer, and that consumer will commit the offsets of the record(s) that were not processed. This leads to at-most-once processing, which is the worst.

kafka4s already provides a readProcessCommit operation, but that commits offsets after every single successfully processed record, which can put a lot of load on Kafka brokers, and is still at-least-once processing (at least for that last record that failed) since the consumer does not commit that offset on close.

This PR introduces another operation, that is like readProcessCommit, but will only commit offsets after either some number of records is processed, or some amount of time has passed since the last offset commit. We get less offset commit load on Kafka, and still get at-least-once processing (all your consumers are idempotent, right?).

This new operation will also commit offsets of successfully processed records on a failure. This is still at-least-once, but minimizes the reprocessing required after restart.

A future PR should add a batch version of this operation.

@zcox
Copy link
Collaborator Author

zcox commented Sep 10, 2023

Added an idea in 9309a2d to help minimize reprocessing after a failure. Could also add a boolean argument to enable/disable that behavior.

@zcox zcox marked this pull request as ready for review December 30, 2023 02:15
@zcox zcox requested a review from a team as a code owner December 30, 2023 02:15
*/
def processingAndCommitting[A](
pollTimeout: FiniteDuration,
maxRecordCount: Long = 1000L,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If someone passes in a negative number, or a negative time would anything unexpected happen? I am not sure how consumer.recordStream(-2.seconds) would deal with it, but in the other two cases, it would mean that we would always be committing for every record. The question then becomes whether or not this is desirable and if we think that is obvious enough at the call site?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. This probably needs more validation of inputs.

* offsets for failed records when the consumer is closed. The consumer must
* be configured to disable offset auto-commits.
*/
def processingAndCommitting[A](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Stream.groupWithin help here? It emits chunks on the maxRecordCount-and-maxElapsedTime cadence.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL Stream.groupWithin. Reading its docs, IIUC it seems that it will buffer up elements of the input stream, until it obtains a certain number or timeout exceeded. For a Kafka consumer, I don't think we'd want that behavior, since it would introduce delay.

We want the elements to flow from the input stream as they're read from Kafka, and take an action (i.e. commit offsets) after a number of records or a timeout. Stream.groupWithin is close to that, but if it delays records going to the process function we wouldn't want that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not if it came after L390, but I would need to think about the onError on line 385. Don't let this block it, but I may play type tetris later.

@zcox zcox merged commit 37b46b2 into Banno:main Jan 2, 2024
5 checks passed
@zcox zcox deleted the commit-less branch January 2, 2024 21:24
amohrland pushed a commit that referenced this pull request Jan 16, 2024
Auto offset commits (the right way)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants