Skip to content

Commit

Permalink
Replace Ref by AtomicReference (#265)
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar authored and t3hnar committed Oct 12, 2023
1 parent 3a2668e commit 5130094
Showing 1 changed file with 42 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
package com.evolutiongaming.akkaeffect.util

import cats.effect.concurrent.{Deferred, Ref}
import cats.effect.Concurrent
import cats.effect.concurrent.Deferred
import cats.effect.{Concurrent, Sync}
import cats.syntax.all._

import java.util.concurrent.atomic.AtomicReference

/** Provides serial access to an internal state.
*
* The class differs from [[cats.effect.concurrent.Ref]] by the ability to execute
* an effect and a guarantee that the operations will be executed in the same
* order these arrived given these were called from the same thread.
*/
private[akkaeffect] trait Serially[F[_], A] {
def apply(f: A => F[A]): F[Unit]
}
Expand All @@ -22,20 +30,26 @@ private[akkaeffect] object Serially {
final case object Active extends S
}

val ref = Ref.unsafe[F, S](S.Idle(value))
val ref = new AtomicReference[S](S.Idle(value))

val unit = ().asRight[(A, Task)]

def start(a: A, task: Task) = {
(a, task).tailRecM { case (a, task) =>
for {
a <- task(a)
a <- ref.modify {
case s: S.Active => (S.Active, (a, s.task).asLeft[Unit])
case S.Active => (S.Idle(a), unit)
case _: S.Idle => (S.Idle(a), unit)
s <- Sync[F].delay {
ref.getAndUpdate {
case _: S.Active => S.Active
case S.Active => S.Idle(a)
case _: S.Idle => S.Idle(a)
}
}
} yield a
} yield s match {
case s: S.Active => (a, s.task).asLeft[Unit]
case S.Active => unit
case _: S.Idle => unit
}
}
}

Expand All @@ -44,27 +58,34 @@ private[akkaeffect] object Serially {
def apply(f: A => F[A]) = {
for {
d <- Deferred[F, Either[Throwable, Unit]]
t = (a: A) => {
t = (a: A) => {
for {
b <- f(a).attempt
_ <- d.complete(b.void)
} yield {
b.getOrElse(a)
}
}
a <- ref.modify {
case s: S.Idle => (S.Active, start(s.value, t))
case s: S.Active =>
val task = (a: A) => Concurrent[F].defer {
for {
a <- s.task(a)
a <- t(a)
} yield a
}
(S.Active(task), Concurrent[F].unit)
case S.Active => (S.Active(t), Concurrent[F].unit)
s <- Sync[F].delay {
ref.getAndUpdate {
case _: S.Idle => S.Active
case s: S.Active =>
val task = (a: A) =>
Sync[F].defer {
for {
a <- s.task(a)
a <- t(a)
} yield a
}
S.Active(task)
case S.Active => S.Active(t)
}
}
_ <- s match {
case s: S.Idle => start(s.value, t)
case _: S.Active => Concurrent[F].unit
case S.Active => Concurrent[F].unit
}
_ <- a
a <- d.get
a <- a.liftTo[F]
} yield a
Expand Down

0 comments on commit 5130094

Please sign in to comment.