Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebalance of partitions with cooperative sticky assignor results in dead/unread partitions #914

Open
rney-lookout opened this issue Mar 22, 2022 · 3 comments
Labels
docs New or improved documentation

Comments

@rney-lookout
Copy link

This test was done with the 3.0.0-M5 build of the fs2-kafka client. Kafka topic has 8 partitions and 2 consumer processes with each getting assigned 4 partitions.

After successful startup an error terminated the stream for one of the processes resulting in a partition rebalance. Initial offering of partitions was 2 partitions as noted in the Kafka client log:

Updating assignment with
Assigned partitions: [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-2]
Current owned partitions: []
Added partitions (assigned - owned): [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-2]
Revoked partitions (owned - assigned): []
logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-80787","level":"INFO"}

Logging within the stream confirms the assignment

"message":"New partition assigned to consumer for Topic: andromeda.notifications.aoa.v3:2
"message":"New partition assigned to consumer for Topic: andromeda.notifications.aoa.v3:5"

About 1 minute later in the log another assignment message arrives from the Kafka client with the remaining 2 partitions

Updating assignment with
Assigned partitions: [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-6, andromeda.notifications.aoa.v3-2, andromeda.notifications.aoa.v3-3]
Current owned partitions: [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-2]
Added partitions (assigned - owned): [andromeda.notifications.aoa.v3-6, andromeda.notifications.aoa.v3-3]
Revoked partitions (owned - assigned): []
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-80787","level":"INFO"}

The consumer application printed out no notifications

Code in the consumer application is

        KafkaConsumer
          .stream(kafkaConsumerSettings.consumerSettings(topicConfig.consumerGroupId))
          .subscribeTo(topicConfig.topic)
          .flatMap(_.partitionsMapStream)
          .filter(_.nonEmpty)
          .flatMap { partitionStream =>
            fs2.Stream
              .emits(partitionStream.toVector.map { case (streamPartition, partitionedStream) =>
                val batchInterval = FiniteDuration.apply(topicConfig.batchInterval, TimeUnit.SECONDS)
                fs2.Stream.eval(
                  AppLogger.log(
                    s"New partition assigned to consumer for Topic: ${streamPartition.topic()}:${streamPartition.partition()}",
                    TraceContext(
                      partition = streamPartition.partition(),
                      work = "Partition Rebalance - New Partition Assigned",
                      status = TraceTags.statusSuccess,
                      source = streamPartition.topic()
                    )
                  )
                ) ++
                  partitionedStream
                    .groupWithin(streamConfig.maxConcurrent, batchInterval)
                    .evalMap { records =>
                      val (messages, offsets: Chunk[CommittableOffset[F]]) = records.foldMap { ev =>
                        ev.record.pure[Chunk] -> ev.offset.pure[Chunk]
                      }
                      val commit: F[Unit] = CommittableOffsetBatch.fromFoldable(offsets).commit
                      val batchContext =
                        BatchContext(streamPartition.topic(), streamPartition.partition(), offsets.head.map(_.offsets.head._2.offset()).getOrElse(-1))

                      processStreams(messages, batchContext) >> commit
                    }
              })
              .parJoinUnbounded
          }
@bplommer
Copy link
Member

It looks like your code expects all partition streams in the map to terminate on a rebalance and be replaced with new streams in the new partitions map, whereas what actually happens with the cooperative sticky assignor is that only streams for revoked partitions are closed, and the new map contains only streams for newly assigned partitions. Therefore you need to handle partition maps from the outer stream in parallel rather than sequentially. I think the fix is to move the .parJoinUnbounded to the very end of your snippet so it's joining the outer stream rather than the inner one - could you try that and let me know if it fixes the problem?

@rney-lookout
Copy link
Author

I confirm that moving the .parJoinUnbounded to the outer stream resolved the problem

@bplommer
Copy link
Member

Reopening as we should document this properly.

@bplommer bplommer added the docs New or improved documentation label Mar 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs New or improved documentation
Development

No branches or pull requests

2 participants