Skip to content

Commit

Permalink
use: ZIO Fibers directly to reduce interop overhead (#757)
Browse files Browse the repository at this point in the history
  • Loading branch information
hearnadam authored Oct 16, 2024
1 parent 7d16a2e commit 072d110
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 10 deletions.
54 changes: 47 additions & 7 deletions kyo-zio/shared/src/main/scala/kyo/ZIOs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import zio.Exit
import zio.Runtime
import zio.Unsafe
import zio.ZIO
import zio.stacktracer.TracingImplicits.disableAutoTrace

object ZIOs:
import internal.*

/** Lifts a zio.ZIO into a Kyo effect.
*
Expand All @@ -19,13 +21,17 @@ object ZIOs:
*/
def get[E, A](v: => ZIO[Any, E, A])(using Frame, zio.Trace): A < (Abort[E] & Async) =
IO.Unsafe {
val p = Promise.Unsafe.init[E, A]()
val future = Unsafe.unsafely(Runtime.default.unsafe.runToFuture(v.either))
future.onComplete { t =>
p.complete(t.fold(ex => Result.panic(ex), Result.fromEither))
}(ExecutionContext.parasitic)
p.onInterrupt(_ => discard(future.cancel()))
p.safe.get
Unsafe.unsafely {
import zio.unsafeInterrupt
given ce: CanEqual[E, E] = CanEqual.derived
val p = Promise.Unsafe.init[E, A]()
val f = Runtime.default.unsafe.fork(v)
f.unsafe.addObserver { (exit: zio.Exit[E, A]) =>
p.completeDiscard(exit.toResult)
}
p.onInterrupt(_ => discard(f.unsafeInterrupt()))
p.safe.get
}
}
end get

Expand Down Expand Up @@ -67,4 +73,38 @@ object ZIOs:
}
end run

extension [E, A](exit: zio.Exit[E, A])
/** Converts a zio.Exit to a kyo.Result.
*
* @return
* A kyo.Result
*/
def toResult(using Frame, CanEqual[E, E]): Result[E, A] =
exit match
case Exit.Success(a) => Result.success(a)
case Exit.Failure(cause) => cause.toError

extension [E](cause: zio.Cause[E])
/** Converts a zio.Cause to a kyo.Result.Error.
*
* This method recursively traverses the zio.Cause structure and converts it to the appropriate kyo.Result.Error type.
*
* @return
* A kyo.Result.Error
*/
def toError(using frame: Frame, ce: CanEqual[E, E]): Result.Error[E] =
import zio.Cause.*
def loop(cause: zio.Cause[E]): Maybe[Result.Error[E]] =
cause match
case Fail(e, trace) => Maybe(Result.Fail(e))
case Die(e, trace) => Maybe(Result.Panic(e))
case Interrupt(fiberId, trace) => Maybe(Result.Panic(Fiber.Interrupted(frame)))
case Then(left, right) => loop(left).orElse(loop(right))
case Both(left, right) => loop(left).orElse(loop(right))
case Stackless(e, trace) => loop(e)
case _: Empty.type => Maybe.empty
loop(cause).getOrElse(Result.Panic(new Exception("Unexpected zio.Cause.Empty at " + frame.parse.position)))
end toError
end extension

end ZIOs
5 changes: 5 additions & 0 deletions kyo-zio/shared/src/main/scala/zio/FiberInterop.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package zio

extension [E, A](fiber: Fiber.Runtime[E, A])
def unsafeInterrupt()(using trace: Trace, u: Unsafe): Unit =
fiber.tellInterrupt(Cause.Interrupt(FiberId.None, StackTrace(FiberId.None, Chunk(trace))))
99 changes: 96 additions & 3 deletions kyo-zio/shared/src/test/scala/kyo/ZIOsTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package kyo
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import kyo.*
import kyo.ZIOs.toError
import kyo.ZIOs.toResult
import kyo.kernel.Platform
import org.scalatest.compatible.Assertion
import org.scalatest.concurrent.Eventually.*
import scala.concurrent.Future
import zio.Cause
import zio.Exit
import zio.Task
import zio.ZIO

