From d36596109803f557e0df47035a1ffc5554ee8383 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 12 Aug 2023 18:56:45 +0400 Subject: [PATCH 01/14] Add `Consumer.commit` method --- .../zio/kafka/consumer/CommittableRecord.scala | 2 ++ .../scala/zio/kafka/consumer/Consumer.scala | 5 +++++ .../zio/kafka/consumer/internal/Runloop.scala | 3 +++ .../kafka/consumer/internal/RunloopAccess.scala | 17 ++++++++++------- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index f9583f5a7..6d3a5e494 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -38,6 +38,8 @@ final case class CommittableRecord[K, V]( def partition: Int = record.partition() def timestamp: Long = record.timestamp() + lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition()) + def offset: Offset = OffsetImpl( topic = record.topic(), diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 3ab626cb9..51c4f1279 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -37,6 +37,8 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Long]] + def commit(record: CommittableRecord[_, _]): Task[Unit] + /** * Retrieve the last committed offset for the given topic-partitions */ @@ -441,6 +443,9 @@ private[consumer] final class ConsumerLive private[consumer] ( offs.asScala.map { case (k, v) => k -> v.longValue() }.toMap } + override def commit(record: CommittableRecord[_, _]): Task[Unit] = + runloopAccess.commit(record) + override def committed( partitions: Set[TopicPartition], timeout: Duration = Duration.Infinity diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 6d6c8a92d..34ebaf6e6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -115,6 +115,9 @@ private[consumer] final class Runloop private ( } } + private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] = + commit.apply(Map(record.topicPartition -> record.record.offset())) + private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 938029890..31e6587fc 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -5,9 +5,9 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } -import zio.{ Hub, IO, Ref, Scope, UIO, ZIO, ZLayer } +import zio._ private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -31,10 +31,10 @@ private[consumer] final class RunloopAccess private ( ) { private def withRunloopZIO[E]( - requireRunning: Boolean + shouldStartIfNot: Boolean )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = runloopStateRef.updateSomeAndGetZIO { - case RunloopState.NotStarted if requireRunning => makeRunloop.map(RunloopState.Started.apply) + case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { case RunloopState.NotStarted => ZIO.unit case RunloopState.Started(runloop) => whenRunning(runloop) @@ -44,7 +44,7 @@ private[consumer] final class RunloopAccess private ( /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. */ - def stopConsumption: UIO[Unit] = withRunloopZIO(requireRunning = false)(_.stopConsumption) + def stopConsumption: UIO[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.stopConsumption) /** * We're doing all of these things in this method so that the interface of this class is as simple as possible and @@ -58,13 +58,16 @@ private[consumer] final class RunloopAccess private ( for { stream <- ZStream.fromHubScoped(partitionHub) // starts the Runloop if not already started - _ <- withRunloopZIO(requireRunning = true)(_.addSubscription(subscription)) + _ <- withRunloopZIO(shouldStartIfNot = true)(_.addSubscription(subscription)) _ <- ZIO.addFinalizer { - withRunloopZIO(requireRunning = false)(_.removeSubscription(subscription)) <* + withRunloopZIO(shouldStartIfNot = false)(_.removeSubscription(subscription)) <* diagnostics.emit(Finalization.SubscriptionFinalized) } } yield stream + def commit(record: CommittableRecord[_, _]): Task[Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commit(record)) + } private[consumer] object RunloopAccess { From f065200e97fc83100e72073856f98087c8dd171d Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 12 Aug 2023 19:12:12 +0400 Subject: [PATCH 02/14] Add `Consumer.commit` and `Consumer.commitOrRetry` methods --- .../src/main/scala/zio/kafka/consumer/Consumer.scala | 5 +++++ .../scala/zio/kafka/consumer/internal/Runloop.scala | 10 ++++++++++ .../zio/kafka/consumer/internal/RunloopAccess.scala | 7 +++++-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 51c4f1279..01123113e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -47,6 +47,8 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] + def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] /** @@ -454,6 +456,9 @@ private[consumer] final class ConsumerLive private[consumer] ( _.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap ) + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(record) + override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 34ebaf6e6..1366e60bf 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -118,6 +118,16 @@ private[consumer] final class Runloop private ( private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] = commit.apply(Map(record.topicPartition -> record.record.offset())) + private[internal] def commitOrRetry[R]( + policy: Schedule[R, Throwable, Any] + )(record: CommittableRecord[_, _]): RIO[R, Unit] = + commit(record).retry( + Schedule.recurWhile[Throwable] { + case _: RetriableCommitFailedException => true + case _ => false + } && policy + ) + private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 31e6587fc..a80c9b6e7 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -30,9 +30,9 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO[E]( + private def withRunloopZIO[R, E]( shouldStartIfNot: Boolean - )(whenRunning: Runloop => IO[E, Unit]): IO[E, Unit] = + )(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] = runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { @@ -68,6 +68,9 @@ private[consumer] final class RunloopAccess private ( def commit(record: CommittableRecord[_, _]): Task[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commit(record)) + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) + } private[consumer] object RunloopAccess { From 3dedcafb3d0eb3ac4c083ed8c2255415a5999908 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 19:01:46 +0400 Subject: [PATCH 03/14] Raw implementation of https://github.com/zio/zio-kafka/pull/1022#issuecomment-1712502924 --- .../scala/zio/kafka/consumer/Consumer.scala | 9 +++++ .../zio/kafka/consumer/internal/Runloop.scala | 36 +++++++++++++++++-- .../consumer/internal/RunloopAccess.scala | 20 ++++++++--- 3 files changed, 59 insertions(+), 6 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 01123113e..41a14e141 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -49,6 +49,10 @@ trait Consumer { def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] + def commitAccumBatch[R]( + commitschedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] + def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] /** @@ -459,6 +463,11 @@ private[consumer] final class ConsumerLive private[consumer] ( override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = runloopAccess.commitOrRetry(policy)(record) + override def commitAccumBatch[R]( + commitSchedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + runloopAccess.commitAccumBatch(commitSchedule) + override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 1366e60bf..078b75971 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -32,7 +32,8 @@ private[consumer] final class Runloop private ( userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, currentStateRef: Ref[State], - fetchStrategy: FetchStrategy + fetchStrategy: FetchStrategy, + runloopScope: Scope ) { private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = @@ -128,6 +129,35 @@ private[consumer] final class Runloop private ( } && policy ) + // noinspection YieldingZIOEffectInspection + private[internal] def commitAccumBatch[R]( + commitSchedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + for { + acc <- Ref.Synchronized.make(Map.empty[TopicPartition, Long] -> List.empty[Promise[Throwable, Unit]]) + _ <- acc.updateZIO { case data @ (offsets, promises) => + if (offsets.isEmpty) ZIO.succeed(data) + else + commit(offsets) + .foldZIO( + e => ZIO.foreachDiscard(promises)(_.fail(e)), + _ => ZIO.foreachDiscard(promises)(_.succeed(())) + ) + .as((Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]])) + } + .schedule(commitSchedule) + .forkIn(runloopScope) + } yield { (records: Chunk[CommittableRecord[_, _]]) => + for { + p <- Promise.make[Throwable, Unit] + _ <- acc.update { case (offsets, promises) => + val newOffsets = offsets ++ records.map(record => record.topicPartition -> record.record.offset()) + val newPromises = promises :+ p + (newOffsets, newPromises) + } + } yield p.await + } + private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => for { @@ -587,6 +617,7 @@ private[consumer] object Runloop { initialState = State.initial currentStateRef <- Ref.make(initialState) runtime <- ZIO.runtime[Any] + scope <- ZIO.scope runloop = new Runloop( runtime = runtime, hasGroupId = hasGroupId, @@ -602,7 +633,8 @@ private[consumer] object Runloop { userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, currentStateRef = currentStateRef, - fetchStrategy = fetchStrategy + fetchStrategy = fetchStrategy, + runloopScope = scope ) _ <- ZIO.logDebug("Starting Runloop") diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index a80c9b6e7..fc0272329 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -30,17 +30,22 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO[R, E]( + private def withRunloopZIO__[R, E, A]( shouldStartIfNot: Boolean - )(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] = + )(whenRunning: Runloop => ZIO[R, E, A])(orElse: ZIO[R, E, A]): ZIO[R, E, A] = runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { - case RunloopState.NotStarted => ZIO.unit + case RunloopState.NotStarted => orElse case RunloopState.Started(runloop) => whenRunning(runloop) - case RunloopState.Finalized => ZIO.unit + case RunloopState.Finalized => orElse } + private def withRunloopZIO[R, E](shouldStartIfNot: Boolean)( + whenRunning: Runloop => ZIO[R, E, Unit] + ): ZIO[R, E, Unit] = + withRunloopZIO__(shouldStartIfNot)(whenRunning)(ZIO.unit) + /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. */ @@ -71,6 +76,13 @@ private[consumer] final class RunloopAccess private ( def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) + def commitAccumBatch[R]( + commitschedule: Schedule[R, Any, Any] + ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))( + ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => ZIO.succeed(ZIO.unit)) + ) + } private[consumer] object RunloopAccess { From fc7632cbe11d8659110a608d590c66e4d743a749 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 19:45:40 +0400 Subject: [PATCH 04/14] Refined implementation of https://github.com/zio/zio-kafka/pull/1022#issuecomment-1712502924 --- .../scala/zio/kafka/consumer/Committer.scala | 49 +++++++++++++++++++ .../scala/zio/kafka/consumer/Consumer.scala | 8 +-- .../zio/kafka/consumer/internal/Runloop.scala | 29 +---------- .../consumer/internal/RunloopAccess.scala | 10 ++-- .../scala/zio/kafka/utils/PendingCommit.scala | 36 ++++++++++++++ 5 files changed, 92 insertions(+), 40 deletions(-) create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala new file mode 100644 index 000000000..e2f8ba936 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala @@ -0,0 +1,49 @@ +package zio.kafka.consumer + +import org.apache.kafka.common.TopicPartition +import zio.kafka.utils.PendingCommit +import zio.{Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO} + +trait Committer { + def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] +} + +//noinspection ConvertExpressionToSAM +object Committer { + private val emptyState: (Map[TopicPartition, Long], List[Promise[Throwable, Unit]]) = + (Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]]) + + val unit: Committer = + new Committer { + override def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + ZIO.succeed(PendingCommit.unit.asInstanceOf[PendingCommit[Throwable, Unit]]) + } + + private[zio] def fromSchedule[R]( + commitSchedule: Schedule[R, Any, Any], + commit: Map[TopicPartition, Long] => Task[Unit], + scope: Scope + ): URIO[R, Committer] = + for { + acc <- Ref.Synchronized.make(emptyState) + _ <- acc.updateZIO { case data @ (offsets, promises) => + if (offsets.isEmpty) ZIO.succeed(data) + else + commit(offsets) + .foldZIO( + e => ZIO.foreachDiscard(promises)(_.fail(e)), + _ => ZIO.foreachDiscard(promises)(_.succeed(())) + ) + .as(emptyState) + } + .schedule(commitSchedule) + .forkIn(scope) + } yield new Committer { + override def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + for { + p <- Promise.make[Throwable, Unit] + newOffsets = records.map(record => record.topicPartition -> record.record.offset()) + _ <- acc.update { case (offsets, promises) => (offsets ++ newOffsets, promises :+ p) } + } yield PendingCommit(p) + } +} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 41a14e141..5ec6e2a2b 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -49,9 +49,7 @@ trait Consumer { def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] - def commitAccumBatch[R]( - commitschedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] + def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] @@ -463,9 +461,7 @@ private[consumer] final class ConsumerLive private[consumer] ( override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = runloopAccess.commitOrRetry(policy)(record) - override def commitAccumBatch[R]( - commitSchedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = + override def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = runloopAccess.commitAccumBatch(commitSchedule) override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 078b75971..10e747619 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -130,33 +130,8 @@ private[consumer] final class Runloop private ( ) // noinspection YieldingZIOEffectInspection - private[internal] def commitAccumBatch[R]( - commitSchedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = - for { - acc <- Ref.Synchronized.make(Map.empty[TopicPartition, Long] -> List.empty[Promise[Throwable, Unit]]) - _ <- acc.updateZIO { case data @ (offsets, promises) => - if (offsets.isEmpty) ZIO.succeed(data) - else - commit(offsets) - .foldZIO( - e => ZIO.foreachDiscard(promises)(_.fail(e)), - _ => ZIO.foreachDiscard(promises)(_.succeed(())) - ) - .as((Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]])) - } - .schedule(commitSchedule) - .forkIn(runloopScope) - } yield { (records: Chunk[CommittableRecord[_, _]]) => - for { - p <- Promise.make[Throwable, Unit] - _ <- acc.update { case (offsets, promises) => - val newOffsets = offsets ++ records.map(record => record.topicPartition -> record.record.offset()) - val newPromises = promises :+ p - (newOffsets, newPromises) - } - } yield p.await - } + private[internal] def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = + Committer.fromSchedule(commitSchedule, commit, runloopScope) private val commit: Map[TopicPartition, Long] => Task[Unit] = offsets => diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index fc0272329..2afa2f2c9 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -5,7 +5,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ CommittableRecord, ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ CommittableRecord, Committer, ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ @@ -76,12 +76,8 @@ private[consumer] final class RunloopAccess private ( def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) - def commitAccumBatch[R]( - commitschedule: Schedule[R, Any, Any] - ): URIO[R, Chunk[CommittableRecord[_, _]] => UIO[Task[Unit]]] = - withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))( - ZIO.succeed((_: Chunk[CommittableRecord[_, _]]) => ZIO.succeed(ZIO.unit)) - ) + def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] = + withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))(ZIO.succeed(Committer.unit)) } diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala b/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala new file mode 100644 index 000000000..b08984c71 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala @@ -0,0 +1,36 @@ +package zio.kafka.utils + +import zio.{ IO, Promise, Trace, ZIO } + +/** + * Less powerful interface than Promise + * + * Avoid leaking to the users the Promise methods we don't want them to have access to. + */ +// noinspection ConvertExpressionToSAM +trait PendingCommit[E, A] { self => + + def awaitCommit(implicit trace: Trace): IO[E, A] + + final def combineDiscard(other: PendingCommit[E, _]): PendingCommit[E, Unit] = + new PendingCommit[E, Unit] { + override def awaitCommit(implicit trace: Trace): IO[E, Unit] = + for { + _ <- self.awaitCommit + _ <- other.awaitCommit + } yield () + } +} + +//noinspection ConvertExpressionToSAM +object PendingCommit { + private[zio] def apply[E, A](p: Promise[E, A]): PendingCommit[E, A] = + new PendingCommit[E, A] { + override def awaitCommit(implicit trace: Trace): IO[E, A] = p.await + } + + val unit: PendingCommit[Nothing, Unit] = + new PendingCommit[Nothing, Unit] { + override def awaitCommit(implicit trace: Trace): IO[Nothing, Unit] = ZIO.unit + } +} From 30cdd0982b5a4aab243ee1b2bc010bca23f59176 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 20:38:05 +0400 Subject: [PATCH 05/14] Refined implementation of https://github.com/zio/zio-kafka/pull/1022#issuecomment-1712502924 --- .../kafka/consumer/CommittableRecord.scala | 14 +--- .../scala/zio/kafka/consumer/Committer.scala | 8 ++- .../scala/zio/kafka/consumer/Consumer.scala | 30 ++++----- .../scala/zio/kafka/consumer/Offset.scala | 47 -------------- .../zio/kafka/consumer/OffsetBatch.scala | 65 ------------------- 5 files changed, 21 insertions(+), 143 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala index 6d3a5e494..aabaf426e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala @@ -2,12 +2,11 @@ package zio.kafka.consumer import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } import org.apache.kafka.common.TopicPartition +import zio.RIO import zio.kafka.serde.Deserializer -import zio.{ RIO, Task } final case class CommittableRecord[K, V]( record: ConsumerRecord[K, V], - private val commitHandle: Map[TopicPartition, Long] => Task[Unit], private val consumerGroupMetadata: Option[ConsumerGroupMetadata] ) { def deserializeWith[R, K1, V1]( @@ -39,26 +38,15 @@ final case class CommittableRecord[K, V]( def timestamp: Long = record.timestamp() lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition()) - - def offset: Offset = - OffsetImpl( - topic = record.topic(), - partition = record.partition(), - offset = record.offset(), - commitHandle = commitHandle, - consumerGroupMetadata = consumerGroupMetadata - ) } object CommittableRecord { def apply[K, V]( record: ConsumerRecord[K, V], - commitHandle: Map[TopicPartition, Long] => Task[Unit], consumerGroupMetadata: Option[ConsumerGroupMetadata] ): CommittableRecord[K, V] = new CommittableRecord( record = record, - commitHandle = commitHandle, consumerGroupMetadata = consumerGroupMetadata ) } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala index e2f8ba936..a47c51e5d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala @@ -2,10 +2,16 @@ package zio.kafka.consumer import org.apache.kafka.common.TopicPartition import zio.kafka.utils.PendingCommit -import zio.{Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO} +import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO } trait Committer { def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] + + final def commitAndAwait(records: Chunk[CommittableRecord[_, _]]): Task[Unit] = + commit(records).flatMap(_.awaitCommit) + + final def commitAndForget(records: Chunk[CommittableRecord[_, _]]): UIO[Unit] = + commit(records).unit } //noinspection ConvertExpressionToSAM diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 5ec6e2a2b..bcecab5f6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -1,17 +1,12 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ - Consumer => JConsumer, - ConsumerRecord, - OffsetAndMetadata, - OffsetAndTimestamp -} +import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, Consumer => JConsumer} import org.apache.kafka.common._ import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } -import zio.kafka.serde.{ Deserializer, Serde } +import zio.kafka.consumer.internal.{ConsumerAccess, RunloopAccess} +import zio.kafka.serde.{Deserializer, Serde} import zio.kafka.utils.SslHelper import zio.stream._ @@ -169,9 +164,6 @@ object Consumer { case object RunloopTimeout extends RuntimeException("Timeout in Runloop") with NoStackTrace case object CommitTimeout extends RuntimeException("Commit timeout") with NoStackTrace - val offsetBatches: ZSink[Any, Nothing, Offset, Nothing, OffsetBatch] = - ZSink.foldLeft[Offset, OffsetBatch](OffsetBatch.empty)(_ add _) - def live: RLayer[ConsumerSettings & Diagnostics, Consumer] = ZLayer.scoped { for { @@ -232,6 +224,9 @@ object Consumer { ): RIO[Consumer, Map[TopicPartition, Option[OffsetAndMetadata]]] = ZIO.serviceWithZIO(_.committed(partitions, timeout)) + def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R & Consumer, Committer] = + ZIO.serviceWithZIO[Consumer](_.commitAccumBatch(commitschedule)) + /** * Accessor method */ @@ -423,7 +418,6 @@ private[consumer] final class ConsumerLive private[consumer] ( consumer: ConsumerAccess, runloopAccess: RunloopAccess ) extends Consumer { - import Consumer._ override def assignment: Task[Set[TopicPartition]] = consumer.withConsumer(_.assignment().asScala.toSet) @@ -525,15 +519,17 @@ private[consumer] final class ConsumerLive private[consumer] ( f: ConsumerRecord[K, V] => URIO[R1, Unit] ): ZIO[R & R1, Throwable, Unit] = for { - r <- ZIO.environment[R & R1] + r <- ZIO.environment[R & R1] + committer <- commitAccumBatch(commitRetryPolicy) _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - partitionStream.mapChunksZIO(_.mapZIO((c: CommittableRecord[K, V]) => f(c.record).as(c.offset))) + partitionStream.mapChunksZIO(records => + records.mapZIO((r: CommittableRecord[K, V]) => f(r.record)).as(records) + ) } - .provideEnvironment(r) - .aggregateAsync(offsetBatches) - .mapZIO(_.commitOrRetry(commitRetryPolicy)) + .mapChunksZIO(committer.commitAndAwait(_).as(Chunk.empty)) .runDrain + .provideEnvironment(r) } yield () override def offsetsForTimes( diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala deleted file mode 100644 index 69f8b9842..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala +++ /dev/null @@ -1,47 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, RetriableCommitFailedException } -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task } - -sealed trait Offset { - def topic: String - def partition: Int - def offset: Long - def commit: Task[Unit] - def batch: OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - final def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) - - final lazy val topicPartition: TopicPartition = new TopicPartition(topic, partition) -} - -object Offset { - private[consumer] def commitOrRetry[R, B]( - commit: Task[Unit], - policy: Schedule[R, Throwable, B] - ): RIO[R, Unit] = - commit.retry( - Schedule.recurWhile[Throwable] { - case _: RetriableCommitFailedException => true - case _ => false - } && policy - ) -} - -private final case class OffsetImpl( - topic: String, - partition: Int, - offset: Long, - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends Offset { - def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset)) - def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala deleted file mode 100644 index 3c0c0f6cc..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/OffsetBatch.scala +++ /dev/null @@ -1,65 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.ConsumerGroupMetadata -import org.apache.kafka.common.TopicPartition -import zio.{ RIO, Schedule, Task, ZIO } - -sealed trait OffsetBatch { - def offsets: Map[TopicPartition, Long] - def commit: Task[Unit] - def add(offset: Offset): OffsetBatch - @deprecated("Use add(Offset) instead", "2.1.4") - def merge(offset: Offset): OffsetBatch - def merge(offsets: OffsetBatch): OffsetBatch - def consumerGroupMetadata: Option[ConsumerGroupMetadata] - - /** - * Attempts to commit and retries according to the given policy when the commit fails with a - * RetriableCommitFailedException - */ - def commitOrRetry[R](policy: Schedule[R, Throwable, Any]): RIO[R, Unit] = - Offset.commitOrRetry(commit, policy) -} - -object OffsetBatch { - val empty: OffsetBatch = EmptyOffsetBatch - - def apply(offsets: Iterable[Offset]): OffsetBatch = offsets.foldLeft(empty)(_ add _) -} - -private final case class OffsetBatchImpl( - offsets: Map[TopicPartition, Long], - commitHandle: Map[TopicPartition, Long] => Task[Unit], - consumerGroupMetadata: Option[ConsumerGroupMetadata] -) extends OffsetBatch { - override def commit: Task[Unit] = commitHandle(offsets) - - override def add(offset: Offset): OffsetBatch = - copy( - offsets = offsets + (offset.topicPartition -> (offsets - .getOrElse(offset.topicPartition, -1L) max offset.offset)) - ) - - override def merge(offset: Offset): OffsetBatch = add(offset) - - override def merge(otherOffsets: OffsetBatch): OffsetBatch = { - val newOffsets = Map.newBuilder[TopicPartition, Long] - newOffsets ++= offsets - otherOffsets.offsets.foreach { case (tp, offset) => - val existing = offsets.getOrElse(tp, -1L) - if (existing < offset) - newOffsets += tp -> offset - } - - copy(offsets = newOffsets.result()) - } -} - -case object EmptyOffsetBatch extends OffsetBatch { - override val offsets: Map[TopicPartition, Long] = Map.empty - override val commit: Task[Unit] = ZIO.unit - override def add(offset: Offset): OffsetBatch = offset.batch - override def merge(offset: Offset): OffsetBatch = add(offset) - override def merge(offsets: OffsetBatch): OffsetBatch = offsets - override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None -} From a2f446e696a20d9aa565a92e8c77f6402b811603 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 22:02:28 +0400 Subject: [PATCH 06/14] Cleaned implementation of the new commit interface --- build.sbt | 2 + .../zio/kafka/bench/ConsumerBenchmark.scala | 29 ++++--- .../src/test/scala/zio/kafka/AdminSpec.scala | 17 ++-- .../test/scala/zio/kafka/ProducerSpec.scala | 22 +++--- .../zio/kafka/consumer/ConsumerSpec.scala | 72 ++++++----------- .../kafka/consumer/SubscriptionsSpec.scala | 31 +++----- .../kafka/consumer/CommittableRecord.scala | 52 ------------ .../scala/zio/kafka/consumer/Committer.scala | 25 +++--- .../scala/zio/kafka/consumer/Consumer.scala | 79 +++++++++++++------ .../internal/PartitionStreamControl.scala | 12 +-- .../zio/kafka/consumer/internal/Runloop.scala | 42 ++++------ .../consumer/internal/RunloopAccess.scala | 15 ++-- .../main/scala/zio/kafka/consumer/types.scala | 58 ++++++++++++++ .../zio/kafka/producer/Transaction.scala | 2 +- .../producer/TransactionalProducer.scala | 22 ++++-- 15 files changed, 244 insertions(+), 236 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala create mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/types.scala diff --git a/build.sbt b/build.sbt index 049d5433e..04cfbca5d 100644 --- a/build.sbt +++ b/build.sbt @@ -7,6 +7,7 @@ lazy val kafkaClients = "org.apache.kafka" % "kafka-clients" lazy val scalaCollectionCompat = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" lazy val jacksonDatabind = "com.fasterxml.jackson.core" % "jackson-databind" % "2.15.2" lazy val logback = "ch.qos.logback" % "logback-classic" % "1.3.11" +lazy val zioPrelude = "dev.zio" %% "zio-prelude" % "1.0.0-RC20" enablePlugins(ZioSbtEcosystemPlugin, ZioSbtCiPlugin) @@ -102,6 +103,7 @@ lazy val zioKafka = .settings(enableZIO(enableStreaming = true)) .settings( libraryDependencies ++= Seq( + zioPrelude, kafkaClients, jacksonDatabind, scalaCollectionCompat diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index b85b413fd..56bba3a57 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -5,13 +5,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig import org.openjdk.jmh.annotations._ import zio.kafka.bench.ZioBenchmark.randomThing import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.{ Consumer, Offset, OffsetBatch, Subscription } +import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.Kafka import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer } -import zio.stream.ZSink -import zio.{ durationInt, Ref, Schedule, ZIO, ZLayer } +import zio.{ Chunk, Ref, ZIO, ZLayer } import java.util.concurrent.TimeUnit @@ -60,18 +59,18 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { def throughputWithCommits(): Any = runZIO { for { counter <- Ref.make(0) - _ <- ZIO.logAnnotate("consumer", "1") { - Consumer - .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .map(_.offset) - .aggregateAsyncWithin(ZSink.collectAll[Offset], Schedule.fixed(100.millis)) - .tap(batch => counter.update(_ + batch.size)) - .map(OffsetBatch.apply) - .mapZIO(_.commit) - .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) - .runDrain - .provideSome[Kafka](env) - } + _ <- ZIO + .logAnnotate("consumer", "1") { + Consumer + .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) + .tap { _ => + counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages)) + } + .mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(Chunk.empty)) + .takeUntilZIO((_: Chunk[_]) => counter.get.map(_ >= nrMessages)) + .runDrain + } + .provideSome[Kafka](env) } yield () } } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala index 33416e7d0..9a103acf7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/AdminSpec.scala @@ -23,7 +23,7 @@ import zio.kafka.admin.AdminClient.{ } import zio.kafka.admin.acl._ import zio.kafka.admin.resource.{ PatternType, ResourcePattern, ResourcePatternFilter, ResourceType } -import zio.kafka.consumer.{ CommittableRecord, Consumer, OffsetBatch, Subscription } +import zio.kafka.consumer.{ Consumer, Subscription } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit._ @@ -228,13 +228,8 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .partitionedStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMapPar(partitionCount)(_._2) .take(count) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka](consumer("adminspec-topic10", Some(consumerGroupID))) @@ -301,7 +296,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -344,7 +339,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream[Kafka, String, String](Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(count) - .foreach(_.offset.commit) + .foreach(Consumer.commit) .provideSomeLayer[Kafka](consumer(topic, Some(groupId))) KafkaTestUtils.withAdmin { client => @@ -645,7 +640,7 @@ object AdminSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { groupInstanceId: Option[String] = None ): ZIO[Kafka, Throwable, Unit] = Consumer .plainStream(Subscription.topics(topicName), Serde.string, Serde.string) - .foreach(_.offset.commit) + .foreach(Consumer.commit) .provideSomeLayer(consumer(clientId, Some(groupId), groupInstanceId)) private def getStableConsumerGroupDescription( diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index dc870f573..457a4580f 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -1,11 +1,13 @@ package zio.kafka +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import zio._ import zio.kafka.admin.AdminClient.NewTopic import zio.kafka.consumer._ +import zio.kafka.consumer.types.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.producer.{ ByteRecord, Producer, Transaction, TransactionalProducer } import zio.kafka.serde.Serde @@ -26,7 +28,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { def withConsumerInt( subscription: Subscription, settings: ConsumerSettings - ): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, CommittableRecord[String, Int]]]] = + ): ZIO[Any with Scope, Throwable, Dequeue[Take[Throwable, ConsumerRecord[String, Int]]]] = Consumer.make(settings).flatMap { c => c.plainStream(subscription, Serde.string, Serde.int).toQueue() } @@ -193,7 +195,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) record = messages - .filter(rec => rec.record.key == key1 && rec.record.value == value1) + .filter(rec => rec.key == key1 && rec.value == value1) } yield record } } @@ -201,7 +203,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { withConsumer(Topics(Set(topic2)), settings).flatMap { consumer => for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == key2 && rec.record.value == value2) + record = messages.filter(rec => rec.key == key2 && rec.value == value2) } yield record } } @@ -289,7 +291,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") + record = messages.filter(_.key == "bob") } yield record } } @@ -329,7 +331,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "bob") + record = messages.filter(_.key == "bob") } yield record } } @@ -417,7 +419,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } @@ -458,7 +460,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } } @@ -504,7 +506,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { messages <- consumer.take .flatMap(_.done) .mapError(_.getOrElse(new NoSuchElementException)) - record = messages.filter(rec => rec.record.key == "no one") + record = messages.filter(_.key == "no one") } yield record } } @@ -544,7 +546,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { aliceAccountFeesPaid, Serde.string, Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) + Some(Offset.from(aliceHadMoneyCommittableMessage)) ) } } @@ -591,7 +593,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { aliceAccountFeesPaid, Serde.string, Serde.int, - Some(aliceHadMoneyCommittableMessage.offset) + Some(Offset.from(aliceHadMoneyCommittableMessage)) ) *> t.abort } diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index d68b93a8a..02b6cc28c 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -4,6 +4,7 @@ import io.github.embeddedkafka.EmbeddedKafka import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerPartitionAssignor, + ConsumerRecord, CooperativeStickyAssignor, RangeAssignor } @@ -19,6 +20,7 @@ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.{ SubscriptionFinalized } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } +import zio.kafka.consumer.types.OffsetBatch import zio.kafka.producer.{ Producer, TransactionalProducer } import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ @@ -58,7 +60,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("chunk sizes") { @@ -96,7 +98,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(clientId = client)) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("Consuming+provideCustomLayer") { @@ -113,7 +115,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(100) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("plainStream emits messages for a pattern subscription") { @@ -128,7 +130,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(5) .runCollect .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = records.map(r => (r.record.key, r.record.value)).toList + kvOut = records.map(r => (r.key, r.value)).toList } yield assert(kvOut)(equalTo(kvs)) }, test("receive only messages from the subscribed topic-partition when creating a manual subscription") { @@ -148,7 +150,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .take(1) .runHead .provideSomeLayer[Kafka](consumer(client, Some(group))) - kvOut = record.map(r => (r.record.key, r.record.value)) + kvOut = record.map(r => (r.key, r.value)) } yield assert(kvOut)(isSome(equalTo("key2" -> "msg2"))) }, test("receive from the right offset when creating a manual subscription with manual seeking") { @@ -173,7 +175,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .provideSomeLayer[Kafka]( consumer(client, Some(group), offsetRetrieval = offsetRetrieval) ) - kvOut = record.map(r => (r.record.key, r.record.value)) + kvOut = record.map(r => (r.key, r.value)) } yield assert(kvOut)(isSome(equalTo("key2-3" -> "msg2-3"))) }, test("restart from the committed position") { @@ -191,13 +193,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .filter(_._1 == new TopicPartition(topic, 0)) .flatMap(_._2) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](5)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka]( consumer(first, Some(group)) @@ -209,13 +206,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .partitionedStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) .flatMap(_._2) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](20)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](20)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka]( consumer(second, Some(group)) @@ -287,7 +279,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .zipWithIndex .tap { case (record, idx) => (Consumer.stopConsumption <* ZIO.logDebug("Stopped consumption")).when(idx == 3) *> - record.offset.commit <* ZIO.logDebug(s"Committed $idx") + Consumer.commit(record) <* ZIO.logDebug(s"Committed $idx") } .tap { case (_, idx) => ZIO.logDebug(s"Consumed $idx") } .runDrain @@ -312,10 +304,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { for { nr <- messagesReceived.updateAndGet(_ + 1) _ <- Consumer.stopConsumption.when(nr == 10) - } yield if (nr < 10) Seq(record.offset) else Seq.empty + } yield if (nr < 10) Seq(record) else Seq.empty } - .transduce(Consumer.offsetBatches) - .mapZIO(_.commit) + .mapChunksZIO(records => Consumer.commit(records).as(records)) .runDrain *> Consumer.committed(Set(new TopicPartition(topic, 0))).map(_.values.head)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -339,11 +330,10 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { subscription = Subscription.topics(topic) offsets <- (Consumer .partitionedStream(subscription, Serde.string, Serde.string) - .flatMapPar(nrPartitions)(_._2.map(_.offset)) + .flatMapPar(nrPartitions)(_._2) .take(nrMessages.toLong) - .transduce(Consumer.offsetBatches) + .mapChunksZIO(records => Consumer.commit(records).as(records)) .take(1) - .mapZIO(_.commit) .runDrain *> Consumer.committed((0 until nrPartitions).map(new TopicPartition(topic, _)).toSet)) .provideSomeLayer[Kafka](consumer(client, Some(group))) @@ -460,13 +450,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) .take(5) - .transduce(ZSink.collectAllN[CommittableRecord[String, String]](5)) - .mapConcatZIO { committableRecords => - val records = committableRecords.map(_.record) - val offsetBatch = OffsetBatch(committableRecords.map(_.offset)) - - offsetBatch.commit.as(records) - } + .transduce(ZSink.collectAllN[ConsumerRecord[String, String]](5)) + .mapConcatZIO(records => Consumer.commit(records).as(records)) .runCollect .provideSomeLayer[Kafka](consumer(client1, Some(group))) // Start a new consumer with manual offset before the committed offset @@ -474,7 +459,6 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { secondResults <- Consumer .plainStream(Subscription.topics(topic), Serde.string, Serde.string) .take(nrRecords.toLong - manualOffsetSeek) - .map(_.record) .runCollect .provideSomeLayer[Kafka]( consumer(client2, Some(group), offsetRetrieval = offsetRetrieval) @@ -522,7 +506,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { newMessage <- Consumer .plainStream(subscription, Serde.string, Serde.string) .take(1) - .map(r => (r.record.key(), r.record.value())) + .map(r => (r.key(), r.value())) .run(ZSink.collectAll[(String, String)]) .map(_.head) .orDie @@ -675,7 +659,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => ZStream.finalizer(ZIO.logDebug(s"TP ${tp.toString} finalizer")) *> partitionStream.mapChunksZIO { records => - OffsetBatch(records.map(_.offset)).commit *> messagesReceived(tp.partition) + Consumer.commit(records) *> messagesReceived(tp.partition) .update(_ + records.size) .as(records) } @@ -812,9 +796,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .fromIterable(partitions.map(_._2)) .flatMapPar(Int.MaxValue)(s => s) .mapZIO(record => messagesReceivedConsumer1.update(_ + 1).as(record)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapChunksZIO(Consumer.commit(_).as(Chunk.empty)) .runDrain } .mapZIO(_ => drainCount.updateAndGet(_ + 1)) @@ -837,9 +819,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Consumer .plainStream(subscription, Serde.string, Serde.string) .mapZIO(record => messagesReceivedConsumer2.update(_ + 1).as(record)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(offsetBatch => offsetBatch.commit) + .mapChunksZIO(Consumer.commit(_).as(Chunk.empty)) .runDrain .provideSomeLayer[Kafka]( customConsumer("consumer2", Some(group)) @@ -926,7 +906,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { records.map(r => new ProducerRecord(toTopic, r.key, r.value)), Serde.string, Serde.string, - OffsetBatch(records.map(_.offset)) + OffsetBatch.from(records) ) _ <- consumedMessagesCounter.update(_ + records.size) } yield Chunk.empty @@ -1205,9 +1185,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { result <- Consumer .plainStream(Subscription.Topics(Set(topic)), Serde.string, Serde.string) .take(11) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) // Hangs without timeout + .mapChunksZIO(Consumer.commit(_).as(Chunk.empty)) .runDrain .exit .provideSomeLayer[Kafka](consumer(client, Some(group), commitTimeout = 2.seconds)) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index d33bdab52..bc4f3bede 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -1,14 +1,14 @@ package zio.kafka.consumer import io.github.embeddedkafka.EmbeddedKafka -import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.clients.consumer.{ ConsumerConfig, ConsumerRecord } import zio._ import zio.kafka.ZIOSpecDefaultSlf4j import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.KafkaTestUtils._ import zio.kafka.testkit.{ Kafka, KafkaRandom } -import zio.stream.{ ZSink, ZStream } +import zio.stream.ZStream import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ @@ -42,12 +42,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runCollect) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) (records1, records2) = records - kvOut1 = records1.map(r => (r.record.key, r.record.value)).toList - kvOut2 = records2.map(r => (r.record.key, r.record.value)).toList + kvOut1 = records1.map(r => (r.key, r.value)).toList + kvOut2 = records2.map(r => (r.key, r.value)).toList } yield assertTrue(kvOut1 == kvs) && assertTrue(kvOut2 == kvs) && - assertTrue(records1.map(_.record.topic()).forall(_ == topic1)) && - assertTrue(records2.map(_.record.topic()).forall(_ == topic2)) + assertTrue(records1.map(_.topic()).forall(_ == topic1)) && + assertTrue(records2.map(_.topic()).forall(_ == topic2)) }, test("consumes from two pattern subscriptions") { val kvs = (1 to 5).toList.map(i => (s"key$i", s"msg$i")) @@ -71,12 +71,12 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { .runCollect) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) (records1, records2) = records - kvOut1 = records1.map(r => (r.record.key, r.record.value)).toList - kvOut2 = records2.map(r => (r.record.key, r.record.value)).toList + kvOut1 = records1.map(r => (r.key, r.value)).toList + kvOut2 = records2.map(r => (r.key, r.value)).toList } yield assertTrue(kvOut1 == kvs) && assertTrue(kvOut2 == kvs) && - assertTrue(records1.map(_.record.topic()).forall(_ == topic1)) && - assertTrue(records2.map(_.record.topic()).forall(_ == topic2)) + assertTrue(records1.map(_.topic()).forall(_ == topic1)) && + assertTrue(records2.map(_.topic()).forall(_ == topic2)) }, test( "gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription" @@ -261,23 +261,18 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceMany(topic1, kvs) - recordsConsumed <- Ref.make(Chunk.empty[CommittableRecord[String, String]]) + recordsConsumed <- Ref.make(Chunk.empty[ConsumerRecord[String, String]]) _ <- Consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) .take(40) - .transduce( - Consumer.offsetBatches.contramap[CommittableRecord[String, String]](_.offset) <&> ZSink - .collectAll[CommittableRecord[String, String]] - ) - .mapZIO { case (offsetBatch, records) => offsetBatch.commit.as(records) } - .flattenChunks + .mapChunksZIO(records => Consumer.commit(records).as(records)) .runCollect .tap(records => recordsConsumed.update(_ ++ records)) .repeatN(24) .provideSomeLayer[Kafka with Scope](consumer(client, Some(group))) consumed <- recordsConsumed.get - } yield assert(consumed.map(r => r.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2)))) + } yield assert(consumed.map(_.value))(hasSameElements(Chunk.fromIterable(kvs.map(_._2)))) } @@ TestAspect.nonFlaky(3) ) .provideSome[Scope & Kafka](producer) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala deleted file mode 100644 index aabaf426e..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/CommittableRecord.scala +++ /dev/null @@ -1,52 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, ConsumerRecord } -import org.apache.kafka.common.TopicPartition -import zio.RIO -import zio.kafka.serde.Deserializer - -final case class CommittableRecord[K, V]( - record: ConsumerRecord[K, V], - private val consumerGroupMetadata: Option[ConsumerGroupMetadata] -) { - def deserializeWith[R, K1, V1]( - keyDeserializer: Deserializer[R, K1], - valueDeserializer: Deserializer[R, V1] - )(implicit ev1: K <:< Array[Byte], ev2: V <:< Array[Byte]): RIO[R, CommittableRecord[K1, V1]] = - for { - key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) - value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) - } yield copy( - record = new ConsumerRecord[K1, V1]( - record.topic(), - record.partition(), - record.offset(), - record.timestamp(), - record.timestampType(), - record.serializedKeySize(), - record.serializedValueSize(), - key, - value, - record.headers(), - record.leaderEpoch() - ) - ) - - def key: K = record.key - def value: V = record.value() - def partition: Int = record.partition() - def timestamp: Long = record.timestamp() - - lazy val topicPartition: TopicPartition = new TopicPartition(record.topic(), record.partition()) -} - -object CommittableRecord { - def apply[K, V]( - record: ConsumerRecord[K, V], - consumerGroupMetadata: Option[ConsumerGroupMetadata] - ): CommittableRecord[K, V] = - new CommittableRecord( - record = record, - consumerGroupMetadata = consumerGroupMetadata - ) -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala index a47c51e5d..255573f24 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala @@ -1,33 +1,37 @@ package zio.kafka.consumer -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.consumer.ConsumerRecord +import zio.kafka.consumer.types.OffsetBatch import zio.kafka.utils.PendingCommit import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO } trait Committer { - def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] + def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] - final def commitAndAwait(records: Chunk[CommittableRecord[_, _]]): Task[Unit] = + final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + commit(OffsetBatch.from(records)) + + final def commitAndAwait(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = commit(records).flatMap(_.awaitCommit) - final def commitAndForget(records: Chunk[CommittableRecord[_, _]]): UIO[Unit] = + final def commitAndForget(records: Chunk[ConsumerRecord[_, _]]): UIO[Unit] = commit(records).unit } //noinspection ConvertExpressionToSAM object Committer { - private val emptyState: (Map[TopicPartition, Long], List[Promise[Throwable, Unit]]) = - (Map.empty[TopicPartition, Long], List.empty[Promise[Throwable, Unit]]) + private val emptyState: (OffsetBatch, List[Promise[Throwable, Unit]]) = + (OffsetBatch.empty, List.empty[Promise[Throwable, Unit]]) val unit: Committer = new Committer { - override def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] = ZIO.succeed(PendingCommit.unit.asInstanceOf[PendingCommit[Throwable, Unit]]) } private[zio] def fromSchedule[R]( commitSchedule: Schedule[R, Any, Any], - commit: Map[TopicPartition, Long] => Task[Unit], + commit: OffsetBatch => Task[Unit], scope: Scope ): URIO[R, Committer] = for { @@ -45,11 +49,10 @@ object Committer { .schedule(commitSchedule) .forkIn(scope) } yield new Committer { - override def commit(records: Chunk[CommittableRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] = for { p <- Promise.make[Throwable, Unit] - newOffsets = records.map(record => record.topicPartition -> record.record.offset()) - _ <- acc.update { case (offsets, promises) => (offsets ++ newOffsets, promises :+ p) } + _ <- acc.update { case (offsets, promises) => (offsets merge offsetBatch, promises :+ p) } } yield PendingCommit(p) } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index bcecab5f6..b810a60a5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -1,12 +1,18 @@ package zio.kafka.consumer -import org.apache.kafka.clients.consumer.{ConsumerRecord, OffsetAndMetadata, OffsetAndTimestamp, Consumer => JConsumer} +import org.apache.kafka.clients.consumer.{ + Consumer => JConsumer, + ConsumerRecord, + OffsetAndMetadata, + OffsetAndTimestamp +} import org.apache.kafka.common._ import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.{ConsumerAccess, RunloopAccess} -import zio.kafka.serde.{Deserializer, Serde} +import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } +import zio.kafka.consumer.types.{ deserializeWith, OffsetBatch } +import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper import zio.stream._ @@ -32,7 +38,9 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Long]] - def commit(record: CommittableRecord[_, _]): Task[Unit] + def commit(record: ConsumerRecord[_, _]): Task[Unit] + def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] + def commit(offsetBatch: OffsetBatch): Task[Unit] /** * Retrieve the last committed offset for the given topic-partitions @@ -42,7 +50,9 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: ConsumerRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] @@ -68,7 +78,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] /** * Create a stream with messages on the subscribed topic-partitions by topic-partition @@ -91,7 +101,7 @@ trait Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] + ): Stream[Throwable, (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])] /** * Create a stream with all messages on the subscribed topic-partitions @@ -116,7 +126,7 @@ trait Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R, Throwable, CommittableRecord[K, V]] + ): ZStream[R, Throwable, ConsumerRecord[K, V]] /** * Stops consumption of data, drains buffered records, and ends the attached streams while still serving commit @@ -215,6 +225,15 @@ object Consumer { ): RIO[Consumer, Map[TopicPartition, Long]] = ZIO.serviceWithZIO(_.beginningOffsets(partitions, timeout)) + def commit(record: ConsumerRecord[_, _]): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(record)) + + def commit(records: Chunk[ConsumerRecord[_, _]]): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(records)) + + def commit(offsetBatch: OffsetBatch): RIO[Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commit(offsetBatch)) + /** * Accessor method */ @@ -251,7 +270,7 @@ object Consumer { subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = + ): ZStream[Consumer, Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] = ZStream.serviceWithStream[Consumer](_.partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer)) /** @@ -264,7 +283,7 @@ object Consumer { ): ZStream[ Consumer, Throwable, - (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]]) + (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]]) ] = ZStream.serviceWithStream[Consumer](_.partitionedStream(subscription, keyDeserializer, valueDeserializer)) @@ -276,7 +295,7 @@ object Consumer { keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int = 4 - ): ZStream[R & Consumer, Throwable, CommittableRecord[K, V]] = + ): ZStream[R & Consumer, Throwable, ConsumerRecord[K, V]] = ZStream.serviceWithStream[Consumer]( _.plainStream(subscription, keyDeserializer, valueDeserializer, bufferSize) ) @@ -441,8 +460,14 @@ private[consumer] final class ConsumerLive private[consumer] ( offs.asScala.map { case (k, v) => k -> v.longValue() }.toMap } - override def commit(record: CommittableRecord[_, _]): Task[Unit] = - runloopAccess.commit(record) + override def commit(record: ConsumerRecord[_, _]): Task[Unit] = + runloopAccess.commit(OffsetBatch.from(record)) + + override def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = + runloopAccess.commit(OffsetBatch.from(records)) + + override def commit(offsetBatch: OffsetBatch): Task[Unit] = + runloopAccess.commit(offsetBatch) override def committed( partitions: Set[TopicPartition], @@ -452,8 +477,16 @@ private[consumer] final class ConsumerLive private[consumer] ( _.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap ) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(record) + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: ConsumerRecord[_, _]): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(OffsetBatch.from(record)) + + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])( + records: Chunk[ConsumerRecord[_, _]] + ): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(OffsetBatch.from(records)) + + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + runloopAccess.commitOrRetry(policy)(offsetBatch) override def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = runloopAccess.commitAccumBatch(commitSchedule) @@ -465,7 +498,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])]] = { + ): Stream[Throwable, Chunk[(TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])]] = { val onlyByteArraySerdes: Boolean = (keyDeserializer eq Serde.byteArray) && (valueDeserializer eq Serde.byteArray) ZStream.unwrapScoped { @@ -477,10 +510,10 @@ private[consumer] final class ConsumerLive private[consumer] ( .map { _.collect { case (tp, partitionStream) if Subscription.subscriptionMatches(subscription, tp) => - val stream: ZStream[R, Throwable, CommittableRecord[K, V]] = + val stream: ZStream[R, Throwable, ConsumerRecord[K, V]] = if (onlyByteArraySerdes) - partitionStream.asInstanceOf[ZStream[R, Throwable, CommittableRecord[K, V]]] - else partitionStream.mapChunksZIO(_.mapZIO(_.deserializeWith(keyDeserializer, valueDeserializer))) + partitionStream.asInstanceOf[ZStream[R, Throwable, ConsumerRecord[K, V]]] + else partitionStream.mapChunksZIO(_.mapZIO(deserializeWith(keyDeserializer, valueDeserializer))) tp -> stream } @@ -492,7 +525,7 @@ private[consumer] final class ConsumerLive private[consumer] ( subscription: Subscription, keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V] - ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, CommittableRecord[K, V]])] = + ): ZStream[Any, Throwable, (TopicPartition, ZStream[R, Throwable, ConsumerRecord[K, V]])] = partitionedAssignmentStream(subscription, keyDeserializer, valueDeserializer).flattenChunks override def plainStream[R, K, V]( @@ -500,7 +533,7 @@ private[consumer] final class ConsumerLive private[consumer] ( keyDeserializer: Deserializer[R, K], valueDeserializer: Deserializer[R, V], bufferSize: Int - ): ZStream[R, Throwable, CommittableRecord[K, V]] = + ): ZStream[R, Throwable, ConsumerRecord[K, V]] = partitionedStream(subscription, keyDeserializer, valueDeserializer).flatMapPar( n = Int.MaxValue, bufferSize = bufferSize @@ -523,9 +556,7 @@ private[consumer] final class ConsumerLive private[consumer] ( committer <- commitAccumBatch(commitRetryPolicy) _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => - partitionStream.mapChunksZIO(records => - records.mapZIO((r: CommittableRecord[K, V]) => f(r.record)).as(records) - ) + partitionStream.mapChunksZIO(records => records.mapZIO(f).as(records)) } .mapChunksZIO(committer.commitAndAwait(_).as(Chunk.empty)) .runDrain diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala index 4928f7d68..1319423c2 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/PartitionStreamControl.scala @@ -2,14 +2,14 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.stream.{ Take, ZStream } import zio.{ Chunk, LogAnnotation, Promise, Queue, Ref, UIO, ZIO } final class PartitionStreamControl private ( val tp: TopicPartition, - stream: ZStream[Any, Throwable, ByteArrayCommittableRecord], - dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]], + stream: ZStream[Any, Throwable, ByteArrayConsumerRecord], + dataQueue: Queue[Take[Throwable, ByteArrayConsumerRecord]], interruptionPromise: Promise[Throwable, Unit], completedPromise: Promise[Nothing, Unit], queueSizeRef: Ref[Int] @@ -21,7 +21,7 @@ final class PartitionStreamControl private ( ) /** Offer new data for the stream to process. */ - private[internal] def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] = + private[internal] def offerRecords(data: Chunk[ByteArrayConsumerRecord]): ZIO[Any, Nothing, Unit] = queueSizeRef.update(_ + data.size) *> dataQueue.offer(Take.chunk(data)).unit def queueSize: UIO[Int] = queueSizeRef.get @@ -45,7 +45,7 @@ final class PartitionStreamControl private ( private[internal] def isRunning: ZIO[Any, Nothing, Boolean] = isCompleted.negate - private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord]) = + private[internal] val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayConsumerRecord]) = (tp, stream) } @@ -60,7 +60,7 @@ object PartitionStreamControl { _ <- ZIO.logDebug(s"Creating partition stream ${tp.toString}") interruptionPromise <- Promise.make[Throwable, Unit] completedPromise <- Promise.make[Nothing, Unit] - dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]] + dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayConsumerRecord]] queueSize <- Ref.make(0) requestAndAwaitData = for { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 10e747619..bb36132c8 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -11,6 +11,7 @@ import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment +import zio.kafka.consumer.types.OffsetBatch import zio.stream._ import java.util @@ -116,13 +117,8 @@ private[consumer] final class Runloop private ( } } - private[internal] def commit(record: CommittableRecord[_, _]): Task[Unit] = - commit.apply(Map(record.topicPartition -> record.record.offset())) - - private[internal] def commitOrRetry[R]( - policy: Schedule[R, Throwable, Any] - )(record: CommittableRecord[_, _]): RIO[R, Unit] = - commit(record).retry( + private[internal] def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + commit(offsetBatch).retry( Schedule.recurWhile[Throwable] { case _: RetriableCommitFailedException => true case _ => false @@ -133,14 +129,13 @@ private[consumer] final class Runloop private ( private[internal] def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = Committer.fromSchedule(commitSchedule, commit, runloopScope) - private val commit: Map[TopicPartition, Long] => Task[Unit] = - offsets => - for { - p <- Promise.make[Throwable, Unit] - _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit - _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) - _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) - } yield () + private[internal] def commit(offsets: OffsetBatch): Task[Unit] = + for { + p <- Promise.make[Throwable, Unit] + _ <- commandQueue.offer(RunloopCommand.Commit(offsets, p)).unit + _ <- diagnostics.emit(DiagnosticEvent.Commit.Started(offsets)) + _ <- p.await.timeoutFail(CommitTimeout)(commitTimeout) + } yield () private def doCommit(cmd: RunloopCommand.Commit): UIO[Unit] = { val offsets = cmd.offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset + 1) } @@ -206,7 +201,7 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps: Set[TopicPartition], polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] ): UIO[Runloop.FulfillResult] = { - type Record = CommittableRecord[Array[Byte], Array[Byte]] + type Record = ConsumerRecord[Array[Byte], Array[Byte]] // The most efficient way to get the records from [[ConsumerRecords]] per // topic-partition, is by first getting the set of topic-partitions, and @@ -219,7 +214,7 @@ private[consumer] final class Runloop private ( if (streams.isEmpty) ZIO.succeed(fulfillResult) else { for { - consumerGroupMetadata <- getConsumerGroupMetadataIfAny + /*consumerGroupMetadata*/ _ <- getConsumerGroupMetadataIfAny // TODO Jules: Use: how? _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) @@ -227,15 +222,8 @@ private[consumer] final class Runloop private ( else { val builder = ChunkBuilder.make[Record](records.size()) val iterator = records.iterator() - while (iterator.hasNext) { - val consumerRecord = iterator.next() - builder += - CommittableRecord[Array[Byte], Array[Byte]]( - record = consumerRecord, - commitHandle = commit, - consumerGroupMetadata = consumerGroupMetadata - ) - } + while (iterator.hasNext) + builder += iterator.next() streamControl.offerRecords(builder.result()) } } @@ -545,7 +533,7 @@ private[consumer] object Runloop { } } - type ByteArrayCommittableRecord = CommittableRecord[Array[Byte], Array[Byte]] + type ByteArrayConsumerRecord = ConsumerRecord[Array[Byte], Array[Byte]] private final case class PollResult( startingTps: Set[TopicPartition], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 2afa2f2c9..6acf838e4 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -3,11 +3,12 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics -import zio.kafka.consumer.internal.Runloop.ByteArrayCommittableRecord +import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ CommittableRecord, Committer, ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.kafka.consumer.{ Committer, ConsumerSettings, InvalidSubscriptionUnion, Subscription } import zio.stream.{ Stream, Take, UStream, ZStream } import zio._ +import zio.kafka.consumer.types.OffsetBatch private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -70,11 +71,11 @@ private[consumer] final class RunloopAccess private ( } } yield stream - def commit(record: CommittableRecord[_, _]): Task[Unit] = - withRunloopZIO(shouldStartIfNot = false)(_.commit(record)) + def commit(offsetBatch: OffsetBatch): Task[Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commit(offsetBatch)) - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: CommittableRecord[_, _]): RIO[R, Unit] = - withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(record)) + def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(offsetBatch)) def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] = withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))(ZIO.succeed(Committer.unit)) @@ -82,7 +83,7 @@ private[consumer] final class RunloopAccess private ( } private[consumer] object RunloopAccess { - type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayCommittableRecord]) + type PartitionAssignment = (TopicPartition, Stream[Throwable, ByteArrayConsumerRecord]) def make( settings: ConsumerSettings, diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala new file mode 100644 index 000000000..5eeb37050 --- /dev/null +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala @@ -0,0 +1,58 @@ +package zio.kafka.consumer + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import zio.kafka.serde.Deserializer +import zio.prelude.Subtype +import zio.{ Chunk, RIO } + +object types { + + @inline + private[zio] def topicPartition(record: ConsumerRecord[_, _]): TopicPartition = + new TopicPartition(record.topic(), record.partition()) + + private[zio] def deserializeWith[R, K, V]( + keyDeserializer: Deserializer[R, K], + valueDeserializer: Deserializer[R, V] + )(record: ConsumerRecord[Array[Byte], Array[Byte]]): RIO[R, ConsumerRecord[K, V]] = + for { + key <- keyDeserializer.deserialize(record.topic(), record.headers(), record.key()) + value <- valueDeserializer.deserialize(record.topic(), record.headers(), record.value()) + } yield new ConsumerRecord[K, V]( + record.topic(), + record.partition(), + record.offset(), + record.timestamp(), + record.timestampType(), + record.serializedKeySize(), + record.serializedValueSize(), + key, + value, + record.headers(), + record.leaderEpoch() + ) + + type Offset = Offset.Type + object Offset extends Subtype[(TopicPartition, Long)] { + def from(record: ConsumerRecord[_, _]): Offset = + Offset.wrap((topicPartition(record), record.offset())) + } + + type OffsetBatch = OffsetBatch.Type + object OffsetBatch extends Subtype[Map[TopicPartition, Long]] { + def empty: OffsetBatch = OffsetBatch(Map.empty) + + def from(record: ConsumerRecord[_, _]): OffsetBatch = + OffsetBatch(Map(topicPartition(record) -> record.offset())) + + def from(records: Chunk[ConsumerRecord[_, _]]): OffsetBatch = + OffsetBatch.wrap(records.map(record => topicPartition(record) -> record.offset()).toMap) + + implicit final class OffsetBatchOps(private val self: OffsetBatch) extends AnyVal { + def merge(other: OffsetBatch): OffsetBatch = ??? // TODO Jules + def add(offset: Offset): OffsetBatch = ??? // TODO Jules + } + } + +} diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala index 428575818..22cc09584 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/Transaction.scala @@ -1,7 +1,7 @@ package zio.kafka.producer import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata } -import zio.kafka.consumer.{ Offset, OffsetBatch } +import zio.kafka.consumer.types.{ Offset, OffsetBatch } import zio.kafka.producer.TransactionalProducer.{ TransactionLeaked, UserInitiatedAbort } import zio.kafka.serde.Serializer import zio.{ Chunk, IO, RIO, Ref, UIO, ZIO } diff --git a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala index 848acc542..cde923210 100644 --- a/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/producer/TransactionalProducer.scala @@ -1,13 +1,13 @@ package zio.kafka.producer -import org.apache.kafka.clients.consumer.OffsetAndMetadata +import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata, OffsetAndMetadata } import org.apache.kafka.clients.producer.{ KafkaProducer, RecordMetadata } import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.InvalidGroupIdException import org.apache.kafka.common.serialization.ByteArraySerializer import zio.Cause.Fail import zio._ -import zio.kafka.consumer.OffsetBatch +import zio.kafka.consumer.types.OffsetBatch import java.util import scala.jdk.CollectionConverters._ @@ -26,7 +26,10 @@ object TransactionalProducer { ) extends TransactionalProducer { private val abortTransaction: Task[Unit] = ZIO.attemptBlocking(live.p.abortTransaction()) - private def commitTransactionWithOffsets(offsetBatch: OffsetBatch): Task[Unit] = { + private def commitTransactionWithOffsets( + offsetBatch: OffsetBatch, + consumerGroupMetadata: Option[ConsumerGroupMetadata] + ): Task[Unit] = { val sendOffsetsToTransaction: Task[Unit] = ZIO.suspendSucceed { @inline def invalidGroupIdException: IO[InvalidGroupIdException, Nothing] = @@ -36,11 +39,11 @@ object TransactionalProducer { ) ) - offsetBatch.consumerGroupMetadata match { + consumerGroupMetadata match { case None => invalidGroupIdException case Some(consumerGroupMetadata) => val offsets: util.Map[TopicPartition, OffsetAndMetadata] = - offsetBatch.offsets.map { case (topicPartition, offset) => + offsetBatch.map { case (topicPartition, offset) => topicPartition -> new OffsetAndMetadata(offset + 1) }.asJava @@ -48,14 +51,19 @@ object TransactionalProducer { } } - sendOffsetsToTransaction.when(offsetBatch.offsets.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) + sendOffsetsToTransaction.when(offsetBatch.nonEmpty) *> ZIO.attemptBlocking(live.p.commitTransaction()) } private def commitOrAbort(transaction: TransactionImpl, exit: Exit[Any, Any]): UIO[Unit] = exit match { case Exit.Success(_) => transaction.offsetBatchRef.get - .flatMap(offsetBatch => commitTransactionWithOffsets(offsetBatch).retryN(5).orDie) + .flatMap(offsetBatch => + commitTransactionWithOffsets( + offsetBatch, + consumerGroupMetadata = None // TODO Jules + ).retryN(5).orDie + ) case Exit.Failure(Fail(UserInitiatedAbort, _)) => abortTransaction.retryN(5).orDie case Exit.Failure(_) => abortTransaction.retryN(5).orDie } From 48b4184b61f65eb71b8dca96b1d2a20461a38672 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 22:20:47 +0400 Subject: [PATCH 07/14] Cleaned implementation of the new commit interface --- .../main/scala/zio/kafka/consumer/types.scala | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala index 5eeb37050..7e126dc22 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala @@ -5,6 +5,7 @@ import org.apache.kafka.common.TopicPartition import zio.kafka.serde.Deserializer import zio.prelude.Subtype import zio.{ Chunk, RIO } +import scala.collection.immutable.Map object types { @@ -36,7 +37,7 @@ object types { type Offset = Offset.Type object Offset extends Subtype[(TopicPartition, Long)] { def from(record: ConsumerRecord[_, _]): Offset = - Offset.wrap((topicPartition(record), record.offset())) + Offset((topicPartition(record), record.offset())) } type OffsetBatch = OffsetBatch.Type @@ -50,8 +51,23 @@ object types { OffsetBatch.wrap(records.map(record => topicPartition(record) -> record.offset()).toMap) implicit final class OffsetBatchOps(private val self: OffsetBatch) extends AnyVal { - def merge(other: OffsetBatch): OffsetBatch = ??? // TODO Jules - def add(offset: Offset): OffsetBatch = ??? // TODO Jules + def merge(other: OffsetBatch): OffsetBatch = { + val newOffsets = Map.newBuilder[TopicPartition, Long] + newOffsets ++= self + other.foreach { case (tp, offset) => + val existing = self.getOrElse(tp, -1L) + if (existing < offset) { + newOffsets += tp -> offset + } + } + OffsetBatch(newOffsets.result()) + } + + def add(offset: Offset): OffsetBatch = { + val (tp, offsetValue) = offset + val newOffset = self.getOrElse(tp, -1L) max offsetValue + OffsetBatch(self + (tp -> newOffset)) + } } } From b1ade413f6944590d7ce6c82a68db1419a03b542 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 22:28:03 +0400 Subject: [PATCH 08/14] Cleaned implementation of the new commit interface --- .../src/test/scala/zio/kafka/ProducerSpec.scala | 3 +-- .../src/main/scala/zio/kafka/consumer/types.scala | 13 ++++++++----- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala index 457a4580f..07ae088e5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/ProducerSpec.scala @@ -194,8 +194,7 @@ object ProducerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { withConsumer(Topics(Set(topic1)), settings).flatMap { consumer => for { messages <- consumer.take.flatMap(_.done).mapError(_.getOrElse(new NoSuchElementException)) - record = messages - .filter(rec => rec.key == key1 && rec.value == value1) + record = messages.filter(rec => rec.key == key1 && rec.value == value1) } yield record } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala index 7e126dc22..b7770cf33 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/types.scala @@ -5,7 +5,6 @@ import org.apache.kafka.common.TopicPartition import zio.kafka.serde.Deserializer import zio.prelude.Subtype import zio.{ Chunk, RIO } -import scala.collection.immutable.Map object types { @@ -36,8 +35,7 @@ object types { type Offset = Offset.Type object Offset extends Subtype[(TopicPartition, Long)] { - def from(record: ConsumerRecord[_, _]): Offset = - Offset((topicPartition(record), record.offset())) + def from(record: ConsumerRecord[_, _]): Offset = Offset((topicPartition(record), record.offset())) } type OffsetBatch = OffsetBatch.Type @@ -47,8 +45,13 @@ object types { def from(record: ConsumerRecord[_, _]): OffsetBatch = OffsetBatch(Map(topicPartition(record) -> record.offset())) - def from(records: Chunk[ConsumerRecord[_, _]]): OffsetBatch = - OffsetBatch.wrap(records.map(record => topicPartition(record) -> record.offset()).toMap) + def from(records: Chunk[ConsumerRecord[_, _]]): OffsetBatch = { + val newOffsets = Map.newBuilder[TopicPartition, Long] + records.foreach { record => + newOffsets += topicPartition(record) -> record.offset() + } + OffsetBatch(newOffsets.result()) + } implicit final class OffsetBatchOps(private val self: OffsetBatch) extends AnyVal { def merge(other: OffsetBatch): OffsetBatch = { From ad7fb69c59913cc5bbd2d6833021b0073a6ecac4 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 23:02:33 +0400 Subject: [PATCH 09/14] Simplify `PendingCommit` interface --- .../scala/zio/kafka/consumer/Committer.scala | 9 +++--- .../scala/zio/kafka/utils/PendingCommit.scala | 29 ++++++++++--------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala index 255573f24..9d0bf7570 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala @@ -6,9 +6,9 @@ import zio.kafka.utils.PendingCommit import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO } trait Committer { - def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] + def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] - final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] = + final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit] = commit(OffsetBatch.from(records)) final def commitAndAwait(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = @@ -25,8 +25,7 @@ object Committer { val unit: Committer = new Committer { - override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] = - ZIO.succeed(PendingCommit.unit.asInstanceOf[PendingCommit[Throwable, Unit]]) + override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] = ZIO.succeed(PendingCommit.unit) } private[zio] def fromSchedule[R]( @@ -49,7 +48,7 @@ object Committer { .schedule(commitSchedule) .forkIn(scope) } yield new Committer { - override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] = + override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] = for { p <- Promise.make[Throwable, Unit] _ <- acc.update { case (offsets, promises) => (offsets merge offsetBatch, promises :+ p) } diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala b/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala index b08984c71..05ef738cf 100644 --- a/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala +++ b/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala @@ -1,20 +1,21 @@ package zio.kafka.utils -import zio.{ IO, Promise, Trace, ZIO } +import zio.{ Promise, Task, Trace, ZIO } /** - * Less powerful interface than Promise + * Represent a commit that is pending to be committed. It can be awaited to be completed. * - * Avoid leaking to the users the Promise methods we don't want them to have access to. + * Less powerful interface than Promise. Avoid leaking to the users the Promise methods we don't want them to have + * access to. */ // noinspection ConvertExpressionToSAM -trait PendingCommit[E, A] { self => +trait PendingCommit { self => - def awaitCommit(implicit trace: Trace): IO[E, A] + def awaitCommit(implicit trace: Trace): Task[Unit] - final def combineDiscard(other: PendingCommit[E, _]): PendingCommit[E, Unit] = - new PendingCommit[E, Unit] { - override def awaitCommit(implicit trace: Trace): IO[E, Unit] = + final def combine(other: PendingCommit): PendingCommit = + new PendingCommit { + override def awaitCommit(implicit trace: Trace): Task[Unit] = for { _ <- self.awaitCommit _ <- other.awaitCommit @@ -24,13 +25,13 @@ trait PendingCommit[E, A] { self => //noinspection ConvertExpressionToSAM object PendingCommit { - private[zio] def apply[E, A](p: Promise[E, A]): PendingCommit[E, A] = - new PendingCommit[E, A] { - override def awaitCommit(implicit trace: Trace): IO[E, A] = p.await + private[zio] def apply(p: Promise[Throwable, Unit]): PendingCommit = + new PendingCommit { + override def awaitCommit(implicit trace: Trace): Task[Unit] = p.await } - val unit: PendingCommit[Nothing, Unit] = - new PendingCommit[Nothing, Unit] { - override def awaitCommit(implicit trace: Trace): IO[Nothing, Unit] = ZIO.unit + val unit: PendingCommit = + new PendingCommit { + override def awaitCommit(implicit trace: Trace): Task[Unit] = ZIO.unit } } From fd2a82fd995378691c50a218b916222e9a9d678b Mon Sep 17 00:00:00 2001 From: Jules Ivanic Date: Sat, 9 Sep 2023 23:19:52 +0400 Subject: [PATCH 10/14] [POC - 4] New commit interface (#1043) --- .../scala/zio/kafka/consumer/Committer.scala | 57 ------------------- .../scala/zio/kafka/consumer/Consumer.scala | 54 ++++++++++-------- .../zio/kafka/consumer/internal/Runloop.scala | 11 +--- .../consumer/internal/RunloopAccess.scala | 22 +++---- .../scala/zio/kafka/utils/PendingCommit.scala | 37 ------------ 5 files changed, 38 insertions(+), 143 deletions(-) delete mode 100644 zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala delete mode 100644 zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala deleted file mode 100644 index 9d0bf7570..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala +++ /dev/null @@ -1,57 +0,0 @@ -package zio.kafka.consumer - -import org.apache.kafka.clients.consumer.ConsumerRecord -import zio.kafka.consumer.types.OffsetBatch -import zio.kafka.utils.PendingCommit -import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO } - -trait Committer { - def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] - - final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit] = - commit(OffsetBatch.from(records)) - - final def commitAndAwait(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = - commit(records).flatMap(_.awaitCommit) - - final def commitAndForget(records: Chunk[ConsumerRecord[_, _]]): UIO[Unit] = - commit(records).unit -} - -//noinspection ConvertExpressionToSAM -object Committer { - private val emptyState: (OffsetBatch, List[Promise[Throwable, Unit]]) = - (OffsetBatch.empty, List.empty[Promise[Throwable, Unit]]) - - val unit: Committer = - new Committer { - override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] = ZIO.succeed(PendingCommit.unit) - } - - private[zio] def fromSchedule[R]( - commitSchedule: Schedule[R, Any, Any], - commit: OffsetBatch => Task[Unit], - scope: Scope - ): URIO[R, Committer] = - for { - acc <- Ref.Synchronized.make(emptyState) - _ <- acc.updateZIO { case data @ (offsets, promises) => - if (offsets.isEmpty) ZIO.succeed(data) - else - commit(offsets) - .foldZIO( - e => ZIO.foreachDiscard(promises)(_.fail(e)), - _ => ZIO.foreachDiscard(promises)(_.succeed(())) - ) - .as(emptyState) - } - .schedule(commitSchedule) - .forkIn(scope) - } yield new Committer { - override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] = - for { - p <- Promise.make[Throwable, Unit] - _ <- acc.update { case (offsets, promises) => (offsets merge offsetBatch, promises :+ p) } - } yield PendingCommit(p) - } -} diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index b810a60a5..bab6d13f6 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -42,6 +42,10 @@ trait Consumer { def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] def commit(offsetBatch: OffsetBatch): Task[Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] + /** * Retrieve the last committed offset for the given topic-partitions */ @@ -50,12 +54,6 @@ trait Consumer { timeout: Duration = Duration.Infinity ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: ConsumerRecord[_, _]): RIO[R, Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] - - def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] - def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] /** @@ -234,6 +232,18 @@ object Consumer { def commit(offsetBatch: OffsetBatch): RIO[Consumer, Unit] = ZIO.serviceWithZIO[Consumer](_.commit(offsetBatch)) + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R & Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, record)) + + def commitOrRetry[R]( + policy: Schedule[R, Throwable, Any], + records: Chunk[ConsumerRecord[_, _]] + ): RIO[R & Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, records)) + + def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R & Consumer, Unit] = + ZIO.serviceWithZIO[Consumer](_.commitOrRetry(policy, offsetBatch)) + /** * Accessor method */ @@ -243,9 +253,6 @@ object Consumer { ): RIO[Consumer, Map[TopicPartition, Option[OffsetAndMetadata]]] = ZIO.serviceWithZIO(_.committed(partitions, timeout)) - def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R & Consumer, Committer] = - ZIO.serviceWithZIO[Consumer](_.commitAccumBatch(commitschedule)) - /** * Accessor method */ @@ -469,27 +476,25 @@ private[consumer] final class ConsumerLive private[consumer] ( override def commit(offsetBatch: OffsetBatch): Task[Unit] = runloopAccess.commit(offsetBatch) - override def committed( - partitions: Set[TopicPartition], - timeout: Duration = Duration.Infinity - ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] = - consumer.withConsumer( - _.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap - ) - - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(record: ConsumerRecord[_, _]): RIO[R, Unit] = + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] = runloopAccess.commitOrRetry(policy)(OffsetBatch.from(record)) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])( + override def commitOrRetry[R]( + policy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]] ): RIO[R, Unit] = runloopAccess.commitOrRetry(policy)(OffsetBatch.from(records)) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = runloopAccess.commitOrRetry(policy)(offsetBatch) - override def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = - runloopAccess.commitAccumBatch(commitSchedule) + override def committed( + partitions: Set[TopicPartition], + timeout: Duration = Duration.Infinity + ): Task[Map[TopicPartition, Option[OffsetAndMetadata]]] = + consumer.withConsumer( + _.committed(partitions.asJava, timeout.asJava).asScala.map { case (k, v) => k -> Option(v) }.toMap + ) override def listTopics(timeout: Duration = Duration.Infinity): Task[Map[String, List[PartitionInfo]]] = consumer.withConsumer(_.listTopics(timeout.asJava).asScala.map { case (k, v) => k -> v.asScala.toList }.toMap) @@ -552,13 +557,12 @@ private[consumer] final class ConsumerLive private[consumer] ( f: ConsumerRecord[K, V] => URIO[R1, Unit] ): ZIO[R & R1, Throwable, Unit] = for { - r <- ZIO.environment[R & R1] - committer <- commitAccumBatch(commitRetryPolicy) + r <- ZIO.environment[R & R1] _ <- partitionedStream(subscription, keyDeserializer, valueDeserializer) .flatMapPar(Int.MaxValue) { case (_, partitionStream) => partitionStream.mapChunksZIO(records => records.mapZIO(f).as(records)) } - .mapChunksZIO(committer.commitAndAwait(_).as(Chunk.empty)) + .mapChunksZIO(commitOrRetry(commitRetryPolicy, _).as(Chunk.empty)) .runDrain .provideEnvironment(r) } yield () diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index bb36132c8..86d3d1494 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -33,8 +33,7 @@ private[consumer] final class Runloop private ( userRebalanceListener: RebalanceListener, restartStreamsOnRebalancing: Boolean, currentStateRef: Ref[State], - fetchStrategy: FetchStrategy, - runloopScope: Scope + fetchStrategy: FetchStrategy ) { private def newPartitionStream(tp: TopicPartition): UIO[PartitionStreamControl] = @@ -125,10 +124,6 @@ private[consumer] final class Runloop private ( } && policy ) - // noinspection YieldingZIOEffectInspection - private[internal] def commitAccumBatch[R](commitSchedule: Schedule[R, Any, Any]): URIO[R, Committer] = - Committer.fromSchedule(commitSchedule, commit, runloopScope) - private[internal] def commit(offsets: OffsetBatch): Task[Unit] = for { p <- Promise.make[Throwable, Unit] @@ -580,7 +575,6 @@ private[consumer] object Runloop { initialState = State.initial currentStateRef <- Ref.make(initialState) runtime <- ZIO.runtime[Any] - scope <- ZIO.scope runloop = new Runloop( runtime = runtime, hasGroupId = hasGroupId, @@ -596,8 +590,7 @@ private[consumer] object Runloop { userRebalanceListener = userRebalanceListener, restartStreamsOnRebalancing = restartStreamsOnRebalancing, currentStateRef = currentStateRef, - fetchStrategy = fetchStrategy, - runloopScope = scope + fetchStrategy = fetchStrategy ) _ <- ZIO.logDebug("Starting Runloop") diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index 6acf838e4..ea4c9fc28 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -1,14 +1,14 @@ package zio.kafka.consumer.internal import org.apache.kafka.common.TopicPartition +import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics import zio.kafka.consumer.internal.Runloop.ByteArrayConsumerRecord import zio.kafka.consumer.internal.RunloopAccess.PartitionAssignment -import zio.kafka.consumer.{ Committer, ConsumerSettings, InvalidSubscriptionUnion, Subscription } -import zio.stream.{ Stream, Take, UStream, ZStream } -import zio._ import zio.kafka.consumer.types.OffsetBatch +import zio.kafka.consumer.{ ConsumerSettings, InvalidSubscriptionUnion, Subscription } +import zio.stream.{ Stream, Take, UStream, ZStream } private[internal] sealed trait RunloopState private[internal] object RunloopState { @@ -31,22 +31,17 @@ private[consumer] final class RunloopAccess private ( diagnostics: Diagnostics ) { - private def withRunloopZIO__[R, E, A]( + private def withRunloopZIO[R, E, A]( shouldStartIfNot: Boolean - )(whenRunning: Runloop => ZIO[R, E, A])(orElse: ZIO[R, E, A]): ZIO[R, E, A] = + )(whenRunning: Runloop => ZIO[R, E, Unit]): ZIO[R, E, Unit] = runloopStateRef.updateSomeAndGetZIO { case RunloopState.NotStarted if shouldStartIfNot => makeRunloop.map(RunloopState.Started.apply) }.flatMap { - case RunloopState.NotStarted => orElse + case RunloopState.NotStarted => ZIO.unit case RunloopState.Started(runloop) => whenRunning(runloop) - case RunloopState.Finalized => orElse + case RunloopState.Finalized => ZIO.unit } - private def withRunloopZIO[R, E](shouldStartIfNot: Boolean)( - whenRunning: Runloop => ZIO[R, E, Unit] - ): ZIO[R, E, Unit] = - withRunloopZIO__(shouldStartIfNot)(whenRunning)(ZIO.unit) - /** * No need to call `Runloop::stopConsumption` if the Runloop has not been started or has been stopped. */ @@ -77,9 +72,6 @@ private[consumer] final class RunloopAccess private ( def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(offsetBatch)) - def commitAccumBatch[R](commitschedule: Schedule[R, Any, Any]): URIO[R, Committer] = - withRunloopZIO__(shouldStartIfNot = false)(_.commitAccumBatch(commitschedule))(ZIO.succeed(Committer.unit)) - } private[consumer] object RunloopAccess { diff --git a/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala b/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala deleted file mode 100644 index 05ef738cf..000000000 --- a/zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala +++ /dev/null @@ -1,37 +0,0 @@ -package zio.kafka.utils - -import zio.{ Promise, Task, Trace, ZIO } - -/** - * Represent a commit that is pending to be committed. It can be awaited to be completed. - * - * Less powerful interface than Promise. Avoid leaking to the users the Promise methods we don't want them to have - * access to. - */ -// noinspection ConvertExpressionToSAM -trait PendingCommit { self => - - def awaitCommit(implicit trace: Trace): Task[Unit] - - final def combine(other: PendingCommit): PendingCommit = - new PendingCommit { - override def awaitCommit(implicit trace: Trace): Task[Unit] = - for { - _ <- self.awaitCommit - _ <- other.awaitCommit - } yield () - } -} - -//noinspection ConvertExpressionToSAM -object PendingCommit { - private[zio] def apply(p: Promise[Throwable, Unit]): PendingCommit = - new PendingCommit { - override def awaitCommit(implicit trace: Trace): Task[Unit] = p.await - } - - val unit: PendingCommit = - new PendingCommit { - override def awaitCommit(implicit trace: Trace): Task[Unit] = ZIO.unit - } -} From a60db8e1da1b122327a24a0d0a0a41a2aa767835 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 23:21:46 +0400 Subject: [PATCH 11/14] Fix benchmark code --- .../scala/zio/kafka/bench/ConsumerBenchmark.scala | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala index 56bba3a57..f75357349 100644 --- a/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala +++ b/zio-kafka-bench/src/main/scala/zio/kafka/bench/ConsumerBenchmark.scala @@ -10,7 +10,7 @@ import zio.kafka.producer.Producer import zio.kafka.serde.Serde import zio.kafka.testkit.Kafka import zio.kafka.testkit.KafkaTestUtils.{ consumerSettings, produceMany, producer } -import zio.{ Chunk, Ref, ZIO, ZLayer } +import zio.{ Ref, ZIO, ZLayer } import java.util.concurrent.TimeUnit @@ -63,14 +63,11 @@ class ConsumerBenchmark extends ZioBenchmark[Kafka with Producer] { .logAnnotate("consumer", "1") { Consumer .plainStream(Subscription.topics(topic1), Serde.byteArray, Serde.byteArray) - .tap { _ => - counter.updateAndGet(_ + 1).flatMap(count => Consumer.stopConsumption.when(count == nrMessages)) - } - .mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(Chunk.empty)) - .takeUntilZIO((_: Chunk[_]) => counter.get.map(_ >= nrMessages)) + .mapChunksZIO(records => counter.update(_ + records.size) *> Consumer.commit(records).as(records)) + .takeUntilZIO(_ => counter.get.map(_ >= nrMessages)) .runDrain + .provideSome[Kafka](env) } - .provideSome[Kafka](env) } yield () } } From a73a3459d4c24680a273e425f4cbf063b5429c6c Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 9 Sep 2023 23:33:04 +0400 Subject: [PATCH 12/14] clean code --- .../scala/zio/kafka/consumer/Consumer.scala | 22 +++++++++---------- .../zio/kafka/consumer/internal/Runloop.scala | 7 ++++-- .../consumer/internal/RunloopAccess.scala | 4 ++-- 3 files changed, 18 insertions(+), 15 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index bab6d13f6..90643afdd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -42,9 +42,9 @@ trait Consumer { def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] def commit(offsetBatch: OffsetBatch): Task[Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] - def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]]): RIO[R, Unit] + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] /** * Retrieve the last committed offset for the given topic-partitions @@ -468,25 +468,25 @@ private[consumer] final class ConsumerLive private[consumer] ( } override def commit(record: ConsumerRecord[_, _]): Task[Unit] = - runloopAccess.commit(OffsetBatch.from(record)) + commit(OffsetBatch.from(record)) override def commit(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] = - runloopAccess.commit(OffsetBatch.from(records)) + commit(OffsetBatch.from(records)) override def commit(offsetBatch: OffsetBatch): Task[Unit] = runloopAccess.commit(offsetBatch) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(OffsetBatch.from(record)) + override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], record: ConsumerRecord[_, _]): RIO[R, Unit] = + commitOrRetry(retryPolicy, OffsetBatch.from(record)) override def commitOrRetry[R]( - policy: Schedule[R, Throwable, Any], + retryPolicy: Schedule[R, Throwable, Any], records: Chunk[ConsumerRecord[_, _]] ): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(OffsetBatch.from(records)) + commitOrRetry(retryPolicy, OffsetBatch.from(records)) - override def commitOrRetry[R](policy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = - runloopAccess.commitOrRetry(policy)(offsetBatch) + override def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = + runloopAccess.commitOrRetry(retryPolicy, offsetBatch) override def committed( partitions: Set[TopicPartition], diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 86d3d1494..f074fc1fd 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -116,12 +116,15 @@ private[consumer] final class Runloop private ( } } - private[internal] def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = + private[internal] def commitOrRetry[R]( + retryPolicy: Schedule[R, Throwable, Any], + offsetBatch: OffsetBatch + ): RIO[R, Unit] = commit(offsetBatch).retry( Schedule.recurWhile[Throwable] { case _: RetriableCommitFailedException => true case _ => false - } && policy + } && retryPolicy ) private[internal] def commit(offsets: OffsetBatch): Task[Unit] = diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala index ea4c9fc28..db76039f5 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/RunloopAccess.scala @@ -69,8 +69,8 @@ private[consumer] final class RunloopAccess private ( def commit(offsetBatch: OffsetBatch): Task[Unit] = withRunloopZIO(shouldStartIfNot = false)(_.commit(offsetBatch)) - def commitOrRetry[R](policy: Schedule[R, Throwable, Any])(offsetBatch: OffsetBatch): RIO[R, Unit] = - withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(policy)(offsetBatch)) + def commitOrRetry[R](retryPolicy: Schedule[R, Throwable, Any], offsetBatch: OffsetBatch): RIO[R, Unit] = + withRunloopZIO(shouldStartIfNot = false)(_.commitOrRetry(retryPolicy, offsetBatch)) } From c34de7d84a31b9e1ff23071ba89cd8b4861f0e34 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sun, 10 Sep 2023 16:18:02 +0400 Subject: [PATCH 13/14] fix compilation --- .github/workflows/ci.yml | 16 ++++++++-------- build.sbt | 4 ++-- project/plugins.sbt | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 60c4dd1de..f2daad29f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: continue-on-error: true steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -48,7 +48,7 @@ jobs: continue-on-error: false steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -88,7 +88,7 @@ jobs: - name: Cache Dependencies uses: coursier/cache-action@v6 - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Test @@ -100,7 +100,7 @@ jobs: if: ${{ github.event_name == 'push' }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -129,7 +129,7 @@ jobs: app_private_key: ${{ secrets.APP_PRIVATE_KEY }} - name: Create Pull Request id: cpr - uses: peter-evans/create-pull-request@v5.0.0 + uses: peter-evans/create-pull-request@v5.0.2 with: body: |- Autogenerated changes after running the `sbt docs/generateReadme` command of the [zio-sbt-website](https://zio.dev/zio-sbt) plugin. @@ -174,7 +174,7 @@ jobs: if: ${{ github.event_name != 'pull_request' }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -203,7 +203,7 @@ jobs: if: ${{ ((github.event_name == 'release') && (github.event.action == 'published')) || (github.event_name == 'workflow_dispatch') }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: Install libuv @@ -234,7 +234,7 @@ jobs: if: ${{ (github.event_name == 'release') && (github.event.action == 'published') }} steps: - name: Git Checkout - uses: actions/checkout@v3.6.0 + uses: actions/checkout@v4.0.0 with: fetch-depth: '0' - name: notify the main repo about the new release of docs package diff --git a/build.sbt b/build.sbt index 04cfbca5d..e717a33d1 100644 --- a/build.sbt +++ b/build.sbt @@ -78,8 +78,8 @@ lazy val root = project def stdSettings(prjName: String) = Seq( name := s"$prjName", scalafmtOnCompile := !insideCI.value, - Compile / compile / scalacOptions ++= - optionsOn("2.13")("-Wconf:cat=unused-nowarn:s").value, + scalacOptions ++= optionsOn("2.13")("-Wconf:cat=unused-nowarn:s").value, + scalacOptions ++= optionsOn("2.12")("-Xfuture", "-Xsource:2.13").value, scalacOptions -= "-Xlint:infer-any", // workaround for bad constant pool issue (Compile / doc) := Def.taskDyn { diff --git a/project/plugins.sbt b/project/plugins.sbt index a6c9152fb..f984f070d 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,4 +1,4 @@ -val zioSbtVersion = "0.4.0-alpha.17" +val zioSbtVersion = "0.4.0-alpha.18" addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion) From 32882ee810a930dca9e684c25eb9f38f3e28e1b6 Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sun, 10 Sep 2023 16:25:20 +0400 Subject: [PATCH 14/14] Clean code --- .../scala/zio/kafka/consumer/internal/Runloop.scala | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index f074fc1fd..e97f4d929 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -199,8 +199,6 @@ private[consumer] final class Runloop private ( ignoreRecordsForTps: Set[TopicPartition], polledRecords: ConsumerRecords[Array[Byte], Array[Byte]] ): UIO[Runloop.FulfillResult] = { - type Record = ConsumerRecord[Array[Byte], Array[Byte]] - // The most efficient way to get the records from [[ConsumerRecords]] per // topic-partition, is by first getting the set of topic-partitions, and // then requesting the records per topic-partition. @@ -216,14 +214,9 @@ private[consumer] final class Runloop private ( _ <- ZIO.foreachParDiscard(streams) { streamControl => val tp = streamControl.tp val records = polledRecords.records(tp) + if (records.isEmpty) ZIO.unit - else { - val builder = ChunkBuilder.make[Record](records.size()) - val iterator = records.iterator() - while (iterator.hasNext) - builder += iterator.next() - streamControl.offerRecords(builder.result()) - } + else streamControl.offerRecords(Chunk.fromJavaIterable(records)) } } yield fulfillResult }