diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ee456d1ab..fc3eed9bf 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -445,46 +445,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka](consumer(client, Some(group))) } yield assert(offsets.values.headOption.flatten.map(_.metadata))(isSome(equalTo(metadata))) }, - test("handle rebalancing by completing topic-partition streams") { - val nrMessages = 50 - val nrPartitions = 6 // Must be even and strictly positive - - for { - // Produce messages on several partitions - topic <- randomTopic - group <- randomGroup - client1 <- randomClient - client2 <- randomClient - - _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = nrPartitions)) - _ <- ZIO.foreachDiscard(1 to nrMessages) { i => - produceMany(topic, partition = i % nrPartitions, kvs = List(s"key$i" -> s"msg$i")) - } - - // Consume messages - subscription = Subscription.topics(topic) - consumer1 <- Consumer - .partitionedStream(subscription, Serde.string, Serde.string) - .flatMapPar(nrPartitions) { case (tp, partition) => - ZStream - .fromZIO(partition.runDrain) - .as(tp) - } - .take(nrPartitions.toLong / 2) - .runDrain - .provideSomeLayer[Kafka](consumer(client1, Some(group))) - .fork - _ <- Live.live(ZIO.sleep(5.seconds)) - consumer2 <- Consumer - .partitionedStream(subscription, Serde.string, Serde.string) - .take(nrPartitions.toLong / 2) - .runDrain - .provideSomeLayer[Kafka](consumer(client2, Some(group))) - .fork - _ <- consumer1.join - _ <- consumer2.join - } yield assertCompletes - }, test("produce diagnostic events when rebalancing") { val nrMessages = 50 val nrPartitions = 6 @@ -626,6 +586,129 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { consumedMessages <- messagesReceived.get } yield assert(consumedMessages)(contains(newMessage).negate) }, + suite("rebalanceSafeCommits prevents processing messages twice when rebalancing")({ + + /** + * Outline of this test: + * - A producer generates some messages on every partition of a topic (2 partitions), + * - A consumer starts reading from the topic. It is the only consumer so it handles all partitions. + * - After a few messages a second consumer is started. One partition will be re-assigned. + * + * Since the first consumer is slow, we expect it to not have committed the offsets yet when the rebalance + * happens. As a consequence, the second consumer would see some messages the first consumer already consumed. + * + * '''However,''' since we enable `rebalanceSafeCommits` on the first consumer, no messages should be consumed + * by both consumers. + */ + def testForPartitionAssignmentStrategy[T <: ConsumerPartitionAssignor: ClassTag] = + test(implicitly[ClassTag[T]].runtimeClass.getName) { + val partitionCount = 2 + + def makeConsumer( + clientId: String, + groupId: String, + rebalanceSafeCommits: Boolean + ): ZIO[Scope with Kafka, Throwable, Consumer] = + for { + settings <- consumerSettings( + clientId = clientId, + groupId = Some(groupId), + `max.poll.records` = 1, + rebalanceSafeCommits = rebalanceSafeCommits + ) + consumer <- Consumer.make(settings) + } yield consumer + + for { + topic <- randomTopic + subscription = Subscription.topics(topic) + clientId1 <- randomClient + clientId2 <- randomClient + groupId <- randomGroup + _ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount)) + // Produce one message to all partitions, every 500 ms + pFib <- ZStream + .fromSchedule(Schedule.fixed(500.millis)) + .mapZIO { i => + ZIO.foreachDiscard(0 until partitionCount) { p => + produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i"))) + } + } + .runDrain + .fork + _ <- ZIO.logDebug("Starting consumer 1") + c1 <- makeConsumer(clientId1, groupId, rebalanceSafeCommits = true) + c1Sleep <- Ref.make[Int](3) + c1Started <- Promise.make[Nothing, Unit] + c1Keys <- Ref.make(Chunk.empty[String]) + fib1 <- + ZIO + .logAnnotate("consumer", "1") { + // When the stream ends, the topic subscription ends as well. Because of that the consumer + // shuts down and commits are no longer possible. Therefore, we signal the second consumer in + // such a way that it doesn't close the stream. + c1 + .plainStream(subscription, Serde.string, Serde.string) + .tap(record => + ZIO.logDebug( + s"Received record with offset ${record.partition}:${record.offset.offset} and key ${record.key}" + ) + ) + .tap { record => + // Signal consumer 2 can start when a record is seen for every partition. + for { + keys <- c1Keys.updateAndGet(_ :+ record.key) + _ <- c1Started.succeed(()).when(keys.map(_.split('-')(1)).toSet.size == partitionCount) + } yield () + } + // Buffer so that the above can run ahead of the below, this is important; + // we want consumer 2 to start before consumer 1 commits. + .buffer(partitionCount) + .mapZIO { record => + for { + s <- c1Sleep.get + _ <- ZIO.sleep(s.seconds) + _ <- ZIO.logDebug( + s"Committing offset ${record.partition}:${record.offset.offset} for key ${record.key}" + ) + _ <- record.offset.commit + } yield record.key + } + .runCollect + .map(_.toSet) + } + .fork + _ <- c1Started.await + _ <- ZIO.logDebug("Starting consumer 2") + c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false) + fib2 <- ZIO + .logAnnotate("consumer", "2") { + c2 + .plainStream(subscription, Serde.string, Serde.string) + .tap(msg => ZIO.logDebug(s"Received ${msg.key}")) + .mapZIO(msg => msg.offset.commit.as(msg.key)) + .take(5) + .runCollect + .map(_.toSet) + } + .fork + _ <- ZIO.logDebug("Waiting for consumers to end") + c2Keys: Set[String] <- fib2.join + _ <- ZIO.logDebug("Consumer 2 ready") + _ <- c1.stopConsumption + _ <- c1Sleep.set(0) + c1Keys: Set[String] <- fib1.join + _ <- ZIO.logDebug("Consumer 1 ready") + _ <- pFib.interrupt + } yield assertTrue((c1Keys intersect c2Keys).isEmpty) + } + + // Test for both default partition assignment strategies + Seq( + testForPartitionAssignmentStrategy[RangeAssignor], + testForPartitionAssignmentStrategy[CooperativeStickyAssignor] + ) + }: _*), test("partitions for topic doesn't fail if doesn't exist") { for { topic <- randomTopic diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala index 083d78ec0..1ae1ad54e 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/internal/RunloopCommitOffsetsSpec.scala @@ -54,12 +54,32 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault { val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) val s2 = s1.keepPartitions(Set(tp10)) assertTrue(s2.offsets == Map(tp10 -> 10L)) + }, + test("does not 'contain' offset when tp is not present") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L)) + val result = s1.contains(tp20, 10) + assertTrue(!result) + }, + test("does not 'contain' a higher offset") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val result = s1.contains(tp10, 11) + assertTrue(!result) + }, + test("does 'contain' equal offset") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val result = s1.contains(tp10, 10) + assertTrue(result) + }, + test("does 'contain' lower offset") { + val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L)) + val result = s1.contains(tp20, 19) + assertTrue(result) } ) - private def makeCommit(offsets: Map[TopicPartition, Long]): RunloopCommand.Commit = { + private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = { val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) } val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None)) - RunloopCommand.Commit(o, p) + Runloop.Commit(o, p) } } diff --git a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala index 972e14577..81312116e 100644 --- a/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala +++ b/zio-kafka-testkit/src/main/scala/zio/kafka/testkit/KafkaTestUtils.scala @@ -116,6 +116,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, maxPollInterval: Duration = 5.minutes, `max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, @@ -138,6 +139,7 @@ object KafkaTestUtils { ) .withOffsetRetrieval(offsetRetrieval) .withRestartStreamOnRebalancing(restartStreamOnRebalancing) + .withRebalanceSafeCommits(rebalanceSafeCommits) .withProperties(properties) val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId) @@ -154,6 +156,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty ): URIO[Kafka, ConsumerSettings] = consumerSettings( @@ -163,6 +166,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties ) .map( @@ -202,6 +206,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout, properties: Map[String, String] = Map.empty ): ZLayer[Kafka, Throwable, Consumer] = @@ -213,6 +218,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties, commitTimeout = commitTimeout ) @@ -229,6 +235,7 @@ object KafkaTestUtils { allowAutoCreateTopics: Boolean = true, diagnostics: Diagnostics = Diagnostics.NoOp, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, properties: Map[String, String] = Map.empty, rebalanceListener: RebalanceListener = RebalanceListener.noop ): ZLayer[Kafka, Throwable, Consumer] = @@ -240,6 +247,7 @@ object KafkaTestUtils { allowAutoCreateTopics = allowAutoCreateTopics, offsetRetrieval = offsetRetrieval, restartStreamOnRebalancing = restartStreamOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, properties = properties ).map(_.withRebalanceListener(rebalanceListener)) ) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala index 741990f39..bcb5b7479 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala @@ -28,6 +28,7 @@ final case class ConsumerSettings( offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(), rebalanceListener: RebalanceListener = RebalanceListener.noop, restartStreamOnRebalancing: Boolean = false, + rebalanceSafeCommits: Boolean = false, fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy() ) { private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match { @@ -154,6 +155,40 @@ final case class ConsumerSettings( def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings = copy(restartStreamOnRebalancing = value) + /** + * WARNING: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet. + * + * @param value + * Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is + * `false`, but the recommended value is `true` as it prevents duplicate messages. + * + * Use `false` when: + * - your streams do not commit, or + * - your streams require access to the consumer (the consumer is not available until the rebalance is done), or + * - when it is okay to process records twice (possibly concurrently), for example, because processing is + * idempotent. + * + * When `true`, messages consumed from revoked partitions must be committed before we allow the rebalance to continue. + * + * When a partition is revoked, consuming the messages will be taken over by another consumer. The other consumer will + * continue from the committed offset. It is therefore important that this consumer commits offsets of all consumed + * messages. Therefore, by holding up the rebalance until these commits are done, we ensure that the new consumer will + * start from the correct offset. + * + * During a rebalance no new messages can be received _for any stream_. Therefore, _all_ streams are deprived of new + * messages until the revoked streams are ready committing. + * + * Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this + * calculates to 3 minutes. + * + * When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any + * offset commits from these streams have a high chance of being delayed (commits are not possible during some phases + * of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will + * start from an earlier offset. The result is that some messages are processed twice and concurrently. + */ + def withRebalanceSafeCommits(value: Boolean): ConsumerSettings = + copy(rebalanceSafeCommits = value) + def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings = withProperties(credentialsStore.properties) @@ -200,6 +235,6 @@ final case class ConsumerSettings( object ConsumerSettings { val defaultCommitTimeout: Duration = 15.seconds - def apply(bootstrapServers: List[String]) = + def apply(bootstrapServers: List[String]): ConsumerSettings = new ConsumerSettings().withBootstrapServers(bootstrapServers) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala index d302fb076..3a00c0442 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/RebalanceListener.scala @@ -7,6 +7,9 @@ import scala.jdk.CollectionConverters._ /** * ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects. + * + * Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor + * when this is not desired. */ final case class RebalanceListener( onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala index 3d2f2e3dd..79f65a2c9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/ConsumerAccess.scala @@ -19,7 +19,7 @@ private[consumer] final class ConsumerAccess( def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = access.withPermit(withConsumerNoPermit(f)) - private[consumer] def withConsumerNoPermit[R, A]( + private def withConsumerNoPermit[R, A]( f: ByteArrayKafkaConsumer => RIO[R, A] ): RIO[R, A] = ZIO @@ -31,10 +31,17 @@ private[consumer] final class ConsumerAccess( .flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt)) /** - * Do not use this method outside of the Runloop + * Use this method only from Runloop. */ private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] = access.withPermit(f(consumer)) + + /** + * Use this method ONLY from the rebalance listener. + */ + private[internal] def rebalanceListenerAccess[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] = + withConsumerNoPermit(f) + } private[consumer] object ConsumerAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index be1041128..0bed2d714 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -22,12 +22,13 @@ import scala.jdk.CollectionConverters._ //noinspection SimplifyWhenInspection,SimplifyUnlessInspection private[consumer] final class Runloop private ( - runtime: Runtime[Any], + sameThreadRuntime: Runtime[Any], hasGroupId: Boolean, consumer: ConsumerAccess, pollTimeout: Duration, maxPollInterval: Duration, commitTimeout: Duration, + commitQueue: Queue[Commit], commandQueue: Queue[RunloopCommand], lastRebalanceEvent: Ref.Synchronized[Runloop.RebalanceEvent], partitionsHub: Hub[Take[Throwable, PartitionAssignment]], @@ -35,6 +36,7 @@ private[consumer] final class Runloop private ( offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + rebalanceSafeCommits: Boolean, currentStateRef: Ref[State], committedOffsetsRef: Ref[CommitOffsets], fetchStrategy: FetchStrategy @@ -73,6 +75,127 @@ private[consumer] final class Runloop private ( commandQueue.offer(RunloopCommand.RemoveSubscription(subscription)).unit private val rebalanceListener: RebalanceListener = { + // All code in this block is called from the rebalance listener and therefore runs on the same-thread-runtime. This + // is because the Java kafka client requires us to invoke the consumer from the same thread that invoked the + // rebalance listener. + // Unfortunately the same-thread-runtime does not work for all ZIO operations. For example, `ZIO.timeout`, + // `ZStream.repeat`, `Promise.await` on non-completed promises, and any other ZIO operation that shifts the work to + // another thread cannot be used. + + /** + * Maximum time spent in the rebalance callback. + * + * In this time zio-kafka awaits processing of records and the completion of commits. + * + * We use 3/5 of `maxPollInterval` which by default calculates to 3 minutes. + */ + val maxEndingStreamsInterval = (maxPollInterval.toNanos / 5L) * 3L + + /** + * Time between polling the commit queue from the rebalance listener when `rebalanceSafeCommits` is enabled. + */ + val commitQueuePollInterval = 100.millis + + /** + * End streams from the rebalance listener. + * + * When `rebalanceSafeCommits` is enabled, wait for consumed offsets to be committed. + */ + def endStreams(state: State, streamsToEnd: Chunk[PartitionStreamControl]): Task[Unit] = + if (streamsToEnd.isEmpty) ZIO.unit + else { + for { + _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- if (rebalanceSafeCommits) consumer.rebalanceListenerAccess(doAwaitStreamCommits(_, state, streamsToEnd)) + else ZIO.unit + } yield () + } + + def doAwaitStreamCommits( + consumer: ByteArrayKafkaConsumer, + state: State, + streamsToEnd: Chunk[PartitionStreamControl] + ): Task[Unit] = { + val deadline = java.lang.System.nanoTime() + maxEndingStreamsInterval - commitTimeout.toNanos + + val endingTps = streamsToEnd.map(_.tp).toSet + + def commitsOfEndingStreams(commits: Chunk[Runloop.Commit]): Chunk[Runloop.Commit] = + commits.filter(commit => (commit.offsets.keySet intersect endingTps).nonEmpty) + + lazy val previousPendingCommits: Chunk[Commit] = + commitsOfEndingStreams(state.pendingCommits) + + def commitAsync(commits: Chunk[Commit]): UIO[Unit] = + if (commits.nonEmpty) { + val (offsets, callback, onFailure) = asyncCommitParameters(commits) + ZIO.logDebug(s"Async commit of ${offsets.size} offsets for ${commits.size} commits") *> + ZIO.attempt(consumer.commitAsync(offsets, callback)).catchAll(onFailure) + } else { + ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).ignore + } + + def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] = + for { + streamResults <- + ZIO.foreach(streamsToEnd) { stream => + for { + isDone <- stream.completedPromise.isDone + endOffset <- if (isDone) stream.completedPromise.await else ZIO.none + } yield (isDone, endOffset) + } + committedOffsets <- committedOffsetsRef.get + } yield { + val allStreamsCompleted = streamResults.forall(_._1) + allStreamsCompleted && { + val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2) + val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits) + endOffsets.forall { endOffset => + val tp = endOffset.topicPartition + val offset = endOffset.offset + def endOffsetWasCommitted = committedOffsets.contains(tp, offset) + def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit => + pendingCommit.offsets.get(tp).exists { pendingOffset => + pendingOffset.offset() >= offset + } + } + endOffsetWasCommitted || endOffsetCommitIsPending + } + } + } + + def commitSync: Task[Unit] = + ZIO.attempt(consumer.commitSync(java.util.Collections.emptyMap(), commitTimeout)) + + // Outline: + // - Every `commitQueuePollInterval` until the deadline has been reached: + // - Get all commits from the commit queue. + // - Start an async commit for these commits. + // - Collect all these new (pending) commits. + // - repeat the above until: + // - All streams that were ended have completed their work, and + // - we have seen a completed or pending commit for all end-offsets. + // An end-offset of a stream is the offset of the last record given to that stream. + // - Do a single sync commit without any offsets, this has the side-effect of blocking until all + // preceding async commits are complete (this requires kafka-client 3.6.0 or later). + // Because all commits created here (including those from non-ending streams) are now complete, we do not + // have to add them to the pending commits of the runloop state. + // + // Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty. + // Instead, we poll the queue in a loop. + ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *> + ZStream + .fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll) + .tap(commitAsync) + .forever + .takeWhile(_ => java.lang.System.nanoTime() <= deadline) + .scan(Chunk.empty[Runloop.Commit])(_ ++ _) + .takeUntilZIO(endingStreamsCompletedAndCommitsExist) + .runDrain *> + commitSync *> + ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end") + } + // During a poll, the java kafka client might call each method of the rebalance listener 0 or 1 times. // We do not know the order in which the call-back methods are invoked. // @@ -95,7 +218,7 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else Chunk.empty - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- endStreams(state, streamsToEnd) _ <- lastRebalanceEvent.set(rebalanceEvent.onAssigned(assignedTps, endedStreams = streamsToEnd)) _ <- ZIO.logTrace("onAssigned done") } yield (), @@ -106,7 +229,7 @@ private[consumer] final class Runloop private ( state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams else state.assignedStreams.filter(control => revokedTps.contains(control.tp)) - _ <- ZIO.foreachDiscard(streamsToEnd)(_.end) + _ <- endStreams(state, streamsToEnd) _ <- lastRebalanceEvent.set(rebalanceEvent.onRevoked(revokedTps, endedStreams = streamsToEnd)) _ <- ZIO.logTrace("onRevoked done") } yield (), @@ -130,14 +253,15 @@ private[consumer] final class Runloop private ( offsets => for { p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- commitQueue.offer(Runloop.Commit(offsets, p)) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable) _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) } yield () /** Merge commits and prepare parameters for calling `consumer.commitAsync`. */ private def asyncCommitParameters( - commits: Chunk[RunloopCommand.Commit] + commits: Chunk[Runloop.Commit] ): (JavaMap[TopicPartition, OffsetAndMetadata], OffsetCommitCallback, Throwable => UIO[Unit]) = { val offsets = commits .foldLeft(mutable.Map.empty[TopicPartition, OffsetAndMetadata]) { case (acc, commit) => @@ -162,7 +286,8 @@ private[consumer] final class Runloop private ( case _: RebalanceInProgressException => for { _ <- ZIO.logDebug(s"Rebalance in progress, commit for offsets $offsets will be retried") - _ <- commandQueue.offerAll(commits) + _ <- commitQueue.offerAll(commits) + _ <- commandQueue.offer(RunloopCommand.CommitAvailable) } yield () case err: Throwable => cont(Exit.fail(err)) <* diagnostics.emit(DiagnosticEvent.Commit.Failure(offsetsWithMetaData, err)) @@ -171,7 +296,7 @@ private[consumer] final class Runloop private ( new OffsetCommitCallback { override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = Unsafe.unsafe { implicit u => - runtime.unsafe.run { + sameThreadRuntime.unsafe.run { if (exception eq null) onSuccess else onFailure(exception) } .getOrThrowFiberFailure() @@ -180,7 +305,7 @@ private[consumer] final class Runloop private ( (offsetsWithMetaData.asJava, callback, onFailure) } - private def handleCommits(state: State, commits: Chunk[RunloopCommand.Commit]): UIO[State] = + private def handleCommits(state: State, commits: Chunk[Runloop.Commit]): UIO[State] = if (commits.isEmpty) { ZIO.succeed(state) } else { @@ -332,7 +457,7 @@ private[consumer] final class Runloop private ( lastRebalanceEvent.getAndSet(RebalanceEvent.None).flatMap { case RebalanceEvent(false, _, _, _, _) => // The fast track, rebalance listener was not invoked: - // no assignment changes, only new records. + // no assignment changes, no new commits, only new records. ZIO.succeed( PollResult( records = polledRecords, @@ -522,12 +647,12 @@ private[consumer] final class Runloop private ( case SubscriptionState.Subscribed(_, Subscription.Pattern(pattern)) => val rc = RebalanceConsumer.Live(c) ZIO - .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(runtime, rc))) + .attempt(c.subscribe(pattern.pattern, rebalanceListener.toKafka(sameThreadRuntime, rc))) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Topics(topics)) => val rc = RebalanceConsumer.Live(c) ZIO - .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(runtime, rc))) + .attempt(c.subscribe(topics.asJava, rebalanceListener.toKafka(sameThreadRuntime, rc))) .as(Chunk.empty) case SubscriptionState.Subscribed(_, Subscription.Manual(topicPartitions)) => // For manual subscriptions we have to do some manual work before starting the run loop @@ -560,8 +685,11 @@ private[consumer] final class Runloop private ( .takeWhile(_ != RunloopCommand.StopRunloop) .runFoldChunksDiscardZIO(initialState) { (state, commands) => for { - _ <- ZIO.logDebug(s"Processing ${commands.size} commands: ${commands.mkString(",")}") - commitCommands = commands.collect { case cmd: RunloopCommand.Commit => cmd } + commitCommands <- commitQueue.takeAll + _ <- ZIO.logDebug( + s"Processing ${commitCommands.size} commits," + + s" ${commands.size} commands: ${commands.mkString(",")}" + ) stateAfterCommits <- handleCommits(state, commitCommands) streamCommands = commands.collect { case cmd: RunloopCommand.StreamCommand => cmd } stateAfterCommands <- ZIO.foldLeft(streamCommands)(stateAfterCommits)(handleCommand) @@ -621,14 +749,20 @@ object Runloop { lostTps: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl] ) { - def onAssigned(assigned: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + def onAssigned( + assigned: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ): RebalanceEvent = copy( wasInvoked = true, assignedTps = assignedTps ++ assigned, endedStreams = this.endedStreams ++ endedStreams ) - def onRevoked(revoked: Set[TopicPartition], endedStreams: Chunk[PartitionStreamControl]): RebalanceEvent = + def onRevoked( + revoked: Set[TopicPartition], + endedStreams: Chunk[PartitionStreamControl] + ): RebalanceEvent = copy( wasInvoked = true, revokedTps = revokedTps ++ revoked, @@ -640,7 +774,16 @@ object Runloop { } private object RebalanceEvent { - val None: RebalanceEvent = RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + val None: RebalanceEvent = + RebalanceEvent(wasInvoked = false, Set.empty, Set.empty, Set.empty, Chunk.empty) + } + + private[internal] final case class Commit( + offsets: Map[TopicPartition, OffsetAndMetadata], + cont: Promise[Throwable, Unit] + ) { + @inline def isDone: UIO[Boolean] = cont.isDone + @inline def isPending: UIO[Boolean] = isDone.negate } private[consumer] def make( @@ -653,24 +796,27 @@ object Runloop { offsetRetrieval: OffsetRetrieval, userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, + rebalanceSafeCommits: Boolean, partitionsHub: Hub[Take[Throwable, PartitionAssignment]], fetchStrategy: FetchStrategy ): URIO[Scope, Runloop] = for { _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.RunloopFinalized)) + commitQueue <- ZIO.acquireRelease(Queue.unbounded[Runloop.Commit])(_.shutdown) commandQueue <- ZIO.acquireRelease(Queue.unbounded[RunloopCommand])(_.shutdown) lastRebalanceEvent <- Ref.Synchronized.make[Runloop.RebalanceEvent](Runloop.RebalanceEvent.None) initialState = State.initial currentStateRef <- Ref.make(initialState) committedOffsetsRef <- Ref.make(CommitOffsets.empty) - runtime <- ZIO.runtime[Any] + sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer) runloop = new Runloop( - runtime = runtime, + sameThreadRuntime = sameThreadRuntime, hasGroupId = hasGroupId, consumer = consumer, pollTimeout = pollTimeout, maxPollInterval = maxPollInterval, commitTimeout = commitTimeout, + commitQueue = commitQueue, commandQueue = commandQueue, lastRebalanceEvent = lastRebalanceEvent, partitionsHub = partitionsHub, @@ -678,6 +824,7 @@ object Runloop { offsetRetrieval = offsetRetrieval, userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, + rebalanceSafeCommits = rebalanceSafeCommits, currentStateRef = currentStateRef, committedOffsetsRef = committedOffsetsRef, fetchStrategy = fetchStrategy @@ -699,12 +846,12 @@ object Runloop { private final case class State( pendingRequests: Chunk[RunloopCommand.Request], - pendingCommits: Chunk[RunloopCommand.Commit], + pendingCommits: Chunk[Runloop.Commit], assignedStreams: Chunk[PartitionStreamControl], subscriptionState: SubscriptionState ) { - def addPendingCommits(c: Chunk[RunloopCommand.Commit]): State = copy(pendingCommits = pendingCommits ++ c) - def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) + def addPendingCommits(c: Chunk[Runloop.Commit]): State = copy(pendingCommits = pendingCommits ++ c) + def addRequest(r: RunloopCommand.Request): State = copy(pendingRequests = pendingRequests :+ r) def shouldPoll: Boolean = subscriptionState.isSubscribed && (pendingRequests.nonEmpty || pendingCommits.nonEmpty || assignedStreams.isEmpty) @@ -721,7 +868,7 @@ object Runloop { // package private for unit testing private[internal] final case class CommitOffsets(offsets: Map[TopicPartition, Long]) { - def addCommits(c: Chunk[RunloopCommand.Commit]): CommitOffsets = { + def addCommits(c: Chunk[Runloop.Commit]): CommitOffsets = { val updatedOffsets = mutable.Map.empty[TopicPartition, Long] updatedOffsets.sizeHint(offsets.size) updatedOffsets ++= offsets @@ -736,6 +883,9 @@ object Runloop { def keepPartitions(tps: Set[TopicPartition]): CommitOffsets = CommitOffsets(offsets.filter { case (tp, _) => tps.contains(tp) }) + + def contains(tp: TopicPartition, offset: Long): Boolean = + offsets.get(tp).exists(_ >= offset) } private[internal] object CommitOffsets { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 4cf7ed383..8abfbb724 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -96,6 +96,7 @@ private[consumer] object RunloopAccess { offsetRetrieval = settings.offsetRetrieval, userRebalanceListener = settings.rebalanceListener, restartStreamsOnRebalancing = settings.restartStreamOnRebalancing, + rebalanceSafeCommits = settings.rebalanceSafeCommits, partitionsHub = partitionsHub, fetchStrategy = settings.fetchStrategy ) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala index a5259b2c2..be43f585c 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopCommand.scala @@ -1,6 +1,5 @@ package zio.kafka.consumer.internal -import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.common.TopicPartition import zio._ import zio.kafka.consumer.{ InvalidSubscriptionUnion, Subscription } @@ -17,15 +16,12 @@ object RunloopCommand { /** Used as a signal that another poll is needed. */ case object Poll extends Control + /** Used as a signal to the poll-loop that commits are available in the commit-queue. */ + case object CommitAvailable extends Control + case object StopRunloop extends Control case object StopAllStreams extends StreamCommand - final case class Commit(offsets: Map[TopicPartition, OffsetAndMetadata], cont: Promise[Throwable, Unit]) - extends RunloopCommand { - @inline def isDone: UIO[Boolean] = cont.isDone - @inline def isPending: UIO[Boolean] = isDone.negate - } - /** Used by a stream to request more records. */ final case class Request(tp: TopicPartition) extends StreamCommand diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala new file mode 100644 index 000000000..b18fe95e1 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/package.scala @@ -0,0 +1,37 @@ +package zio.kafka.consumer + +import zio._ +import zio.internal.ExecutionMetrics + +package object internal { + + /** + * A runtime layer that can be used to run everything on the thread of the caller. + * + * Provided by Adam Fraser in Discord: + * https://discord.com/channels/629491597070827530/630498701860929559/1094279123880386590 but with cooperative + * yielding enabled. + * + * WARNING! Unfortunately some ZIO operations, like `ZIO.timeout`, inherently need to work multi-threaded and will + * therefore shift the fiber to another thread, even when this runtime is used. + */ + private[internal] val SameThreadRuntimeLayer: ZLayer[Any, Nothing, Unit] = { + val sameThreadExecutor = new Executor() { + override def metrics(implicit unsafe: Unsafe): Option[ExecutionMetrics] = None + + override def submit(runnable: Runnable)(implicit unsafe: Unsafe): Boolean = { + runnable.run() + true + } + } + + Runtime.setExecutor(sameThreadExecutor) ++ Runtime.setBlockingExecutor(sameThreadExecutor) + } + + /** + * A sleep that is safe to use from the same-thread-runtime. + */ + private[internal] def blockingSleep(sleepTime: Duration): Task[Unit] = + ZIO.attempt(Thread.sleep(sleepTime.toMillis)) + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 428575818..b08b3f36e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -40,7 +40,7 @@ trait Transaction { def abort: IO[TransactionalProducer.UserInitiatedAbort.type, Nothing] } -final private[producer] class TransactionImpl( +private[producer] final class TransactionImpl( producer: Producer, private[producer] val offsetBatchRef: Ref[OffsetBatch], closed: Ref[Boolean]