Skip to content

Commit

Permalink
prelude/core: fix boundary inference with Abort (#772)
Browse files Browse the repository at this point in the history
As mentioned in
https://github.com/getkyo/kyo/pull/765/files#r1807072333, the inference
of `Boundary` in `Async` is failing when the compiler infers a widened
`Abort` from the return type of an operation. This issue is also
affecting [easyracer](https://github.com/jamesward/easyracer).
  • Loading branch information
fwbrasil authored Oct 22, 2024
1 parent 036db99 commit e9a3b04
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 34 deletions.
4 changes: 1 addition & 3 deletions kyo-bench/src/main/scala/kyo/bench/CollectParBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ class CollectParBench extends Bench.ForkOnly(Seq.fill(1000)(1)):
override def kyoBenchFiber() =
import kyo.*

// TODO inference issue
val x = Async.parallel(kyoTasks)
x
Async.parallel(kyoTasks)
end kyoBenchFiber

def catsBench() =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ class HttpClientContentionBench
override def kyoBenchFiber() =
import kyo.*

// TODO inference issue
val x = Async.parallel(Seq.fill(concurrency)(Requests(_.get(kyoUrl))))
x
Async.parallel(Seq.fill(concurrency)(Requests(_.get(kyoUrl))))
end kyoBenchFiber

val zioUrl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ class HttpClientRaceContentionBench
override def kyoBenchFiber() =
import kyo.*

// TODO inference issue
val x = Async.race(Seq.fill(concurrency)(Requests.let(kyoClient)(Requests(_.get(kyoUrl)))))
x
Async.race(Seq.fill(concurrency)(Requests.let(kyoClient)(Requests(_.get(kyoUrl)))))
end kyoBenchFiber

val zioUrl =
Expand Down
16 changes: 8 additions & 8 deletions kyo-combinators/shared/src/main/scala/kyo/Combinators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
def fork(
using
flat: Flat[A],
boundary: Boundary[Ctx, IO],
boundary: Boundary[Ctx, IO & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[E, A] < (IO & Ctx) =
Expand All @@ -656,7 +656,7 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
def forkScoped(
using
flat: Flat[A],
boundary: Boundary[Ctx, IO],
boundary: Boundary[Ctx, IO & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): Fiber[E, A] < (IO & Ctx & Resource) =
Expand Down Expand Up @@ -697,8 +697,8 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
using
f: Flat[A],
f1: Flat[A1],
b: Boundary[Ctx, IO],
b1: Boundary[Ctx1, IO],
b: Boundary[Ctx, IO & Abort[E]],
b1: Boundary[Ctx1, IO & Abort[E1]],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
Expand All @@ -722,8 +722,8 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
using
f: Flat[A],
f1: Flat[A1],
b: Boundary[Ctx, IO],
b1: Boundary[Ctx1, IO],
b: Boundary[Ctx, IO & Abort[E]],
b1: Boundary[Ctx1, IO & Abort[E1]],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
Expand All @@ -747,8 +747,8 @@ extension [A, E, Ctx](effect: A < (Abort[E] & Async & Ctx))
using
f: Flat[A],
f1: Flat[A1],
b: Boundary[Ctx, IO],
b1: Boundary[Ctx1, IO],
b: Boundary[Ctx, IO & Abort[E]],
b1: Boundary[Ctx1, IO & Abort[E1]],
r: Reducible[Abort[E]],
r1: Reducible[Abort[E1]],
fr: Frame
Expand Down
4 changes: 2 additions & 2 deletions kyo-combinators/shared/src/main/scala/kyo/Constructors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ extension (kyoObject: Kyo.type)
def foreachPar[E, A, S, A1, Ctx](sequence: Seq[A])(useElement: A => A1 < (Abort[E] & Async & Ctx))(
using
flat: Flat[A1],
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): Seq[A1] < (reduce.SReduced & Async & Ctx) =
Expand All @@ -130,7 +130,7 @@ extension (kyoObject: Kyo.type)
def foreachParDiscard[E, A, S, A1, Ctx](sequence: Seq[A])(useElement: A => A1 < (Abort[E] & Async & Ctx))(
using
flat: Flat[A1],
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): Unit < (reduce.SReduced & Async & Ctx) =
Expand Down
24 changes: 12 additions & 12 deletions kyo-core/shared/src/main/scala/kyo/Async.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@ object Async:
*/
inline def run[E, A: Flat, Ctx](inline v: => A < (Abort[E] & Async & Ctx))(
using
boundary: Boundary[Ctx, IO],
reduce: Reducible[Abort[E]],
boundary: Boundary[Ctx, IO & Abort[E]],
frame: Frame
): Fiber[E, A] < (IO & Ctx) =
boundary((trace, context) => Fiber.fromTask(IOTask(v, trace, context)))
Expand All @@ -63,7 +62,7 @@ object Async:
*/
def runAndBlock[E, A: Flat, Ctx](timeout: Duration)(v: => A < (Abort[E] & Async & Ctx))(
using
boundary: Boundary[Ctx, IO & Abort[E]],
boundary: Boundary[Ctx, IO & Abort[E | Timeout]],
frame: Frame
): A < (Abort[E | Timeout] & IO & Ctx) =
run(v).map { fiber =>
Expand All @@ -83,9 +82,10 @@ object Async:
*/
def mask[E, A: Flat, Ctx](v: => A < (Abort[E] & Async & Ctx))(
using
boundary: Boundary[Ctx, IO],
reduce: Reducible[Abort[E]],
boundary: Boundary[Ctx, Async & Abort[E]],
frame: Frame
): A < (Abort[E] & Async & Ctx) =
): A < (reduce.SReduced & Async & Ctx) =
Async.run(v).map(_.mask.map(_.get))

/** Delays execution of a computation by a specified duration.
Expand Down Expand Up @@ -130,7 +130,7 @@ object Async:
*/
def timeout[E, A: Flat, Ctx](d: Duration)(v: => A < (Abort[E] & Async & Ctx))(
using
boundary: Boundary[Ctx, Async & Abort[E]],
boundary: Boundary[Ctx, Async & Abort[E | Timeout]],
frame: Frame
): A < (Abort[E | Timeout] & Async & Ctx) =
boundary { (trace, context) =>
Expand All @@ -151,7 +151,7 @@ object Async:
*/
def race[E, A: Flat, Ctx](seq: Seq[A < (Abort[E] & Async & Ctx)])(
using
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): A < (reduce.SReduced & Async & Ctx) =
Expand All @@ -172,7 +172,7 @@ object Async:
rest: A < (Abort[E] & Async & Ctx)*
)(
using
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): A < (reduce.SReduced & Async & Ctx) =
Expand All @@ -188,7 +188,7 @@ object Async:
*/
def parallel[E, A: Flat, Ctx](seq: Seq[A < (Abort[E] & Async & Ctx)])(
using
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): Seq[A] < (reduce.SReduced & Async & Ctx) =
Expand All @@ -213,7 +213,7 @@ object Async:
v2: A2 < (Abort[E] & Async & Ctx)
)(
using
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): (A1, A2) < (reduce.SReduced & Async & Ctx) =
Expand All @@ -238,7 +238,7 @@ object Async:
v3: A3 < (Abort[E] & Async & Ctx)
)(
using
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): (A1, A2, A3) < (reduce.SReduced & Async & Ctx) =
Expand Down Expand Up @@ -266,7 +266,7 @@ object Async:
v4: A4 < (Abort[E] & Async & Ctx)
)(
using
boundary: Boundary[Ctx, Async],
boundary: Boundary[Ctx, Async & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame
): (A1, A2, A3, A4) < (reduce.SReduced & Async & Ctx) =
Expand Down
4 changes: 2 additions & 2 deletions kyo-core/shared/src/main/scala/kyo/Fiber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ object Fiber extends FiberPlatformSpecific:
*/
def race[E, A: Flat, Ctx](seq: Seq[A < (Abort[E] & Async & Ctx)])(
using
boundary: Boundary[Ctx, IO],
boundary: Boundary[Ctx, IO & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame,
safepoint: Safepoint
Expand Down Expand Up @@ -305,7 +305,7 @@ object Fiber extends FiberPlatformSpecific:
*/
def parallel[E, A: Flat, Ctx](seq: Seq[A < (Abort[E] & Async & Ctx)])(
using
boundary: Boundary[Ctx, IO],
boundary: Boundary[Ctx, IO & Abort[E]],
reduce: Reducible[Abort[E]],
frame: Frame,
safepoint: Safepoint
Expand Down
31 changes: 31 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/AsyncTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -398,4 +398,35 @@ class AsyncTest extends Test:
yield assert(r1 == 0 && r2 == 42)
}

"boundary inference with Abort" - {
"same failures" in {
val v: Int < Abort[Int] = 1
val _: Fiber[Int, Int] < IO = Async.run(v)
val _: Int < (Abort[Int | Timeout] & IO) = Async.runAndBlock(1.second)(v)
val _: Int < (Abort[Int] & Async) = Async.mask(v)
val _: Int < (Abort[Int | Timeout] & Async) = Async.timeout(1.second)(v)
val _: Int < (Abort[Int] & Async) = Async.race(Seq(v))
val _: Int < (Abort[Int] & Async) = Async.race(v, v)
val _: Seq[Int] < (Abort[Int] & Async) = Async.parallel(Seq(v))
val _: (Int, Int) < (Abort[Int] & Async) = Async.parallel(v, v)
val _: (Int, Int, Int) < (Abort[Int] & Async) = Async.parallel(v, v, v)
val _: (Int, Int, Int, Int) < (Abort[Int] & Async) = Async.parallel(v, v, v, v)
succeed
}
"additional failure" in {
val v: Int < Abort[Int] = 1
val _: Fiber[Int | String, Int] < IO = Async.run(v)
val _: Int < (Abort[Int | Timeout | String] & IO) = Async.runAndBlock(1.second)(v)
val _: Int < (Abort[Int | String] & Async) = Async.mask(v)
val _: Int < (Abort[Int | Timeout | String] & Async) = Async.timeout(1.second)(v)
val _: Int < (Abort[Int | String] & Async) = Async.race(Seq(v))
val _: Int < (Abort[Int | String] & Async) = Async.race(v, v)
val _: Seq[Int] < (Abort[Int | String] & Async) = Async.parallel(Seq(v))
val _: (Int, Int) < (Abort[Int | String] & Async) = Async.parallel(v, v)
val _: (Int, Int, Int) < (Abort[Int | String] & Async) = Async.parallel(v, v, v)
val _: (Int, Int, Int, Int) < (Abort[Int | String] & Async) = Async.parallel(v, v, v, v)
succeed
}
}

end AsyncTest
15 changes: 15 additions & 0 deletions kyo-core/shared/src/test/scala/kyo/FiberTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -579,4 +579,19 @@ class FiberTest extends Test:
}
}

"boundary inference with Abort" - {
"same failures" in {
val v: Int < Abort[Int] = 1
val _: Fiber[Int, Int] < IO = Fiber.race(Seq(v))
val _: Fiber[Int, Seq[Int]] < IO = Fiber.parallel(Seq(v))
succeed
}
"additional failure" in {
val v: Int < Abort[Int] = 1
val _: Fiber[Int | String, Int] < IO = Fiber.race(Seq(v))
val _: Fiber[Int | String, Seq[Int]] < IO = Fiber.parallel(Seq(v))
succeed
}
}

end FiberTest
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[kyo] object Boundary:

val s = flatten(TypeRepr.of[S])

val r = flatten(TypeRepr.of[Ctx]).filter(tpe => !s.exists(_ <:< tpe))
val r = flatten(TypeRepr.of[Ctx]).filter(tpe => !s.exists(tpe <:< _))

val nok = r.filterNot(tpe => (tpe <:< TypeRepr.of[ContextEffect[?]]) || (tpe =:= TypeRepr.of[Any]))
if nok.nonEmpty then
Expand Down
61 changes: 61 additions & 0 deletions kyo-prelude/shared/src/test/scala/kyo/kernel/BoundaryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,65 @@ class BoundaryTest extends Test:
assert(result.eval == 40)
}

"residual effects" - {
sealed trait ResidualEffect extends ArrowEffect[Const[Int], Const[Int]]
sealed trait SubResidualEffect extends ResidualEffect

"supports residual effects in S type parameter" in {
val boundary = Boundary.derive[TestEffect1, ResidualEffect]
val _: Boundary[TestEffect1, ResidualEffect] = boundary
succeed
}

"allows using residual effects within boundary" in {
val boundary = Boundary.derive[TestEffect1, ResidualEffect]
val effect: Int < (TestEffect1 & ResidualEffect) = boundary { (trace, context) =>
for
x <- ContextEffect.suspend[Int, TestEffect1](Tag[TestEffect1])
y <- ArrowEffect.suspend[Int](Tag[ResidualEffect], x)
yield y
}

val result = ContextEffect.handle(Tag[TestEffect1], 10, _ + 1) {
ArrowEffect.handle(Tag[ResidualEffect], effect) {
[C] => (input, cont) => cont(input * 2)
}
}

assert(result.eval == 20)
}

"preserves residual effects after boundary application" in {
val boundary = Boundary.derive[TestEffect1, ResidualEffect]
val effect: Int < (TestEffect1 & ResidualEffect) = boundary { (trace, context) =>
ContextEffect.suspend[Int, TestEffect1](Tag[TestEffect1])
}

val result = ContextEffect.handle(Tag[TestEffect1], 5, _ + 1)(effect)

val finalResult = ArrowEffect.handle(Tag[ResidualEffect], result) {
[C] => (input, cont) => cont(input * 2)
}
assert(finalResult.eval == 5)
}

"supports subclasses of residual effects" in {
val boundary = Boundary.derive[TestEffect1, ResidualEffect]
val effect: Int < (TestEffect1 & SubResidualEffect) = boundary { (trace, context) =>
for
x <- ContextEffect.suspend[Int, TestEffect1](Tag[TestEffect1])
y <- ArrowEffect.suspend[Int](Tag[SubResidualEffect], x)
yield y
}

val result = ContextEffect.handle(Tag[TestEffect1], 15, _ + 1) {
ArrowEffect.handle(Tag[SubResidualEffect], effect) {
[C] => (input, cont) => cont(input * 2)
}
}

assert(result.eval == 30)
}
}

end BoundaryTest

0 comments on commit e9a3b04

Please sign in to comment.