Skip to content

Commit

Permalink
Replace Ref by AtomicReference
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Oct 6, 2023
1 parent 1621780 commit 252da05
Showing 1 changed file with 40 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package com.evolutiongaming.akkaeffect.util

import cats.effect.{Async, Concurrent}
import cats.effect.kernel.Ref
import cats.effect.{Sync, Async, Concurrent}
import cats.syntax.all._
import java.util.concurrent.atomic.AtomicReference

/** Provides serial access to an internal state.
*
* The class differs from [[cats.effect.kernel.Ref]] by the ability to execute
* an effect and a guarantee that the operations will be executed in the same
* order these arrived given these were called from the same thread.
*/
private[akkaeffect] trait Serially[F[_], A] {
def apply(f: A => F[A]): F[Unit]
}
Expand All @@ -22,18 +28,25 @@ private[akkaeffect] object Serially {
final case object Active extends S
}

val ref = Ref.unsafe[F, S](S.Idle(value))
val ref = new AtomicReference[S](S.Idle(value))

val unit = ().asRight[(A, Task)]

def start(a: A, task: Task) = {
(a, task).tailRecM { case (a, task) =>
for {
a <- task(a)
a <- ref.modify {
case s: S.Active => (S.Active, (a, s.task).asLeft[Unit])
case S.Active => (S.Idle(a), unit)
case _: S.Idle => (S.Idle(a), unit)
a <- Sync[F].delay {
val s = ref.getAndUpdate {
case _: S.Active => S.Active
case S.Active => S.Idle(a)
case _: S.Idle => S.Idle(a)
}
s match {
case s: S.Active => (a, s.task).asLeft[Unit]
case S.Active => unit
case _: S.Idle => unit
}
}
} yield a
}
Expand All @@ -44,27 +57,33 @@ private[akkaeffect] object Serially {
def apply(f: A => F[A]) = {
for {
d <- Concurrent[F].deferred[Either[Throwable, Unit]]
t = (a: A) => {
t = (a: A) => {
for {
b <- f(a).attempt
_ <- d.complete(b.void)
} yield {
b.getOrElse(a)
}
}
a <- ref.modify {
case s: S.Idle =>
(S.Active, start(s.value, t))
case s: S.Active =>
val task = (a: A) => Async[F].defer {
for {
a <- s.task(a)
a <- t(a)
} yield a
}
(S.Active(task), Concurrent[F].unit)
case S.Active =>
(S.Active(t), Concurrent[F].unit)
a <- Sync[F].delay {
val s = ref.getAndUpdate {
case _: S.Idle => S.Active
case s: S.Active =>
val task = (a: A) =>
Async[F].defer {
for {
a <- s.task(a)
a <- t(a)
} yield a
}
S.Active(task)
case S.Active => S.Active(t)
}
s match {
case s: S.Idle => start(s.value, t)
case _: S.Active => Concurrent[F].unit
case S.Active => Concurrent[F].unit
}
}
_ <- a
a <- d.get
Expand Down

0 comments on commit 252da05

Please sign in to comment.