diff --git a/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala b/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala index 7f62fdac..dd212170 100644 --- a/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala +++ b/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala @@ -1,5 +1,6 @@ package com.evolutiongaming.akkaeffect.util +import cats.data.Kleisli import cats.effect.{Sync, Async, Concurrent} import cats.syntax.all._ import java.util.concurrent.atomic.AtomicReference @@ -18,7 +19,7 @@ private[akkaeffect] object Serially { def apply[F[_]: Async, A](value: A): Serially[F, A] = { - type Task = A => F[A] + type Task = Kleisli[F, A, A] sealed abstract class S @@ -57,7 +58,7 @@ private[akkaeffect] object Serially { def apply(f: A => F[A]) = { for { d <- Concurrent[F].deferred[Either[Throwable, Unit]] - t = (a: A) => { + t = Kleisli { a: A => for { b <- f(a).attempt _ <- d.complete(b.void) @@ -67,17 +68,9 @@ private[akkaeffect] object Serially { } a <- Sync[F].delay { val s = ref.getAndUpdate { - case _: S.Idle => S.Active - case s: S.Active => - val task = (a: A) => - Async[F].defer { - for { - a <- s.task(a) - a <- t(a) - } yield a - } - S.Active(task) - case S.Active => S.Active(t) + case _: S.Idle => S.Active + case s: S.Active => S.Active(s.task.andThen(t)) + case S.Active => S.Active(t) } s match { case s: S.Idle => start(s.value, t)