From 22fd9fef82893540acf36c3599b106886c6ad461 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 19 Oct 2022 17:31:03 +0200 Subject: [PATCH 1/4] Add tests, unwrap exception uncancellable, and pinpoint hanging test --- scala-fx/src/main/scala/fx/Fiber.scala | 5 +- scala-fx/src/test/scala/fx/BracketTests.scala | 60 +++++++++++++++++++ .../test/scala/fx/UncancellableTests.scala | 50 ++++++++++++++++ 3 files changed, 114 insertions(+), 1 deletion(-) create mode 100644 scala-fx/src/test/scala/fx/BracketTests.scala create mode 100644 scala-fx/src/test/scala/fx/UncancellableTests.scala diff --git a/scala-fx/src/main/scala/fx/Fiber.scala b/scala-fx/src/main/scala/fx/Fiber.scala index 131ef521..3d5a0f54 100644 --- a/scala-fx/src/main/scala/fx/Fiber.scala +++ b/scala-fx/src/main/scala/fx/Fiber.scala @@ -2,6 +2,7 @@ package fx import java.util.concurrent.Future import java.util.concurrent.CompletableFuture +import java.util.concurrent.CompletionException opaque type Fiber[A] = Future[A] @@ -20,7 +21,9 @@ def uncancellable[A](fn: () => A): A = { case t: Throwable => promise.completeExceptionally(t) }) - promise.join + try + promise.join + catch case e : CompletionException => throw e.getCause } def fork[B](f: () => B)(using structured: Structured): Fiber[B] = diff --git a/scala-fx/src/test/scala/fx/BracketTests.scala b/scala-fx/src/test/scala/fx/BracketTests.scala new file mode 100644 index 00000000..2a617453 --- /dev/null +++ b/scala-fx/src/test/scala/fx/BracketTests.scala @@ -0,0 +1,60 @@ +package fx + +import fx.ResourcesTests.property +import org.scalacheck.Prop.forAll +import org.scalacheck.Properties + +import java.util.concurrent.{CompletableFuture, CompletionException} + +object BracketTests extends Properties("Bracket Tests"): + + property("bracketCase identity") = forAll { (n: Int) => + bracketCase(() => n, identity, (_, _) => ()) == n + } + + case class CustomEx(val token: String) extends RuntimeException + + property("bracketCase exception identity") = forAll { (msg: String) => + val res = + try + bracketCase(() => throw CustomEx(msg), _ => throw RuntimeException("Cannot come here"), (_, _) => ()) + catch case CustomEx(msg) => msg + + res == msg + } + + property("bracketCase must run release task on use error") = forAll { (msg: String) => + val promise = new CompletableFuture[ExitCase] + val res = + try + bracketCase(() => (), _ => throw CustomEx(msg), (_, ex) => promise.complete(ex)) + catch case CustomEx(msg) => msg + + res == msg && promise.join() == ExitCase.Failure(CustomEx(msg)) + } + + property("bracketCase must run release task on use success") = forAll { (msg: String) => + val promise = new CompletableFuture[ExitCase] + val res = bracketCase(() => (), _ => msg, (_, ex) => promise.complete(ex)) + + res == msg && promise.join() == ExitCase.Completed + } + + property("bracketCase cancellation in use") = forAll { (msg: String) => + throw RuntimeException("I am hanging.") + + val latch = new CompletableFuture[Unit] + val promise = new CompletableFuture[ExitCase] + structured { + val fiber = fork(() => bracketCase(() => (), _ => { + latch.complete(()) + Thread.sleep(100_000) + }, (_,ex) => promise.complete(ex))) + latch.join() + fiber.cancel() + promise.join().isInstanceOf[ExitCase.Cancelled] + } + } + +end BracketTests + diff --git a/scala-fx/src/test/scala/fx/UncancellableTests.scala b/scala-fx/src/test/scala/fx/UncancellableTests.scala new file mode 100644 index 00000000..4184110b --- /dev/null +++ b/scala-fx/src/test/scala/fx/UncancellableTests.scala @@ -0,0 +1,50 @@ +package fx + +import fx.ResourcesTests.property +import org.scalacheck.Prop.forAll +import org.scalacheck.Properties + +import java.util.concurrent.{CompletableFuture, CompletionException} + +object UncancellableTests extends Properties("Bracket Tests"): + + property("uncancellable identity") = forAll { (n: Int) => + uncancellable(() => n) == n + } + + case class CustomEx(val token: String) extends RuntimeException + + property("uncancellable exception identity") = forAll { (msg: String) => + val res = + try + uncancellable(() => throw CustomEx(msg)) + catch case CustomEx(msg) => msg + + res == msg + } + +// "Uncancellable back pressures withTimeoutOrNull" { +// runBlockingTest { +// checkAll(Arb.long(50, 100), Arb.long(300, 400)) { +// a +// , b -> +// val start = currentTime +// +// val n = withTimeoutOrNull(a.milliseconds) { +// uncancellable { +// delay(b.milliseconds) +// } +// } +// +// val duration = currentTime - start +// +// n shouldBe null // timed-out so should be null +// require((duration) >= b) { +// "Should've taken longer than $b milliseconds, but took $duration" +// } +// } +// } +// } + +end UncancellableTests + From 25ddc9e698d682d1cbc8686ea4ac363402714248 Mon Sep 17 00:00:00 2001 From: "Jack C. Viers" Date: Wed, 19 Oct 2022 21:52:29 -0500 Subject: [PATCH 2/4] Adds InterruptedException -> CancellationException in bracketCase --- scala-fx/src/main/scala/fx/Use.scala | 6 ++++-- scala-fx/src/test/scala/fx/BracketTests.scala | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/scala-fx/src/main/scala/fx/Use.scala b/scala-fx/src/main/scala/fx/Use.scala index 2a703f36..1868a3aa 100644 --- a/scala-fx/src/main/scala/fx/Use.scala +++ b/scala-fx/src/main/scala/fx/Use.scala @@ -77,9 +77,11 @@ inline def bracketCase[A, B]( val res = try use(acquired) catch - case (e: CancellationException) => + case e: InterruptedException => runReleaseAndRethrow(e, () => release(acquired, ExitCase.Cancelled(e))) - case (e: ExecutionException) => + case e: CancellationException => + runReleaseAndRethrow(e, () => release(acquired, ExitCase.Cancelled(e))) + case e: ExecutionException => runReleaseAndRethrow( e.getCause, () => release(acquired, ExitCase.Cancelled(e.getCause)) diff --git a/scala-fx/src/test/scala/fx/BracketTests.scala b/scala-fx/src/test/scala/fx/BracketTests.scala index 2a617453..d114ac1c 100644 --- a/scala-fx/src/test/scala/fx/BracketTests.scala +++ b/scala-fx/src/test/scala/fx/BracketTests.scala @@ -41,7 +41,7 @@ object BracketTests extends Properties("Bracket Tests"): } property("bracketCase cancellation in use") = forAll { (msg: String) => - throw RuntimeException("I am hanging.") + // throw RuntimeException("I am hanging.") val latch = new CompletableFuture[Unit] val promise = new CompletableFuture[ExitCase] From c0199716c857c987353e7349f808fd1f1f73f397 Mon Sep 17 00:00:00 2001 From: "Jack C. Viers" Date: Wed, 19 Oct 2022 22:21:07 -0500 Subject: [PATCH 3/4] Changes to unhang cases --- .../src/test/scala/fx/HttpServerFixtures.scala | 4 ++-- scala-fx/src/main/scala/fx/Resource.scala | 12 ++++++------ scala-fx/src/test/scala/fx/ResourcesTests.scala | 6 +++--- .../scala/sttp/fx/HttpExtensionsSuiteFixtures.scala | 4 ++-- .../scala/sttp/fx/ToHttpBodyMapperFixtures.scala | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/http-scala-fx/src/test/scala/fx/HttpServerFixtures.scala b/http-scala-fx/src/test/scala/fx/HttpServerFixtures.scala index e173ac71..a1e9be96 100644 --- a/http-scala-fx/src/test/scala/fx/HttpServerFixtures.scala +++ b/http-scala-fx/src/test/scala/fx/HttpServerFixtures.scala @@ -116,10 +116,10 @@ trait HttpServerFixtures: setup = _ => { for { server <- Resource( - HttpServer.create(InetSocketAddress(0), 0), + () => HttpServer.create(InetSocketAddress(0), 0), (server, _) => server.stop(0)) serverExecutor <- Resource( - Executors.newVirtualThreadPerTaskExecutor, + () => Executors.newVirtualThreadPerTaskExecutor, (executor, _) => executor.shutdown()) _ = server.setExecutor(serverExecutor) httpContext = server.createContext("/root", handler) diff --git a/scala-fx/src/main/scala/fx/Resource.scala b/scala-fx/src/main/scala/fx/Resource.scala index 913546b7..c8a34055 100644 --- a/scala-fx/src/main/scala/fx/Resource.scala +++ b/scala-fx/src/main/scala/fx/Resource.scala @@ -6,24 +6,24 @@ import Tuple.{InverseMap, IsMappedBy, Map} import scala.annotation.implicitNotFound class Resource[A]( - val acquire: Resources ?=> A, + val acquire: Resources ?=> () => A, val release: (A, ExitCase) => Unit ): - def this(value: Resources ?=> A) = this(value, (_, _) => ()) + def this(value: Resources ?=> () => A) = this(value, (_, _) => ()) - def use[B](f: A => B): B = bracketCase(() => acquire, f, release) + def use[B](f: A => B): B = bracketCase(acquire, f, release) def map[B](f: (A) => B): Resource[B] = - Resource(f(this.bind)) + Resource(() => f(this.bind)) def flatMap[B](f: (A) => Resource[B]): Resource[B] = - Resource(f(this.bind).bind) + Resource(() => f(this.bind).bind) def bind: A = bracketCase( () => { - val a = acquire + val a = acquire() val finalizer: (ExitCase) => Unit = (ex: ExitCase) => release(a, ex) summon[Resources].finalizers.updateAndGet(finalizer +: _) a diff --git a/scala-fx/src/test/scala/fx/ResourcesTests.scala b/scala-fx/src/test/scala/fx/ResourcesTests.scala index 407f162c..2a6d218a 100644 --- a/scala-fx/src/test/scala/fx/ResourcesTests.scala +++ b/scala-fx/src/test/scala/fx/ResourcesTests.scala @@ -10,13 +10,13 @@ import java.util.concurrent.ExecutionException object ResourcesTests extends Properties("Resources Tests"): property("Can consume resource") = forAll { (n: Int) => - val r = Resource(n, (_, _) => ()) + val r = Resource(() => n, (_, _) => ()) r.use(_ + 1) == n + 1 } property("value resource is released with Complete") = forAll { (n: Int) => val p = CompletableFuture[ExitCase]() - val r = Resource(n, (_, ex) => require(p.complete(ex))) + val r = Resource(() => n, (_, ex) => require(p.complete(ex))) r.use(_ => ()) p.join == ExitCase.Completed } @@ -25,7 +25,7 @@ object ResourcesTests extends Properties("Resources Tests"): property("error resource finishes with error") = forAll { (n: String) => val p = CompletableFuture[ExitCase]() - val r = Resource[Int](throw CustomEx(n), (_, ex) => require(p.complete(ex))) + val r = Resource[Int](() => throw CustomEx(n), (_, ex) => require(p.complete(ex))) val result = try r.use(_ + 1) diff --git a/sttp-scala-fx/src/test/scala/sttp/fx/HttpExtensionsSuiteFixtures.scala b/sttp-scala-fx/src/test/scala/sttp/fx/HttpExtensionsSuiteFixtures.scala index fa1be4f4..a0b585d1 100644 --- a/sttp-scala-fx/src/test/scala/sttp/fx/HttpExtensionsSuiteFixtures.scala +++ b/sttp-scala-fx/src/test/scala/sttp/fx/HttpExtensionsSuiteFixtures.scala @@ -21,10 +21,10 @@ trait HttpExtensionsSuiteFixtures { self: ScalaFXSuite => setup = _ => { for { server <- Resource( - HttpServer.create(InetSocketAddress(0), 0), + () => HttpServer.create(InetSocketAddress(0), 0), (server, _) => server.stop(0)) serverExecutor <- Resource( - Executors.newVirtualThreadPerTaskExecutor, + () => Executors.newVirtualThreadPerTaskExecutor, (executor, _) => executor.shutdown()) _ = server.setExecutor(serverExecutor) httpContext = server.createContext("/root", handler) diff --git a/sttp-scala-fx/src/test/scala/sttp/fx/ToHttpBodyMapperFixtures.scala b/sttp-scala-fx/src/test/scala/sttp/fx/ToHttpBodyMapperFixtures.scala index 981eb841..06b2333f 100644 --- a/sttp-scala-fx/src/test/scala/sttp/fx/ToHttpBodyMapperFixtures.scala +++ b/sttp-scala-fx/src/test/scala/sttp/fx/ToHttpBodyMapperFixtures.scala @@ -39,7 +39,7 @@ trait ToHttpBodyMapperFixtures { self: ScalaFXSuite => val fileBody = FunFixture( setup = _ => { Resource.apply( - { + () => { val file = Files.createTempFile("fileBodyTest", ".txt") Files.write(file, "test".getBytes()) FileBody(SttpFile.fromPath(file)) From 9b1cb86ee576287b8e82b75195cf4275ac71755a Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Thu, 20 Oct 2022 10:04:36 +0200 Subject: [PATCH 4/4] Fix ResourceTest.error --- scala-fx/src/main/scala/fx/Fiber.scala | 5 ++-- scala-fx/src/test/scala/fx/BracketTests.scala | 23 +++++++++++-------- .../src/test/scala/fx/ResourcesTests.scala | 4 ++-- .../test/scala/fx/UncancellableTests.scala | 8 ++----- 4 files changed, 19 insertions(+), 21 deletions(-) diff --git a/scala-fx/src/main/scala/fx/Fiber.scala b/scala-fx/src/main/scala/fx/Fiber.scala index 3d5a0f54..89b8532b 100644 --- a/scala-fx/src/main/scala/fx/Fiber.scala +++ b/scala-fx/src/main/scala/fx/Fiber.scala @@ -21,9 +21,8 @@ def uncancellable[A](fn: () => A): A = { case t: Throwable => promise.completeExceptionally(t) }) - try - promise.join - catch case e : CompletionException => throw e.getCause + try promise.join + catch case e: CompletionException => throw e.getCause } def fork[B](f: () => B)(using structured: Structured): Fiber[B] = diff --git a/scala-fx/src/test/scala/fx/BracketTests.scala b/scala-fx/src/test/scala/fx/BracketTests.scala index d114ac1c..debafee0 100644 --- a/scala-fx/src/test/scala/fx/BracketTests.scala +++ b/scala-fx/src/test/scala/fx/BracketTests.scala @@ -17,7 +17,10 @@ object BracketTests extends Properties("Bracket Tests"): property("bracketCase exception identity") = forAll { (msg: String) => val res = try - bracketCase(() => throw CustomEx(msg), _ => throw RuntimeException("Cannot come here"), (_, _) => ()) + bracketCase( + () => throw CustomEx(msg), + _ => throw RuntimeException("Cannot come here"), + (_, _) => ()) catch case CustomEx(msg) => msg res == msg @@ -26,8 +29,7 @@ object BracketTests extends Properties("Bracket Tests"): property("bracketCase must run release task on use error") = forAll { (msg: String) => val promise = new CompletableFuture[ExitCase] val res = - try - bracketCase(() => (), _ => throw CustomEx(msg), (_, ex) => promise.complete(ex)) + try bracketCase(() => (), _ => throw CustomEx(msg), (_, ex) => promise.complete(ex)) catch case CustomEx(msg) => msg res == msg && promise.join() == ExitCase.Failure(CustomEx(msg)) @@ -41,15 +43,17 @@ object BracketTests extends Properties("Bracket Tests"): } property("bracketCase cancellation in use") = forAll { (msg: String) => - // throw RuntimeException("I am hanging.") - val latch = new CompletableFuture[Unit] val promise = new CompletableFuture[ExitCase] structured { - val fiber = fork(() => bracketCase(() => (), _ => { - latch.complete(()) - Thread.sleep(100_000) - }, (_,ex) => promise.complete(ex))) + val fiber = fork(() => + bracketCase( + () => (), + _ => { + latch.complete(()) + Thread.sleep(100_000) + }, + (_, ex) => promise.complete(ex))) latch.join() fiber.cancel() promise.join().isInstanceOf[ExitCase.Cancelled] @@ -57,4 +61,3 @@ object BracketTests extends Properties("Bracket Tests"): } end BracketTests - diff --git a/scala-fx/src/test/scala/fx/ResourcesTests.scala b/scala-fx/src/test/scala/fx/ResourcesTests.scala index 2a6d218a..4f2cb3c8 100644 --- a/scala-fx/src/test/scala/fx/ResourcesTests.scala +++ b/scala-fx/src/test/scala/fx/ResourcesTests.scala @@ -30,8 +30,8 @@ object ResourcesTests extends Properties("Resources Tests"): try r.use(_ + 1) "unexpected" - catch case e: CompletionException => e.getCause.asInstanceOf[CustomEx].token - result == n + catch case CustomEx(msg) => msg + n == result } end ResourcesTests diff --git a/scala-fx/src/test/scala/fx/UncancellableTests.scala b/scala-fx/src/test/scala/fx/UncancellableTests.scala index 4184110b..5d80b119 100644 --- a/scala-fx/src/test/scala/fx/UncancellableTests.scala +++ b/scala-fx/src/test/scala/fx/UncancellableTests.scala @@ -8,16 +8,13 @@ import java.util.concurrent.{CompletableFuture, CompletionException} object UncancellableTests extends Properties("Bracket Tests"): - property("uncancellable identity") = forAll { (n: Int) => - uncancellable(() => n) == n - } + property("uncancellable identity") = forAll { (n: Int) => uncancellable(() => n) == n } case class CustomEx(val token: String) extends RuntimeException property("uncancellable exception identity") = forAll { (msg: String) => val res = - try - uncancellable(() => throw CustomEx(msg)) + try uncancellable(() => throw CustomEx(msg)) catch case CustomEx(msg) => msg res == msg @@ -47,4 +44,3 @@ object UncancellableTests extends Properties("Bracket Tests"): // } end UncancellableTests -