From 131245614463a9f6c34ebfddd6e12a46527971d8 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sat, 4 Nov 2023 13:50:03 +0100 Subject: [PATCH] Await commit during a rebalance Fixes #590 "Many records duplicately processed after rebalancing" In this change we introduce a new experimental mode that holds up a rebalance until all messages that were provided to the stream of a revoked partition, have been committed. ### Motivation Here is a common (single partition) scenario around rebalances: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100) 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance 1. the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets 1. _at the same time,_ another consumer on another instance, starts consuming from the last committed offset (which is 50) and will process the same messages with offsets 50 to 100 Messages with offsets 50 to 100 are being processed by both consumers simultaneously. Note that both consumers will try to commit these offsets. Until the first consumer is ready, the stored offsets can go up and down and are therefore unreliable. After merging this change, the scenario will unfold as follows: 1. a consumer polls some messages and puts them in the streams (let's say messages with offsets 0 to 100). Zio-kafka keeps track of the highest provided offset 1. asynchronously, the user processes these messages. Some of them are committed (let's say up to offset 50), the rest is still being processed when... 1. a rebalance happens, the partition is revoked and assigned to another consumer on another instance * the consumer continues to process the remaining messages with offsets 50 to 100, and tries to commit those offsets * inside the onRevoked callback, zio-kafka continues to process commit commands from the user * zio-kafka continues to do so until the commit with the highest provided offset (offset 100) completes * the onRevoked callback completes, signalling to Kafka that the next consumer may start consuming from the partition 1. another consumer on another instance, starts consuming from the last committed offset (which is now 100, problem solved!) ### Commit queue Because both the main runloop, and the rebalance listener need to process (and thus receive) commits commands, the commit commands were moved to a separate queue. Because the main runloop may still need to be kickstarted when it is no longer polling, a new command `CommitAvailable` was introduced. ### Complications 1. The chosen solution is not suitable for all consumers. - There are use cases where not all messages are read from the stream. For example, some want to read exactly 100 messages from a topic and then stop consuming. In that case the user has no intention to commit all messages, and therefore we should not wait for that to happen. Since stream consumers can basically do whatever they want, the only way we can support such use cases is by letting the consumer tell zio-kafka that they are done with committing. This requires an API change. For example, we can let the user tell zio-kafka that a given commit is the last one. - Not all consumers commit offsets (to Kafka) in the first place. In a future change we could make it work for commits to other stores though. As a workaround, these users can commit to both places. 1. It requires Kafka client 3.6.0. In earlier versions there was no way to wait for async commits to complete. ### Same thread executor The Kafka client requires that any nested invocations (that is, from the rebalance listener callback) to the java consumer happens from the same thread. This is very much at odds with how ZIO works. Attempts to convince the Kafka committers to relax this requirement failed; they could not be convinced that this is a problem. This is circumvented by using a special same-thread-runtime which runs on the thread of the caller. However, some operations such as `ZIO.timeout` and anything with `Schedules` will still shift work to another thread. We work around this by using blocking time. ### Experimental Because holding up the rebalance may have unforeseen consequences, this feature is marked as experimental. This allows us to collect experiences before we recommend this mode to all users. ### Collateral This change also: - fixes order of `private` and `final` - removes some completely useless tests ### Related The same issue is present in: - f2s-kafka: https://github.com/fd4s/fs2-kafka/issues/1200 - alpakka-kafka: https://github.com/akka/alpakka-kafka/issues/1038 In fact, every program that does polls and commits asynchronously is likely affected. ### Non-goals This change does not try to solve the following goals. However, these can be addressed in future PRs. - Awaiting commits after stopping the consumer, e.g. due to program shutdown (see #1087). - Support consumers that want to commit only a portion of the given messages. - Support transactional consumer/producer. - Support external commits. This branch is based on the work of abandoned PRs #788 and #830 and builds on preparatory work in PRs #744, #1068, #1073 #1086, #1089 and #1097. --- .../zio/kafka/consumer/ConsumerSpec.scala | 163 +++++++++++---- .../internal/RunloopCommitOffsetsSpec.scala | 24 ++- .../zio/kafka/testkit/KafkaTestUtils.scala | 8 + .../zio/kafka/consumer/ConsumerSettings.scala | 37 +++- .../kafka/consumer/RebalanceListener.scala | 3 + .../consumer/internal/ConsumerAccess.scala | 11 +- .../zio/kafka/consumer/internal/Runloop.scala | 194 ++++++++++++++++-- .../consumer/internal/RunloopAccess.scala | 1 + .../consumer/internal/RunloopCommand.scala | 10 +- .../zio/kafka/consumer/internal/package.scala | 37 ++++ .../zio/kafka/producer/Transaction.scala | 2 +- 11 files changed, 415 insertions(+), 75 deletions(-) create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ee456d1ab..fc3eed9bf 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -445,46 +445,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) }, - test("handle rebalancing by completing topic-partition streams") { - val nrMessages = 50 - val nrPartitions = 6 // Must be even and strictly positive - - for { - // Produce messages on several partitions - topic <- randomTopic - group <- randomGroup - client1 <- randomClient - client2 <- randomClient - - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) - _ <- ZIO.foreachDiscard(1 to nrMessages) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - - // Consume messages - subscription = Subscription.topics(topic) - consumer1 <- Consumer - .partitionedStream(subscription, Serde.string, Serde.string) - .flatMapPar(nrPartitions) { case (tp, partition) => - ZStream - .fromZIO(partition.runDrain) - .as(tp) - } - .take(nrPartitions.toLong / 2) - .runDrain - .provideSomeLayer[Kafka](consumer(client1, Some(group))) - .fork - _ <- Live.live(ZIO.sleep(5.seconds)) - consumer2 <- Consumer - .partitionedStream(subscription, Serde.string, Serde.string) - .take(nrPartitions.toLong / 2) - .runDrain - .provideSomeLayer[Kafka](consumer(client2, Some(group))) - .fork - _ <- consumer1.join - _ <- consumer2.join - } yield assertCompletes - }, test("produce diagnostic events when rebalancing") { val nrMessages = 50 val nrPartitions = 6 @@ -626,6 +586,129 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { consumedMessages <- messagesReceived.get } yield assert(consumedMessages)(contains(newMessage).negate) }, + suite("rebalanceSafeCommits prevents processing messages twice when rebalancing")({ + + /** + * Outline of this test: + * - A producer generates some messages on every partition of a topic (2 partitions), + * - A consumer starts reading from the topic. It is the only consumer so it handles all partitions. + * - After a few messages a second consumer is started. One partition will be re-assigned. + * + * Since the first consumer is slow, we expect it to not have committed the offsets yet when the rebalance + * happens. As a consequence, the second consumer would see some messages the first consumer already consumed. + * + * '''However,''' since we enable `rebalanceSafeCommits` on the first consumer, no messages should be consumed + * by both consumers. + */ + def testForPartitionAssignmentStrategy[T <: ConsumerPartitionAssignor: ClassTag] = + test(implicitly[ClassTag[T]].runtimeClass.getName) { + val partitionCount = 2 + + def makeConsumer( + clientId: String, + groupId: String, + rebalanceSafeCommits: Boolean + ): ZIO[Scope with Kafka, Throwable, Consumer] = + for { + settings <- consumerSettings( + clientId = clientId, + groupId = Some(groupId), + `max.poll.records` = 1, + rebalanceSafeCommits = rebalanceSafeCommits + ) + consumer <- Consumer.make(settings) + } yield consumer + + for { + topic <- randomTopic + subscription = Subscription.topics(topic) + clientId1 <- randomClient + clientId2 <- randomClient + groupId <- randomGroup + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount)) + // Produce one message to all partitions, every 500 ms + pFib <- ZStream + .fromSchedule(Schedule.fixed(500.millis)) + .mapZIO { i => + ZIO.foreachDiscard(0 until partitionCount) { p => + produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i"))) + } + } + .runDrain + .fork + _ <- ZIO.logDebug("Starting consumer 1") + c1 <- makeConsumer(clientId1, groupId, rebalanceSafeCommits = true) + c1Sleep <- Ref.make[Int](3) + c1Started <- Promise.make[Nothing, Unit] + c1Keys <- Ref.make(Chunk.empty[String]) + fib1 <- + ZIO + .logAnnotate("consumer", "1") { + // When the stream ends, the topic subscription ends as well. Because of that the consumer + // shuts down and commits are no longer possible. Therefore, we signal the second consumer in + // such a way that it doesn't close the stream. + c1 + .plainStream(subscription, Serde.string, Serde.string) + .tap(record => + ZIO.logDebug( + s"Received record with offset ${record.partition}:${record.offset.offset} and key ${record.key}" + ) + ) + .tap { record => + // Signal consumer 2 can start when a record is seen for every partition. + for { + keys <- c1Keys.updateAndGet(_ :+ record.key) + _ <- c1Started.succeed(()).when(keys.map(_.split('-')(1)).toSet.size == partitionCount) + } yield () + } + // Buffer so that the above can run ahead of the below, this is important; + // we want consumer 2 to start before consumer 1 commits. + .buffer(partitionCount) + .mapZIO { record => + for { + s <- c1Sleep.get + _ <- ZIO.sleep(s.seconds) + _ <- ZIO.logDebug( + s"Committing offset ${record.partition}:${record.offset.offset} for key ${record.key}" + ) + _ <- record.offset.commit + } yield record.key + } + .runCollect + .map(_.toSet) + } + .fork + _ <- c1Started.await + _ <- ZIO.logDebug("Starting consumer 2") + c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false) + fib2 <- ZIO + .logAnnotate("consumer", "2") { + c2 + .plainStream(subscription, Serde.string, Serde.string) + .tap(msg => ZIO.logDebug(s"Received ${msg.key}")) + .mapZIO(msg => msg.offset.commit.as(msg.key)) + .take(5) + .runCollect + .map(_.toSet) + } + .fork + _ <- ZIO.logDebug("Waiting for consumers to end") + c2Keys: Set[String] <- fib2.join + _ <- ZIO.logDebug("Consumer 2 ready") + _ <- c1.stopConsumption + _ <- c1Sleep.set(0) + c1Keys: Set[String] <- fib1.join + _ <- ZIO.logDebug("Consumer 1 ready") + _ <- pFib.interrupt + } yield assertTrue((c1Keys intersect c2Keys).isEmpty) + } + + // Test for both default partition assignment strategies + Seq( + testForPartitionAssignmentStrategy[RangeAssignor], + testForPartitionAssignmentStrategy[CooperativeStickyAssignor] + ) + }: _*), test("partitions for topic doesn't fail if doesn't exist") { for { topic <- randomTopic diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala index 083d78ec0..1ae1ad54e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala @@ -54,12 +54,32 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val s2 = s1.keepPartitions(Set(tp10)) assertTrue(s2.offsets == Map(tp10 -> 10L)) + }, + test("does not 'contain' offset when tp is not present") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val result = s1.contains(tp20, 10) + assertTrue(!result) + }, + test("does not 'contain' a higher offset") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val result = s1.contains(tp10, 11) + assertTrue(!result) + }, + test("does 'contain' equal offset") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val result = s1.contains(tp10, 10) + assertTrue(result) + }, + test("does 'contain' lower offset") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val result = s1.contains(tp20, 19) + assertTrue(result) } ) - private def makeCommit(offsets: Map[TopicPartition, Long]): RunloopCommand.Commit = { + private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - RunloopCommand.Commit(o, p) + Runloop.Commit(o, p) } } diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 972e14577..81312116e 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -116,6 +116,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, maxPollInterval: Duration = 5.minutes, `max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, @@ -138,6 +139,7 @@ object KafkaTestUtils { ) .withOffsetRetrieval(offsetRetrieval) .withRestartStreamOnRebalancing(restartStreamOnRebalancing) + .withRebalanceSafeCommits(rebalanceSafeCommits) .withProperties(properties) val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId) @@ -154,6 +156,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty ): URIO[Kafka, ConsumerSettings] = consumerSettings( @@ -163,6 +166,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties ) .map( @@ -202,6 +206,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty ): ZLayer[Kafka, Throwable, Consumer] = @@ -213,6 +218,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties, commitTimeout = commitTimeout ) @@ -229,6 +235,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty, rebalanceListener: RebalanceListener = RebalanceListener.noop ): ZLayer[Kafka, Throwable, Consumer] = @@ -240,6 +247,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties ).map(_.withRebalanceListener(rebalanceListener)) ) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 741990f39..bcb5b7479 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -28,6 +28,7 @@ final case class ConsumerSettings( offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), rebalanceListener: RebalanceListener = RebalanceListener.noop, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() ) { private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { @@ -154,6 +155,40 @@ final case class ConsumerSettings( def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings = copy(restartStreamOnRebalancing = value) + /** + * WARNING: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet. + * + * @param value + * Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is + * `false`, but the recommended value is `true` as it prevents duplicate messages. + * + * Use `false` when: + * - your streams do not commit, or + * - your streams require access to the consumer (the consumer is not available until the rebalance is done), or + * - when it is okay to process records twice (possibly concurrently), for example, because processing is + * idempotent. + * + * When `true`, messages consumed from revoked partitions must be committed before we allow the rebalance to continue. + * + * When a partition is revoked, consuming the messages will be taken over by another consumer. The other consumer will + * continue from the committed offset. It is therefore important that this consumer commits offsets of all consumed + * messages. Therefore, by holding up the rebalance until these commits are done, we ensure that the new consumer will + * start from the correct offset. + * + * During a rebalance no new messages can be received _for any stream_. Therefore, _all_ streams are deprived of new + * messages until the revoked streams are ready committing. + * + * Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this + * calculates to 3 minutes. + * + * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any + * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases + * of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will + * start from an earlier offset. The result is that some messages are processed twice and concurrently. + */ + def withRebalanceSafeCommits(value: Boolean): ConsumerSettings = + copy(rebalanceSafeCommits = value) + def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings = withProperties(credentialsStore.properties) @@ -200,6 +235,6 @@ final case class ConsumerSettings( object ConsumerSettings { val defaultCommitTimeout: Duration = 15.seconds - def apply(bootstrapServers: List[String]) = + def apply(bootstrapServers: List[String]): ConsumerSettings = new ConsumerSettings().withBootstrapServers(bootstrapServers) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala index d302fb076..3a00c0442 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala @@ -7,6 +7,9 @@ import scala.jdk.CollectionConverters._ /** * ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects. + * + * Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor + * when this is not desired. */ final case class RebalanceListener( onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 3d2f2e3dd..79f65a2c9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -19,7 +19,7 @@ private[consumer] final class ConsumerAccess( def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = access.withPermit(withConsumerNoPermit(f)) - private[consumer] def withConsumerNoPermit[R, A]( + private def withConsumerNoPermit[R, A]( f: ByteArrayKafkaConsumer => RIO[R, A] ): RIO[R, A] = ZIO @@ -31,10 +31,17 @@ private[consumer] final class ConsumerAccess( .flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt)) /** - * Do not use this method outside of the Runloop + * Use this method only from Runloop. */ private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] = access.withPermit(f(consumer)) + + /** + * Use this method ONLY from the rebalance listener. + */ + private[internal] def rebalanceListenerAccess[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = + withConsumerNoPermit(f) + } private[consumer] object ConsumerAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index be1041128..0bed2d714 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -22,12 +22,13 @@ import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection private[consumer] final class Runloop private ( - runtime: Runtime[Any], + sameThreadRuntime: Runtime[Any], hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, maxPollInterval: Duration, commitTimeout: Duration, + commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], @@ -35,6 +36,7 @@ private[consumer] final class Runloop private ( offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + rebalanceSafeCommits: Boolean, currentStateRef: Ref[State], committedOffsetsRef: Ref[CommitOffsets], fetchStrategy: FetchStrategy @@ -73,6 +75,127 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { + // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This + // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the + // rebalance listener. + // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, + // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to + // another thread cannot be used. + + /** + * Maximum time spent in the rebalance callback. + * + * In this time zio-kafka awaits processing of records and the completion of commits. + * + * We use 3/5 of `maxPollInterval` which by default calculates to 3 minutes. + */ + val maxEndingStreamsInterval = (maxPollInterval.toNanos / 5L) * 3L + + /** + * Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. + */ + val commitQueuePollInterval = 100.millis + + /** + * End streams from the rebalance listener. + * + * When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. + */ + def endStreams(state: State, streamsToEnd: Chunk[PartitionStreamControl]): Task[Unit] = + if (streamsToEnd.isEmpty) ZIO.unit + else { + for { + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- if (rebalanceSafeCommits) consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) + else ZIO.unit + } yield () + } + + def doAwaitStreamCommits( + consumer: ByteArrayKafkaConsumer, + state: State, + streamsToEnd: Chunk[PartitionStreamControl] + ): Task[Unit] = { + val deadline = java.lang.System.nanoTime() + maxEndingStreamsInterval - commitTimeout.toNanos + + val endingTps = streamsToEnd.map(_.tp).toSet + + def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = + commits.filter(commit => (commit.offsets.keySet intersect endingTps).nonEmpty) + + lazy val previousPendingCommits: Chunk[Commit] = + commitsOfEndingStreams(state.pendingCommits) + + def commitAsync(commits: Chunk[Commit]): UIO[Unit] = + if (commits.nonEmpty) { + val (offsets, callback, onFailure) = asyncCommitParameters(commits) + ZIO.logDebug(s"Async commit of ${offsets.size} offsets for ${commits.size} commits") *> + ZIO.attempt(consumer.commitAsync(offsets, callback)).catchAll(onFailure) + } else { + ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).ignore + } + + def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = + for { + streamResults <- + ZIO.foreach(streamsToEnd) { stream => + for { + isDone <- stream.completedPromise.isDone + endOffset <- if (isDone) stream.completedPromise.await else ZIO.none + } yield (isDone, endOffset) + } + committedOffsets <- committedOffsetsRef.get + } yield { + val allStreamsCompleted = streamResults.forall(_._1) + allStreamsCompleted && { + val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2) + val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits) + endOffsets.forall { endOffset => + val tp = endOffset.topicPartition + val offset = endOffset.offset + def endOffsetWasCommitted = committedOffsets.contains(tp, offset) + def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit => + pendingCommit.offsets.get(tp).exists { pendingOffset => + pendingOffset.offset() >= offset + } + } + endOffsetWasCommitted || endOffsetCommitIsPending + } + } + } + + def commitSync: Task[Unit] = + ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) + + // Outline: + // - Every `commitQueuePollInterval` until the deadline has been reached: + // - Get all commits from the commit queue. + // - Start an async commit for these commits. + // - Collect all these new (pending) commits. + // - repeat the above until: + // - All streams that were ended have completed their work, and + // - we have seen a completed or pending commit for all end-offsets. + // An end-offset of a stream is the offset of the last record given to that stream. + // - Do a single sync commit without any offsets, this has the side-effect of blocking until all + // preceding async commits are complete (this requires kafka-client 3.6.0 or later). + // Because all commits created here (including those from non-ending streams) are now complete, we do not + // have to add them to the pending commits of the runloop state. + // + // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. + // Instead, we poll the queue in a loop. + ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *> + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) + .tap(commitAsync) + .forever + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .scan(Chunk.empty[Runloop.Commit])(_ ++ _) + .takeUntilZIO(endingStreamsCompletedAndCommitsExist) + .runDrain *> + commitSync *> + ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. // We do not know the order in which the call-back methods are invoked. // @@ -95,7 +218,7 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else Chunk.empty - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- endStreams(state, streamsToEnd) _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) _ <- ZIO.logTrace("onAssigned done") } yield (), @@ -106,7 +229,7 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- endStreams(state, streamsToEnd) _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) _ <- ZIO.logTrace("onRevoked done") } yield (), @@ -130,14 +253,15 @@ private[consumer] final class Runloop private ( offsets => for { p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- commitQueue.offer(Runloop.Commit(offsets, p)) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable) _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) } yield () /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ private def asyncCommitParameters( - commits: Chunk[RunloopCommand.Commit] + commits: Chunk[Runloop.Commit] ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { val offsets = commits .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => @@ -162,7 +286,8 @@ private[consumer] final class Runloop private ( case _: RebalanceInProgressException => for { _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commandQueue.offerAll(commits) + _ <- commitQueue.offerAll(commits) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable) } yield () case err: Throwable => cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) @@ -171,7 +296,7 @@ private[consumer] final class Runloop private ( new OffsetCommitCallback { override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = Unsafe.unsafe { implicit u => - runtime.unsafe.run { + sameThreadRuntime.unsafe.run { if (exception eq null) onSuccess else onFailure(exception) } .getOrThrowFiberFailure() @@ -180,7 +305,7 @@ private[consumer] final class Runloop private ( (offsetsWithMetaData.asJava, callback, onFailure) } - private def handleCommits(state: State, commits: Chunk[RunloopCommand.Commit]): UIO[State] = + private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] = if (commits.isEmpty) { ZIO.succeed(state) } else { @@ -332,7 +457,7 @@ private[consumer] final class Runloop private ( lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { case RebalanceEvent(false, _, _, _, _) => // The fast track, rebalance listener was not invoked: - // no assignment changes, only new records. + // no assignment changes, no new commits, only new records. ZIO.succeed( PollResult( records = polledRecords, @@ -522,12 +647,12 @@ private[consumer] final class Runloop private ( case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => val rc = RebalanceConsumer.Live(c) ZIO - .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(runtime, rc))) + .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(sameThreadRuntime, rc))) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => val rc = RebalanceConsumer.Live(c) ZIO - .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(runtime, rc))) + .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(sameThreadRuntime, rc))) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop @@ -560,8 +685,11 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") - commitCommands = commands.collect { case cmd: RunloopCommand.Commit => cmd } + commitCommands <- commitQueue.takeAll + _ <- ZIO.logDebug( + s"Processing ${commitCommands.size} commits," + + s" ${commands.size} commands: ${commands.mkString(",")}" + ) stateAfterCommits <- handleCommits(state, commitCommands) streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand) @@ -621,14 +749,20 @@ object Runloop { lostTps: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl] ) { - def onAssigned(assigned: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + def onAssigned( + assigned: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ): RebalanceEvent = copy( wasInvoked = true, assignedTps = assignedTps ++ assigned, endedStreams = this.endedStreams ++ endedStreams ) - def onRevoked(revoked: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + def onRevoked( + revoked: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ): RebalanceEvent = copy( wasInvoked = true, revokedTps = revokedTps ++ revoked, @@ -640,7 +774,16 @@ object Runloop { } private object RebalanceEvent { - val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + val None: RebalanceEvent = + RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + } + + private[internal] final case class Commit( + offsets: Map[TopicPartition, OffsetAndMetadata], + cont: Promise[Throwable, Unit] + ) { + @inline def isDone: UIO[Boolean] = cont.isDone + @inline def isPending: UIO[Boolean] = isDone.negate } private[consumer] def make( @@ -653,24 +796,27 @@ object Runloop { offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + rebalanceSafeCommits: Boolean, partitionsHub: Hub[Take[Throwable, PartitionAssignment]], fetchStrategy: FetchStrategy ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) committedOffsetsRef <- Ref.make(CommitOffsets.empty) - runtime <- ZIO.runtime[Any] + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) runloop = new Runloop( - runtime = runtime, + sameThreadRuntime = sameThreadRuntime, hasGroupId = hasGroupId, consumer = consumer, pollTimeout = pollTimeout, maxPollInterval = maxPollInterval, commitTimeout = commitTimeout, + commitQueue = commitQueue, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, @@ -678,6 +824,7 @@ object Runloop { offsetRetrieval = offsetRetrieval, userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, currentStateRef = currentStateRef, committedOffsetsRef = committedOffsetsRef, fetchStrategy = fetchStrategy @@ -699,12 +846,12 @@ object Runloop { private final case class State( pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[RunloopCommand.Commit], + pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { - def addPendingCommits(c: Chunk[RunloopCommand.Commit]): State = copy(pendingCommits = pendingCommits ++ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) + def addPendingCommits(c: Chunk[Runloop.Commit]): State = copy(pendingCommits = pendingCommits ++ c) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) def shouldPoll: Boolean = subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) @@ -721,7 +868,7 @@ object Runloop { // package private for unit testing private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - def addCommits(c: Chunk[RunloopCommand.Commit]): CommitOffsets = { + def addCommits(c: Chunk[Runloop.Commit]): CommitOffsets = { val updatedOffsets = mutable.Map.empty[TopicPartition, Long] updatedOffsets.sizeHint(offsets.size) updatedOffsets ++= offsets @@ -736,6 +883,9 @@ object Runloop { def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) + + def contains(tp: TopicPartition, offset: Long): Boolean = + offsets.get(tp).exists(_ >= offset) } private[internal] object CommitOffsets { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 4cf7ed383..8abfbb724 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -96,6 +96,7 @@ private[consumer] object RunloopAccess { offsetRetrieval = settings.offsetRetrieval, userRebalanceListener = settings.rebalanceListener, restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, + rebalanceSafeCommits = settings.rebalanceSafeCommits, partitionsHub = partitionsHub, fetchStrategy = settings.fetchStrategy ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index a5259b2c2..be43f585c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -1,6 +1,5 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } @@ -17,15 +16,12 @@ object RunloopCommand { /** Used as a signal that another poll is needed. */ case object Poll extends Control + /** Used as a signal to the poll-loop that commits are available in the commit-queue. */ + case object CommitAvailable extends Control + case object StopRunloop extends Control case object StopAllStreams extends StreamCommand - final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit]) - extends RunloopCommand { - @inline def isDone: UIO[Boolean] = cont.isDone - @inline def isPending: UIO[Boolean] = isDone.negate - } - /** Used by a stream to request more records. */ final case class Request(tp: TopicPartition) extends StreamCommand diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala new file mode 100644 index 000000000..b18fe95e1 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala @@ -0,0 +1,37 @@ +package zio.kafka.consumer + +import zio._ +import zio.internal.ExecutionMetrics + +package object internal { + + /** + * A runtime layer that can be used to run everything on the thread of the caller. + * + * Provided by Adam Fraser in Discord: + * https://discord.com/channels/629491597070827530/630498701860929559/1094279123880386590 but with cooperative + * yielding enabled. + * + * WARNING! Unfortunately some ZIO operations, like `ZIO.timeout`, inherently need to work multi-threaded and will + * therefore shift the fiber to another thread, even when this runtime is used. + */ + private[internal] val SameThreadRuntimeLayer: ZLayer[Any, Nothing, Unit] = { + val sameThreadExecutor = new Executor() { + override def metrics(implicit unsafe: Unsafe): Option[ExecutionMetrics] = None + + override def submit(runnable: Runnable)(implicit unsafe: Unsafe): Boolean = { + runnable.run() + true + } + } + + Runtime.setExecutor(sameThreadExecutor) ++ Runtime.setBlockingExecutor(sameThreadExecutor) + } + + /** + * A sleep that is safe to use from the same-thread-runtime. + */ + private[internal] def blockingSleep(sleepTime: Duration): Task[Unit] = + ZIO.attempt(Thread.sleep(sleepTime.toMillis)) + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 428575818..b08b3f36e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -40,7 +40,7 @@ trait Transaction { def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] } -final private[producer] class TransactionImpl( +private[producer] final class TransactionImpl( producer: Producer, private[producer] val offsetBatchRef: Ref[OffsetBatch], closed: Ref[Boolean]