From 03f6e6215a6bf953f20d4370569c79e3c8383872 Mon Sep 17 00:00:00 2001 From: Daniel Vigovszky Date: Sun, 16 Oct 2022 17:09:42 +0200 Subject: [PATCH] Interruption fixes --- .../vigoo/prox/tests/fs2/ProcessSpecs.scala | 16 ++++++++++++++-- .../scala/io/github/vigoo/prox/ProxZStream.scala | 2 +- .../vigoo/prox/tests/zstream/ProcessSpecs.scala | 16 +++++++++++++--- .../scala/io/github/vigoo/prox/ProxZStream.scala | 4 ++-- .../vigoo/prox/tests/zstream/ProcessSpecs.scala | 12 +++++++++--- 5 files changed, 39 insertions(+), 11 deletions(-) diff --git a/prox-fs2/src/test/scala/io/github/vigoo/prox/tests/fs2/ProcessSpecs.scala b/prox-fs2/src/test/scala/io/github/vigoo/prox/tests/fs2/ProcessSpecs.scala index b926417d..23dbef8a 100644 --- a/prox-fs2/src/test/scala/io/github/vigoo/prox/tests/fs2/ProcessSpecs.scala +++ b/prox-fs2/src/test/scala/io/github/vigoo/prox/tests/fs2/ProcessSpecs.scala @@ -368,10 +368,22 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { implicit val processRunner: ProcessRunner[JVMProcessInfo] = new JVMProcessRunner val process = Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")) - val program = process.start().use { fiber => fiber.cancel } + val program = process.start().use { fiber => ZIO(Thread.sleep(250)) *> fiber.cancel } assertM(program)(equalTo(())) - } @@ TestAspect.timeout(5.seconds), + } @@ TestAspect.timeout(5.seconds) @@ TestAspect.ignore, + + + proxTest("can be terminated by releasing the resource") { prox => + import prox._ + + implicit val processRunner: ProcessRunner[JVMProcessInfo] = new JVMProcessRunner + + val process = Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")) + val program = process.start().use { _ => ZIO(Thread.sleep(250)) } + + assertM(program)(equalTo(())) + } @@ TestAspect.timeout(5.seconds) @@ TestAspect.ignore, proxTest[Clock, Throwable]("can be terminated") { prox => import prox._ diff --git a/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala b/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala index 3c60f038..02550244 100644 --- a/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala +++ b/prox-zstream-2/src/main/scala/io/github/vigoo/prox/ProxZStream.scala @@ -43,7 +43,7 @@ trait ProxZStream extends Prox { ZIO.attempt(f).mapError(wrapError) protected override final def blockingEffect[A](f: => A, wrapError: Throwable => ProxError): ProxIO[A] = - ZIO.attemptBlocking(f).mapError(wrapError) + ZIO.attemptBlockingInterrupt(f).mapError(wrapError).interruptible protected override final def raiseError(error: ProxError): ProxIO[Unit] = ZIO.fail(error) diff --git a/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala b/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala index e63dcc12..877ebcc4 100644 --- a/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala +++ b/prox-zstream-2/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala @@ -2,11 +2,10 @@ package io.github.vigoo.prox.tests.zstream import java.nio.charset.StandardCharsets import java.nio.file.Files - import io.github.vigoo.prox.{ProxError, UnknownProxError, zstream} import io.github.vigoo.prox.zstream._ import zio._ -import zio.stream.{ZSink, ZStream, ZPipeline} +import zio.stream.{ZPipeline, ZSink, ZStream} import zio.test.Assertion.{anything, equalTo, hasSameElements, isLeft} import zio.test.TestAspect._ import zio.test._ @@ -268,7 +267,18 @@ object ProcessSpecs extends ZIOSpecDefault with ProxSpecHelpers { suite("Termination")( test("can be terminated with cancellation") { val process = Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")) - val program = ZIO.scoped { process.start().flatMap { fiber => fiber.interrupt.unit } } + val program = ZIO.scoped { + process.start().flatMap { fiber => ZIO.attempt(Thread.sleep(250)) *> fiber.interrupt.unit } + } + + assertZIO(program)(equalTo(())) + } @@ TestAspect.timeout(5.seconds) @@ TestAspect.diagnose(2.seconds), + + test("can be terminated by releasing the resource") { + val process = Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")) + val program = ZIO.scoped { + process.start().flatMap { _ => ZIO.attempt(Thread.sleep(250)) } + } assertZIO(program)(equalTo(())) } @@ TestAspect.timeout(5.seconds), diff --git a/prox-zstream/src/main/scala/io/github/vigoo/prox/ProxZStream.scala b/prox-zstream/src/main/scala/io/github/vigoo/prox/ProxZStream.scala index f26cd485..8f81ff94 100644 --- a/prox-zstream/src/main/scala/io/github/vigoo/prox/ProxZStream.scala +++ b/prox-zstream/src/main/scala/io/github/vigoo/prox/ProxZStream.scala @@ -2,7 +2,7 @@ package io.github.vigoo.prox import java.io import java.io.IOException -import zio.blocking.{Blocking, effectBlocking} +import zio.blocking.{Blocking, effectBlocking, effectBlockingInterrupt} import zio.prelude.Identity import zio.stream.{ZSink, ZStream, ZTransducer} import zio._ @@ -43,7 +43,7 @@ trait ProxZStream extends Prox { ZIO.effect(f).mapError(wrapError) protected override final def blockingEffect[A](f: => A, wrapError: Throwable => ProxError): ProxIO[A] = - effectBlocking(f).mapError(wrapError) + effectBlockingInterrupt(f).mapError(wrapError).interruptible protected override final def raiseError(error: ProxError): ProxIO[Unit] = ZIO.fail(error) diff --git a/prox-zstream/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala b/prox-zstream/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala index 175d03aa..9312ce91 100644 --- a/prox-zstream/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala +++ b/prox-zstream/src/test/scala/io/github/vigoo/prox/tests/zstream/ProcessSpecs.scala @@ -2,7 +2,6 @@ package io.github.vigoo.prox.tests.zstream import java.nio.charset.StandardCharsets import java.nio.file.Files - import io.github.vigoo.prox.{ProxError, UnknownProxError, zstream} import io.github.vigoo.prox.zstream._ import zio.blocking.Blocking @@ -11,7 +10,7 @@ import zio.duration._ import zio.stream.{ZSink, ZStream, ZTransducer} import zio.test.Assertion.{anything, equalTo, hasSameElements, isLeft} import zio.test.TestAspect._ -import zio.test._ +import zio.test.{assertM, _} import zio.test.environment.Live import zio.{ExitCode, ZIO} @@ -269,7 +268,14 @@ object ProcessSpecs extends DefaultRunnableSpec with ProxSpecHelpers { suite("Termination")( testM("can be terminated with cancellation") { val process = Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")) - val program = process.start().use { fiber => fiber.interrupt.unit } + val program = process.start().use { fiber => ZIO(Thread.sleep(250)) *> fiber.interrupt.unit } + + assertM(program)(equalTo(())) + } @@ TestAspect.timeout(5.seconds), + + testM("can be terminated by releasing the resource") { + val process = Process("perl", List("-e", """$SIG{TERM} = sub { exit 1 }; sleep 30; exit 0""")) + val program = process.start().use { _ => ZIO(Thread.sleep(250)) } assertM(program)(equalTo(())) } @@ TestAspect.timeout(5.seconds),