Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Cats-effect Async instance for A =>> Structured ?=> A #42

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ lazy val catsScalaFXSettings = Seq(
autoAPIMappings := true,
libraryDependencies ++= Seq(
catsEffect,
scalacheck % Test
scalacheck % Test,
catsEffectLaws % Test,
munitDiscipline % Test,
catsEffectTestKit % Test
// scalaCheckShapeless % Test
)
)

Expand Down
110 changes: 110 additions & 0 deletions cats-scalafx/src/main/scala/instances.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package fx.instances

import _root_.cats.Monad
import _root_.cats.effect.Async
import _root_.cats.effect.Outcome
import _root_.cats.effect.kernel.{Cont, Deferred, Fiber, Poll, Ref, Sync}
import _root_.cats.instances.*
import fx.{ExitCase, Structured, cancel, fork, join, uncancellable as FxUncancellable, Fiber as FxFiber}

import java.util.concurrent.{CancellationException, CompletableFuture, ExecutionException, Executor, Executors, TimeUnit}
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal

type StructuredF[A] = Structured ?=> A

class FxAsync extends Async[StructuredF]:

override def raiseError[A](e: Throwable): StructuredF[A] =
throw e

override def pure[A](x: A): StructuredF[A] =
x

override def sleep(time: FiniteDuration): StructuredF[Unit] =
Thread.sleep(time.toMillis)

override def handleErrorWith[A](fa: StructuredF[A])(f: Throwable => StructuredF[A]): StructuredF[A] =
try fa
catch case NonFatal(ex) => f(ex)

override def executionContext: StructuredF[ExecutionContext] =
ExecutionContext.fromExecutor(
Executors.newThreadPerTaskExecutor(summon[Structured].threadFactory))

override def cont[K, R](body: Cont[StructuredF, K, R]): StructuredF[R] =
Async.defaultCont(body)(this)

override def suspend[A](hint: Sync.Type)(thunk: => A): StructuredF[A] =
thunk

override def ref[A](a: A): StructuredF[Ref[StructuredF, A]] =
Ref.unsafe(a)(this)

override def deferred[A]: StructuredF[Deferred[StructuredF, A]] =
Deferred.unsafe(this)

override def start[A](fa: StructuredF[A]): StructuredF[Fiber[StructuredF, Throwable, A]] =
val ourFiber: FxFiber[A] = fork(() => fa)
new Fiber[StructuredF, Throwable, A]:
override def join: StructuredF[Outcome[StructuredF, Throwable, A]] =
try Outcome.succeeded(ourFiber.join)
catch
case (_: CancellationException) => Outcome.canceled
case (_: ExecutionException) => Outcome.canceled
case NonFatal(t) => Outcome.errored(t)

override def cancel: StructuredF[Unit] = ourFiber.cancel()

override def cede: StructuredF[Unit] =
fork(() => ()).join

override def forceR[A, B](fa: StructuredF[A])(fb: StructuredF[B]): StructuredF[B] =
try
val _ = fa
fb
catch case NonFatal(_) => fb

override def uncancelable[A](body: Poll[StructuredF] => StructuredF[A]): StructuredF[A] =
FxUncancellable(() => {
val poll = new Poll[StructuredF] {
override def apply[A](f: StructuredF[A]): StructuredF[A] =
fork(() => f).join
}
body(poll)
})

override def canceled: StructuredF[Unit] =
throw CancellationException()

override def onCancel[A](fa: StructuredF[A], fin: StructuredF[Unit]): StructuredF[A] =
try fa
catch
case e: CancellationException =>
val _ = fin
throw e

override def flatMap[A, B](fa: StructuredF[A])(f: A => StructuredF[B]): StructuredF[B] =
f(fa)

override def tailRecM[A, B](a: A)(f: A => StructuredF[Either[A, B]]): StructuredF[B] =
FxAsync.functionMonad.tailRecM(a)(a => () => f(a))()

override def monotonic: StructuredF[FiniteDuration] =
FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS)

override def realTime: StructuredF[FiniteDuration] =
FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS)

override def evalOn[A](fa: StructuredF[A], ec: ExecutionContext): StructuredF[A] =
val promise = CompletableFuture[A]()
ec.execute(() => promise.complete(fa))
promise.join()


object FxAsync:
val functionMonad = function.catsStdBimonadForFunction0

given asyncInstance: Async[StructuredF] =
new FxAsync
234 changes: 117 additions & 117 deletions cats-scalafx/src/test/scala/CatsEffectTests.scala
Original file line number Diff line number Diff line change
@@ -1,117 +1,117 @@
package fx
package cats

