Skip to content

Commit

Permalink
improve SeriallyTest
Browse files Browse the repository at this point in the history
  • Loading branch information
t3hnar committed Oct 5, 2023
1 parent a4a52ea commit 3a2668e
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 74 deletions.
29 changes: 15 additions & 14 deletions actor/src/main/scala/com/evolutiongaming/akkaeffect/ActorVar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,23 @@ private[akkaeffect] object ActorVar {
val serially = Serially[F, Option[State]](none)

def update(f: Option[State] => F[Option[State]]): Unit = {
serially { state =>
val result = for {
a <- f(state)
_ <- a match {
case Some(_) => unit
case None => act { stop() }
}
} yield a
result.handleErrorWith { error =>
for {
_ <- state.foldMapM { _.release }
_ <- act[Any] { throw error }
a <- error.raiseError[F, Option[State]]
serially
.apply { state =>
val result = for {
a <- f(state)
_ <- a match {
case Some(_) => unit
case None => act { stop() }
}
} yield a
result.handleErrorWith { error =>
for {
_ <- state.foldMapM { _.release }
_ <- act[Any] { throw error }
a <- error.raiseError[F, Option[State]]
} yield a
}
}
}
.toFuture
()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ private[akkaeffect] object Serially {
a <- ref.modify {
case s: S.Idle => (S.Active, start(s.value, t))
case s: S.Active =>
val task = (a: A) => Concurrent[F].defer(for {
a <- s.task(a)
a <- t(a)
} yield a)
val task = (a: A) => Concurrent[F].defer {
for {
a <- s.task(a)
a <- t(a)
} yield a
}
(S.Active(task), Concurrent[F].unit)
case S.Active => (S.Active(t), Concurrent[F].unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,51 +11,6 @@ import scala.util.control.NoStackTrace

class SeriallyTest extends AsyncFunSuite with Matchers {

test("serially") {
val threadId = IO { Thread.currentThread().getId }
val result = for {
serially <- IO { Serially[IO, Int](0) }
threadId0 <- threadId
_ <- serially { a => (a + 1).pure[IO] }
threadId1 <- threadId
_ <- IO { threadId0 shouldEqual threadId1 }
deferred <- Deferred[IO, Int]
_ <- IO.defer { serially { a => deferred.complete(a).as(a) } }
value <- deferred.get
_ <- IO { value shouldEqual 1 }
deferred <- Deferred[IO, Unit]
threadId0 <- Deferred[IO, Long]
threadId1 <- Deferred[IO, Long]
_ <- serially { a =>
for {
_ <- deferred.get
threadId <- threadId
_ <- threadId0.complete(threadId)
} yield {
a + 1
}
}
.start
_ <- serially { a =>
for {
threadId <- threadId
_ <- threadId1.complete(threadId)
} yield {
a + 1
}
}.start
_ <- deferred.complete(())
threadId0 <- threadId0.get
threadId1 <- threadId1.get
_ <- IO { threadId0 shouldEqual threadId1 }
deferred <- Deferred[IO, Int]
_ <- IO.defer { serially { a => deferred.complete(a).as(a) } }
value <- deferred.get
_ <- IO { value shouldEqual 3 }
} yield {}
result.run()
}

test("error") {
val result = for {
error <- IO { new RuntimeException with NoStackTrace }
Expand All @@ -70,19 +25,19 @@ class SeriallyTest extends AsyncFunSuite with Matchers {
result.run()
}

test("handles many concurrent tasks") {
test("handle concurrently added tasks serially") {
val n = 10000
var i = 0

val result = for {
serially <- IO {
Serially[IO, Int](0)
}
_ <- serially.apply(s => IO {
i = i + 1
}.as(s + 1)).parReplicateA(100000)
_ <- IO {
i shouldEqual 100000
}
serially <- IO { Serially[IO, Int](0) }
_ <- serially
.apply { a =>
IO
.apply { i = i + 1 }
.as(a + 1)
}
.parReplicateA(n)
_ <- IO { i shouldEqual n }
} yield ()
result.run()
}
Expand Down

0 comments on commit 3a2668e

Please sign in to comment.