Skip to content

Commit

Permalink
core: Queue/Channel/Meter improvements and fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
fwbrasil committed Oct 18, 2024
1 parent 3e5682b commit 5addfb5
Show file tree
Hide file tree
Showing 31 changed files with 1,418 additions and 806 deletions.
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/Bench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object Bench:

abstract class Base[A](expectedResult: A) extends Bench[A](expectedResult):
def zioBench(): zio.UIO[A]
def kyoBenchFiber(): kyo.<[A, kyo.Async] = kyoBench()
def kyoBenchFiber(): kyo.<[A, kyo.Async & kyo.Abort[Throwable]] = kyoBench()
def kyoBench(): kyo.<[A, kyo.IO]
def catsBench(): cats.effect.IO[A]
end Base
Expand Down
4 changes: 3 additions & 1 deletion kyo-bench/src/main/scala/kyo/bench/CollectParBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ class CollectParBench extends Bench.ForkOnly(Seq.fill(1000)(1)):
override def kyoBenchFiber() =
import kyo.*

Async.parallel(kyoTasks)
// TODO inference issue
val x = Async.parallel(kyoTasks)
x
end kyoBenchFiber

def catsBench() =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class EnqueueDequeueBench extends Bench.ForkOnly(()):

import kyo.Access

def loop(c: Channel[Unit], i: Int): Unit < Async =
def loop(c: Channel[Unit], i: Int): Unit < (Async & Abort[Closed]) =
if i >= depth then
IO.unit
else
Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/HttpClientBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class HttpClientBench extends Bench.ForkOnly("pong"):
override def kyoBenchFiber() =
import kyo.*

Abort.run(Requests(_.get(kyoUrl))).map(_.getOrThrow)
Requests(_.get(kyoUrl))
end kyoBenchFiber

val zioUrl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ class HttpClientContentionBench
override def kyoBenchFiber() =
import kyo.*

Async.parallel(Seq.fill(concurrency)(Abort.run(Requests(_.get(kyoUrl))).map(_.getOrThrow)))
// TODO inference issue
val x = Async.parallel(Seq.fill(concurrency)(Requests(_.get(kyoUrl))))
x
end kyoBenchFiber

val zioUrl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ class HttpClientRaceContentionBench
override def kyoBenchFiber() =
import kyo.*

Async.race(Seq.fill(concurrency)(Requests.let(kyoClient)(Abort.run(Requests(_.get(kyoUrl)))).map(_.getOrThrow)))
// TODO inference issue
val x = Async.race(Seq.fill(concurrency)(Requests.let(kyoClient)(Requests(_.get(kyoUrl)))))
x
end kyoBenchFiber

val zioUrl =
Expand Down
12 changes: 5 additions & 7 deletions kyo-bench/src/main/scala/kyo/bench/MtlBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@ class MtlBench extends Bench(()):
_ <- Var.update((state: State) => state.copy(value = state.value + 1))
yield ()
)
Abort.run[Throwable](
Var.run(State(2))(
Emit.run(
Env.run(EnvValue("config"))(
testKyo.andThen(Var.get[State])
)
Var.run(State(2))(
Emit.run(
Env.run(EnvValue("config"))(
testKyo.andThen(Var.get[State])
)
)
).eval
)
end syncKyo

@Benchmark
Expand Down
6 changes: 3 additions & 3 deletions kyo-bench/src/main/scala/kyo/bench/PingPongBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class PingPongBench extends Bench.ForkOnly(()):
override def kyoBenchFiber() =
import kyo.*

def repeat[A](n: Int)(io: A < Async): A < Async =
def repeat[A](n: Int)(io: A < (Async & Abort[Closed])): A < (Async & Abort[Closed]) =
if n <= 1 then io
else io.flatMap(_ => repeat(n - 1)(io))

def iterate(promise: Promise[Nothing, Unit], n: Int): Unit < Async =
def iterate(promise: Promise[Nothing, Unit], n: Int): Unit < (Async & Abort[Closed]) =
for
ref <- AtomicInt.init(n)
chan <- Channel.init[Unit](1)
Expand All @@ -52,7 +52,7 @@ class PingPongBench extends Bench.ForkOnly(()):
n <- ref.decrementAndGet
_ <- if n == 0 then promise.complete(Result.unit).unit else IO.unit
yield ()
_ <- repeat(depth)(Async.run[Nothing, Unit, Any](effect))
_ <- repeat(depth)(Async.run[Closed, Unit, Any](effect))
yield ()

for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ProducerConsumerBench extends Bench.ForkOnly(()):

import kyo.Access

def repeat[A](n: Int)(io: A < Async): A < Async =
def repeat[A](n: Int)(io: A < (Async & Abort[Closed])): A < (Async & Abort[Closed]) =
if n <= 1 then io
else io.flatMap(_ => repeat(n - 1)(io))

Expand Down
2 changes: 1 addition & 1 deletion kyo-bench/src/main/scala/kyo/bench/SemaphoreBench.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class SemaphoreBench extends Bench.ForkOnly(()):
override def kyoBenchFiber() =
import kyo.*

def loop(s: Meter, i: Int): Unit < Async =
def loop(s: Meter, i: Int): Unit < (Async & Abort[Closed]) =
if i >= depth then
IO.unit
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SemaphoreContentionBench extends Bench.ForkOnly(()):
if n <= 1 then io
else io.flatMap(_ => repeat(n - 1)(io))

def loop(sem: Meter, cdl: Latch, i: Int = 0): Unit < Async =
def loop(sem: Meter, cdl: Latch, i: Int = 0): Unit < (Async & Abort[Closed]) =
if i >= depth then
cdl.release
else
Expand Down
18 changes: 15 additions & 3 deletions kyo-core/js/src/main/scala/kyo/queuesStubs.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
package org.jctools.queues

import java.util.ArrayDeque
import scala.annotation.tailrec

class StubQueue[A](capacity: Int) extends ArrayDeque[A]:
def isFull = size() >= capacity
def drain(f: A => Unit): Unit =
given [B]: CanEqual[B, B] = CanEqual.derived
@tailrec def loop(): Unit =
super.poll() match
case null =>
case value =>
f(value)
loop()
end loop
loop()
end drain
override def offer(e: A): Boolean =
!isFull && super.offer(e)
end StubQueue
Expand All @@ -16,8 +28,8 @@ case class SpmcArrayQueue[A](capacity: Int) extends StubQueue[A](capacity)

case class SpscArrayQueue[A](capacity: Int) extends StubQueue[A](capacity)

case class MpmcUnboundedXaddArrayQueue[A](chunkSize: Int) extends ArrayDeque[A] {}
case class MpmcUnboundedXaddArrayQueue[A](chunkSize: Int) extends StubQueue[A](Int.MaxValue)

case class MpscUnboundedArrayQueue[A](chunkSize: Int) extends ArrayDeque[A] {}
case class MpscUnboundedArrayQueue[A](chunkSize: Int) extends StubQueue[A](Int.MaxValue)

case class SpscUnboundedArrayQueue[A](chunkSize: Int) extends ArrayDeque[A] {}
case class SpscUnboundedArrayQueue[A](chunkSize: Int) extends StubQueue[A](Int.MaxValue)
Loading

0 comments on commit 5addfb5

Please sign in to comment.