import _root_.{cats => c}
import c.effect.*
import c.implicits.*
import c.effect.implicits.*
import c.effect.unsafe.implicits.*
import c.syntax.either._
import org.scalacheck.Prop.forAll
import org.scalacheck.Properties

import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.CancellationException
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._

object CatsEffectTests extends Properties("Cats Effect Tests"):
property("fx happy programs to IO") = forAll { (a: Int) =>
val effect: Control[Throwable] ?=> Int = a
toEffect[IO, Throwable, Int](effect).unsafeRunSync() == a
}

property("fx failing programs to ApplicativeError effects") = forAll { (b: String) =>
val effect: Control[String] ?=> Int = b.shift
implicit val ae: ApplicativeError[[a] =>> Either[String, a], String] =
catsStdInstancesForEither
toEffect[[a] =>> Either[String, a], String, Int](effect) == Left(b)

}

property("fx failing throwable programs to IO effects") = forAll { (b: String) =>
val expectedException = RuntimeException(b)
val effect: Control[Throwable] ?=> Int = expectedException.shift

toEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync() == Left(expectedException)

}

property("IO cancellation is propagated through fx structure") = forAll { (a: Int) =>
var aa: Int | Null = null
try
structured {
fromIO(
IO.canceled
.onCancel(IO {
aa = a
}))
}
false
catch
case e: CancellationException => aa == a
case e: Throwable =>
println(e)
false
}

property("fx happy programs to IO") = forAll { (a: Int) =>
val effect: Control[String] ?=> Int = a
toCatsEffect[IO, String, Int](effect).unsafeRunSync() == a
}

property("fx failing programs with Control[Throwable] to IO") = forAll { (b: String) =>
val effect: Control[Throwable] ?=> Int = new RuntimeException(b).shift
toCatsEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync().leftMap { e =>
e.getMessage
} == Left(b)
}

property("fx failing programs to IO") = forAll { (b: String) =>
val effect: Control[String] ?=> Int = b.shift
toCatsEffect[IO, String, Int](effect).attempt.unsafeRunSync() == Left(
NonThrowableFXToCatsException(b))

}

property("fromIO can handle errors through IO") = forAll { (t: Throwable, expected: Int) =>
structured {
val actual = fromIO(IO[Int] {
throw t
}.handleErrorWith { _ => IO.pure(expected) })
actual.join == expected
}
}

property("structured cancellation should cancel IO") = forAll { (i: Int) =>
val promise = CompletableFuture[Int]()
val latch = CompletableFuture[Unit]()
structured {
val fiber = fromIO(IO {
latch.complete(())
}.flatMap(_ => IO.never[Int]).onCancel {
IO(promise.complete(i))
})
latch.get()
try fiber.cancel(true)
catch case e: Throwable => () // ignore blow up
promise.get() == i
}
}

