From 1359f2f1c29a2a107d2b038b8f25f180302dd5ce Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Fri, 26 Jun 2020 13:12:07 +0200 Subject: [PATCH 01/23] Add implementation + tests Also fixes a couple of nits found in docs --- .../main/kotlin/arrow/fx/coroutines/predef.kt | 5 +- .../arrow/fx/coroutines/stream/ParJoin.kt | 20 ++- .../arrow/fx/coroutines/stream/Stream.kt | 45 +++-- .../arrow/fx/coroutines/stream/callbacks.kt | 76 ++++++++ .../fx/coroutines/stream/concurrent/Queue.kt | 4 +- .../arrow/fx/coroutines/stream/BracketTest.kt | 1 - .../arrow/fx/coroutines/stream/StreamTest.kt | 21 +++ .../stream/ops/CallbackStreamTest.kt | 162 ++++++++++++++++++ 8 files changed, 313 insertions(+), 21 deletions(-) create mode 100644 arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt create mode 100644 arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt index c1408cba9..55eb40ea9 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt @@ -1,8 +1,11 @@ package arrow.fx.coroutines -inline infix fun ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C = +internal inline infix fun ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C = { a -> f(this(a)) } +internal inline infix fun (suspend (A) -> B).andThen(crossinline f: suspend (B) -> C): suspend (A) -> C = + { a: A -> f(this(a)) } + infix fun A.prependTo(fa: Iterable): List = listOf(this) + fa diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt index 7a694688f..956dbefe5 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt @@ -16,7 +16,7 @@ import arrow.fx.coroutines.stream.concurrent.SignallingAtomic import arrow.fx.coroutines.uncancellable /** - * Nondeterministically merges a stream of streams (`outer`) in to a single stream, + * Non-deterministically merges a stream of streams (`outer`) in to a single stream, * opening at most `maxOpen` streams at any point in time. * * The outer stream is evaluated and each resulting inner stream is run concurrently, @@ -67,7 +67,7 @@ suspend fun stop( outputQ.enqueue1(None) } -suspend fun decrementRunning( +internal suspend fun decrementRunning( done: SignallingAtomic>>, outputQ: NoneTerminatedQueue>, running: SignallingAtomic @@ -77,7 +77,7 @@ suspend fun decrementRunning( Pair(now, (if (now == 0) suspend { stop(done, outputQ, None) } else suspend { Unit })) }.invoke() -suspend fun incrementRunning(running: SignallingAtomic): Unit = +internal suspend fun incrementRunning(running: SignallingAtomic): Unit = running.update { it + 1 } // runs inner stream, each stream is forked. terminates when killSignal is true if fails will enq in queue failure @@ -121,7 +121,7 @@ internal suspend fun runInner( } // runs the outer stream, interrupts when kill == true, and then decrements the `running` -suspend fun Stream>.runOuter( +internal suspend fun Stream>.runOuter( done: SignallingAtomic>>, outputQ: NoneTerminatedQueue>, running: SignallingAtomic, @@ -146,9 +146,17 @@ suspend fun Stream>.runOuter( // awaits when all streams (outer + inner) finished, // and then collects result of the stream (outer + inner) execution -suspend fun signalResult(done: SignallingAtomic>>): Unit = +internal suspend fun signalResult(done: SignallingAtomic>>): Unit = done.get().flatten().fold({ Unit }, { throw it }) +/** + * Merges both Streams into an Stream of A and B represented by Either. + * This operation is equivalent to a normal merge but for different types. + */ +fun Stream.either(other: Stream): Stream> = + Stream(this.map { Either.Left(it) }, other.map { Either.Right(it) }) + .parJoin(2) + fun Stream>.parJoin(maxOpen: Int): Stream { require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" } return Stream.effect { @@ -180,6 +188,6 @@ fun Stream>.parJoin(maxOpen: Int): Stream { }.flatten() } -/** Like [parJoin] but races all inner streams simultaneously. */ +/** Like [parJoin] but races all inner streams simultaneously without limit. */ fun Stream>.parJoinUnbounded(): Stream = parJoin(Int.MAX_VALUE) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt index c52a7e422..3a40f858c 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt @@ -1027,7 +1027,7 @@ import kotlin.random.Random } /** - * Determinsitically zips elements, terminating when the end of either branch is reached naturally. + * Deterministically zips elements, terminating when the end of either branch is reached naturally. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* @@ -1059,7 +1059,7 @@ import kotlin.random.Random zipWith(that) { x, _ -> x } /** - * Determinsitically zips elements using the specified function, + * Deterministically zips elements using the specified function, * terminating when the end of either branch is reached naturally. * * ```kotlin:ank:playground @@ -1332,13 +1332,13 @@ import kotlin.random.Random * Run the supplied effectful action at the end of this stream, regardless of how the stream terminates. */ fun onFinalize(f: suspend () -> Unit): Stream = - bracket({ Unit }, { f.invoke() }).flatMap { this } + bracket({ Unit }, { f() }).flatMap { this } /** * Like [onFinalize] but provides the reason for finalization as an `ExitCase`. */ fun onFinalizeCase(f: suspend (ExitCase) -> Unit): Stream = - Stream.bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this } + bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this } /** * Interrupts the stream, when `haltOnSignal` finishes its evaluation. @@ -1360,9 +1360,9 @@ import kotlin.random.Random /** * Introduces an explicit scope. * - * Scopes are normally introduced automatically, when using `bracket` or xsimilar + * Scopes are normally introduced automatically, when using `bracket` or similar * operations that acquire resources and run finalizers. Manual scope introduction - * is useful when using [onFinalizeWeak]/[onFinalizeCaseWeak], where no scope + * is useful when using [bracketWeak]/[bracketCaseWeak], where no scope * is introduced. */ fun scope(): Stream = @@ -1898,7 +1898,7 @@ import kotlin.random.Random } /** - * Determinsitically zips elements, terminating when the ends of both branches + * Deterministically zips elements, terminating when the ends of both branches * are reached naturally, padding the left branch with `pad1` and padding the right branch * with `pad2` as necessary. * @@ -1918,7 +1918,7 @@ fun Stream.zipAll(that: Stream, pad1: O, pad2: B): Stream Stream.filterNull(): Stream = * ``` */ fun Stream.terminateOnNull(): Stream = + terminateOn { it == null } + +/** + * Halts the input stream when the condition is true. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream(1, 2, 3, 4) + * .terminateOn { it == 3 } + * .compile() + * .toList() + * .let(::println) //[1, 2] + * //sampleEnd + * ``` + */ +fun Stream.terminateOn(terminator: (O) -> Boolean): Stream = asPull.repeat { pull -> pull.unconsOrNull().flatMap { uncons -> when (uncons) { null -> Pull.just(null) else -> { - when (val idx = uncons.head.indexOfFirst { it == null }) { + when (val idx = uncons.head.indexOfFirst(terminator)) { 0 -> Pull.just(null) null -> Pull.output(uncons.head.map { it!! }).map { uncons.tail } else -> Pull.output(uncons.head.take(idx).map { it!! }).map { null } @@ -2170,6 +2189,10 @@ fun Stream>.terminateOnNone(): Stream = } }.stream() +/** + * Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion. + * Note that this doesn't actually limit an infinite stream. + */ fun Stream.noneTerminate(): Stream> = map { Some(it) }.append { Stream.just(None) } @@ -2185,7 +2208,7 @@ fun emptyStream(): Stream = Stream.empty() /** - * Determinsitically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally. + * Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* @@ -2204,7 +2227,7 @@ fun Stream.interleave(that: Stream): Stream = zip(that).flatMap { (o1, o2) -> Stream(o1, o2) } /** - * Determinsitically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally. + * Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt new file mode 100644 index 000000000..cc5dab035 --- /dev/null +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -0,0 +1,76 @@ +package arrow.fx.coroutines.stream + +import arrow.core.Either +import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.UnsafePromise +import arrow.fx.coroutines.andThen +import arrow.fx.coroutines.stream.concurrent.Queue +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine +import kotlin.experimental.ExperimentalTypeInference + +interface EmitterSyntax { + fun emit(a: A): Unit + fun emit(chunk: Chunk): Unit + fun emit(iterable: Iterable): Unit + fun emit(vararg aas: A): Unit + fun end(): Unit +} + +//@OptIn(ExperimentalTypeInference::class) in 1.3.70 +@UseExperimental(ExperimentalTypeInference::class) +fun Stream.Companion.callbackStream(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = + Stream.cancellableCallbackStream(f.andThen { CancelToken.unit }) + +//@OptIn(ExperimentalTypeInference::class) in 1.3.70 +@UseExperimental(ExperimentalTypeInference::class) +fun Stream.Companion.cancellableCallbackStream(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = + effect { + val q = Queue.unbounded>() + val error = UnsafePromise() + + val cancel = emitterCallback(f) { value -> + suspend { + q.enqueue1(value) + //TODO shall we consider emitting from different contexts? Might serve as an observeOn in RxJava + }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) }) + } + + q.dequeue() + .interruptWhen { Either.Left(error.join()) } + .terminateOn { it == Chunk.empty() } + .flatMap(::chunk) + .onFinalizeCase { + when (it) { + is ExitCase.Cancelled -> cancel.cancel.invoke() + } + } + }.flatten() + +private suspend fun emitterCallback( + f: suspend EmitterSyntax.() -> CancelToken, + cb: (Chunk) -> Unit +): CancelToken = + object : EmitterSyntax { + override fun emit(a: A) { + emit(Chunk.just(a)) + } + + override fun emit(chunk: Chunk) { + cb(chunk) + } + + override fun emit(iterable: Iterable) { + cb(Chunk.iterable(iterable)) + } + + override fun emit(vararg aas: A) { + cb(Chunk(*aas)) + } + + override fun end() { + cb(Chunk.empty()) + } + }.f() diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt index 4bff1d205..c0c133171 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/concurrent/Queue.kt @@ -134,8 +134,8 @@ interface Queue : Enqueue, Dequeue1, Dequeue { suspend fun bounded(maxSize: Int): Queue = fromStrategy(Strategy.boundedFifo(maxSize)) - /** Creates a FILO queue with the specified size bound. */ - suspend fun boundedLife(maxSize: Int): Queue = + /** Creates a LIFO queue with the specified size bound. */ + suspend fun boundedLifo(maxSize: Int): Queue = fromStrategy(Strategy.boundedLifo(maxSize)) /** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */ diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt index 40583f044..7d3eec20a 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt @@ -19,7 +19,6 @@ import io.kotest.property.arbitrary.int import io.kotest.property.arbitrary.list import io.kotest.property.arbitrary.long import io.kotest.property.arbitrary.string -import io.kotest.property.checkAll class BracketTest : StreamSpec(spec = { diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt index 1f19683b8..8609bad29 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt @@ -1033,6 +1033,27 @@ class StreamTest : StreamSpec(spec = { } } } + + "terminateOn" { + Stream(1, 2, 3, 4) + .terminateOn { it % 3 == 0 } + .compile() + .toList() shouldBe listOf(1, 2) + } + + "terminateOnNull" { + Stream(1, 2, null, 4) + .terminateOnNull() + .compile() + .toList() shouldBe listOf(1, 2) + } + + "terminateOnNone" { + Stream(Some(1), Some(2), None, Some(4)) + .terminateOnNone() + .compile() + .toList() shouldBe listOf(1, 2) + } }) fun Arb.Companion.`null`(): Arb = diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt new file mode 100644 index 000000000..71a0c49a7 --- /dev/null +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt @@ -0,0 +1,162 @@ +package arrow.fx.coroutines.stream.ops + +import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.ForkAndForget +import arrow.fx.coroutines.Promise +import arrow.fx.coroutines.Schedule +import arrow.fx.coroutines.StreamSpec +import arrow.fx.coroutines.assertThrowable +import arrow.fx.coroutines.milliseconds +import arrow.fx.coroutines.parTupledN +import arrow.fx.coroutines.sleep +import arrow.fx.coroutines.stream.Stream +import arrow.fx.coroutines.stream.callbackStream +import arrow.fx.coroutines.stream.cancellableCallbackStream +import arrow.fx.coroutines.stream.chunk +import arrow.fx.coroutines.stream.compile +import arrow.fx.coroutines.throwable +import io.kotest.matchers.shouldBe +import io.kotest.property.Arb +import io.kotest.property.arbitrary.filter +import io.kotest.property.arbitrary.int +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine + +class CallbackStreamTest : StreamSpec(iterations = 250, spec = { + + "callbackStream should be lazy" { + var effect = 0 + Stream.callbackStream { + effect += 1 + } + + effect shouldBe 0 + } + + "emits values" { + Stream.callbackStream { + emit(1) + emit(2) + emit(3) + } + .take(3) + .compile() + .toList() shouldBe listOf(1, 2, 3) + } + + "emits varargs" { + Stream.callbackStream { + emit(1, 2, 3) + } + .take(3) + .compile() + .toList() shouldBe listOf(1, 2, 3) + } + + "emits iterable" { + Stream.callbackStream { + emit(listOf(1, 2, 3)) + } + .take(3) + .compile() + .toList() shouldBe listOf(1, 2, 3) + } + + "emits chunks" { + checkAll(Arb.chunk(Arb.int()).filter { it.isNotEmpty() }, Arb.chunk(Arb.int()).filter { it.isNotEmpty() }) { ch, ch2 -> + Stream.callbackStream { + emit(ch) + emit(ch2) + end() + } + .chunks().compile() + .toList() shouldBe listOf(ch, ch2) + } + } + + "long running emission" { + Stream.callbackStream { + ForkAndForget { + countToCallback(5, { it }) { emit(it) } + } + } + .take(5) + .compile() + .toList() shouldBe listOf(1, 2, 3, 4, 5) + } + + "emits and completes" { + Stream.callbackStream { + emit(1) + emit(2) + emit(3) + end() + } + .compile() + .toList() shouldBe listOf(1, 2, 3) + } + + "forwards errors" { + checkAll(Arb.throwable()) { e -> + val s = Stream.callbackStream { + throw e + } + .compile() + + assertThrowable { + s.toList() + } shouldBe e + } + } + + "runs cancel token" { + checkAll(Arb.int()) { + val latch = Promise() + var effect = 0 + + val s = Stream.cancellableCallbackStream { + CancelToken { effect += 1 } + } + + val f = ForkAndForget { + parTupledN( + { s.compile().drain() }, + { latch.complete(Unit) } + ) + } + + parTupledN({ latch.get() }, { sleep(50.milliseconds) }) + + f.cancel() + + effect shouldBe 1 + } + } + + "doesn't run cancel token without cancellation" { + var effect = 0 + + Stream.cancellableCallbackStream { + end() + CancelToken { effect += 1 } + } + .compile() + .drain() + + effect shouldBe 0 + } + +}) + +private fun countToCallback( + iterations: Int, + map: (Int) -> A, + cb: suspend (A) -> Unit +): Unit = suspend { + var i = 0 + arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) { + i += 1 + cb(map(i)) + } +}.startCoroutine(Continuation(EmptyCoroutineContext) { }) \ No newline at end of file From e86280b1d9288627c519bb17cfdbe35f13df56d5 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Fri, 26 Jun 2020 13:33:07 +0200 Subject: [PATCH 02/23] Added callbackStream javadocs Nits --- .../arrow/fx/coroutines/stream/Stream.kt | 2 +- .../arrow/fx/coroutines/stream/callbacks.kt | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt index 3a40f858c..e4c713c10 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt @@ -2099,7 +2099,7 @@ fun Stream.filterNull(): Stream = * //sampleEnd * ``` */ -fun Stream.terminateOnNull(): Stream = +fun Stream.terminateOnNull(): Stream = terminateOn { it == null } /** diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index cc5dab035..5ede4fecd 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -19,11 +19,58 @@ interface EmitterSyntax { fun end(): Unit } +/** + * Creates a Stream from the given suspended block. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream.callbackStream { + * emit(1) + * emit(2, 3, 4) + * end() + * } + * .compile() + * .toList() + * .let(::println) //[1, 2, 3, 4] + * //sampleEnd + * ``` + * + * Note that if neither `end()`, `emit(Chunk.empty())` nor other limit operators such as `take(N)` are called, + * then the Stream will never end. + */ //@OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.callbackStream(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = Stream.cancellableCallbackStream(f.andThen { CancelToken.unit }) + + +/** + * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream.callbackStream { + * emit(1) + * emit(2, 3, 4) + * end() + * CancelToken { /* cancel subscription to callback */ } + * } + * .compile() + * .toList() + * .let(::println) //[1, 2, 3, 4] + * //sampleEnd + * ``` + * + * Note that if neither `end()`, `emit(Chunk.empty())` nor other limit operators such as `take(N)` are called, + * then the Stream will never end. + */ //@OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.cancellableCallbackStream(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = From 5e82a1281c17ba99233641ea99dff248a647c825 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Fri, 26 Jun 2020 17:32:01 +0200 Subject: [PATCH 03/23] Rename callbackStream[Cancellable] to async[Cancellable] Nits --- .../arrow/fx/coroutines/stream/callbacks.kt | 33 ++++---- .../stream/ops/CallbackStreamTest.kt | 79 ++++++++++--------- 2 files changed, 60 insertions(+), 52 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 5ede4fecd..7a414a662 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -11,6 +11,8 @@ import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine import kotlin.experimental.ExperimentalTypeInference +private object END + interface EmitterSyntax { fun emit(a: A): Unit fun emit(chunk: Chunk): Unit @@ -27,7 +29,7 @@ interface EmitterSyntax { * * //sampleStart * suspend fun main(): Unit = - * Stream.callbackStream { + * Stream.async { * emit(1) * emit(2, 3, 4) * end() @@ -41,22 +43,21 @@ interface EmitterSyntax { * Note that if neither `end()`, `emit(Chunk.empty())` nor other limit operators such as `take(N)` are called, * then the Stream will never end. */ -//@OptIn(ExperimentalTypeInference::class) in 1.3.70 +// @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.callbackStream(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = - Stream.cancellableCallbackStream(f.andThen { CancelToken.unit }) - - +fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = + Stream.asyncCancellable(f.andThen { CancelToken.unit }) /** * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* + * import arrow.fx.coroutines.CancelToken * * //sampleStart * suspend fun main(): Unit = - * Stream.callbackStream { + * Stream.asyncCancellable { * emit(1) * emit(2, 3, 4) * end() @@ -71,23 +72,23 @@ fun Stream.Companion.callbackStream(@BuilderInference f: suspend EmitterSynt * Note that if neither `end()`, `emit(Chunk.empty())` nor other limit operators such as `take(N)` are called, * then the Stream will never end. */ -//@OptIn(ExperimentalTypeInference::class) in 1.3.70 +// @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.cancellableCallbackStream(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = +fun Stream.Companion.asyncCancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = effect { - val q = Queue.unbounded>() + val q = Queue.unbounded() val error = UnsafePromise() val cancel = emitterCallback(f) { value -> suspend { q.enqueue1(value) - //TODO shall we consider emitting from different contexts? Might serve as an observeOn in RxJava - }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) }) + // TODO shall we consider emitting from different contexts? Might serve as an observeOn in RxJava + }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) }) } - q.dequeue() + (q.dequeue() .interruptWhen { Either.Left(error.join()) } - .terminateOn { it == Chunk.empty() } + .terminateOn { it === END } as Stream>) .flatMap(::chunk) .onFinalizeCase { when (it) { @@ -98,7 +99,7 @@ fun Stream.Companion.cancellableCallbackStream(@BuilderInference f: suspend private suspend fun emitterCallback( f: suspend EmitterSyntax.() -> CancelToken, - cb: (Chunk) -> Unit + cb: (Any?) -> Unit ): CancelToken = object : EmitterSyntax { override fun emit(a: A) { @@ -118,6 +119,6 @@ private suspend fun emitterCallback( } override fun end() { - cb(Chunk.empty()) + cb(END) } }.f() diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt index 71a0c49a7..45ef4809b 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt @@ -10,15 +10,15 @@ import arrow.fx.coroutines.milliseconds import arrow.fx.coroutines.parTupledN import arrow.fx.coroutines.sleep import arrow.fx.coroutines.stream.Stream -import arrow.fx.coroutines.stream.callbackStream -import arrow.fx.coroutines.stream.cancellableCallbackStream +import arrow.fx.coroutines.stream.async +import arrow.fx.coroutines.stream.asyncCancellable import arrow.fx.coroutines.stream.chunk import arrow.fx.coroutines.stream.compile import arrow.fx.coroutines.throwable import io.kotest.matchers.shouldBe import io.kotest.property.Arb -import io.kotest.property.arbitrary.filter import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.list import kotlin.coroutines.Continuation import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine @@ -26,68 +26,75 @@ import kotlin.coroutines.startCoroutine class CallbackStreamTest : StreamSpec(iterations = 250, spec = { "callbackStream should be lazy" { - var effect = 0 - Stream.callbackStream { - effect += 1 - } + checkAll(Arb.int()) { + var effect = 0 + Stream.async { + effect += 1 + } - effect shouldBe 0 + effect shouldBe 0 + } } "emits values" { - Stream.callbackStream { - emit(1) - emit(2) - emit(3) + checkAll(Arb.list(Arb.int())) { list -> + Stream.async { + list.forEach { emit(it) } + end() + } + .compile() + .toList() shouldBe list } - .take(3) - .compile() - .toList() shouldBe listOf(1, 2, 3) } "emits varargs" { - Stream.callbackStream { - emit(1, 2, 3) + checkAll(Arb.list(Arb.int())) { list -> + Stream.async { + emit(*list.toTypedArray()) + end() + } + .compile() + .toList() shouldBe list } - .take(3) - .compile() - .toList() shouldBe listOf(1, 2, 3) } "emits iterable" { - Stream.callbackStream { - emit(listOf(1, 2, 3)) + checkAll(Arb.list(Arb.int())) { list -> + Stream.async { + emit(list) + end() + } + .compile() + .toList() shouldBe list } - .take(3) - .compile() - .toList() shouldBe listOf(1, 2, 3) } "emits chunks" { - checkAll(Arb.chunk(Arb.int()).filter { it.isNotEmpty() }, Arb.chunk(Arb.int()).filter { it.isNotEmpty() }) { ch, ch2 -> - Stream.callbackStream { + checkAll(Arb.chunk(Arb.int()), Arb.chunk(Arb.int())) { ch, ch2 -> + Stream.async { emit(ch) emit(ch2) end() } - .chunks().compile() + .chunks() + .compile() .toList() shouldBe listOf(ch, ch2) } } "long running emission" { - Stream.callbackStream { + Stream.async { ForkAndForget { countToCallback(5, { it }) { emit(it) } + end() } } - .take(5) .compile() .toList() shouldBe listOf(1, 2, 3, 4, 5) } "emits and completes" { - Stream.callbackStream { + Stream.async { emit(1) emit(2) emit(3) @@ -99,7 +106,7 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { "forwards errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.callbackStream { + val s = Stream.async { throw e } .compile() @@ -115,7 +122,7 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { val latch = Promise() var effect = 0 - val s = Stream.cancellableCallbackStream { + val s = Stream.asyncCancellable { CancelToken { effect += 1 } } @@ -137,7 +144,7 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { "doesn't run cancel token without cancellation" { var effect = 0 - Stream.cancellableCallbackStream { + Stream.asyncCancellable { end() CancelToken { effect += 1 } } @@ -146,7 +153,6 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { effect shouldBe 0 } - }) private fun countToCallback( @@ -158,5 +164,6 @@ private fun countToCallback( arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) { i += 1 cb(map(i)) + sleep(500.milliseconds) } -}.startCoroutine(Continuation(EmptyCoroutineContext) { }) \ No newline at end of file +}.startCoroutine(Continuation(EmptyCoroutineContext) { }) From 2babd55b55768324695024d259cb1bd31bfca2ef Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Fri, 26 Jun 2020 17:37:28 +0200 Subject: [PATCH 04/23] Fix long running test ending and count --- .../arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt index 45ef4809b..4481c0453 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt @@ -85,8 +85,7 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { "long running emission" { Stream.async { ForkAndForget { - countToCallback(5, { it }) { emit(it) } - end() + countToCallback(4, { it }, { emit(it) }) { end() } } } .compile() @@ -158,7 +157,8 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { private fun countToCallback( iterations: Int, map: (Int) -> A, - cb: suspend (A) -> Unit + cb: suspend (A) -> Unit, + onEnd: suspend () -> Unit = { } ): Unit = suspend { var i = 0 arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) { @@ -166,4 +166,5 @@ private fun countToCallback( cb(map(i)) sleep(500.milliseconds) } + onEnd() }.startCoroutine(Continuation(EmptyCoroutineContext) { }) From 0db7bfc54388cb6b6658f068100dab517075bd90 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Fri, 26 Jun 2020 18:06:43 +0200 Subject: [PATCH 05/23] Try to fix flaky test --- .../src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt index 8609bad29..171aea422 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt @@ -768,7 +768,7 @@ class StreamTest : StreamSpec(spec = { when (uncons1) { null -> Pull.output1(acc) else -> Pull.output1(uncons1.head) - .flatMap { loop(acc + uncons1.head, uncons1.tail) } + .append { loop(acc + uncons1.head, uncons1.tail) } } } @@ -776,7 +776,7 @@ class StreamTest : StreamSpec(spec = { } Stream.iterate(0, Int::inc) - .flatMap { Stream(it).delayBy(20.milliseconds) } + .flatMap { Stream(it).delayBy(10.milliseconds) } .interruptWhen { Right(sleep(150.milliseconds)) } .through(p) .compile() From 4952430aa02f32f73c21475475da306bd619dd91 Mon Sep 17 00:00:00 2001 From: "Rachel M. Carmena" Date: Mon, 29 Jun 2020 11:54:52 +0200 Subject: [PATCH 06/23] Configuration: download logs option --- .github/workflows/build_arrow-fx.yml | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/.github/workflows/build_arrow-fx.yml b/.github/workflows/build_arrow-fx.yml index 5474bce6c..c61df2325 100644 --- a/.github/workflows/build_arrow-fx.yml +++ b/.github/workflows/build_arrow-fx.yml @@ -36,3 +36,17 @@ jobs: # run: | # export PULL_REQUEST_NUMBER=$(echo $GITHUB_REF | cut -d/ -f3) # ./gradlew :arrow-benchmarks-fx:compareBenchmarksCI + - name: Prepare test logs + if: ${{ always() }} + run: | + mkdir test-reports + for report in `ls -d ./**/build/reports/tests/test`; do + arrow_module=$(echo $report | cut -d/ -f2) + cp -r $report test-reports/$arrow_module + done + - name: Make logs available to download + if: ${{ always() }} + uses: actions/upload-artifact@v1 + with: + name: test-reports + path: ./test-reports/* From 2d86e1f6e7ce0eaf528e98d8df1e909a63a4c43b Mon Sep 17 00:00:00 2001 From: "Rachel M. Carmena" Date: Mon, 29 Jun 2020 11:59:05 +0200 Subject: [PATCH 07/23] Revert "Configuration: download logs option" This reverts commit 4952430aa02f32f73c21475475da306bd619dd91. --- .github/workflows/build_arrow-fx.yml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/.github/workflows/build_arrow-fx.yml b/.github/workflows/build_arrow-fx.yml index c61df2325..5474bce6c 100644 --- a/.github/workflows/build_arrow-fx.yml +++ b/.github/workflows/build_arrow-fx.yml @@ -36,17 +36,3 @@ jobs: # run: | # export PULL_REQUEST_NUMBER=$(echo $GITHUB_REF | cut -d/ -f3) # ./gradlew :arrow-benchmarks-fx:compareBenchmarksCI - - name: Prepare test logs - if: ${{ always() }} - run: | - mkdir test-reports - for report in `ls -d ./**/build/reports/tests/test`; do - arrow_module=$(echo $report | cut -d/ -f2) - cp -r $report test-reports/$arrow_module - done - - name: Make logs available to download - if: ${{ always() }} - uses: actions/upload-artifact@v1 - with: - name: test-reports - path: ./test-reports/* From b02f381bcd650eb5ad6f6c52474979745862a74c Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 29 Jun 2020 12:31:49 +0200 Subject: [PATCH 08/23] Fixes from code review --- .../kotlin/arrow/fx/coroutines/stream/callbacks.kt | 6 +++--- .../kotlin/arrow/fx/coroutines/stream/StreamTest.kt | 4 ++-- .../ops/{CallbackStreamTest.kt => AsyncCancellable.kt} | 10 +++++----- 3 files changed, 10 insertions(+), 10 deletions(-) rename arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/{CallbackStreamTest.kt => AsyncCancellable.kt} (93%) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 7a414a662..86e32ca5d 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -46,7 +46,7 @@ interface EmitterSyntax { // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = - Stream.asyncCancellable(f.andThen { CancelToken.unit }) + Stream.cancellable(f.andThen { CancelToken.unit }) /** * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. @@ -57,7 +57,7 @@ fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() * * //sampleStart * suspend fun main(): Unit = - * Stream.asyncCancellable { + * Stream.cancellable { * emit(1) * emit(2, 3, 4) * end() @@ -74,7 +74,7 @@ fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.asyncCancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = +fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = effect { val q = Queue.unbounded() val error = UnsafePromise() diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt index 171aea422..8609bad29 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt @@ -768,7 +768,7 @@ class StreamTest : StreamSpec(spec = { when (uncons1) { null -> Pull.output1(acc) else -> Pull.output1(uncons1.head) - .append { loop(acc + uncons1.head, uncons1.tail) } + .flatMap { loop(acc + uncons1.head, uncons1.tail) } } } @@ -776,7 +776,7 @@ class StreamTest : StreamSpec(spec = { } Stream.iterate(0, Int::inc) - .flatMap { Stream(it).delayBy(10.milliseconds) } + .flatMap { Stream(it).delayBy(20.milliseconds) } .interruptWhen { Right(sleep(150.milliseconds)) } .through(p) .compile() diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellable.kt similarity index 93% rename from arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt rename to arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellable.kt index 4481c0453..9469905da 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellable.kt @@ -11,7 +11,7 @@ import arrow.fx.coroutines.parTupledN import arrow.fx.coroutines.sleep import arrow.fx.coroutines.stream.Stream import arrow.fx.coroutines.stream.async -import arrow.fx.coroutines.stream.asyncCancellable +import arrow.fx.coroutines.stream.cancellable import arrow.fx.coroutines.stream.chunk import arrow.fx.coroutines.stream.compile import arrow.fx.coroutines.throwable @@ -23,9 +23,9 @@ import kotlin.coroutines.Continuation import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine -class CallbackStreamTest : StreamSpec(iterations = 250, spec = { +class AsyncCancellable : StreamSpec(iterations = 250, spec = { - "callbackStream should be lazy" { + "should be lazy" { checkAll(Arb.int()) { var effect = 0 Stream.async { @@ -121,7 +121,7 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { val latch = Promise() var effect = 0 - val s = Stream.asyncCancellable { + val s = Stream.cancellable { CancelToken { effect += 1 } } @@ -143,7 +143,7 @@ class CallbackStreamTest : StreamSpec(iterations = 250, spec = { "doesn't run cancel token without cancellation" { var effect = 0 - Stream.asyncCancellable { + Stream.cancellable { end() CancelToken { effect += 1 } } From 6e8ab19a1ac5894afa2a5dcf2e0e323bc9d9c16f Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 29 Jun 2020 15:38:59 +0200 Subject: [PATCH 09/23] Test in chunks --- ...AsyncCancellable.kt => AsyncCancellableTest.kt} | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) rename arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/{AsyncCancellable.kt => AsyncCancellableTest.kt} (89%) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellable.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt similarity index 89% rename from arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellable.kt rename to arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt index 9469905da..8649860d2 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellable.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt @@ -9,6 +9,7 @@ import arrow.fx.coroutines.assertThrowable import arrow.fx.coroutines.milliseconds import arrow.fx.coroutines.parTupledN import arrow.fx.coroutines.sleep +import arrow.fx.coroutines.stream.Chunk import arrow.fx.coroutines.stream.Stream import arrow.fx.coroutines.stream.async import arrow.fx.coroutines.stream.cancellable @@ -19,11 +20,12 @@ import io.kotest.matchers.shouldBe import io.kotest.property.Arb import io.kotest.property.arbitrary.int import io.kotest.property.arbitrary.list +import io.kotest.property.arbitrary.map import kotlin.coroutines.Continuation import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine -class AsyncCancellable : StreamSpec(iterations = 250, spec = { +class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "should be lazy" { checkAll(Arb.int()) { @@ -48,13 +50,14 @@ class AsyncCancellable : StreamSpec(iterations = 250, spec = { } "emits varargs" { - checkAll(Arb.list(Arb.int())) { list -> + checkAll(Arb.list(Arb.int()).map { it.toTypedArray() }) { list -> Stream.async { - emit(*list.toTypedArray()) + emit(*list) end() } + .chunks() .compile() - .toList() shouldBe list + .toList() shouldBe listOf(Chunk(*list)) } } @@ -64,8 +67,9 @@ class AsyncCancellable : StreamSpec(iterations = 250, spec = { emit(list) end() } + .chunks() .compile() - .toList() shouldBe list + .toList() shouldBe listOf(Chunk.iterable(list)) } } From 7f2af2c18c05897ca960253ff5c84bb73683e79b Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 29 Jun 2020 15:41:22 +0200 Subject: [PATCH 10/23] Emit on computation pool by default --- .../src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 86e32ca5d..3693f19f8 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -2,12 +2,12 @@ package arrow.fx.coroutines.stream import arrow.core.Either import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.ComputationPool import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.UnsafePromise import arrow.fx.coroutines.andThen import arrow.fx.coroutines.stream.concurrent.Queue import kotlin.coroutines.Continuation -import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine import kotlin.experimental.ExperimentalTypeInference @@ -82,8 +82,7 @@ fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax< val cancel = emitterCallback(f) { value -> suspend { q.enqueue1(value) - // TODO shall we consider emitting from different contexts? Might serve as an observeOn in RxJava - }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) }) + }.startCoroutine(Continuation(ComputationPool) { r -> r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) }) } (q.dequeue() From 5506cd16a5bf26b7a79fbda1730d5847fa26dc7b Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 29 Jun 2020 16:51:35 +0200 Subject: [PATCH 11/23] Tweak tests a bit --- .../stream/ops/AsyncCancellableTest.kt | 42 +++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt index 8649860d2..64e1b5e33 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt @@ -1,5 +1,6 @@ package arrow.fx.coroutines.stream.ops +import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.CancelToken import arrow.fx.coroutines.ForkAndForget import arrow.fx.coroutines.Promise @@ -8,6 +9,7 @@ import arrow.fx.coroutines.StreamSpec import arrow.fx.coroutines.assertThrowable import arrow.fx.coroutines.milliseconds import arrow.fx.coroutines.parTupledN +import arrow.fx.coroutines.seconds import arrow.fx.coroutines.sleep import arrow.fx.coroutines.stream.Chunk import arrow.fx.coroutines.stream.Stream @@ -30,11 +32,14 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "should be lazy" { checkAll(Arb.int()) { var effect = 0 - Stream.async { + val s = Stream.async { effect += 1 + end() } effect shouldBe 0 + s.compile().drain() + effect shouldBe 1 } } @@ -96,6 +101,24 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .toList() shouldBe listOf(1, 2, 3, 4, 5) } + "parallel emission/pulling" { + val ref = Atomic(false) + + Stream.async { + emit(1) + sleep(1.seconds) + emit(2) + ref.set(true) + end() + } + .effectMap { + if (it == 1) ref.get() shouldBe false + else Unit + } + .compile() + .drain() + } + "emits and completes" { Stream.async { emit(1) @@ -115,7 +138,20 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .compile() assertThrowable { - s.toList() + s.drain() + } shouldBe e + } + } + + "forwards suspended errors" { + checkAll(Arb.throwable()) { e -> + val s = Stream.async { + suspend { throw e } + } + .compile() + + assertThrowable { + s.drain() } shouldBe e } } @@ -136,7 +172,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { ) } - parTupledN({ latch.get() }, { sleep(50.milliseconds) }) + parTupledN({ latch.get() }, { sleep(20.milliseconds) }) f.cancel() From 44abbde86aa04a0f6e9c3a5d9be26f2d5704e403 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 29 Jun 2020 17:34:32 +0200 Subject: [PATCH 12/23] Improve tests --- .../stream/ops/AsyncCancellableTest.kt | 47 ++++++++++++++----- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt index 64e1b5e33..ffd00e0dc 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt @@ -2,12 +2,15 @@ package arrow.fx.coroutines.stream.ops import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.ForkAndForget import arrow.fx.coroutines.Promise import arrow.fx.coroutines.Schedule import arrow.fx.coroutines.StreamSpec import arrow.fx.coroutines.assertThrowable +import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.milliseconds +import arrow.fx.coroutines.never import arrow.fx.coroutines.parTupledN import arrow.fx.coroutines.seconds import arrow.fx.coroutines.sleep @@ -17,6 +20,7 @@ import arrow.fx.coroutines.stream.async import arrow.fx.coroutines.stream.cancellable import arrow.fx.coroutines.stream.chunk import arrow.fx.coroutines.stream.compile +import arrow.fx.coroutines.suspend import arrow.fx.coroutines.throwable import io.kotest.matchers.shouldBe import io.kotest.property.Arb @@ -31,15 +35,15 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "should be lazy" { checkAll(Arb.int()) { - var effect = 0 + val effect = Atomic(false) val s = Stream.async { - effect += 1 + effect.set(true) end() } - effect shouldBe 0 + effect.get() shouldBe false s.compile().drain() - effect shouldBe 1 + effect.get() shouldBe true } } @@ -146,7 +150,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "forwards suspended errors" { checkAll(Arb.throwable()) { e -> val s = Stream.async { - suspend { throw e } + e.suspend() } .compile() @@ -159,10 +163,10 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "runs cancel token" { checkAll(Arb.int()) { val latch = Promise() - var effect = 0 + val effect = Atomic(false) val s = Stream.cancellable { - CancelToken { effect += 1 } + CancelToken { effect.set(true) } } val f = ForkAndForget { @@ -176,21 +180,42 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { f.cancel() - effect shouldBe 1 + effect.get() shouldBe true + } + } + + "can cancel never" { + checkAll(Arb.int()) { + val latch = Promise() + val exit = Promise() + val f = ForkAndForget { + guaranteeCase({ + Stream.cancellable { + latch.complete(Unit) + never() + CancelToken.unit + } + .compile() + .toList() + }) { ex -> exit.complete(ex) } + } + latch.get() + f.cancel() + exit.get() shouldBe ExitCase.Cancelled } } "doesn't run cancel token without cancellation" { - var effect = 0 + val effect = Atomic(false) Stream.cancellable { end() - CancelToken { effect += 1 } + CancelToken { effect.set(true) } } .compile() .drain() - effect shouldBe 0 + effect.get() shouldBe false } }) From a021f9da8e91454f94f20de9b05626943de23008 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 29 Jun 2020 17:34:50 +0200 Subject: [PATCH 13/23] Run async emission in parallel --- .../arrow/fx/coroutines/stream/callbacks.kt | 53 +++++++++++++------ 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 3693f19f8..9cc68734e 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -2,12 +2,16 @@ package arrow.fx.coroutines.stream import arrow.core.Either import arrow.fx.coroutines.CancelToken -import arrow.fx.coroutines.ComputationPool import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.ForkAndForget +import arrow.fx.coroutines.ForkConnected +import arrow.fx.coroutines.Promise import arrow.fx.coroutines.UnsafePromise import arrow.fx.coroutines.andThen +import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.stream.concurrent.Queue import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine import kotlin.experimental.ExperimentalTypeInference @@ -40,7 +44,7 @@ interface EmitterSyntax { * //sampleEnd * ``` * - * Note that if neither `end()`, `emit(Chunk.empty())` nor other limit operators such as `take(N)` are called, + * Note that if neither `end()` nor other limit operators such as `take(N)` are called, * then the Stream will never end. */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @@ -69,21 +73,20 @@ fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() * //sampleEnd * ``` * - * Note that if neither `end()`, `emit(Chunk.empty())` nor other limit operators such as `take(N)` are called, + * Like [async], if neither `end()` nor other limit operators such as `take(N)` are called, * then the Stream will never end. + * + * Note that if any of the emissions */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = - effect { + force { val q = Queue.unbounded() val error = UnsafePromise() + val cancel = Promise() - val cancel = emitterCallback(f) { value -> - suspend { - q.enqueue1(value) - }.startCoroutine(Continuation(ComputationPool) { r -> r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) }) - } + ForkConnected { emitterCallback(f, cancel, error, q) } (q.dequeue() .interruptWhen { Either.Left(error.join()) } @@ -91,16 +94,26 @@ fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax< .flatMap(::chunk) .onFinalizeCase { when (it) { - is ExitCase.Cancelled -> cancel.cancel.invoke() + is ExitCase.Cancelled -> cancel.get().cancel.invoke() } } - }.flatten() + } private suspend fun emitterCallback( f: suspend EmitterSyntax.() -> CancelToken, - cb: (Any?) -> Unit -): CancelToken = - object : EmitterSyntax { + cancel: Promise, + error: UnsafePromise, + q: Queue +): Unit { + val cb = { ch: Any? -> + suspend { + q.enqueue1(ch) + }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> + r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) + }) + } + + val emitter = object : EmitterSyntax { override fun emit(a: A) { emit(Chunk.just(a)) } @@ -120,4 +133,14 @@ private suspend fun emitterCallback( override fun end() { cb(END) } - }.f() + } + + guaranteeCase({ + val cancelT = emitter.f() + cancel.complete(cancelT) + }, { exit -> + when (exit) { + is ExitCase.Failure -> error.complete(Result.success(exit.failure)) + } + }) +} From f75f8ab9cb7b86272fa4906a0efb716bdff0ae73 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Tue, 30 Jun 2020 12:30:41 +0200 Subject: [PATCH 14/23] nit --- .../src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 9cc68734e..74129bcaa 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -3,7 +3,6 @@ package arrow.fx.coroutines.stream import arrow.core.Either import arrow.fx.coroutines.CancelToken import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.ForkAndForget import arrow.fx.coroutines.ForkConnected import arrow.fx.coroutines.Promise import arrow.fx.coroutines.UnsafePromise From e8c010096bb978ad008cc0207c4bdda705c39179 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Tue, 30 Jun 2020 14:53:57 +0200 Subject: [PATCH 15/23] Simplify Stream.async into one single function Move the cancellation to the syntax to avoid infinite emissions not able to call cancel --- .../arrow/fx/coroutines/stream/callbacks.kt | 63 ++++++------------ .../stream/ops/AsyncCancellableTest.kt | 65 +++++++++---------- 2 files changed, 53 insertions(+), 75 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 74129bcaa..aeebecb75 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -4,9 +4,7 @@ import arrow.core.Either import arrow.fx.coroutines.CancelToken import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.ForkConnected -import arrow.fx.coroutines.Promise import arrow.fx.coroutines.UnsafePromise -import arrow.fx.coroutines.andThen import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.stream.concurrent.Queue import kotlin.coroutines.Continuation @@ -17,6 +15,7 @@ import kotlin.experimental.ExperimentalTypeInference private object END interface EmitterSyntax { + fun onCancel(cancelF: suspend () -> Unit): Unit fun emit(a: A): Unit fun emit(chunk: Chunk): Unit fun emit(iterable: Iterable): Unit @@ -25,34 +24,7 @@ interface EmitterSyntax { } /** - * Creates a Stream from the given suspended block. - * - * ```kotlin:ank:playground - * import arrow.fx.coroutines.stream.* - * - * //sampleStart - * suspend fun main(): Unit = - * Stream.async { - * emit(1) - * emit(2, 3, 4) - * end() - * } - * .compile() - * .toList() - * .let(::println) //[1, 2, 3, 4] - * //sampleEnd - * ``` - * - * Note that if neither `end()` nor other limit operators such as `take(N)` are called, - * then the Stream will never end. - */ -// @OptIn(ExperimentalTypeInference::class) in 1.3.70 -@UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = - Stream.cancellable(f.andThen { CancelToken.unit }) - -/** - * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. + * Creates a Stream from the given suspended block, allowing to emit, set cancel effects and end the emission. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* @@ -61,10 +33,10 @@ fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() * //sampleStart * suspend fun main(): Unit = * Stream.cancellable { + * onCancel { /* cancel something */ } * emit(1) * emit(2, 3, 4) * end() - * CancelToken { /* cancel subscription to callback */ } * } * .compile() * .toList() @@ -72,18 +44,19 @@ fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() * //sampleEnd * ``` * - * Like [async], if neither `end()` nor other limit operators such as `take(N)` are called, + * Note that if neither `end()` nor other limit operators such as `take(N)` are called, * then the Stream will never end. * - * Note that if any of the emissions + * Cancellation or errors might happen any time during emission, so it's recommended to call `onCancel` as early as + * possible, otherwise it's not guaranteed to be called. */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = +fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = force { val q = Queue.unbounded() val error = UnsafePromise() - val cancel = Promise() + val cancel = UnsafePromise() ForkConnected { emitterCallback(f, cancel, error, q) } @@ -93,14 +66,19 @@ fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax< .flatMap(::chunk) .onFinalizeCase { when (it) { - is ExitCase.Cancelled -> cancel.get().cancel.invoke() + is ExitCase.Cancelled -> { + when (val r = cancel.tryGet()) { + null -> Unit // This means the user didn't set the onCancel or set it too late + else -> r.getOrNull()!!.invoke() + } + } } } } private suspend fun emitterCallback( - f: suspend EmitterSyntax.() -> CancelToken, - cancel: Promise, + f: suspend EmitterSyntax.() -> Unit, + cancel: UnsafePromise, error: UnsafePromise, q: Queue ): Unit { @@ -132,12 +110,13 @@ private suspend fun emitterCallback( override fun end() { cb(END) } + + override fun onCancel(cancelF: suspend () -> Unit) { + cancel.complete(Result.success(CancelToken(cancelF))) + } } - guaranteeCase({ - val cancelT = emitter.f() - cancel.complete(cancelT) - }, { exit -> + guaranteeCase({ emitter.f() }, { exit -> when (exit) { is ExitCase.Failure -> error.complete(Result.success(exit.failure)) } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt index ffd00e0dc..5f2f8e822 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt @@ -1,14 +1,11 @@ package arrow.fx.coroutines.stream.ops import arrow.fx.coroutines.Atomic -import arrow.fx.coroutines.CancelToken -import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.ForkAndForget import arrow.fx.coroutines.Promise import arrow.fx.coroutines.Schedule import arrow.fx.coroutines.StreamSpec import arrow.fx.coroutines.assertThrowable -import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.milliseconds import arrow.fx.coroutines.never import arrow.fx.coroutines.parTupledN @@ -17,7 +14,6 @@ import arrow.fx.coroutines.sleep import arrow.fx.coroutines.stream.Chunk import arrow.fx.coroutines.stream.Stream import arrow.fx.coroutines.stream.async -import arrow.fx.coroutines.stream.cancellable import arrow.fx.coroutines.stream.chunk import arrow.fx.coroutines.stream.compile import arrow.fx.coroutines.suspend @@ -33,7 +29,7 @@ import kotlin.coroutines.startCoroutine class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { - "should be lazy" { + "should be lazy".config(enabled = true) { checkAll(Arb.int()) { val effect = Atomic(false) val s = Stream.async { @@ -47,7 +43,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits values" { + "emits values".config(enabled = true) { checkAll(Arb.list(Arb.int())) { list -> Stream.async { list.forEach { emit(it) } @@ -58,7 +54,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits varargs" { + "emits varargs".config(enabled = true) { checkAll(Arb.list(Arb.int()).map { it.toTypedArray() }) { list -> Stream.async { emit(*list) @@ -70,7 +66,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits iterable" { + "emits iterable".config(enabled = true) { checkAll(Arb.list(Arb.int())) { list -> Stream.async { emit(list) @@ -82,7 +78,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits chunks" { + "emits chunks".config(enabled = true) { checkAll(Arb.chunk(Arb.int()), Arb.chunk(Arb.int())) { ch, ch2 -> Stream.async { emit(ch) @@ -95,7 +91,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "long running emission" { + "long running emission".config(enabled = true) { Stream.async { ForkAndForget { countToCallback(4, { it }, { emit(it) }) { end() } @@ -105,7 +101,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .toList() shouldBe listOf(1, 2, 3, 4, 5) } - "parallel emission/pulling" { + "parallel emission/pulling".config(enabled = true) { val ref = Atomic(false) Stream.async { @@ -123,7 +119,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .drain() } - "emits and completes" { + "emits and completes".config(enabled = true) { Stream.async { emit(1) emit(2) @@ -134,7 +130,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .toList() shouldBe listOf(1, 2, 3) } - "forwards errors" { + "forwards errors".config(enabled = true) { checkAll(Arb.throwable()) { e -> val s = Stream.async { throw e @@ -147,7 +143,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "forwards suspended errors" { + "forwards suspended errors".config(enabled = true) { checkAll(Arb.throwable()) { e -> val s = Stream.async { e.suspend() @@ -160,13 +156,13 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "runs cancel token" { + "runs cancel token".config(enabled = true) { checkAll(Arb.int()) { val latch = Promise() val effect = Atomic(false) - val s = Stream.cancellable { - CancelToken { effect.set(true) } + val s = Stream.async { + onCancel { effect.set(true) } } val f = ForkAndForget { @@ -184,33 +180,36 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "can cancel never" { + "can cancel never and run token".config(enabled = true) { checkAll(Arb.int()) { val latch = Promise() - val exit = Promise() + val effect = Atomic(false) + + val s = Stream.async { + onCancel { effect.set(true) } + never() + } + val f = ForkAndForget { - guaranteeCase({ - Stream.cancellable { - latch.complete(Unit) - never() - CancelToken.unit - } - .compile() - .toList() - }) { ex -> exit.complete(ex) } + parTupledN( + { s.compile().drain() }, + { latch.complete(Unit) } + ) } - latch.get() + + parTupledN({ latch.get() }, { sleep(20.milliseconds) }) + f.cancel() - exit.get() shouldBe ExitCase.Cancelled + effect.get() shouldBe true } } - "doesn't run cancel token without cancellation" { + "doesn't run cancel token without cancellation".config(enabled = true) { val effect = Atomic(false) - Stream.cancellable { + Stream.async { + onCancel { effect.set(true) } end() - CancelToken { effect.set(true) } } .compile() .drain() From bbfb358bbee1abdf0057cfb955433e78df6df325 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Tue, 30 Jun 2020 16:35:02 +0200 Subject: [PATCH 16/23] Nits and cleanup --- .../arrow/fx/coroutines/stream/callbacks.kt | 3 +-- .../stream/ops/AsyncCancellableTest.kt | 26 +++++++++---------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index aeebecb75..f7be31f71 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -28,11 +28,10 @@ interface EmitterSyntax { * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* - * import arrow.fx.coroutines.CancelToken * * //sampleStart * suspend fun main(): Unit = - * Stream.cancellable { + * Stream.async { * onCancel { /* cancel something */ } * emit(1) * emit(2, 3, 4) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt index 5f2f8e822..1c4dabd42 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt @@ -29,7 +29,7 @@ import kotlin.coroutines.startCoroutine class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { - "should be lazy".config(enabled = true) { + "should be lazy" { checkAll(Arb.int()) { val effect = Atomic(false) val s = Stream.async { @@ -43,7 +43,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits values".config(enabled = true) { + "emits values" { checkAll(Arb.list(Arb.int())) { list -> Stream.async { list.forEach { emit(it) } @@ -54,7 +54,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits varargs".config(enabled = true) { + "emits varargs" { checkAll(Arb.list(Arb.int()).map { it.toTypedArray() }) { list -> Stream.async { emit(*list) @@ -66,7 +66,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits iterable".config(enabled = true) { + "emits iterable" { checkAll(Arb.list(Arb.int())) { list -> Stream.async { emit(list) @@ -78,7 +78,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "emits chunks".config(enabled = true) { + "emits chunks" { checkAll(Arb.chunk(Arb.int()), Arb.chunk(Arb.int())) { ch, ch2 -> Stream.async { emit(ch) @@ -91,7 +91,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "long running emission".config(enabled = true) { + "long running emission" { Stream.async { ForkAndForget { countToCallback(4, { it }, { emit(it) }) { end() } @@ -101,7 +101,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .toList() shouldBe listOf(1, 2, 3, 4, 5) } - "parallel emission/pulling".config(enabled = true) { + "parallel emission/pulling" { val ref = Atomic(false) Stream.async { @@ -119,7 +119,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .drain() } - "emits and completes".config(enabled = true) { + "emits and completes" { Stream.async { emit(1) emit(2) @@ -130,7 +130,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { .toList() shouldBe listOf(1, 2, 3) } - "forwards errors".config(enabled = true) { + "forwards errors" { checkAll(Arb.throwable()) { e -> val s = Stream.async { throw e @@ -143,7 +143,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "forwards suspended errors".config(enabled = true) { + "forwards suspended errors" { checkAll(Arb.throwable()) { e -> val s = Stream.async { e.suspend() @@ -156,7 +156,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "runs cancel token".config(enabled = true) { + "runs cancel token" { checkAll(Arb.int()) { val latch = Promise() val effect = Atomic(false) @@ -180,7 +180,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "can cancel never and run token".config(enabled = true) { + "can cancel never and run token" { checkAll(Arb.int()) { val latch = Promise() val effect = Atomic(false) @@ -204,7 +204,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } } - "doesn't run cancel token without cancellation".config(enabled = true) { + "doesn't run cancel token without cancellation" { val effect = Atomic(false) Stream.async { From 6520aef6d71d0d7fe926c8e05b24a76a84998b91 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 20 Jul 2020 13:18:05 +0200 Subject: [PATCH 17/23] Stream.async -> Stream.callback --- .../arrow/fx/coroutines/stream/callbacks.kt | 6 ++-- ...syncCancellableTest.kt => CallbackTest.kt} | 30 +++++++++---------- 2 files changed, 18 insertions(+), 18 deletions(-) rename arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/{AsyncCancellableTest.kt => CallbackTest.kt} (90%) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index f7be31f71..cb0f027a8 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -24,14 +24,14 @@ interface EmitterSyntax { } /** - * Creates a Stream from the given suspended block, allowing to emit, set cancel effects and end the emission. + * Creates a Stream from the given suspended block callback, allowing to emit, set cancel effects and end the emission. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* * * //sampleStart * suspend fun main(): Unit = - * Stream.async { + * Stream.callback { * onCancel { /* cancel something */ } * emit(1) * emit(2, 3, 4) @@ -51,7 +51,7 @@ interface EmitterSyntax { */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.async(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = +fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = force { val q = Queue.unbounded() val error = UnsafePromise() diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt similarity index 90% rename from arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt rename to arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt index 1c4dabd42..bce52eca5 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/AsyncCancellableTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt @@ -13,7 +13,7 @@ import arrow.fx.coroutines.seconds import arrow.fx.coroutines.sleep import arrow.fx.coroutines.stream.Chunk import arrow.fx.coroutines.stream.Stream -import arrow.fx.coroutines.stream.async +import arrow.fx.coroutines.stream.callback import arrow.fx.coroutines.stream.chunk import arrow.fx.coroutines.stream.compile import arrow.fx.coroutines.suspend @@ -27,12 +27,12 @@ import kotlin.coroutines.Continuation import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine -class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { +class CallbackTest : StreamSpec(iterations = 250, spec = { "should be lazy" { checkAll(Arb.int()) { val effect = Atomic(false) - val s = Stream.async { + val s = Stream.callback { effect.set(true) end() } @@ -45,7 +45,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "emits values" { checkAll(Arb.list(Arb.int())) { list -> - Stream.async { + Stream.callback { list.forEach { emit(it) } end() } @@ -56,7 +56,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "emits varargs" { checkAll(Arb.list(Arb.int()).map { it.toTypedArray() }) { list -> - Stream.async { + Stream.callback { emit(*list) end() } @@ -68,7 +68,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "emits iterable" { checkAll(Arb.list(Arb.int())) { list -> - Stream.async { + Stream.callback { emit(list) end() } @@ -80,7 +80,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "emits chunks" { checkAll(Arb.chunk(Arb.int()), Arb.chunk(Arb.int())) { ch, ch2 -> - Stream.async { + Stream.callback { emit(ch) emit(ch2) end() @@ -92,7 +92,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } "long running emission" { - Stream.async { + Stream.callback { ForkAndForget { countToCallback(4, { it }, { emit(it) }) { end() } } @@ -104,7 +104,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "parallel emission/pulling" { val ref = Atomic(false) - Stream.async { + Stream.callback { emit(1) sleep(1.seconds) emit(2) @@ -120,7 +120,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { } "emits and completes" { - Stream.async { + Stream.callback { emit(1) emit(2) emit(3) @@ -132,7 +132,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "forwards errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.async { + val s = Stream.callback { throw e } .compile() @@ -145,7 +145,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "forwards suspended errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.async { + val s = Stream.callback { e.suspend() } .compile() @@ -161,7 +161,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { val latch = Promise() val effect = Atomic(false) - val s = Stream.async { + val s = Stream.callback { onCancel { effect.set(true) } } @@ -185,7 +185,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { val latch = Promise() val effect = Atomic(false) - val s = Stream.async { + val s = Stream.callback { onCancel { effect.set(true) } never() } @@ -207,7 +207,7 @@ class AsyncCancellableTest : StreamSpec(iterations = 250, spec = { "doesn't run cancel token without cancellation" { val effect = Atomic(false) - Stream.async { + Stream.callback { onCancel { effect.set(true) } end() } From 0dfb150cb6949213f3a70de91eb7164ae0751136 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Mon, 20 Jul 2020 16:25:33 +0200 Subject: [PATCH 18/23] Revert "Simplify Stream.async into one single function" This reverts commit e8c010096bb978ad008cc0207c4bdda705c39179. # Conflicts: # arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt # arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt --- .../arrow/fx/coroutines/stream/callbacks.kt | 57 ++++++++++++------- .../fx/coroutines/stream/ops/CallbackTest.kt | 45 ++++++++------- 2 files changed, 61 insertions(+), 41 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index cb0f027a8..07d3983c3 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -4,7 +4,9 @@ import arrow.core.Either import arrow.fx.coroutines.CancelToken import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.ForkConnected +import arrow.fx.coroutines.Promise import arrow.fx.coroutines.UnsafePromise +import arrow.fx.coroutines.andThen import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.stream.concurrent.Queue import kotlin.coroutines.Continuation @@ -15,7 +17,6 @@ import kotlin.experimental.ExperimentalTypeInference private object END interface EmitterSyntax { - fun onCancel(cancelF: suspend () -> Unit): Unit fun emit(a: A): Unit fun emit(chunk: Chunk): Unit fun emit(iterable: Iterable): Unit @@ -32,7 +33,6 @@ interface EmitterSyntax { * //sampleStart * suspend fun main(): Unit = * Stream.callback { - * onCancel { /* cancel something */ } * emit(1) * emit(2, 3, 4) * end() @@ -45,17 +45,42 @@ interface EmitterSyntax { * * Note that if neither `end()` nor other limit operators such as `take(N)` are called, * then the Stream will never end. - * - * Cancellation or errors might happen any time during emission, so it's recommended to call `onCancel` as early as - * possible, otherwise it's not guaranteed to be called. */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = + Stream.cancellableCallback(f.andThen { CancelToken.unit }) + +/** + * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream.callback { + * emit(1) + * emit(2, 3, 4) + * end() + * CancelToken { /* cancel subscription to callback */ } + * } + * .compile() + * .toList() + * .let(::println) //[1, 2, 3, 4] + * //sampleEnd + * ``` + * + * If neither `end()` nor other limit operators such as `take(N)` are called, + * then the Stream will never end. + */ +// @OptIn(ExperimentalTypeInference::class) in 1.3.70 +@UseExperimental(ExperimentalTypeInference::class) +fun Stream.Companion.cancellableCallback(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = force { val q = Queue.unbounded() val error = UnsafePromise() - val cancel = UnsafePromise() + val cancel = Promise() ForkConnected { emitterCallback(f, cancel, error, q) } @@ -65,19 +90,14 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. .flatMap(::chunk) .onFinalizeCase { when (it) { - is ExitCase.Cancelled -> { - when (val r = cancel.tryGet()) { - null -> Unit // This means the user didn't set the onCancel or set it too late - else -> r.getOrNull()!!.invoke() - } - } + is ExitCase.Cancelled -> cancel.get().cancel.invoke() } } } private suspend fun emitterCallback( - f: suspend EmitterSyntax.() -> Unit, - cancel: UnsafePromise, + f: suspend EmitterSyntax.() -> CancelToken, + cancel: Promise, error: UnsafePromise, q: Queue ): Unit { @@ -109,13 +129,12 @@ private suspend fun emitterCallback( override fun end() { cb(END) } - - override fun onCancel(cancelF: suspend () -> Unit) { - cancel.complete(Result.success(CancelToken(cancelF))) - } } - guaranteeCase({ emitter.f() }, { exit -> + guaranteeCase({ + val cancelT = emitter.f() + cancel.complete(cancelT) + }, { exit -> when (exit) { is ExitCase.Failure -> error.complete(Result.success(exit.failure)) } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt index bce52eca5..c647cb655 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt @@ -1,11 +1,14 @@ package arrow.fx.coroutines.stream.ops import arrow.fx.coroutines.Atomic +import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.ExitCase import arrow.fx.coroutines.ForkAndForget import arrow.fx.coroutines.Promise import arrow.fx.coroutines.Schedule import arrow.fx.coroutines.StreamSpec import arrow.fx.coroutines.assertThrowable +import arrow.fx.coroutines.guaranteeCase import arrow.fx.coroutines.milliseconds import arrow.fx.coroutines.never import arrow.fx.coroutines.parTupledN @@ -13,6 +16,7 @@ import arrow.fx.coroutines.seconds import arrow.fx.coroutines.sleep import arrow.fx.coroutines.stream.Chunk import arrow.fx.coroutines.stream.Stream +import arrow.fx.coroutines.stream.cancellableCallback import arrow.fx.coroutines.stream.callback import arrow.fx.coroutines.stream.chunk import arrow.fx.coroutines.stream.compile @@ -132,7 +136,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { "forwards errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.callback { + val s = Stream.cancellableCallback { throw e } .compile() @@ -145,7 +149,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { "forwards suspended errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.callback { + val s = Stream.cancellableCallback { e.suspend() } .compile() @@ -161,8 +165,8 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { val latch = Promise() val effect = Atomic(false) - val s = Stream.callback { - onCancel { effect.set(true) } + val s = Stream.cancellableCallback { + CancelToken { effect.set(true) } } val f = ForkAndForget { @@ -180,36 +184,33 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { } } - "can cancel never and run token" { + "can cancel never" { checkAll(Arb.int()) { val latch = Promise() - val effect = Atomic(false) - - val s = Stream.callback { - onCancel { effect.set(true) } - never() - } - + val exit = Promise() val f = ForkAndForget { - parTupledN( - { s.compile().drain() }, - { latch.complete(Unit) } - ) + guaranteeCase({ + Stream.cancellableCallback { + latch.complete(Unit) + never() + CancelToken.unit + } + .compile() + .toList() + }) { ex -> exit.complete(ex) } } - - parTupledN({ latch.get() }, { sleep(20.milliseconds) }) - + latch.get() f.cancel() - effect.get() shouldBe true + exit.get() shouldBe ExitCase.Cancelled } } "doesn't run cancel token without cancellation" { val effect = Atomic(false) - Stream.callback { - onCancel { effect.set(true) } + Stream.cancellableCallback { end() + CancelToken { effect.set(true) } } .compile() .drain() From 43919445b4511bc2c6510f01a77b17d95c48b443 Mon Sep 17 00:00:00 2001 From: Alberto Ballano Date: Tue, 21 Jul 2020 13:14:04 +0200 Subject: [PATCH 19/23] Fix docs --- .../src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 07d3983c3..0b3d9c7ce 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -55,6 +55,7 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. * * ```kotlin:ank:playground + * import arrow.fx.coroutines.CancelToken * import arrow.fx.coroutines.stream.* * * //sampleStart From 17430b68925aa84b8087e2d3240c30881d6a164b Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 20:16:23 +0200 Subject: [PATCH 20/23] Update cancellable docs and align Stream.cancellable semantics --- .../kotlin/arrow/fx/coroutines/builders.kt | 114 +++++++++++++++++- .../arrow/fx/coroutines/stream/callbacks.kt | 95 +++++++++++---- .../stream/{ops => }/CallbackTest.kt | 89 +++++++------- 3 files changed, 224 insertions(+), 74 deletions(-) rename arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/{ops => }/CallbackTest.kt (75%) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt index c5dc4a72a..b1522d507 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt @@ -4,6 +4,62 @@ import kotlin.coroutines.suspendCoroutine import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED +/** + * Create a cancellable `suspend` function that executes an asynchronous process on evaluation. + * This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * import java.lang.RuntimeException + * import java.util.concurrent.Executors + * import java.util.concurrent.ScheduledFuture + * import java.util.concurrent.TimeUnit + * + * typealias Callback = (List?, Throwable?) -> Unit + * + * class GithubId + * object GithubService { + * private val listeners: MutableMap> = mutableMapOf() + * fun getUsernames(callback: Callback): GithubId { + * val id = GithubId() + * val future = Executors.newScheduledThreadPool(1).run { + * schedule({ + * callback(listOf("Arrow"), null) + * shutdown() + * }, 2, TimeUnit.SECONDS) + * } + * listeners[id] = future + * return id + * } + * fun unregisterCallback(id: GithubId): Unit { + * listeners[id]?.cancel(false) + * listeners.remove(id) + * } + * } + * + * suspend fun main(): Unit { + * //sampleStart + * suspend fun getUsernames(): List = + * cancellable { cb: (Result>) -> Unit -> + * val id = GithubService.getUsernames { names, throwable -> + * when { + * names != null -> cb(Result.success(names)) + * throwable != null -> cb(Result.failure(throwable)) + * else -> cb(Result.failure(RuntimeException("Null result and no exception"))) + * } + * } + * CancelToken { GithubService.unregisterCallback(id) } + * } + * val result = getUsernames() + * //sampleEnd + * println(result) + * } + * ``` + * + * @param cb an asynchronous computation that might fail. + * @see suspendCoroutine for wrapping impure APIs without cancellation + * @see cancellableF for wrapping impure APIs using a suspend with cancellation + */ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = suspendCoroutine { cont -> val conn = cont.context.connection() @@ -24,7 +80,61 @@ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = } else cancellable.complete(CancelToken.unit) } -suspend fun cancellableF(k: suspend ((Result) -> Unit) -> CancelToken): A = +/** + * Create a cancellable `suspend` function that executes an asynchronous process on evaluation. + * This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code. + * + * The suspending [cb] runs in an uncancellable manner, acquiring [CancelToken] as a resource. + * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. + * + * ```kotlin:ank:playground + * import arrow.core.* + * import arrow.fx.coroutines.* + * import java.lang.RuntimeException + * + * typealias Callback = (List?, Throwable?) -> Unit + * + * class GithubId + * object GithubService { + * private val listeners: MutableMap = mutableMapOf() + * suspend fun getUsernames(callback: Callback): GithubId { + * val id = GithubId() + * listeners[id] = callback + * ForkConnected { sleep(2.seconds); callback(listOf("Arrow"), null) } + * return id + * } + * + * fun unregisterCallback(id: GithubId): Unit { + * listeners.remove(id) + * } + * } + * + * suspend fun main(): Unit { + * //sampleStart + * suspend fun getUsernames(): List = + * cancellableF { cb: (Result>) -> Unit -> + * val id = GithubService.getUsernames { names, throwable -> + * when { + * names != null -> cb(Result.succes(names)) + * throwable != null -> cb(Result.failure(throwable)) + * else -> cb(Result.failure(RuntimeException("Null result and no exception"))) + * } + * } + * + * CancelToken { GithubService.unregisterCallback(id) } + * } + * + * val result = getUsernames() + * //sampleEnd + * println(result) + * } + * ``` + * + * @param cb an asynchronous computation that might fail. + * @see suspendCoroutine for wrapping impure APIs without cancellation + * @see cancellable for wrapping impure APIs with cancellation + */ +suspend fun cancellableF(cb: suspend ((Result) -> Unit) -> CancelToken): A = suspendCoroutine { cont -> val conn = cont.context.connection() @@ -51,7 +161,7 @@ suspend fun cancellableF(k: suspend ((Result) -> Unit) -> CancelToken): A // uninterruptedly, otherwise risking a leak, hence the bracket // TODO CREATE KotlinTracker issue using CancelToken here breaks something in compilation bracketCase Unit, Unit>( - acquire = { k(cb1).cancel }, + acquire = { cb(cb1).cancel }, use = { waitUntilCallbackInvoked(state) }, release = { token, ex -> when (ex) { diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index 0b3d9c7ce..f52fe60f0 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -3,7 +3,7 @@ package arrow.fx.coroutines.stream import arrow.core.Either import arrow.fx.coroutines.CancelToken import arrow.fx.coroutines.ExitCase -import arrow.fx.coroutines.ForkConnected +import arrow.fx.coroutines.ForkAndForget import arrow.fx.coroutines.Promise import arrow.fx.coroutines.UnsafePromise import arrow.fx.coroutines.andThen @@ -46,30 +46,69 @@ interface EmitterSyntax { * Note that if neither `end()` nor other limit operators such as `take(N)` are called, * then the Stream will never end. */ -// @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = - Stream.cancellableCallback(f.andThen { CancelToken.unit }) + Stream.cancellable(f.andThen { CancelToken.unit }) /** - * Creates a Stream from the given suspended block that will evaluate the passed CancelToken if cancelled. + * Creates a cancellable Stream from the given suspended block that will evaluate the passed [CancelToken] if cancelled. + * + * The suspending [f] runs in an uncancellable manner, acquiring [CancelToken] as a resource. + * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. * * ```kotlin:ank:playground - * import arrow.fx.coroutines.CancelToken - * import arrow.fx.coroutines.stream.* + * import arrow.fx.coroutines.* + * import java.lang.RuntimeException + * import java.util.concurrent.Executors + * import java.util.concurrent.ScheduledFuture + * import java.util.concurrent.TimeUnit * - * //sampleStart - * suspend fun main(): Unit = - * Stream.callback { - * emit(1) - * emit(2, 3, 4) - * end() - * CancelToken { /* cancel subscription to callback */ } + * typealias Callback = (List?, Throwable?) -> Unit + * + * class GithubId + * object GithubService { + * private val listeners: MutableMap> = mutableMapOf() + * fun getUsernames(callback: Callback): GithubId { + * val id = GithubId() + * val future = Executors.newScheduledThreadPool(1).run { + * var count = 0 + * scheduleAtFixedRate({ + * callback(listOf("Arrow - ${count++}"), null) + * }, 0, 500, TimeUnit.MILLISECONDS) * } + * listeners[id] = future + * return id + * } + * + * fun unregisterCallback(id: GithubId): Unit { + * listeners[id]?.cancel(false) + * listeners.remove(id) + * } + * } + * + * suspend fun main(): Unit { + * //sampleStart + * fun getUsernames(): Stream = + * Stream.cancellable { + * val id = GithubService.getUsernames { names, throwable -> + * when { + * names != null -> emit(names) + * throwable != null -> throw throwable + * else -> throw RuntimeException("Null result and no exception") + * } + * } + * + * CancelToken { GithubService.unregisterCallback(id) } + * } + * + * val result = getUsernames() + * .take(3) * .compile() * .toList() - * .let(::println) //[1, 2, 3, 4] - * //sampleEnd + * + * //sampleEnd + * println(result) + * } * ``` * * If neither `end()` nor other limit operators such as `take(N)` are called, @@ -77,23 +116,26 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. */ // @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) -fun Stream.Companion.cancellableCallback(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = +fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = force { val q = Queue.unbounded() val error = UnsafePromise() val cancel = Promise() - ForkConnected { emitterCallback(f, cancel, error, q) } - - (q.dequeue() - .interruptWhen { Either.Left(error.join()) } - .terminateOn { it === END } as Stream>) - .flatMap(::chunk) - .onFinalizeCase { - when (it) { - is ExitCase.Cancelled -> cancel.get().cancel.invoke() - } + Stream.bracketCase({ + ForkAndForget { emitterCallback(f, cancel, error, q) } + }, { f, exit -> + when (exit) { + is ExitCase.Cancelled -> cancel.get().cancel.invoke() + else -> Unit } + f.cancel() + }).flatMap { + (q.dequeue() + .interruptWhen { Either.Left(error.join()) } + .terminateOn { it === END } as Stream>) + .flatMap(::chunk) + } } private suspend fun emitterCallback( @@ -138,6 +180,7 @@ private suspend fun emitterCallback( }, { exit -> when (exit) { is ExitCase.Failure -> error.complete(Result.success(exit.failure)) + else -> Unit } }) } diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt similarity index 75% rename from arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt rename to arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt index c647cb655..ffd655755 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt @@ -1,25 +1,20 @@ -package arrow.fx.coroutines.stream.ops +package arrow.fx.coroutines.stream import arrow.fx.coroutines.Atomic import arrow.fx.coroutines.CancelToken -import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.CancellableContinuation import arrow.fx.coroutines.ForkAndForget +import arrow.fx.coroutines.ForkConnected import arrow.fx.coroutines.Promise import arrow.fx.coroutines.Schedule import arrow.fx.coroutines.StreamSpec +import arrow.fx.coroutines.UnsafePromise import arrow.fx.coroutines.assertThrowable -import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.cancelBoundary import arrow.fx.coroutines.milliseconds -import arrow.fx.coroutines.never import arrow.fx.coroutines.parTupledN -import arrow.fx.coroutines.seconds import arrow.fx.coroutines.sleep -import arrow.fx.coroutines.stream.Chunk -import arrow.fx.coroutines.stream.Stream -import arrow.fx.coroutines.stream.cancellableCallback -import arrow.fx.coroutines.stream.callback -import arrow.fx.coroutines.stream.chunk -import arrow.fx.coroutines.stream.compile +import arrow.fx.coroutines.startCoroutineCancellable import arrow.fx.coroutines.suspend import arrow.fx.coroutines.throwable import io.kotest.matchers.shouldBe @@ -110,7 +105,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { Stream.callback { emit(1) - sleep(1.seconds) + sleep(500.milliseconds) emit(2) ref.set(true) end() @@ -123,23 +118,11 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { .drain() } - "emits and completes" { - Stream.callback { - emit(1) - emit(2) - emit(3) - end() - } - .compile() - .toList() shouldBe listOf(1, 2, 3) - } - "forwards errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.cancellableCallback { + val s = Stream.cancellable { throw e - } - .compile() + }.compile() assertThrowable { s.drain() @@ -149,10 +132,9 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { "forwards suspended errors" { checkAll(Arb.throwable()) { e -> - val s = Stream.cancellableCallback { + val s = Stream.cancellable { e.suspend() - } - .compile() + }.compile() assertThrowable { s.drain() @@ -165,7 +147,7 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { val latch = Promise() val effect = Atomic(false) - val s = Stream.cancellableCallback { + val s = Stream.cancellable { CancelToken { effect.set(true) } } @@ -184,31 +166,46 @@ class CallbackTest : StreamSpec(iterations = 250, spec = { } } - "can cancel never" { - checkAll(Arb.int()) { + "cancelableF executes generated task uninterruptedly" { + checkAll(Arb.int()) { i -> val latch = Promise() - val exit = Promise() - val f = ForkAndForget { - guaranteeCase({ - Stream.cancellableCallback { - latch.complete(Unit) - never() - CancelToken.unit - } - .compile() - .toList() - }) { ex -> exit.complete(ex) } + val start = Promise() + val done = Promise() + + val task = suspend { + Stream.cancellable { + latch.complete(Unit) + start.get() + cancelBoundary() + emit(Unit) + done.complete(i) + CancelToken.unit + }.compile() + .lastOrError() } + + val p = UnsafePromise() + + val cancel = task.startCoroutineCancellable(CancellableContinuation { r -> p.complete(r) }) + latch.get() - f.cancel() - exit.get() shouldBe ExitCase.Cancelled + + ForkConnected { cancel.invoke() } + + // Let cancel schedule + sleep(10.milliseconds) + + start.complete(Unit) // Continue cancellableF + + done.get() shouldBe i + p.tryGet() shouldBe null } } "doesn't run cancel token without cancellation" { val effect = Atomic(false) - Stream.cancellableCallback { + Stream.cancellable { end() CancelToken { effect.set(true) } } From 35a1840601a2c29bf8ba5950e066f20c8753de6e Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 20:20:26 +0200 Subject: [PATCH 21/23] Apply implement END as Chunk review suggestion --- .../arrow/fx/coroutines/stream/Chunk.kt | 2 +- .../arrow/fx/coroutines/stream/callbacks.kt | 23 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt index 27ddcf1a8..63bf10d46 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt @@ -512,7 +512,7 @@ abstract class Chunk { object Empty : Chunk() { override fun size(): Int = 0 - override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty.apply($i)") + override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty[$i]") override fun splitAtChunk_(n: Int): Pair, Chunk> = TODO("INTERNAL DEV ERROR NUB") override fun copyToArray_(xs: Array, start: Int) = Unit } diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index f52fe60f0..dfda49730 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -14,8 +14,6 @@ import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.startCoroutine import kotlin.experimental.ExperimentalTypeInference -private object END - interface EmitterSyntax { fun emit(a: A): Unit fun emit(chunk: Chunk): Unit @@ -114,11 +112,10 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * If neither `end()` nor other limit operators such as `take(N)` are called, * then the Stream will never end. */ -// @OptIn(ExperimentalTypeInference::class) in 1.3.70 @UseExperimental(ExperimentalTypeInference::class) fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = force { - val q = Queue.unbounded() + val q = Queue.unbounded>() val error = UnsafePromise() val cancel = Promise() @@ -131,9 +128,9 @@ fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax< } f.cancel() }).flatMap { - (q.dequeue() + q.dequeue() .interruptWhen { Either.Left(error.join()) } - .terminateOn { it === END } as Stream>) + .terminateOn { it === END } .flatMap(::chunk) } } @@ -142,9 +139,9 @@ private suspend fun emitterCallback( f: suspend EmitterSyntax.() -> CancelToken, cancel: Promise, error: UnsafePromise, - q: Queue + q: Queue> ): Unit { - val cb = { ch: Any? -> + val cb = { ch: Chunk -> suspend { q.enqueue1(ch) }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> @@ -184,3 +181,13 @@ private suspend fun emitterCallback( } }) } + +private object END : Chunk() { + override fun size(): Int = 0 + override fun get(i: Int): Nothing = + throw throw RuntimeException("NULL_CHUNK[$i]") + + override fun copyToArray_(xs: Array, start: Int) {} + override fun splitAtChunk_(n: Int): Pair, Chunk> = + Pair(empty(), empty()) +} From c0f9a1974bb36eeb619e36b135db23d5ddc0c281 Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 22:41:50 +0200 Subject: [PATCH 22/23] Fix docs --- .../src/main/kotlin/arrow/fx/coroutines/builders.kt | 2 +- .../src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt index b1522d507..690c4396f 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt @@ -115,7 +115,7 @@ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = * cancellableF { cb: (Result>) -> Unit -> * val id = GithubService.getUsernames { names, throwable -> * when { - * names != null -> cb(Result.succes(names)) + * names != null -> cb(Result.success(names)) * throwable != null -> cb(Result.failure(throwable)) * else -> cb(Result.failure(RuntimeException("Null result and no exception"))) * } diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index dfda49730..cdd25ab85 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -55,7 +55,7 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. * * ```kotlin:ank:playground - * import arrow.fx.coroutines.* + * import arrow.fx.coroutines.stream.* * import java.lang.RuntimeException * import java.util.concurrent.Executors * import java.util.concurrent.ScheduledFuture From 543816a88790a288e270b021bf9ed03529b5d24b Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Wed, 29 Jul 2020 23:15:37 +0200 Subject: [PATCH 23/23] Add missing import --- .../src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt index cdd25ab85..ae3790748 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -55,6 +55,7 @@ fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax. * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. * * ```kotlin:ank:playground + * import arrow.fx.coroutines.CancelToken * import arrow.fx.coroutines.stream.* * import java.lang.RuntimeException * import java.util.concurrent.Executors