Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(rust): Put consumer in an Arc so it can be shared #4954

Merged
merged 7 commits into from
Nov 2, 2023
Merged

Conversation

lynnagara
Copy link
Member

Initial refactoring so that the consumer can be shared and referenced within the assignment callbacks. This is necessary so we can do things like call commit on the consumer during rebalance callbacks.

This PR is based on #4936 which had to be reverted as the consumer fully stopped committing when that was merged. Though this is only the initial refactoring and does not actually commit during join yet.

At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created. This version avoids doing this and keeps the original approach of deferring consumer creation until StreamProcessor.subscribe() is called.

Initial refactoring so that the consumer can be shared and referenced
within the assignment callbacks. This is necessary so we can do things
like call commit on the consumer during rebalance callbacks.

This PR is based on #4936
which had to be reverted as the consumer fully stopped committing
when that was merged. Though this is only the initial refactoring
and does not actually commit during join yet.

At the moment my best guess of why the previous attempt did not work
in prod was the the context was being mutated after the consumer
was initially created. This version avoids doing this and keeps the
original approach of deferring consumer creation until
StreamProcessor.subscribe() is called.
@lynnagara lynnagara requested a review from a team as a code owner October 31, 2023 18:21
@volokluev
Copy link
Member

Is there a way to test deferring consumer creation until StreamProcessor.subscribe() is called?

@lynnagara
Copy link
Member Author

lynnagara commented Oct 31, 2023

Is there a way to test deferring consumer creation until StreamProcessor.subscribe() is called?

This has always been the case, nothing has changed here. I just wanted to highlight the difference with the other PR and why I think that failed.

@lynnagara lynnagara enabled auto-merge (squash) October 31, 2023 18:33
@fpacifici
Copy link
Contributor

At the moment my best guess of why the previous attempt did not work in prod was the the context was being mutated after the consumer was initially created.

How would the mutation of the context prevent the consumer from committing ?

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I get why we need a Mutex to wrap the Consumer.
Isn't the consumer always accessed from the main thread ?

@@ -41,7 +41,7 @@ pub enum ProducerError {

/// This is basically an observer pattern to receive the callbacks from
/// the consumer when partitions are assigned/revoked.
pub trait AssignmentCallbacks: Send + Sync {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this Sync ? The assignment callback seems something that should be used on the main thread only. Which makes me doubt we should need Send either.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually not sure why we were able to drop Sync here (i copied this part from #4936).

https://docs.rs/rdkafka/latest/rdkafka/client/trait.ClientContext.html seems to suggest that both Sync + Send are required.

rust_snuba/rust_arroyo/src/processing/mod.rs Show resolved Hide resolved
.lock()
.unwrap()
.poll(Some(Duration::ZERO))
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't unwrap here going to panic in case of errors instead of surfacing the same error in the Result of the run_once method ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make a note to address it later. But not in this PR, this code hasn't changed here.

@lynnagara
Copy link
Member Author

How would the mutation of the context prevent the consumer from committing ?

It really is just a guess but there's just not a lot else in that other PR that seems suspicious at all. Worst of all it works locally and I've seen it actually work at least once in prod.

This goal of this PR is really just to bisect #4936, by implementing the basics of making the consumer Arc (which will be later needed) without any additional changes. Running this should be able to help shed some light into which part of the other PR caused it to fail to commit.

Copy link
Contributor

@fpacifici fpacifici left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure taking the refactoring a piece at a time is the most effective approach to troubleshoot the issue. Whether the issue occurs or not with this smaller refactoring you will not exactly know what caused it.

I do not have enough context on the actual symptom to advise on where to put logs but have you considered adding more observability (logs, run it at DEBUG level, collect metrics, add sentry tracing)?

Anyway I do not see anything problematic with the PR itself.

@lynnagara lynnagara merged commit f1743d8 into master Nov 2, 2023
@lynnagara lynnagara deleted the consumer-arc branch November 2, 2023 23:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants