-
-
Notifications
You must be signed in to change notification settings - Fork 62
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(rust): Put consumer in an Arc so it can be shared #4954
Conversation
Initial refactoring so that the consumer can be shared and referenced within the assignment callbacks. This is necessary so we can do things like call commit on the consumer during rebalance callbacks. This PR is based on #4936 which had to be reverted as the consumer fully stopped committing when that was merged. Though this is only the initial refactoring and does not actually commit during join yet. At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created. This version avoids doing this and keeps the original approach of deferring consumer creation until StreamProcessor.subscribe() is called.
Is there a way to test deferring consumer creation until StreamProcessor.subscribe() is called? |
This has always been the case, nothing has changed here. I just wanted to highlight the difference with the other PR and why I think that failed. |
How would the mutation of the context prevent the consumer from committing ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I get why we need a Mutex to wrap the Consumer.
Isn't the consumer always accessed from the main thread ?
@@ -41,7 +41,7 @@ pub enum ProducerError { | |||
|
|||
/// This is basically an observer pattern to receive the callbacks from | |||
/// the consumer when partitions are assigned/revoked. | |||
pub trait AssignmentCallbacks: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this Sync ? The assignment callback seems something that should be used on the main thread only. Which makes me doubt we should need Send
either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm actually not sure why we were able to drop Sync here (i copied this part from #4936).
https://docs.rs/rdkafka/latest/rdkafka/client/trait.ClientContext.html seems to suggest that both Sync + Send are required.
.lock() | ||
.unwrap() | ||
.poll(Some(Duration::ZERO)) | ||
.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't unwrap here going to panic in case of errors instead of surfacing the same error in the Result
of the run_once
method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will make a note to address it later. But not in this PR, this code hasn't changed here.
It really is just a guess but there's just not a lot else in that other PR that seems suspicious at all. Worst of all it works locally and I've seen it actually work at least once in prod. This goal of this PR is really just to bisect #4936, by implementing the basics of making the consumer Arc (which will be later needed) without any additional changes. Running this should be able to help shed some light into which part of the other PR caused it to fail to commit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure taking the refactoring a piece at a time is the most effective approach to troubleshoot the issue. Whether the issue occurs or not with this smaller refactoring you will not exactly know what caused it.
I do not have enough context on the actual symptom to advise on where to put logs but have you considered adding more observability (logs, run it at DEBUG level, collect metrics, add sentry tracing)?
Anyway I do not see anything problematic with the PR itself.
Initial refactoring so that the consumer can be shared and referenced within the assignment callbacks. This is necessary so we can do things like call commit on the consumer during rebalance callbacks.
This PR is based on #4936 which had to be reverted as the consumer fully stopped committing when that was merged. Though this is only the initial refactoring and does not actually commit during join yet.
At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created. This version avoids doing this and keeps the original approach of deferring consumer creation until StreamProcessor.subscribe() is called.