diff --git a/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala b/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala index e6ca6b8c..d2582450 100644 --- a/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala +++ b/actor/src/main/scala/com/evolutiongaming/akkaeffect/util/Serially.scala @@ -1,9 +1,17 @@ package com.evolutiongaming.akkaeffect.util -import cats.effect.concurrent.{Deferred, Ref} -import cats.effect.Concurrent +import cats.effect.concurrent.Deferred +import cats.effect.{Concurrent, Sync} import cats.syntax.all._ +import java.util.concurrent.atomic.AtomicReference + +/** Provides serial access to an internal state. + * + * The class differs from [[cats.effect.concurrent.Ref]] by the ability to execute + * an effect and a guarantee that the operations will be executed in the same + * order these arrived given these were called from the same thread. + */ private[akkaeffect] trait Serially[F[_], A] { def apply(f: A => F[A]): F[Unit] } @@ -22,7 +30,7 @@ private[akkaeffect] object Serially { final case object Active extends S } - val ref = Ref.unsafe[F, S](S.Idle(value)) + val ref = new AtomicReference[S](S.Idle(value)) val unit = ().asRight[(A, Task)] @@ -30,12 +38,18 @@ private[akkaeffect] object Serially { (a, task).tailRecM { case (a, task) => for { a <- task(a) - a <- ref.modify { - case s: S.Active => (S.Active, (a, s.task).asLeft[Unit]) - case S.Active => (S.Idle(a), unit) - case _: S.Idle => (S.Idle(a), unit) + s <- Sync[F].delay { + ref.getAndUpdate { + case _: S.Active => S.Active + case S.Active => S.Idle(a) + case _: S.Idle => S.Idle(a) + } } - } yield a + } yield s match { + case s: S.Active => (a, s.task).asLeft[Unit] + case S.Active => unit + case _: S.Idle => unit + } } } @@ -44,7 +58,7 @@ private[akkaeffect] object Serially { def apply(f: A => F[A]) = { for { d <- Deferred[F, Either[Throwable, Unit]] - t = (a: A) => { + t = (a: A) => { for { b <- f(a).attempt _ <- d.complete(b.void) @@ -52,19 +66,26 @@ private[akkaeffect] object Serially { b.getOrElse(a) } } - a <- ref.modify { - case s: S.Idle => (S.Active, start(s.value, t)) - case s: S.Active => - val task = (a: A) => Concurrent[F].defer { - for { - a <- s.task(a) - a <- t(a) - } yield a - } - (S.Active(task), Concurrent[F].unit) - case S.Active => (S.Active(t), Concurrent[F].unit) + s <- Sync[F].delay { + ref.getAndUpdate { + case _: S.Idle => S.Active + case s: S.Active => + val task = (a: A) => + Sync[F].defer { + for { + a <- s.task(a) + a <- t(a) + } yield a + } + S.Active(task) + case S.Active => S.Active(t) + } + } + _ <- s match { + case s: S.Idle => start(s.value, t) + case _: S.Active => Concurrent[F].unit + case S.Active => Concurrent[F].unit } - _ <- a a <- d.get a <- a.liftTo[F] } yield a