You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
java.util.ConcurrentModificationException:KafkaConsumer is not safe for multi-threaded access. currentThread(name: zio-default-blocking-4, id: 164) otherThread(id: 59)
at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2484)
at org.apache.kafka.clients.consumer.KafkaConsumer.close(KafkaConsumer.java:2343)
at zio.kafka.consumer.internal.ConsumerAccess$.$anonfun$make$5(ConsumerAccess.scala:62)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
at zio.ZIOCompanionVersionSpecific.$anonfun$attempt$1(ZIOCompanionVersionSpecific.scala:100)
at zio.kafka.consumer.internal.ConsumerAccess.make(ConsumerAccess.scala:62)
at zio.kafka.consumer.internal.ConsumerAccess.make(ConsumerAccess.scala:61)
It seems that the KafkaConsumer must be closed from the same thread which is not guaranteed with ZIO.acquireRelease AFAIK
Hi @TobiasPfeifer. Thanks for the bug report.
How often do you see this? Which zio version are you using? Which zio-kafka version?
The problem is that the consumer is still in use when the release effect (closing the underlying java consumer) is executed. My guess is that during shutdown, the task that is using the consumer is not interrupted.
There is some funky code in ConsumerAccess.withConsumerNoPermit that should manage the interruption. @svroonland do remember how this works? It reminds me of the non-deterministic shutdown we were looking at recently.
It's not that funky, it just forks it so it can interrupt it by calling wakeup() ;)
I agree with your idea that something is still using the consumer at the time when close() is called. I would expect everything to be using ZIO scopes where necessary, so all resources should be released in a proper order. Makes me wonder if some forked fiber is not interrupted, perhaps in the user's code.
@TobiasPfeifer Could you share some code that generates this error?
It seems that the
KafkaConsumer
must be closed from the same thread which is not guaranteed with ZIO.acquireRelease AFAIKI´m providing the layers like this:
The text was updated successfully, but these errors were encountered: