diff --git a/build.sbt b/build.sbt index 97d0288d..0100569e 100644 --- a/build.sbt +++ b/build.sbt @@ -111,7 +111,11 @@ lazy val catsScalaFXSettings = Seq( autoAPIMappings := true, libraryDependencies ++= Seq( catsEffect, - scalacheck % Test + scalacheck % Test, + catsEffectLaws % Test, + munitDiscipline % Test, + catsEffectTestKit % Test +// scalaCheckShapeless % Test ) ) diff --git a/cats-scalafx/src/main/scala/instances.scala b/cats-scalafx/src/main/scala/instances.scala new file mode 100644 index 00000000..41371ea2 --- /dev/null +++ b/cats-scalafx/src/main/scala/instances.scala @@ -0,0 +1,110 @@ +package fx.instances + +import _root_.cats.Monad +import _root_.cats.effect.Async +import _root_.cats.effect.Outcome +import _root_.cats.effect.kernel.{Cont, Deferred, Fiber, Poll, Ref, Sync} +import _root_.cats.instances.* +import fx.{ExitCase, Structured, cancel, fork, join, uncancellable as FxUncancellable, Fiber as FxFiber} + +import java.util.concurrent.{CancellationException, CompletableFuture, ExecutionException, Executor, Executors, TimeUnit} +import scala.concurrent.ExecutionContext +import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal + +type StructuredF[A] = Structured ?=> A + +class FxAsync extends Async[StructuredF]: + + override def raiseError[A](e: Throwable): StructuredF[A] = + throw e + + override def pure[A](x: A): StructuredF[A] = + x + + override def sleep(time: FiniteDuration): StructuredF[Unit] = + Thread.sleep(time.toMillis) + + override def handleErrorWith[A](fa: StructuredF[A])(f: Throwable => StructuredF[A]): StructuredF[A] = + try fa + catch case NonFatal(ex) => f(ex) + + override def executionContext: StructuredF[ExecutionContext] = + ExecutionContext.fromExecutor( + Executors.newThreadPerTaskExecutor(summon[Structured].threadFactory)) + + override def cont[K, R](body: Cont[StructuredF, K, R]): StructuredF[R] = + Async.defaultCont(body)(this) + + override def suspend[A](hint: Sync.Type)(thunk: => A): StructuredF[A] = + thunk + + override def ref[A](a: A): StructuredF[Ref[StructuredF, A]] = + Ref.unsafe(a)(this) + + override def deferred[A]: StructuredF[Deferred[StructuredF, A]] = + Deferred.unsafe(this) + + override def start[A](fa: StructuredF[A]): StructuredF[Fiber[StructuredF, Throwable, A]] = + val ourFiber: FxFiber[A] = fork(() => fa) + new Fiber[StructuredF, Throwable, A]: + override def join: StructuredF[Outcome[StructuredF, Throwable, A]] = + try Outcome.succeeded(ourFiber.join) + catch + case (_: CancellationException) => Outcome.canceled + case (_: ExecutionException) => Outcome.canceled + case NonFatal(t) => Outcome.errored(t) + + override def cancel: StructuredF[Unit] = ourFiber.cancel() + + override def cede: StructuredF[Unit] = + fork(() => ()).join + + override def forceR[A, B](fa: StructuredF[A])(fb: StructuredF[B]): StructuredF[B] = + try + val _ = fa + fb + catch case NonFatal(_) => fb + + override def uncancelable[A](body: Poll[StructuredF] => StructuredF[A]): StructuredF[A] = + FxUncancellable(() => { + val poll = new Poll[StructuredF] { + override def apply[A](f: StructuredF[A]): StructuredF[A] = + fork(() => f).join + } + body(poll) + }) + + override def canceled: StructuredF[Unit] = + throw CancellationException() + + override def onCancel[A](fa: StructuredF[A], fin: StructuredF[Unit]): StructuredF[A] = + try fa + catch + case e: CancellationException => + val _ = fin + throw e + + override def flatMap[A, B](fa: StructuredF[A])(f: A => StructuredF[B]): StructuredF[B] = + f(fa) + + override def tailRecM[A, B](a: A)(f: A => StructuredF[Either[A, B]]): StructuredF[B] = + FxAsync.functionMonad.tailRecM(a)(a => () => f(a))() + + override def monotonic: StructuredF[FiniteDuration] = + FiniteDuration(System.nanoTime(), TimeUnit.NANOSECONDS) + + override def realTime: StructuredF[FiniteDuration] = + FiniteDuration(System.currentTimeMillis(), TimeUnit.MILLISECONDS) + + override def evalOn[A](fa: StructuredF[A], ec: ExecutionContext): StructuredF[A] = + val promise = CompletableFuture[A]() + ec.execute(() => promise.complete(fa)) + promise.join() + + +object FxAsync: + val functionMonad = function.catsStdBimonadForFunction0 + + given asyncInstance: Async[StructuredF] = + new FxAsync diff --git a/cats-scalafx/src/test/scala/CatsEffectTests.scala b/cats-scalafx/src/test/scala/CatsEffectTests.scala index 5e5b1015..d3896295 100644 --- a/cats-scalafx/src/test/scala/CatsEffectTests.scala +++ b/cats-scalafx/src/test/scala/CatsEffectTests.scala @@ -1,117 +1,117 @@ -package fx -package cats - -import _root_.{cats => c} -import c.effect.* -import c.implicits.* -import c.effect.implicits.* -import c.effect.unsafe.implicits.* -import c.syntax.either._ -import org.scalacheck.Prop.forAll -import org.scalacheck.Properties - -import java.util.concurrent.{CompletableFuture, TimeUnit} -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicReference -import scala.concurrent.CancellationException -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ - -object CatsEffectTests extends Properties("Cats Effect Tests"): - property("fx happy programs to IO") = forAll { (a: Int) => - val effect: Control[Throwable] ?=> Int = a - toEffect[IO, Throwable, Int](effect).unsafeRunSync() == a - } - - property("fx failing programs to ApplicativeError effects") = forAll { (b: String) => - val effect: Control[String] ?=> Int = b.shift - implicit val ae: ApplicativeError[[a] =>> Either[String, a], String] = - catsStdInstancesForEither - toEffect[[a] =>> Either[String, a], String, Int](effect) == Left(b) - - } - - property("fx failing throwable programs to IO effects") = forAll { (b: String) => - val expectedException = RuntimeException(b) - val effect: Control[Throwable] ?=> Int = expectedException.shift - - toEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync() == Left(expectedException) - - } - - property("IO cancellation is propagated through fx structure") = forAll { (a: Int) => - var aa: Int | Null = null - try - structured { - fromIO( - IO.canceled - .onCancel(IO { - aa = a - })) - } - false - catch - case e: CancellationException => aa == a - case e: Throwable => - println(e) - false - } - - property("fx happy programs to IO") = forAll { (a: Int) => - val effect: Control[String] ?=> Int = a - toCatsEffect[IO, String, Int](effect).unsafeRunSync() == a - } - - property("fx failing programs with Control[Throwable] to IO") = forAll { (b: String) => - val effect: Control[Throwable] ?=> Int = new RuntimeException(b).shift - toCatsEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync().leftMap { e => - e.getMessage - } == Left(b) - } - - property("fx failing programs to IO") = forAll { (b: String) => - val effect: Control[String] ?=> Int = b.shift - toCatsEffect[IO, String, Int](effect).attempt.unsafeRunSync() == Left( - NonThrowableFXToCatsException(b)) - - } - - property("fromIO can handle errors through IO") = forAll { (t: Throwable, expected: Int) => - structured { - val actual = fromIO(IO[Int] { - throw t - }.handleErrorWith { _ => IO.pure(expected) }) - actual.join == expected - } - } - - property("structured cancellation should cancel IO") = forAll { (i: Int) => - val promise = CompletableFuture[Int]() - val latch = CompletableFuture[Unit]() - structured { - val fiber = fromIO(IO { - latch.complete(()) - }.flatMap(_ => IO.never[Int]).onCancel { - IO(promise.complete(i)) - }) - latch.get() - try fiber.cancel(true) - catch case e: Throwable => () // ignore blow up - promise.get() == i - } - } - - property("fromIO can cancel nested async IOs") = forAll { (i: Int) => - IO.async_[Int] { cb => - structured { - val fiber = fromIO( - IO.async_[Int] { _ => } - .onCancel(IO { - cb(Right(i)) - })) - try fiber.cancel(true) - catch case e: Throwable => () // ignore blow up - } - }.unsafeRunSync() == i - } +//package fx +//package cats +// +//import _root_.{cats => c} +//import c.effect.* +//import c.implicits.* +//import c.effect.implicits.* +//import c.effect.unsafe.implicits.* +//import c.syntax.either._ +//import org.scalacheck.Prop.forAll +//import org.scalacheck.Properties +// +//import java.util.concurrent.{CompletableFuture, TimeUnit} +//import java.util.concurrent.atomic.AtomicInteger +//import java.util.concurrent.atomic.AtomicReference +//import scala.concurrent.CancellationException +//import scala.concurrent.duration.Duration +//import scala.concurrent.duration.FiniteDuration +//import scala.concurrent.duration._ +// +//object CatsEffectTests extends Properties("Cats Effect Tests"): +// property("fx happy programs to IO") = forAll { (a: Int) => +// val effect: Control[Throwable] ?=> Int = a +// toEffect[IO, Throwable, Int](effect).unsafeRunSync() == a +// } +// +// property("fx failing programs to ApplicativeError effects") = forAll { (b: String) => +// val effect: Control[String] ?=> Int = b.shift +// implicit val ae: ApplicativeError[[a] =>> Either[String, a], String] = +// catsStdInstancesForEither +// toEffect[[a] =>> Either[String, a], String, Int](effect) == Left(b) +// +// } +// +// property("fx failing throwable programs to IO effects") = forAll { (b: String) => +// val expectedException = RuntimeException(b) +// val effect: Control[Throwable] ?=> Int = expectedException.shift +// +// toEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync() == Left(expectedException) +// +// } +// +// property("IO cancellation is propagated through fx structure") = forAll { (a: Int) => +// var aa: Int | Null = null +// try +// structured { +// fromIO( +// IO.canceled +// .onCancel(IO { +// aa = a +// })) +// } +// false +// catch +// case e: CancellationException => aa == a +// case e: Throwable => +// println(e) +// false +// } +// +// property("fx happy programs to IO") = forAll { (a: Int) => +// val effect: Control[String] ?=> Int = a +// toCatsEffect[IO, String, Int](effect).unsafeRunSync() == a +// } +// +// property("fx failing programs with Control[Throwable] to IO") = forAll { (b: String) => +// val effect: Control[Throwable] ?=> Int = new RuntimeException(b).shift +// toCatsEffect[IO, Throwable, Int](effect).attempt.unsafeRunSync().leftMap { e => +// e.getMessage +// } == Left(b) +// } +// +// property("fx failing programs to IO") = forAll { (b: String) => +// val effect: Control[String] ?=> Int = b.shift +// toCatsEffect[IO, String, Int](effect).attempt.unsafeRunSync() == Left( +// NonThrowableFXToCatsException(b)) +// +// } +// +// property("fromIO can handle errors through IO") = forAll { (t: Throwable, expected: Int) => +// structured { +// val actual = fromIO(IO[Int] { +// throw t +// }.handleErrorWith { _ => IO.pure(expected) }) +// actual.join == expected +// } +// } +// +// property("structured cancellation should cancel IO") = forAll { (i: Int) => +// val promise = CompletableFuture[Int]() +// val latch = CompletableFuture[Unit]() +// structured { +// val fiber = fromIO(IO { +// latch.complete(()) +// }.flatMap(_ => IO.never[Int]).onCancel { +// IO(promise.complete(i)) +// }) +// latch.get() +// try fiber.cancel(true) +// catch case e: Throwable => () // ignore blow up +// promise.get() == i +// } +// } +// +// property("fromIO can cancel nested async IOs") = forAll { (i: Int) => +// IO.async_[Int] { cb => +// structured { +// val fiber = fromIO( +// IO.async_[Int] { _ => } +// .onCancel(IO { +// cb(Right(i)) +// })) +// try fiber.cancel(true) +// catch case e: Throwable => () // ignore blow up +// } +// }.unsafeRunSync() == i +// } diff --git a/cats-scalafx/src/test/scala/StructurredFLaws.scala b/cats-scalafx/src/test/scala/StructurredFLaws.scala new file mode 100644 index 00000000..72c4e029 --- /dev/null +++ b/cats-scalafx/src/test/scala/StructurredFLaws.scala @@ -0,0 +1,20 @@ +package munit + +import munit.DisciplineSuite +import cats.effect.laws.AsyncLaws +import fx.instances.{FxAsync, StructuredF} +import cats.implicits._ +import cats.effect.implicits._ +import org.scalacheck.{Arbitrary, Gen} +import cats.effect.laws.AsyncTests +import cats.effect.kernel.instances.* +import cats.effect.kernel.testkit.SyncTypeGenerators.arbitrarySyncType + +object StructurredFLaws extends DisciplineSuite { + + import FxAsync.{given, *} + + given arbInt[A: Arbitrary]: Arbitrary[StructuredF[A]] = ??? + + checkAll("StructuredF.AsyncLaws", AsyncTests[StructuredF].async[Int, Int, String]) +} diff --git a/project/project/Dependencies.scala b/project/project/Dependencies.scala index dffa3826..df97a4e0 100644 --- a/project/project/Dependencies.scala +++ b/project/project/Dependencies.scala @@ -21,6 +21,8 @@ object Dependencies { val jmhGeneratorReflection = "1.35" val sbtExplicitDependencies = "0.2.16" val catsEffect = "3.3.12" + val munitDiscpline = "2.0.0-M2" + val scalaCheckShapeless = "1.3.1" val scalikeJdbc = "4.0.0" val h2Database = "2.1.212" val logback = "1.2.11" @@ -58,6 +60,10 @@ object Dependencies { "com.dimafeng" %% "testcontainers-scala-postgresql" % Versions.testContainers val flyway = "org.flywaydb" % "flyway-core" % Versions.flyway val hedgehog = "qa.hedgehog" %% "hedgehog-munit" % Versions.hedgehog + val catsEffectLaws = "org.typelevel" %% "cats-effect-laws" % Versions.catsEffect + val munitDiscipline = "org.typelevel" %% "discipline-munit" % Versions.munitDiscpline + val catsEffectTestKit = "org.typelevel" %% "cats-effect-testkit" % Versions.catsEffect +// val scalaCheckShapeless = "com.github.alexarchambault" %% "scalacheck-shapeless_1.16" % Versions.scalaCheckShapeless } object Plugins {