Skip to content

Commit

Permalink
Replace funciton with Kleisli to make it simpler
Browse files Browse the repository at this point in the history
  • Loading branch information
rtar committed Oct 9, 2023
1 parent 252da05 commit 4414e1e
Showing 1 changed file with 6 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.evolutiongaming.akkaeffect.util

import cats.data.Kleisli
import cats.effect.{Sync, Async, Concurrent}
import cats.syntax.all._
import java.util.concurrent.atomic.AtomicReference
Expand All @@ -18,7 +19,7 @@ private[akkaeffect] object Serially {

def apply[F[_]: Async, A](value: A): Serially[F, A] = {

type Task = A => F[A]
type Task = Kleisli[F, A, A]

sealed abstract class S

Expand Down Expand Up @@ -57,7 +58,7 @@ private[akkaeffect] object Serially {
def apply(f: A => F[A]) = {
for {
d <- Concurrent[F].deferred[Either[Throwable, Unit]]
t = (a: A) => {
t = Kleisli { a: A =>
for {
b <- f(a).attempt
_ <- d.complete(b.void)
Expand All @@ -67,17 +68,9 @@ private[akkaeffect] object Serially {
}
a <- Sync[F].delay {
val s = ref.getAndUpdate {
case _: S.Idle => S.Active
case s: S.Active =>
val task = (a: A) =>
Async[F].defer {
for {
a <- s.task(a)
a <- t(a)
} yield a
}
S.Active(task)
case S.Active => S.Active(t)
case _: S.Idle => S.Active
case s: S.Active => S.Active(s.task.andThen(t))
case S.Active => S.Active(t)
}
s match {
case s: S.Idle => start(s.value, t)
Expand Down

0 comments on commit 4414e1e

Please sign in to comment.