Expand Down Expand Up @@ -294,18 +298,25 @@ class ZIOsTest extends Test:
}
}
end if

"synthetic" in runKyo {
val interrupted = ZIOs.get(ZIO.never.fork.flatMap(_.interrupt).flatten)
Abort.run(interrupted).map { result =>
assert(result.isPanic)
}
}
}

"ZIO failure translation" - {
"regular failure to Abort[E]" in runKyo {
"failure translation" - {
"fail to Abort[E]" in runKyo {
val zioFailure: ZIO[Any, String, Int] = ZIO.fail("ZIO failed")
val kyoEffect: Int < (Abort[String] & Async) = ZIOs.get(zioFailure)
Abort.run(kyoEffect).map { result =>
assert(result == Result.fail("ZIO failed"))
}
}

"ZIO defect to Abort[Nothing] (panic)" in runKyo {
"die to Abort[Nothing] (panic)" in runKyo {
val zioDefect: ZIO[Any, Nothing, Int] = ZIO.die(new RuntimeException("ZIO defect"))
val kyoEffect: Int < (Abort[Nothing] & Async) = ZIOs.get(zioDefect)
Abort.run(kyoEffect).map { result =>
Expand All @@ -331,5 +342,87 @@ class ZIOsTest extends Test:
assert(result == Result.fail(CustomError("Custom ZIO error")))
}
}

"ZIO failure Both" in runKyo {
val zioFailure: ZIO[Any, String, Int] = ZIO.failCause(Cause.Both(Cause.empty, Cause.fail("ZIO failure")))
val kyoEffect: Int < (Abort[String] & Async) = ZIOs.get(zioFailure)
Abort.run(kyoEffect).map { result =>
assert(result == Result.fail("ZIO failure"))
}
}

"ZIO failure Then" in runKyo {
val zioFailure: ZIO[Any, String, Int] = ZIO.failCause(Cause.Then(Cause.fail("Left"), Cause.fail("Right")))
val kyoEffect: Int < (Abort[String] & Async) = ZIOs.get(zioFailure)
Abort.run(kyoEffect).map { result =>
assert(result == Result.fail("Left"))
}
}

"ZIO failure Empty" in runKyo {
val zioFailure: ZIO[Any, Nothing, Int] = ZIO.failCause(Cause.empty)
val kyoEffect: Int < (Abort[Nothing] & Async) = ZIOs.get(zioFailure)
Abort.run(kyoEffect).map { result =>
assert(result.isPanic)
}
}
}

"Exit toResult" - {
"Success" in {
assert(Exit.succeed(42).toResult == Result.success(42))
}

"Failure" in {
assert(Exit.fail("error").toResult == Result.fail("error"))
}
}

"Cause toError" - {
"Fail" in runKyo {
val cause = Cause.fail("error")
assert(cause.toError == Result.fail("error"))
}

"Die" in runKyo {
val exception = new RuntimeException("die")
val cause = Cause.die(exception)
cause.toError match
case Result.Panic(e) => assert(e == exception)
case _ => fail("Expected Result.Panic")
}

"Interrupt" in runKyo {
Cause.interrupt(zio.FiberId.None).toError match
case Result.Panic(e: Fiber.Interrupted) => succeed
case _ => fail("Expected Result.Panic with Fiber.Interrupted")
end match
}

"Then" in runKyo {
val cause = Cause.Then(Cause.fail("first"), Cause.fail("second"))
assert(cause.toError == Result.fail("first"))
}

"Both" in runKyo {
val cause = Cause.Both(Cause.fail("left"), Cause.fail("right"))
assert(cause.toError == Result.fail("left"))
}

"Stackless" in runKyo {
val innerCause = Cause.fail("error")
val cause = Cause.stackless(innerCause)
assert(cause.toError == Result.fail("error"))
}

"Empty" in runKyo {
val cause = Cause.empty
cause.toError match
case Result.Panic(e) =>
assert(e.getMessage.startsWith("Unexpected zio.Cause.Empty at"))
case _ => fail("Expected Result.Panic")
end match
}
}

end ZIOsTest

0 comments on commit 072d110

Please sign in to comment.