Skip to content

Commit

Permalink
Simplify PendingCommit interface
Browse files Browse the repository at this point in the history
  • Loading branch information
guizmaii committed Sep 9, 2023
1 parent b1ade41 commit 1ae11f0
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 17 deletions.
9 changes: 4 additions & 5 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Committer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import zio.kafka.utils.PendingCommit
import zio.{ Chunk, Promise, Ref, Schedule, Scope, Task, UIO, URIO, ZIO }

trait Committer {
def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]]
def commit(offsetBatch: OffsetBatch): UIO[PendingCommit]

final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit[Throwable, Unit]] =
final def commit(records: Chunk[ConsumerRecord[_, _]]): UIO[PendingCommit] =
commit(OffsetBatch.from(records))

final def commitAndAwait(records: Chunk[ConsumerRecord[_, _]]): Task[Unit] =
Expand All @@ -25,8 +25,7 @@ object Committer {

val unit: Committer =
new Committer {
override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] =
ZIO.succeed(PendingCommit.unit.asInstanceOf[PendingCommit[Throwable, Unit]])
override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] = ZIO.succeed(PendingCommit.unit)
}

private[zio] def fromSchedule[R](
Expand All @@ -49,7 +48,7 @@ object Committer {
.schedule(commitSchedule)
.forkIn(scope)
} yield new Committer {
override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit[Throwable, Unit]] =
override def commit(offsetBatch: OffsetBatch): UIO[PendingCommit] =
for {
p <- Promise.make[Throwable, Unit]
_ <- acc.update { case (offsets, promises) => (offsets merge offsetBatch, promises :+ p) }
Expand Down
24 changes: 12 additions & 12 deletions zio-kafka/src/main/scala/zio/kafka/utils/PendingCommit.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
package zio.kafka.utils

import zio.{ IO, Promise, Trace, ZIO }
import zio.{ Promise, Task, Trace, ZIO }

/**
* Less powerful interface than Promise
*
* Avoid leaking to the users the Promise methods we don't want them to have access to.
*/
// noinspection ConvertExpressionToSAM
trait PendingCommit[E, A] { self =>
trait PendingCommit { self =>

def awaitCommit(implicit trace: Trace): IO[E, A]
def awaitCommit(implicit trace: Trace): Task[Unit]

final def combineDiscard(other: PendingCommit[E, _]): PendingCommit[E, Unit] =
new PendingCommit[E, Unit] {
override def awaitCommit(implicit trace: Trace): IO[E, Unit] =
final def combine(other: PendingCommit): PendingCommit =
new PendingCommit {
override def awaitCommit(implicit trace: Trace): Task[Unit] =
for {
_ <- self.awaitCommit
_ <- other.awaitCommit
Expand All @@ -24,13 +24,13 @@ trait PendingCommit[E, A] { self =>

//noinspection ConvertExpressionToSAM
object PendingCommit {
private[zio] def apply[E, A](p: Promise[E, A]): PendingCommit[E, A] =
new PendingCommit[E, A] {
override def awaitCommit(implicit trace: Trace): IO[E, A] = p.await
private[zio] def apply(p: Promise[Throwable, Unit]): PendingCommit =
new PendingCommit {
override def awaitCommit(implicit trace: Trace): Task[Unit] = p.await
}

val unit: PendingCommit[Nothing, Unit] =
new PendingCommit[Nothing, Unit] {
override def awaitCommit(implicit trace: Trace): IO[Nothing, Unit] = ZIO.unit
val unit: PendingCommit =
new PendingCommit {
override def awaitCommit(implicit trace: Trace): Task[Unit] = ZIO.unit
}
}

0 comments on commit 1ae11f0

Please sign in to comment.