Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Commit

Permalink
Adds Stream.callback (#199)
Browse files Browse the repository at this point in the history
* Add implementation + tests

Also fixes a couple of nits found in docs

* Added callbackStream javadocs

Move the cancellation to the syntax to avoid infinite emissions not able to call cancel

* Update cancellable docs and align Stream.cancellable semantics

* Apply implement END as Chunk review suggestion

Co-authored-by: Simon Vergauwen <[email protected]>
Co-authored-by: Rachel M. Carmena <[email protected]>
Co-authored-by: Simon Vergauwen <[email protected]>
  • Loading branch information
4 people authored Jul 31, 2020
1 parent 78f8e34 commit 1eb44b5
Show file tree
Hide file tree
Showing 9 changed files with 615 additions and 23 deletions.
114 changes: 112 additions & 2 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,62 @@ import kotlin.coroutines.suspendCoroutine
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED

/**
* Create a cancellable `suspend` function that executes an asynchronous process on evaluation.
* This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
* import java.lang.RuntimeException
* import java.util.concurrent.Executors
* import java.util.concurrent.ScheduledFuture
* import java.util.concurrent.TimeUnit
*
* typealias Callback = (List<String>?, Throwable?) -> Unit
*
* class GithubId
* object GithubService {
* private val listeners: MutableMap<GithubId, ScheduledFuture<*>> = mutableMapOf()
* fun getUsernames(callback: Callback): GithubId {
* val id = GithubId()
* val future = Executors.newScheduledThreadPool(1).run {
* schedule({
* callback(listOf("Arrow"), null)
* shutdown()
* }, 2, TimeUnit.SECONDS)
* }
* listeners[id] = future
* return id
* }
* fun unregisterCallback(id: GithubId): Unit {
* listeners[id]?.cancel(false)
* listeners.remove(id)
* }
* }
*
* suspend fun main(): Unit {
* //sampleStart
* suspend fun getUsernames(): List<String> =
* cancellable { cb: (Result<List<String>>) -> Unit ->
* val id = GithubService.getUsernames { names, throwable ->
* when {
* names != null -> cb(Result.success(names))
* throwable != null -> cb(Result.failure(throwable))
* else -> cb(Result.failure(RuntimeException("Null result and no exception")))
* }
* }
* CancelToken { GithubService.unregisterCallback(id) }
* }
* val result = getUsernames()
* //sampleEnd
* println(result)
* }
* ```
*
* @param cb an asynchronous computation that might fail.
* @see suspendCoroutine for wrapping impure APIs without cancellation
* @see cancellableF for wrapping impure APIs using a suspend with cancellation
*/
suspend fun <A> cancellable(cb: ((Result<A>) -> Unit) -> CancelToken): A =
suspendCoroutine { cont ->
val conn = cont.context.connection()
Expand All @@ -24,7 +80,61 @@ suspend fun <A> cancellable(cb: ((Result<A>) -> Unit) -> CancelToken): A =
} else cancellable.complete(CancelToken.unit)
}

