You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This test was done with the 3.0.0-M5 build of the fs2-kafka client. Kafka topic has 8 partitions and 2 consumer processes with each getting assigned 4 partitions.
After successful startup an error terminated the stream for one of the processes resulting in a partition rebalance. Initial offering of partitions was 2 partitions as noted in the Kafka client log:
"message":"New partition assigned to consumer for Topic: andromeda.notifications.aoa.v3:2
"message":"New partition assigned to consumer for Topic: andromeda.notifications.aoa.v3:5"
About 1 minute later in the log another assignment message arrives from the Kafka client with the remaining 2 partitions
It looks like your code expects all partition streams in the map to terminate on a rebalance and be replaced with new streams in the new partitions map, whereas what actually happens with the cooperative sticky assignor is that only streams for revoked partitions are closed, and the new map contains only streams for newly assigned partitions. Therefore you need to handle partition maps from the outer stream in parallel rather than sequentially. I think the fix is to move the .parJoinUnbounded to the very end of your snippet so it's joining the outer stream rather than the inner one - could you try that and let me know if it fixes the problem?
This test was done with the 3.0.0-M5 build of the fs2-kafka client. Kafka topic has 8 partitions and 2 consumer processes with each getting assigned 4 partitions.
After successful startup an error terminated the stream for one of the processes resulting in a partition rebalance. Initial offering of partitions was 2 partitions as noted in the Kafka client log:
Logging within the stream confirms the assignment
About 1 minute later in the log another assignment message arrives from the Kafka client with the remaining 2 partitions
The consumer application printed out no notifications
Code in the consumer application is
The text was updated successfully, but these errors were encountered: