-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Auto offset commits (the right way) #782
Conversation
Added an idea in 9309a2d to help minimize reprocessing after a failure. Could also add a boolean argument to enable/disable that behavior. |
*/ | ||
def processingAndCommitting[A]( | ||
pollTimeout: FiniteDuration, | ||
maxRecordCount: Long = 1000L, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If someone passes in a negative number, or a negative time would anything unexpected happen? I am not sure how consumer.recordStream(-2.seconds)
would deal with it, but in the other two cases, it would mean that we would always be committing for every record. The question then becomes whether or not this is desirable and if we think that is obvious enough at the call site?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. This probably needs more validation of inputs.
* offsets for failed records when the consumer is closed. The consumer must | ||
* be configured to disable offset auto-commits. | ||
*/ | ||
def processingAndCommitting[A]( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Stream.groupWithin
help here? It emits chunks on the maxRecordCount-and-maxElapsedTime cadence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL Stream.groupWithin
. Reading its docs, IIUC it seems that it will buffer up elements of the input stream, until it obtains a certain number or timeout exceeded. For a Kafka consumer, I don't think we'd want that behavior, since it would introduce delay.
We want the elements to flow from the input stream as they're read from Kafka, and take an action (i.e. commit offsets) after a number of records or a timeout. Stream.groupWithin
is close to that, but if it delays records going to the process
function we wouldn't want that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not if it came after L390, but I would need to think about the onError
on line 385. Don't let this block it, but I may play type tetris later.
Auto offset commits (the right way)
The problem with using the underlying Java Kafka consumer's auto offset commits, is that if processing the record fails (i.e. a runtime exception is thrown), the
Resource
we wrap it in closes the consumer, and that consumer will commit the offsets of the record(s) that were not processed. This leads to at-most-once processing, which is the worst.kafka4s already provides a
readProcessCommit
operation, but that commits offsets after every single successfully processed record, which can put a lot of load on Kafka brokers, and is still at-least-once processing (at least for that last record that failed) since the consumer does not commit that offset on close.This PR introduces another operation, that is like
readProcessCommit
, but will only commit offsets after either some number of records is processed, or some amount of time has passed since the last offset commit. We get less offset commit load on Kafka, and still get at-least-once processing (all your consumers are idempotent, right?).This new operation will also commit offsets of successfully processed records on a failure. This is still at-least-once, but minimizes the reprocessing required after restart.
A future PR should add a batch version of this operation.