suspend fun <A> cancellableF(k: suspend ((Result<A>) -> Unit) -> CancelToken): A =
/**
* Create a cancellable `suspend` function that executes an asynchronous process on evaluation.
* This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code.
*
* The suspending [cb] runs in an uncancellable manner, acquiring [CancelToken] as a resource.
* If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned.
*
* ```kotlin:ank:playground
* import arrow.core.*
* import arrow.fx.coroutines.*
* import java.lang.RuntimeException
*
* typealias Callback = (List<String>?, Throwable?) -> Unit
*
* class GithubId
* object GithubService {
* private val listeners: MutableMap<GithubId, Callback> = mutableMapOf()
* suspend fun getUsernames(callback: Callback): GithubId {
* val id = GithubId()
* listeners[id] = callback
* ForkConnected { sleep(2.seconds); callback(listOf("Arrow"), null) }
* return id
* }
*
* fun unregisterCallback(id: GithubId): Unit {
* listeners.remove(id)
* }
* }
*
* suspend fun main(): Unit {
* //sampleStart
* suspend fun getUsernames(): List<String> =
* cancellableF { cb: (Result<List<String>>) -> Unit ->
* val id = GithubService.getUsernames { names, throwable ->
* when {
* names != null -> cb(Result.success(names))
* throwable != null -> cb(Result.failure(throwable))
* else -> cb(Result.failure(RuntimeException("Null result and no exception")))
* }
* }
*
* CancelToken { GithubService.unregisterCallback(id) }
* }
*
* val result = getUsernames()
* //sampleEnd
* println(result)
* }
* ```
*
* @param cb an asynchronous computation that might fail.
* @see suspendCoroutine for wrapping impure APIs without cancellation
* @see cancellable for wrapping impure APIs with cancellation
*/
suspend fun <A> cancellableF(cb: suspend ((Result<A>) -> Unit) -> CancelToken): A =
suspendCoroutine { cont ->
val conn = cont.context.connection()

Expand All @@ -51,7 +161,7 @@ suspend fun <A> cancellableF(k: suspend ((Result<A>) -> Unit) -> CancelToken): A
// uninterruptedly, otherwise risking a leak, hence the bracket
// TODO CREATE KotlinTracker issue using CancelToken here breaks something in compilation
bracketCase<suspend () -> Unit, Unit>(
acquire = { k(cb1).cancel },
acquire = { cb(cb1).cancel },
use = { waitUntilCallbackInvoked(state) },
release = { token, ex ->
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package arrow.fx.coroutines

inline infix fun <A, B, C> ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C =
internal inline infix fun <A, B, C> ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C =
{ a -> f(this(a)) }

internal inline infix fun <A, B, C> (suspend (A) -> B).andThen(crossinline f: suspend (B) -> C): suspend (A) -> C =
{ a: A -> f(this(a)) }

infix fun <A> A.prependTo(fa: Iterable<A>): List<A> =
listOf(this) + fa

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ abstract class Chunk<out O> {

object Empty : Chunk<Nothing>() {
override fun size(): Int = 0
override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty.apply($i)")
override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty[$i]")
override fun splitAtChunk_(n: Int): Pair<Chunk<Nothing>, Chunk<Nothing>> = TODO("INTERNAL DEV ERROR NUB")
override fun copyToArray_(xs: Array<Any?>, start: Int) = Unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.uncancellable

/**
* Nondeterministically merges a stream of streams (`outer`) in to a single stream,
* Non-deterministically merges a stream of streams (`outer`) in to a single stream,
* opening at most `maxOpen` streams at any point in time.
*
* The outer stream is evaluated and each resulting inner stream is run concurrently,
Expand Down Expand Up @@ -67,7 +67,7 @@ suspend fun <O> stop(
outputQ.enqueue1(None)
}

suspend fun <O> decrementRunning(
internal suspend fun <O> decrementRunning(
done: SignallingAtomic<Option<Option<Throwable>>>,
outputQ: NoneTerminatedQueue<Chunk<O>>,
running: SignallingAtomic<Int>
Expand All @@ -77,7 +77,7 @@ suspend fun <O> decrementRunning(
Pair(now, (if (now == 0) suspend { stop(done, outputQ, None) } else suspend { Unit }))
}.invoke()

suspend fun incrementRunning(running: SignallingAtomic<Int>): Unit =
internal suspend fun incrementRunning(running: SignallingAtomic<Int>): Unit =
running.update { it + 1 }

// runs inner stream, each stream is forked. terminates when killSignal is true if fails will enq in queue failure
Expand Down Expand Up @@ -121,7 +121,7 @@ internal suspend fun <O> runInner(
}

// runs the outer stream, interrupts when kill == true, and then decrements the `running`
suspend fun <O> Stream<Stream<O>>.runOuter(
internal suspend fun <O> Stream<Stream<O>>.runOuter(
done: SignallingAtomic<Option<Option<Throwable>>>,
outputQ: NoneTerminatedQueue<Chunk<O>>,
running: SignallingAtomic<Int>,
Expand All @@ -146,9 +146,17 @@ suspend fun <O> Stream<Stream<O>>.runOuter(

// awaits when all streams (outer + inner) finished,
// and then collects result of the stream (outer + inner) execution
suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable>>>): Unit =
internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable>>>): Unit =
done.get().flatten().fold({ Unit }, { throw it })

/**
* Merges both Streams into an Stream of A and B represented by Either<A, B>.
* This operation is equivalent to a normal merge but for different types.
*/
fun <A, B> Stream<A>.either(other: Stream<B>): Stream<Either<A, B>> =
Stream(this.map { Either.Left(it) }, other.map { Either.Right(it) })
.parJoin(2)

fun <O> Stream<Stream<O>>.parJoin(maxOpen: Int): Stream<O> {
require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" }
return Stream.effect {
Expand Down Expand Up @@ -180,6 +188,6 @@ fun <O> Stream<Stream<O>>.parJoin(maxOpen: Int): Stream<O> {
}.flatten()
}

/** Like [parJoin] but races all inner streams simultaneously. */
/** Like [parJoin] but races all inner streams simultaneously without limit. */
fun <O> Stream<Stream<O>>.parJoinUnbounded(): Stream<O> =
parJoin(Int.MAX_VALUE)
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
}

/**
* Determinsitically zips elements, terminating when the end of either branch is reached naturally.
* Deterministically zips elements, terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand Down Expand Up @@ -1069,7 +1069,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
zipWith(that) { x, _ -> x }

/**
* Determinsitically zips elements using the specified function,
* Deterministically zips elements using the specified function,
* terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
Expand Down Expand Up @@ -1405,13 +1405,13 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Run the supplied effectful action at the end of this stream, regardless of how the stream terminates.
*/
fun onFinalize(f: suspend () -> Unit): Stream<O> =
bracket({ Unit }, { f.invoke() }).flatMap { this }
bracket({ Unit }, { f() }).flatMap { this }

/**
* Like [onFinalize] but provides the reason for finalization as an `ExitCase`.
*/
fun onFinalizeCase(f: suspend (ExitCase) -> Unit): Stream<O> =
Stream.bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this }
bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this }

/**
* Interrupts the stream, when `haltOnSignal` finishes its evaluation.
Expand All @@ -1433,9 +1433,9 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
/**
* Introduces an explicit scope.
*
* Scopes are normally introduced automatically, when using `bracket` or xsimilar
* Scopes are normally introduced automatically, when using `bracket` or similar
* operations that acquire resources and run finalizers. Manual scope introduction
* is useful when using [onFinalizeWeak]/[onFinalizeCaseWeak], where no scope
* is useful when using [bracketWeak]/[bracketCaseWeak], where no scope
* is introduced.
*/
fun scope(): Stream<O> =
Expand Down Expand Up @@ -1971,7 +1971,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
}

/**
* Determinsitically zips elements, terminating when the ends of both branches
* Deterministically zips elements, terminating when the ends of both branches
* are reached naturally, padding the left branch with `pad1` and padding the right branch
* with `pad2` as necessary.
*
Expand All @@ -1991,7 +1991,7 @@ fun <O, B> Stream<O>.zipAll(that: Stream<B>, pad1: O, pad2: B): Stream<Pair<O, B
zipAllWith(that, pad1, pad2, ::Pair)

/**
* Determinsitically zips elements with the specified function, terminating
* Deterministically zips elements with the specified function, terminating
* when the ends of both branches are reached naturally, padding the left
* branch with `pad1` and padding the right branch with `pad2` as necessary.
*
Expand Down Expand Up @@ -2172,13 +2172,32 @@ fun <O : Any> Stream<O?>.filterNull(): Stream<O> =
* //sampleEnd
* ```
*/
fun <O : Any> Stream<O?>.terminateOnNull(): Stream<O> =
fun <O> Stream<O>.terminateOnNull(): Stream<O> =
terminateOn { it == null }

/**
* Halts the input stream when the condition is true.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
*
* //sampleStart
* suspend fun main(): Unit =
* Stream(1, 2, 3, 4)
* .terminateOn { it == 3 }
* .compile()
* .toList()
* .let(::println) //[1, 2]
* //sampleEnd
* ```
*/
fun <O> Stream<O>.terminateOn(terminator: (O) -> Boolean): Stream<O> =
asPull.repeat { pull ->
pull.unconsOrNull().flatMap { uncons ->
when (uncons) {
null -> Pull.just(null)
else -> {
when (val idx = uncons.head.indexOfFirst { it == null }) {
when (val idx = uncons.head.indexOfFirst(terminator)) {
0 -> Pull.just(null)
null -> Pull.output<O>(uncons.head.map { it!! }).map { uncons.tail }
else -> Pull.output(uncons.head.take(idx).map { it!! }).map { null }
Expand Down Expand Up @@ -2243,6 +2262,10 @@ fun <O> Stream<Option<O>>.terminateOnNone(): Stream<O> =
}
}.stream()

/**
* Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion.
* Note that this doesn't actually limit an infinite stream.
*/
fun <O> Stream<O>.noneTerminate(): Stream<Option<O>> =
map { Some(it) }.append { Stream.just(None) }

Expand All @@ -2258,7 +2281,7 @@ fun <O> emptyStream(): Stream<O> =
Stream.empty()

/**
* Determinsitically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.
* Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand All @@ -2277,7 +2300,7 @@ fun <O> Stream<O>.interleave(that: Stream<O>): Stream<O> =
zip(that).flatMap { (o1, o2) -> Stream(o1, o2) }

/**
* Determinsitically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.
* Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand Down
Loading

0 comments on commit 1eb44b5

Please sign in to comment.