property("fromIO can cancel nested async IOs") = forAll { (i: Int) =>
IO.async_[Int] { cb =>
structured {
val fiber = fromIO(
IO.async_[Int] { _ => }
.onCancel(IO {
cb(Right(i))
}))
try fiber.cancel(true)
catch case e: Throwable => () // ignore blow up
}
}.unsafeRunSync() == i
}
//package fx
//package cats
//
//import _root_.{cats => c}
//import c.effect.*
//import c.implicits.*
//import c.effect.implicits.*
//import c.effect.unsafe.implicits.*
//import c.syntax.either._
//import org.scalacheck.Prop.forAll
//import org.scalacheck.Properties
//
//import java.util.concurrent.{CompletableFuture, TimeUnit}
//import java.util.concurrent.atomic.AtomicInteger
//import java.util.concurrent.atomic.AtomicReference
//import scala.concurrent.CancellationException
//import scala.concurrent.duration.Duration
//import scala.concurrent.duration.FiniteDuration
//import scala.concurrent.duration._
//
//object CatsEffectTests extends Properties("Cats Effect Tests"):
// property("fx happy programs to IO") = forAll { (a: Int) =>
// val effect: Control[Throwable] ?=> Int = a
// toEffect[IO, Throwable, Int](effect).unsafeRunSync() == a
// }
//
// property("fx failing programs to ApplicativeError effects") = forAll { (b: String) =>
// val effect: Control[String] ?=> Int = b.shift
// implicit val ae: ApplicativeError[[a] =>> Either[String, a], String] =
// catsStdInstancesForEither
// toEffect[[a] =>> Either[String, a], String, Int](effect) == Left(b)
//
// }
//
// property("fx failing throwable programs to IO effects") = forAll { (b: String) =>
// val expectedException = RuntimeException(b)
// val effect: Control[Throwable] ?=> Int = expectedException.shift
//
// toEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync() == Left(expectedException)
//
// }
//
// property("IO cancellation is propagated through fx structure") = forAll { (a: Int) =>
// var aa: Int | Null = null
// try
// structured {
// fromIO(
// IO.canceled
// .onCancel(IO {
// aa = a
// }))
// }
// false
// catch
// case e: CancellationException => aa == a
// case e: Throwable =>
// println(e)
// false
// }
//
// property("fx happy programs to IO") = forAll { (a: Int) =>
// val effect: Control[String] ?=> Int = a
// toCatsEffect[IO, String, Int](effect).unsafeRunSync() == a
// }
//
// property("fx failing programs with Control[Throwable] to IO") = forAll { (b: String) =>
// val effect: Control[Throwable] ?=> Int = new RuntimeException(b).shift
// toCatsEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync().leftMap { e =>
// e.getMessage
// } == Left(b)
// }
//
// property("fx failing programs to IO") = forAll { (b: String) =>
// val effect: Control[String] ?=> Int = b.shift
// toCatsEffect[IO, String, Int](effect).attempt.unsafeRunSync() == Left(
// NonThrowableFXToCatsException(b))
//
// }
//
// property("fromIO can handle errors through IO") = forAll { (t: Throwable, expected: Int) =>
// structured {
// val actual = fromIO(IO[Int] {
// throw t
// }.handleErrorWith { _ => IO.pure(expected) })
// actual.join == expected
// }
// }
//
// property("structured cancellation should cancel IO") = forAll { (i: Int) =>
// val promise = CompletableFuture[Int]()
// val latch = CompletableFuture[Unit]()
// structured {
// val fiber = fromIO(IO {
// latch.complete(())
// }.flatMap(_ => IO.never[Int]).onCancel {
// IO(promise.complete(i))
// })
// latch.get()
// try fiber.cancel(true)
// catch case e: Throwable => () // ignore blow up
// promise.get() == i
// }
// }
//
// property("fromIO can cancel nested async IOs") = forAll { (i: Int) =>
// IO.async_[Int] { cb =>
// structured {
// val fiber = fromIO(
// IO.async_[Int] { _ => }
// .onCancel(IO {
// cb(Right(i))
// }))
// try fiber.cancel(true)
// catch case e: Throwable => () // ignore blow up
// }
// }.unsafeRunSync() == i
// }
20 changes: 20 additions & 0 deletions cats-scalafx/src/test/scala/StructurredFLaws.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package munit

import munit.DisciplineSuite
import cats.effect.laws.AsyncLaws
import fx.instances.{FxAsync, StructuredF}
import cats.implicits._
import cats.effect.implicits._
import org.scalacheck.{Arbitrary, Gen}
import cats.effect.laws.AsyncTests
import cats.effect.kernel.instances.*
import cats.effect.kernel.testkit.SyncTypeGenerators.arbitrarySyncType

object StructurredFLaws extends DisciplineSuite {

import FxAsync.{given, *}

given arbInt[A: Arbitrary]: Arbitrary[StructuredF[A]] = ???

checkAll("StructuredF.AsyncLaws", AsyncTests[StructuredF].async[Int, Int, String])
}
6 changes: 6 additions & 0 deletions project/project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ object Dependencies {
val jmhGeneratorReflection = "1.35"
val sbtExplicitDependencies = "0.2.16"
val catsEffect = "3.3.12"
val munitDiscpline = "2.0.0-M2"
val scalaCheckShapeless = "1.3.1"
val scalikeJdbc = "4.0.0"
val h2Database = "2.1.212"
val logback = "1.2.11"
Expand Down Expand Up @@ -58,6 +60,10 @@ object Dependencies {
"com.dimafeng" %% "testcontainers-scala-postgresql" % Versions.testContainers
val flyway = "org.flywaydb" % "flyway-core" % Versions.flyway
val hedgehog = "qa.hedgehog" %% "hedgehog-munit" % Versions.hedgehog
val catsEffectLaws = "org.typelevel" %% "cats-effect-laws" % Versions.catsEffect
val munitDiscipline = "org.typelevel" %% "discipline-munit" % Versions.munitDiscpline
val catsEffectTestKit = "org.typelevel" %% "cats-effect-testkit" % Versions.catsEffect
// val scalaCheckShapeless = "com.github.alexarchambault" %% "scalacheck-shapeless_1.16" % Versions.scalaCheckShapeless
}

object Plugins {
Expand Down