From c435b02b5388f85cd5b91957d506be61658b25f4 Mon Sep 17 00:00:00 2001 From: Adam Hearn <22334119+hearnadam@users.noreply.github.com> Date: Sat, 19 Oct 2024 15:34:09 -1000 Subject: [PATCH 1/3] kyo-monix --- build.sbt | 16 +++++- .../shared/src/main/scala/kyo/Monixs.scala | 29 ++++++++++ .../src/test/scala/kyo/MonixsTest.scala | 57 +++++++++++++++++++ 3 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 kyo-monix/shared/src/main/scala/kyo/Monixs.scala create mode 100644 kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala diff --git a/build.sbt b/build.sbt index 910f8a255..f2bf9a63f 100644 --- a/build.sbt +++ b/build.sbt @@ -98,7 +98,8 @@ lazy val kyoJVM = project `kyo-zio`.jvm, `kyo-cats`.jvm, `kyo-combinators`.jvm, - `kyo-examples`.jvm + `kyo-examples`.jvm, + `kyo-monix`.jvm ) lazy val kyoJS = project @@ -368,6 +369,19 @@ lazy val `kyo-cats` = ) .jvmSettings(mimaCheck(false)) +lazy val `kyo-monix` = + crossProject(JVMPlatform) + .withoutSuffixFor(JVMPlatform) + .crossType(CrossType.Full) + .in(file("kyo-monix")) + .dependsOn(`kyo-core`) + .settings( + `kyo-settings`, + libraryDependencies += "io.monix" %% "monix" % "3.4.1", + libraryDependencies += "org.scalatest" %%% "scalatest" % scalaTestVersion % Test + ) + .jvmSettings(mimaCheck(false)) + lazy val `kyo-combinators` = crossProject(JSPlatform, JVMPlatform) .withoutSuffixFor(JVMPlatform) diff --git a/kyo-monix/shared/src/main/scala/kyo/Monixs.scala b/kyo-monix/shared/src/main/scala/kyo/Monixs.scala new file mode 100644 index 000000000..c4bb74215 --- /dev/null +++ b/kyo-monix/shared/src/main/scala/kyo/Monixs.scala @@ -0,0 +1,29 @@ +package kyo + +import kyo.* +import monix.eval.Task + +object Monixs: + + def get[A: Flat](task: Task[A])(using f: Frame, s: monix.execution.Scheduler): A < (Abort[Throwable] & Async) = + IO.Unsafe { + val p = Promise.Unsafe.init[Throwable, A]() + val cancelable = task.runAsync { (e: Either[Throwable, A]) => + p.completeDiscard(Result.fromEither(e)) + }(s) + p.onInterrupt(_ => discard(cancelable.cancel())) + p.safe.get + } + + def run[A: Flat](v: => A < (Abort[Throwable] & Async))(using frame: Frame): Task[A] = + Task.defer { + import AllowUnsafe.embrace.danger + Async.run(v).map { fiber => + Task.async[A] { cb => + fiber.unsafe.onComplete(r => cb(r.toEither)) + // fiber.unsafe.interrupt(Result.Panic(Fiber.Interrupted(frame))) + } + }.pipe(IO.Unsafe.run).eval + } + +end Monixs diff --git a/kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala b/kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala new file mode 100644 index 000000000..d37869102 --- /dev/null +++ b/kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala @@ -0,0 +1,57 @@ +package kyo + +import kyo.* +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import org.scalatest.compatible.Assertion +import org.scalatest.freespec.AsyncFreeSpec +import scala.concurrent.Future + +class MonixsTest extends AsyncFreeSpec: + given CanEqual[Throwable, Throwable] = CanEqual.derived + + def runMonix[T](v: Task[T]): Future[T] = + v.runToFuture + + def runKyo(v: => Assertion < (Abort[Throwable] & Async)): Future[Assertion] = + Monixs.run(v).runToFuture + + "Monixs" - { + "get" - { + "should convert Task to Kyo effect" in runKyo { + val task = Task.pure(42) + val kyo = Monixs.get(task) + kyo.map(result => assert(result == 42)) + } + + "should handle Task failures" in runKyo { + val ex = new Exception("Test exception") + val task = Task.raiseError(ex) + val kyo = Monixs.get(task) + Abort.run[Throwable](kyo).map { + case Result.Fail(e) => assert(e == ex) + case _ => fail("Expected Fail result") + } + } + } + + "run" - { + "should convert Kyo effect to Task" in runMonix { + val kyo: Int < (Abort[Nothing] & Async) = Async.run(42).map(_.get) + val task = Monixs.run(kyo) + task.map(result => assert(result == 42)) + } + + "should handle Kyo failures" in runMonix { + val ex = new Exception("Test exception") + val kyo = Abort.fail[Throwable](ex) + val task = Monixs.run(kyo) + task.attempt.map { + case Left(e) => assert(e == ex) + case Right(_) => fail("Expected Left result") + } + } + } + } + +end MonixsTest From 33e4817a422c65fff4ae8c29097a3d2b60fb41d0 Mon Sep 17 00:00:00 2001 From: Adam Hearn <22334119+hearnadam@users.noreply.github.com> Date: Thu, 24 Oct 2024 13:38:28 -1000 Subject: [PATCH 2/3] interruption --- kyo-monix/shared/src/main/scala/kyo/Monixs.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kyo-monix/shared/src/main/scala/kyo/Monixs.scala b/kyo-monix/shared/src/main/scala/kyo/Monixs.scala index c4bb74215..990d88b26 100644 --- a/kyo-monix/shared/src/main/scala/kyo/Monixs.scala +++ b/kyo-monix/shared/src/main/scala/kyo/Monixs.scala @@ -19,9 +19,9 @@ object Monixs: Task.defer { import AllowUnsafe.embrace.danger Async.run(v).map { fiber => - Task.async[A] { cb => + Task.cancelable[A] { cb => fiber.unsafe.onComplete(r => cb(r.toEither)) - // fiber.unsafe.interrupt(Result.Panic(Fiber.Interrupted(frame))) + Task(discard(fiber.unsafe.interrupt(Result.Panic(Fiber.Interrupted(frame))))) } }.pipe(IO.Unsafe.run).eval } From ef9280b18f3736b49412440c3c0f22ce9734e497 Mon Sep 17 00:00:00 2001 From: Adam Hearn <22334119+hearnadam@users.noreply.github.com> Date: Thu, 24 Oct 2024 14:41:52 -1000 Subject: [PATCH 3/3] Monixs -> Monix + tests --- .../scala/kyo/{Monixs.scala => Monix.scala} | 8 +- .../shared/src/test/scala/kyo/MonixTest.scala | 226 ++++++++++++++++++ .../src/test/scala/kyo/MonixsTest.scala | 57 ----- 3 files changed, 230 insertions(+), 61 deletions(-) rename kyo-monix/shared/src/main/scala/kyo/{Monixs.scala => Monix.scala} (91%) create mode 100644 kyo-monix/shared/src/test/scala/kyo/MonixTest.scala delete mode 100644 kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala diff --git a/kyo-monix/shared/src/main/scala/kyo/Monixs.scala b/kyo-monix/shared/src/main/scala/kyo/Monix.scala similarity index 91% rename from kyo-monix/shared/src/main/scala/kyo/Monixs.scala rename to kyo-monix/shared/src/main/scala/kyo/Monix.scala index 990d88b26..869bffae6 100644 --- a/kyo-monix/shared/src/main/scala/kyo/Monixs.scala +++ b/kyo-monix/shared/src/main/scala/kyo/Monix.scala @@ -3,7 +3,7 @@ package kyo import kyo.* import monix.eval.Task -object Monixs: +object Monix: def get[A: Flat](task: Task[A])(using f: Frame, s: monix.execution.Scheduler): A < (Abort[Throwable] & Async) = IO.Unsafe { @@ -15,15 +15,15 @@ object Monixs: p.safe.get } - def run[A: Flat](v: => A < (Abort[Throwable] & Async))(using frame: Frame): Task[A] = + def run[A: Flat](v: => A < (Abort[Throwable] & Async))(using f: Frame): Task[A] = Task.defer { import AllowUnsafe.embrace.danger Async.run(v).map { fiber => Task.cancelable[A] { cb => fiber.unsafe.onComplete(r => cb(r.toEither)) - Task(discard(fiber.unsafe.interrupt(Result.Panic(Fiber.Interrupted(frame))))) + Task(discard(fiber.unsafe.interrupt(Result.Panic(Fiber.Interrupted(f))))) } }.pipe(IO.Unsafe.run).eval } -end Monixs +end Monix diff --git a/kyo-monix/shared/src/test/scala/kyo/MonixTest.scala b/kyo-monix/shared/src/test/scala/kyo/MonixTest.scala new file mode 100644 index 000000000..5b8b73725 --- /dev/null +++ b/kyo-monix/shared/src/test/scala/kyo/MonixTest.scala @@ -0,0 +1,226 @@ +package kyo + +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kyo.* +import kyo.kernel.Platform +import monix.eval.Task +import monix.execution.Scheduler.Implicits.global +import org.scalatest.compatible.Assertion +import org.scalatest.freespec.AsyncFreeSpec +import scala.concurrent.Future + +class MonixTest extends AsyncFreeSpec: + given CanEqual[Throwable, Throwable] = CanEqual.derived + + def runMonix[T](v: Task[T]): Future[T] = + v.runToFuture + + def runKyo(v: => Assertion < (Abort[Throwable] & Async)): Future[Assertion] = + Monix.run(v).runToFuture + + "Monix" - { + "get" - { + "should convert Task to Kyo effect" in runKyo { + val task = Task.pure(42) + val kyo = Monix.get(task) + kyo.map(result => assert(result == 42)) + } + + "should handle Task failures" in runKyo { + val ex = new Exception("Test exception") + val task = Task.raiseError(ex) + val kyo = Monix.get(task) + Abort.run[Throwable](kyo).map { + case Result.Fail(e) => assert(e == ex) + case _ => fail("Expected Fail result") + } + } + } + + "run" - { + "should convert Kyo effect to Task" in runMonix { + val kyo: Int < (Abort[Nothing] & Async) = Async.run(42).map(_.get) + val task = Monix.run(kyo) + task.map(result => assert(result == 42)) + } + + "should handle Kyo failures" in runMonix { + val ex = new Exception("Test exception") + val kyo = Abort.fail[Throwable](ex) + val task = Monix.run(kyo) + task.attempt.map { + case Left(e) => assert(e == ex) + case Right(_) => fail("Expected Left result") + } + } + } + } + + "interrupts" - { + + def kyoLoop(started: CountDownLatch, done: CountDownLatch): Unit < IO = + def loop(i: Int): Unit < IO = + IO { + if i == 0 then + IO(started.countDown()).andThen(loop(i + 1)) + else + loop(i + 1) + } + IO.ensure(IO(done.countDown()))(loop(0)) + end kyoLoop + + def monixLoop(started: CountDownLatch, done: CountDownLatch): Task[Unit] = + def loop(i: Int): Task[Unit] = + Task.defer { + if i == 0 then + Task(started.countDown()) + .flatMap(_ => loop(i + 1)) + else + loop(i + 1) + } + loop(0).guarantee(Task(done.countDown())) + end monixLoop + + if Platform.isJVM then + + "runMonix" - { + "monix to kyo" in runMonix { + val started = new CountDownLatch(1) + val done = new CountDownLatch(1) + for + f <- Monix.run(kyoLoop(started, done)).start + _ <- Task(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.cancel + _ <- Task(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(done.getCount == 0) + end for + } + + "both" in runMonix { + val started = new CountDownLatch(2) + val done = new CountDownLatch(2) + val v = + for + _ <- Monix.get(monixLoop(started, done)) + _ <- Async.run(kyoLoop(started, done)) + yield () + for + f <- Monix.run(v).start + _ <- Task(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.cancel + _ <- Task(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(done.getCount == 0) + end for + } + + "parallel loops" in runMonix { + val started = new CountDownLatch(2) + val done = new CountDownLatch(2) + def parallelEffect = + Monix.run { + val loop1 = Monix.get(monixLoop(started, done)) + val loop2 = kyoLoop(started, done) + Async.parallel[Throwable, Unit, Unit, Any](loop1, loop2) + } + for + f <- parallelEffect.start + _ <- Task(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.cancel + _ <- Task(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(done.getCount == 0) + end for + } + + "race loops" in runMonix { + val started = new CountDownLatch(2) + val done = new CountDownLatch(2) + def raceEffect = + Monix.run { + val loop1 = Monix.get(monixLoop(started, done)) + val loop2 = kyoLoop(started, done) + Async.race[Throwable, Unit, Any](loop1, loop2) + } + for + f <- raceEffect.start + _ <- Task(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.cancel + _ <- Task(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(done.getCount == 0) + end for + } + } + + "runKyo" - { + "kyo to monix" in runKyo { + val started = new CountDownLatch(1) + val done = new CountDownLatch(1) + val panic = Result.Panic(new Exception) + for + f <- Async.run(Monix.get(monixLoop(started, done))) + _ <- IO(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.interrupt(panic) + r <- f.getResult + _ <- IO(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(r == panic) + end for + } + + "both" in runKyo { + val started = new CountDownLatch(2) + val done = new CountDownLatch(2) + val v = + for + _ <- Monix.get(monixLoop(started, done)) + _ <- kyoLoop(started, done) + yield () + for + f <- Async.run(v) + _ <- IO(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.interrupt + r <- f.getResult + _ <- IO(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(r.isPanic) + end for + } + + "parallel loops" in runKyo { + val started = new CountDownLatch(2) + val done = new CountDownLatch(2) + def parallelEffect = + val loop1 = Monix.get(monixLoop(started, done)) + val loop2 = kyoLoop(started, done) + Async.parallel[Throwable, Unit, Unit, Any](loop1, loop2) + end parallelEffect + for + f <- Async.run(parallelEffect) + _ <- IO(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.interrupt + r <- f.getResult + _ <- IO(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(r.isPanic) + end for + } + + "race loops" in runKyo { + val started = new CountDownLatch(2) + val done = new CountDownLatch(2) + def raceEffect = + val loop1 = Monix.get(monixLoop(started, done)) + val loop2 = kyoLoop(started, done) + Async.race(loop1, loop2) + end raceEffect + for + f <- Async.run(raceEffect) + _ <- IO(started.await(100, TimeUnit.MILLISECONDS)) + _ <- f.interrupt + r <- f.getResult + _ <- IO(done.await(100, TimeUnit.MILLISECONDS)) + yield assert(r.isPanic) + end for + } + } + end if + } + +end MonixTest diff --git a/kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala b/kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala deleted file mode 100644 index d37869102..000000000 --- a/kyo-monix/shared/src/test/scala/kyo/MonixsTest.scala +++ /dev/null @@ -1,57 +0,0 @@ -package kyo - -import kyo.* -import monix.eval.Task -import monix.execution.Scheduler.Implicits.global -import org.scalatest.compatible.Assertion -import org.scalatest.freespec.AsyncFreeSpec -import scala.concurrent.Future - -class MonixsTest extends AsyncFreeSpec: - given CanEqual[Throwable, Throwable] = CanEqual.derived - - def runMonix[T](v: Task[T]): Future[T] = - v.runToFuture - - def runKyo(v: => Assertion < (Abort[Throwable] & Async)): Future[Assertion] = - Monixs.run(v).runToFuture - - "Monixs" - { - "get" - { - "should convert Task to Kyo effect" in runKyo { - val task = Task.pure(42) - val kyo = Monixs.get(task) - kyo.map(result => assert(result == 42)) - } - - "should handle Task failures" in runKyo { - val ex = new Exception("Test exception") - val task = Task.raiseError(ex) - val kyo = Monixs.get(task) - Abort.run[Throwable](kyo).map { - case Result.Fail(e) => assert(e == ex) - case _ => fail("Expected Fail result") - } - } - } - - "run" - { - "should convert Kyo effect to Task" in runMonix { - val kyo: Int < (Abort[Nothing] & Async) = Async.run(42).map(_.get) - val task = Monixs.run(kyo) - task.map(result => assert(result == 42)) - } - - "should handle Kyo failures" in runMonix { - val ex = new Exception("Test exception") - val kyo = Abort.fail[Throwable](ex) - val task = Monixs.run(kyo) - task.attempt.map { - case Left(e) => assert(e == ex) - case Right(_) => fail("Expected Left result") - } - } - } - } - -end MonixsTest