diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt index c5dc4a72a..690c4396f 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt @@ -4,6 +4,62 @@ import kotlin.coroutines.suspendCoroutine import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED +/** + * Create a cancellable `suspend` function that executes an asynchronous process on evaluation. + * This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.* + * import java.lang.RuntimeException + * import java.util.concurrent.Executors + * import java.util.concurrent.ScheduledFuture + * import java.util.concurrent.TimeUnit + * + * typealias Callback = (List?, Throwable?) -> Unit + * + * class GithubId + * object GithubService { + * private val listeners: MutableMap> = mutableMapOf() + * fun getUsernames(callback: Callback): GithubId { + * val id = GithubId() + * val future = Executors.newScheduledThreadPool(1).run { + * schedule({ + * callback(listOf("Arrow"), null) + * shutdown() + * }, 2, TimeUnit.SECONDS) + * } + * listeners[id] = future + * return id + * } + * fun unregisterCallback(id: GithubId): Unit { + * listeners[id]?.cancel(false) + * listeners.remove(id) + * } + * } + * + * suspend fun main(): Unit { + * //sampleStart + * suspend fun getUsernames(): List = + * cancellable { cb: (Result>) -> Unit -> + * val id = GithubService.getUsernames { names, throwable -> + * when { + * names != null -> cb(Result.success(names)) + * throwable != null -> cb(Result.failure(throwable)) + * else -> cb(Result.failure(RuntimeException("Null result and no exception"))) + * } + * } + * CancelToken { GithubService.unregisterCallback(id) } + * } + * val result = getUsernames() + * //sampleEnd + * println(result) + * } + * ``` + * + * @param cb an asynchronous computation that might fail. + * @see suspendCoroutine for wrapping impure APIs without cancellation + * @see cancellableF for wrapping impure APIs using a suspend with cancellation + */ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = suspendCoroutine { cont -> val conn = cont.context.connection() @@ -24,7 +80,61 @@ suspend fun cancellable(cb: ((Result) -> Unit) -> CancelToken): A = } else cancellable.complete(CancelToken.unit) } -suspend fun cancellableF(k: suspend ((Result) -> Unit) -> CancelToken): A = +/** + * Create a cancellable `suspend` function that executes an asynchronous process on evaluation. + * This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code. + * + * The suspending [cb] runs in an uncancellable manner, acquiring [CancelToken] as a resource. + * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. + * + * ```kotlin:ank:playground + * import arrow.core.* + * import arrow.fx.coroutines.* + * import java.lang.RuntimeException + * + * typealias Callback = (List?, Throwable?) -> Unit + * + * class GithubId + * object GithubService { + * private val listeners: MutableMap = mutableMapOf() + * suspend fun getUsernames(callback: Callback): GithubId { + * val id = GithubId() + * listeners[id] = callback + * ForkConnected { sleep(2.seconds); callback(listOf("Arrow"), null) } + * return id + * } + * + * fun unregisterCallback(id: GithubId): Unit { + * listeners.remove(id) + * } + * } + * + * suspend fun main(): Unit { + * //sampleStart + * suspend fun getUsernames(): List = + * cancellableF { cb: (Result>) -> Unit -> + * val id = GithubService.getUsernames { names, throwable -> + * when { + * names != null -> cb(Result.success(names)) + * throwable != null -> cb(Result.failure(throwable)) + * else -> cb(Result.failure(RuntimeException("Null result and no exception"))) + * } + * } + * + * CancelToken { GithubService.unregisterCallback(id) } + * } + * + * val result = getUsernames() + * //sampleEnd + * println(result) + * } + * ``` + * + * @param cb an asynchronous computation that might fail. + * @see suspendCoroutine for wrapping impure APIs without cancellation + * @see cancellable for wrapping impure APIs with cancellation + */ +suspend fun cancellableF(cb: suspend ((Result) -> Unit) -> CancelToken): A = suspendCoroutine { cont -> val conn = cont.context.connection() @@ -51,7 +161,7 @@ suspend fun cancellableF(k: suspend ((Result) -> Unit) -> CancelToken): A // uninterruptedly, otherwise risking a leak, hence the bracket // TODO CREATE KotlinTracker issue using CancelToken here breaks something in compilation bracketCase Unit, Unit>( - acquire = { k(cb1).cancel }, + acquire = { cb(cb1).cancel }, use = { waitUntilCallbackInvoked(state) }, release = { token, ex -> when (ex) { diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt index c1408cba9..55eb40ea9 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/predef.kt @@ -1,8 +1,11 @@ package arrow.fx.coroutines -inline infix fun ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C = +internal inline infix fun ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C = { a -> f(this(a)) } +internal inline infix fun (suspend (A) -> B).andThen(crossinline f: suspend (B) -> C): suspend (A) -> C = + { a: A -> f(this(a)) } + infix fun A.prependTo(fa: Iterable): List = listOf(this) + fa diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt index 27ddcf1a8..63bf10d46 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Chunk.kt @@ -512,7 +512,7 @@ abstract class Chunk { object Empty : Chunk() { override fun size(): Int = 0 - override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty.apply($i)") + override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty[$i]") override fun splitAtChunk_(n: Int): Pair, Chunk> = TODO("INTERNAL DEV ERROR NUB") override fun copyToArray_(xs: Array, start: Int) = Unit } diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt index 7a694688f..956dbefe5 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt @@ -16,7 +16,7 @@ import arrow.fx.coroutines.stream.concurrent.SignallingAtomic import arrow.fx.coroutines.uncancellable /** - * Nondeterministically merges a stream of streams (`outer`) in to a single stream, + * Non-deterministically merges a stream of streams (`outer`) in to a single stream, * opening at most `maxOpen` streams at any point in time. * * The outer stream is evaluated and each resulting inner stream is run concurrently, @@ -67,7 +67,7 @@ suspend fun stop( outputQ.enqueue1(None) } -suspend fun decrementRunning( +internal suspend fun decrementRunning( done: SignallingAtomic>>, outputQ: NoneTerminatedQueue>, running: SignallingAtomic @@ -77,7 +77,7 @@ suspend fun decrementRunning( Pair(now, (if (now == 0) suspend { stop(done, outputQ, None) } else suspend { Unit })) }.invoke() -suspend fun incrementRunning(running: SignallingAtomic): Unit = +internal suspend fun incrementRunning(running: SignallingAtomic): Unit = running.update { it + 1 } // runs inner stream, each stream is forked. terminates when killSignal is true if fails will enq in queue failure @@ -121,7 +121,7 @@ internal suspend fun runInner( } // runs the outer stream, interrupts when kill == true, and then decrements the `running` -suspend fun Stream>.runOuter( +internal suspend fun Stream>.runOuter( done: SignallingAtomic>>, outputQ: NoneTerminatedQueue>, running: SignallingAtomic, @@ -146,9 +146,17 @@ suspend fun Stream>.runOuter( // awaits when all streams (outer + inner) finished, // and then collects result of the stream (outer + inner) execution -suspend fun signalResult(done: SignallingAtomic>>): Unit = +internal suspend fun signalResult(done: SignallingAtomic>>): Unit = done.get().flatten().fold({ Unit }, { throw it }) +/** + * Merges both Streams into an Stream of A and B represented by Either. + * This operation is equivalent to a normal merge but for different types. + */ +fun Stream.either(other: Stream): Stream> = + Stream(this.map { Either.Left(it) }, other.map { Either.Right(it) }) + .parJoin(2) + fun Stream>.parJoin(maxOpen: Int): Stream { require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" } return Stream.effect { @@ -180,6 +188,6 @@ fun Stream>.parJoin(maxOpen: Int): Stream { }.flatten() } -/** Like [parJoin] but races all inner streams simultaneously. */ +/** Like [parJoin] but races all inner streams simultaneously without limit. */ fun Stream>.parJoinUnbounded(): Stream = parJoin(Int.MAX_VALUE) diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt index 0701dbd02..7561a0de9 100644 --- a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/Stream.kt @@ -1037,7 +1037,7 @@ inline fun StreamOf.fix(): Stream = } /** - * Determinsitically zips elements, terminating when the end of either branch is reached naturally. + * Deterministically zips elements, terminating when the end of either branch is reached naturally. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* @@ -1069,7 +1069,7 @@ inline fun StreamOf.fix(): Stream = zipWith(that) { x, _ -> x } /** - * Determinsitically zips elements using the specified function, + * Deterministically zips elements using the specified function, * terminating when the end of either branch is reached naturally. * * ```kotlin:ank:playground @@ -1405,13 +1405,13 @@ inline fun StreamOf.fix(): Stream = * Run the supplied effectful action at the end of this stream, regardless of how the stream terminates. */ fun onFinalize(f: suspend () -> Unit): Stream = - bracket({ Unit }, { f.invoke() }).flatMap { this } + bracket({ Unit }, { f() }).flatMap { this } /** * Like [onFinalize] but provides the reason for finalization as an `ExitCase`. */ fun onFinalizeCase(f: suspend (ExitCase) -> Unit): Stream = - Stream.bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this } + bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this } /** * Interrupts the stream, when `haltOnSignal` finishes its evaluation. @@ -1433,9 +1433,9 @@ inline fun StreamOf.fix(): Stream = /** * Introduces an explicit scope. * - * Scopes are normally introduced automatically, when using `bracket` or xsimilar + * Scopes are normally introduced automatically, when using `bracket` or similar * operations that acquire resources and run finalizers. Manual scope introduction - * is useful when using [onFinalizeWeak]/[onFinalizeCaseWeak], where no scope + * is useful when using [bracketWeak]/[bracketCaseWeak], where no scope * is introduced. */ fun scope(): Stream = @@ -1971,7 +1971,7 @@ inline fun StreamOf.fix(): Stream = } /** - * Determinsitically zips elements, terminating when the ends of both branches + * Deterministically zips elements, terminating when the ends of both branches * are reached naturally, padding the left branch with `pad1` and padding the right branch * with `pad2` as necessary. * @@ -1991,7 +1991,7 @@ fun Stream.zipAll(that: Stream, pad1: O, pad2: B): Stream Stream.filterNull(): Stream = * //sampleEnd * ``` */ -fun Stream.terminateOnNull(): Stream = +fun Stream.terminateOnNull(): Stream = + terminateOn { it == null } + +/** + * Halts the input stream when the condition is true. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream(1, 2, 3, 4) + * .terminateOn { it == 3 } + * .compile() + * .toList() + * .let(::println) //[1, 2] + * //sampleEnd + * ``` + */ +fun Stream.terminateOn(terminator: (O) -> Boolean): Stream = asPull.repeat { pull -> pull.unconsOrNull().flatMap { uncons -> when (uncons) { null -> Pull.just(null) else -> { - when (val idx = uncons.head.indexOfFirst { it == null }) { + when (val idx = uncons.head.indexOfFirst(terminator)) { 0 -> Pull.just(null) null -> Pull.output(uncons.head.map { it!! }).map { uncons.tail } else -> Pull.output(uncons.head.take(idx).map { it!! }).map { null } @@ -2243,6 +2262,10 @@ fun Stream>.terminateOnNone(): Stream = } }.stream() +/** + * Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion. + * Note that this doesn't actually limit an infinite stream. + */ fun Stream.noneTerminate(): Stream> = map { Some(it) }.append { Stream.just(None) } @@ -2258,7 +2281,7 @@ fun emptyStream(): Stream = Stream.empty() /** - * Determinsitically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally. + * Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* @@ -2277,7 +2300,7 @@ fun Stream.interleave(that: Stream): Stream = zip(that).flatMap { (o1, o2) -> Stream(o1, o2) } /** - * Determinsitically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally. + * Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally. * * ```kotlin:ank:playground * import arrow.fx.coroutines.stream.* diff --git a/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt new file mode 100644 index 000000000..ae3790748 --- /dev/null +++ b/arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt @@ -0,0 +1,194 @@ +package arrow.fx.coroutines.stream + +import arrow.core.Either +import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.ExitCase +import arrow.fx.coroutines.ForkAndForget +import arrow.fx.coroutines.Promise +import arrow.fx.coroutines.UnsafePromise +import arrow.fx.coroutines.andThen +import arrow.fx.coroutines.guaranteeCase +import arrow.fx.coroutines.stream.concurrent.Queue +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine +import kotlin.experimental.ExperimentalTypeInference + +interface EmitterSyntax { + fun emit(a: A): Unit + fun emit(chunk: Chunk): Unit + fun emit(iterable: Iterable): Unit + fun emit(vararg aas: A): Unit + fun end(): Unit +} + +/** + * Creates a Stream from the given suspended block callback, allowing to emit, set cancel effects and end the emission. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.stream.* + * + * //sampleStart + * suspend fun main(): Unit = + * Stream.callback { + * emit(1) + * emit(2, 3, 4) + * end() + * } + * .compile() + * .toList() + * .let(::println) //[1, 2, 3, 4] + * //sampleEnd + * ``` + * + * Note that if neither `end()` nor other limit operators such as `take(N)` are called, + * then the Stream will never end. + */ +@UseExperimental(ExperimentalTypeInference::class) +fun Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax.() -> Unit): Stream = + Stream.cancellable(f.andThen { CancelToken.unit }) + +/** + * Creates a cancellable Stream from the given suspended block that will evaluate the passed [CancelToken] if cancelled. + * + * The suspending [f] runs in an uncancellable manner, acquiring [CancelToken] as a resource. + * If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned. + * + * ```kotlin:ank:playground + * import arrow.fx.coroutines.CancelToken + * import arrow.fx.coroutines.stream.* + * import java.lang.RuntimeException + * import java.util.concurrent.Executors + * import java.util.concurrent.ScheduledFuture + * import java.util.concurrent.TimeUnit + * + * typealias Callback = (List?, Throwable?) -> Unit + * + * class GithubId + * object GithubService { + * private val listeners: MutableMap> = mutableMapOf() + * fun getUsernames(callback: Callback): GithubId { + * val id = GithubId() + * val future = Executors.newScheduledThreadPool(1).run { + * var count = 0 + * scheduleAtFixedRate({ + * callback(listOf("Arrow - ${count++}"), null) + * }, 0, 500, TimeUnit.MILLISECONDS) + * } + * listeners[id] = future + * return id + * } + * + * fun unregisterCallback(id: GithubId): Unit { + * listeners[id]?.cancel(false) + * listeners.remove(id) + * } + * } + * + * suspend fun main(): Unit { + * //sampleStart + * fun getUsernames(): Stream = + * Stream.cancellable { + * val id = GithubService.getUsernames { names, throwable -> + * when { + * names != null -> emit(names) + * throwable != null -> throw throwable + * else -> throw RuntimeException("Null result and no exception") + * } + * } + * + * CancelToken { GithubService.unregisterCallback(id) } + * } + * + * val result = getUsernames() + * .take(3) + * .compile() + * .toList() + * + * //sampleEnd + * println(result) + * } + * ``` + * + * If neither `end()` nor other limit operators such as `take(N)` are called, + * then the Stream will never end. + */ +@UseExperimental(ExperimentalTypeInference::class) +fun Stream.Companion.cancellable(@BuilderInference f: suspend EmitterSyntax.() -> CancelToken): Stream = + force { + val q = Queue.unbounded>() + val error = UnsafePromise() + val cancel = Promise() + + Stream.bracketCase({ + ForkAndForget { emitterCallback(f, cancel, error, q) } + }, { f, exit -> + when (exit) { + is ExitCase.Cancelled -> cancel.get().cancel.invoke() + else -> Unit + } + f.cancel() + }).flatMap { + q.dequeue() + .interruptWhen { Either.Left(error.join()) } + .terminateOn { it === END } + .flatMap(::chunk) + } + } + +private suspend fun emitterCallback( + f: suspend EmitterSyntax.() -> CancelToken, + cancel: Promise, + error: UnsafePromise, + q: Queue> +): Unit { + val cb = { ch: Chunk -> + suspend { + q.enqueue1(ch) + }.startCoroutine(Continuation(EmptyCoroutineContext) { r -> + r.fold({ Unit }, { e -> error.complete(Result.success(e)) }) + }) + } + + val emitter = object : EmitterSyntax { + override fun emit(a: A) { + emit(Chunk.just(a)) + } + + override fun emit(chunk: Chunk) { + cb(chunk) + } + + override fun emit(iterable: Iterable) { + cb(Chunk.iterable(iterable)) + } + + override fun emit(vararg aas: A) { + cb(Chunk(*aas)) + } + + override fun end() { + cb(END) + } + } + + guaranteeCase({ + val cancelT = emitter.f() + cancel.complete(cancelT) + }, { exit -> + when (exit) { + is ExitCase.Failure -> error.complete(Result.success(exit.failure)) + else -> Unit + } + }) +} + +private object END : Chunk() { + override fun size(): Int = 0 + override fun get(i: Int): Nothing = + throw throw RuntimeException("NULL_CHUNK[$i]") + + override fun copyToArray_(xs: Array, start: Int) {} + override fun splitAtChunk_(n: Int): Pair, Chunk> = + Pair(empty(), empty()) +} diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt index 46014b946..bdd5c959c 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/BracketTest.kt @@ -20,7 +20,6 @@ import io.kotest.property.arbitrary.int import io.kotest.property.arbitrary.list import io.kotest.property.arbitrary.long import io.kotest.property.arbitrary.string -import io.kotest.property.checkAll class BracketTest : StreamSpec(spec = { diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt new file mode 100644 index 000000000..ffd655755 --- /dev/null +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/CallbackTest.kt @@ -0,0 +1,232 @@ +package arrow.fx.coroutines.stream + +import arrow.fx.coroutines.Atomic +import arrow.fx.coroutines.CancelToken +import arrow.fx.coroutines.CancellableContinuation +import arrow.fx.coroutines.ForkAndForget +import arrow.fx.coroutines.ForkConnected +import arrow.fx.coroutines.Promise +import arrow.fx.coroutines.Schedule +import arrow.fx.coroutines.StreamSpec +import arrow.fx.coroutines.UnsafePromise +import arrow.fx.coroutines.assertThrowable +import arrow.fx.coroutines.cancelBoundary +import arrow.fx.coroutines.milliseconds +import arrow.fx.coroutines.parTupledN +import arrow.fx.coroutines.sleep +import arrow.fx.coroutines.startCoroutineCancellable +import arrow.fx.coroutines.suspend +import arrow.fx.coroutines.throwable +import io.kotest.matchers.shouldBe +import io.kotest.property.Arb +import io.kotest.property.arbitrary.int +import io.kotest.property.arbitrary.list +import io.kotest.property.arbitrary.map +import kotlin.coroutines.Continuation +import kotlin.coroutines.EmptyCoroutineContext +import kotlin.coroutines.startCoroutine + +class CallbackTest : StreamSpec(iterations = 250, spec = { + + "should be lazy" { + checkAll(Arb.int()) { + val effect = Atomic(false) + val s = Stream.callback { + effect.set(true) + end() + } + + effect.get() shouldBe false + s.compile().drain() + effect.get() shouldBe true + } + } + + "emits values" { + checkAll(Arb.list(Arb.int())) { list -> + Stream.callback { + list.forEach { emit(it) } + end() + } + .compile() + .toList() shouldBe list + } + } + + "emits varargs" { + checkAll(Arb.list(Arb.int()).map { it.toTypedArray() }) { list -> + Stream.callback { + emit(*list) + end() + } + .chunks() + .compile() + .toList() shouldBe listOf(Chunk(*list)) + } + } + + "emits iterable" { + checkAll(Arb.list(Arb.int())) { list -> + Stream.callback { + emit(list) + end() + } + .chunks() + .compile() + .toList() shouldBe listOf(Chunk.iterable(list)) + } + } + + "emits chunks" { + checkAll(Arb.chunk(Arb.int()), Arb.chunk(Arb.int())) { ch, ch2 -> + Stream.callback { + emit(ch) + emit(ch2) + end() + } + .chunks() + .compile() + .toList() shouldBe listOf(ch, ch2) + } + } + + "long running emission" { + Stream.callback { + ForkAndForget { + countToCallback(4, { it }, { emit(it) }) { end() } + } + } + .compile() + .toList() shouldBe listOf(1, 2, 3, 4, 5) + } + + "parallel emission/pulling" { + val ref = Atomic(false) + + Stream.callback { + emit(1) + sleep(500.milliseconds) + emit(2) + ref.set(true) + end() + } + .effectMap { + if (it == 1) ref.get() shouldBe false + else Unit + } + .compile() + .drain() + } + + "forwards errors" { + checkAll(Arb.throwable()) { e -> + val s = Stream.cancellable { + throw e + }.compile() + + assertThrowable { + s.drain() + } shouldBe e + } + } + + "forwards suspended errors" { + checkAll(Arb.throwable()) { e -> + val s = Stream.cancellable { + e.suspend() + }.compile() + + assertThrowable { + s.drain() + } shouldBe e + } + } + + "runs cancel token" { + checkAll(Arb.int()) { + val latch = Promise() + val effect = Atomic(false) + + val s = Stream.cancellable { + CancelToken { effect.set(true) } + } + + val f = ForkAndForget { + parTupledN( + { s.compile().drain() }, + { latch.complete(Unit) } + ) + } + + parTupledN({ latch.get() }, { sleep(20.milliseconds) }) + + f.cancel() + + effect.get() shouldBe true + } + } + + "cancelableF executes generated task uninterruptedly" { + checkAll(Arb.int()) { i -> + val latch = Promise() + val start = Promise() + val done = Promise() + + val task = suspend { + Stream.cancellable { + latch.complete(Unit) + start.get() + cancelBoundary() + emit(Unit) + done.complete(i) + CancelToken.unit + }.compile() + .lastOrError() + } + + val p = UnsafePromise() + + val cancel = task.startCoroutineCancellable(CancellableContinuation { r -> p.complete(r) }) + + latch.get() + + ForkConnected { cancel.invoke() } + + // Let cancel schedule + sleep(10.milliseconds) + + start.complete(Unit) // Continue cancellableF + + done.get() shouldBe i + p.tryGet() shouldBe null + } + } + + "doesn't run cancel token without cancellation" { + val effect = Atomic(false) + + Stream.cancellable { + end() + CancelToken { effect.set(true) } + } + .compile() + .drain() + + effect.get() shouldBe false + } +}) + +private fun countToCallback( + iterations: Int, + map: (Int) -> A, + cb: suspend (A) -> Unit, + onEnd: suspend () -> Unit = { } +): Unit = suspend { + var i = 0 + arrow.fx.coroutines.repeat(Schedule.recurs(iterations)) { + i += 1 + cb(map(i)) + sleep(500.milliseconds) + } + onEnd() +}.startCoroutine(Continuation(EmptyCoroutineContext) { }) diff --git a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt index 1956d08b0..cd355b04d 100644 --- a/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt +++ b/arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt @@ -1,5 +1,7 @@ package arrow.fx.coroutines.stream +import arrow.core.None +import arrow.core.Some import arrow.core.extensions.list.foldable.combineAll import arrow.core.extensions.list.foldable.foldMap import arrow.core.extensions.monoid @@ -527,4 +529,25 @@ class StreamTest : StreamSpec(spec = { .toList() shouldBe List(n) { i } } } + + "terminateOn" { + Stream(1, 2, 3, 4) + .terminateOn { it % 3 == 0 } + .compile() + .toList() shouldBe listOf(1, 2) + } + + "terminateOnNull" { + Stream(1, 2, null, 4) + .terminateOnNull() + .compile() + .toList() shouldBe listOf(1, 2) + } + + "terminateOnNone" { + Stream(Some(1), Some(2), None, Some(4)) + .terminateOnNone() + .compile() + .toList() shouldBe listOf(1, 2) + } })