Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FromJavaConsumer: how to synchronize access to the java consumer #1276

Open
tnielens opened this issue Jul 12, 2024 · 4 comments
Open

FromJavaConsumer: how to synchronize access to the java consumer #1276

tnielens opened this issue Jul 12, 2024 · 4 comments

Comments

@tnielens
Copy link
Contributor

#694 introduced a fromJavaConsumer constructor. But it isn't clear how to synchronize access to the consumer once handed to zio-kafka. My use case is the same as the one reported in #694. I have some logic working on the java consumer to check for the consumer lag, health and other metrics. These checks run periodically but I need to coordinate the accesses with zio-kafka. As far as I understand, zio-kafka doens't provide that ability and I'll face concurrency issues if both my checks and zio-kafka run tasks against the consumer concurrently.

@svroonland
Copy link
Collaborator

zio-kafka protects against concurrent access using a Semaphore. We could perhaps allow the user to pass their own Semaphore in fromJavaConsumer. It would need to have at most 1 permit, something we can't guarantee, but it seems reasonable to document this and expect undefined behavior when it's more than 1 permit.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jul 13, 2024

@tnielens What is your use case? Would you be helped if all operations of the java-kafka consumer are available on the zio-kafka consumer?
@svroonland, another option would be to return an implementation of org.apache.kafka.clients.consumer.Consumer which wraps the given java consumer and uses the zio-kafka semaphore to synchronize access.

@tnielens
Copy link
Contributor Author

My usecase is passing the kafka consumer to a HealthCheckServiceof mine which is maintained separately and works against the standard Consumer class. It checks different things. Among them, metrics like the last-poll-seconds-ago, what is the current lag of the consumer, does the lag decrease, etc.

One workaround I'm trying now is introducing a class SynchronizedConsumer[K, V](delegate: Consumer[K, V]) extends Consumer[K, V] which wraps all delegate calls with synchronized { ... } except for the wakeup()call. Not ideal as zio-kafka's Consumer might not expect all calls to the standard Consumer to potentially block.

@erikvanoosten
Copy link
Collaborator

erikvanoosten commented Jul 15, 2024

Not ideal as zio-kafka's Consumer might not expect all calls to the standard Consumer to potentially block.

I think that is okay! The zio-kafka consumer already runs on its own thread because almost every call to the standard consumer blocks! If it blocks a little bit longer, nobody will notice!

We could improve somewhat by moving this logic into zio-kafka, because then access would be shared over a single, fair semaphore. But to be honest, since you have a perfectly good workaround, I am not sure it's worth it to put this into the library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants