-
-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Commit offsets from strategy join #4936
Conversation
The return value of Strategy.join() is currently ignored, because we have no access to the consumer from within the assignment callbacks. Shuffle a few things around so that consumer callbacks contain Arc to the consumer internally.
@@ -69,7 +76,11 @@ impl<TPayload: 'static + Clone> AssignmentCallbacks for Callbacks<TPayload> { | |||
s.close(); | |||
// TODO: We need to actually call consumer.commit() with the commit request. | |||
// Right now we are never committing during consumer shutdown. | |||
let _ = s.join(None); | |||
if let Some(commit_request) = s.join(None) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the actual fix we're enabling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice!
This has stopped committing anything in prod, reverting |
revert failed (conflict? already reverted?) -- check the logs |
This reverts commit c7f4213. Consumer has stopped committing entirely, reverting to figure out why
Initial refactoring so that the consumer can be shared and referenced within the assignment callbacks. This is necessary so we can do things like call commit on the consumer during rebalance callbacks. This PR is based on #4936 which had to be reverted as the consumer fully stopped committing when that was merged. Though this is only the initial refactoring and does not actually commit during join yet. At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created. This version avoids doing this and keeps the original approach of deferring consumer creation until StreamProcessor.subscribe() is called.
Initial refactoring so that the consumer can be shared and referenced within the assignment callbacks. This is necessary so we can do things like call commit on the consumer during rebalance callbacks. This PR is based on #4936 which had to be reverted as the consumer fully stopped committing when that was merged. Though this is only the initial refactoring and does not actually commit during join yet. At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created. This version avoids doing this and keeps the original approach of deferring consumer creation until StreamProcessor.subscribe() is called.
Initial refactoring so that the consumer can be shared and referenced within the assignment callbacks. This is necessary so we can do things like call commit on the consumer during rebalance callbacks. This PR is based on getsentry/snuba#4936 which had to be reverted as the consumer fully stopped committing when that was merged. Though this is only the initial refactoring and does not actually commit during join yet. At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created. This version avoids doing this and keeps the original approach of deferring consumer creation until StreamProcessor.subscribe() is called.
The return value of Strategy.join() is currently ignored, because we
have no access to the consumer from within the assignment callbacks.
Shuffle a few things around so that consumer callbacks contain Arc to
the consumer internally.
SNS-2473