Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Adds Stream.callback #199

Merged
merged 34 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1359f2f
Add implementation + tests
aballano Jun 26, 2020
e86280b
Added callbackStream javadocs
aballano Jun 26, 2020
5e82a12
Rename callbackStream[Cancellable] to async[Cancellable]
aballano Jun 26, 2020
2babd55
Fix long running test ending and count
aballano Jun 26, 2020
0db7bfc
Try to fix flaky test
aballano Jun 26, 2020
574a3fe
Merge branch 'master' into ab/callbackStream
nomisRev Jun 29, 2020
4952430
Configuration: download logs option
rachelcarmena Jun 29, 2020
2d86e1f
Revert "Configuration: download logs option"
rachelcarmena Jun 29, 2020
b02f381
Fixes from code review
aballano Jun 29, 2020
6e8ab19
Test in chunks
aballano Jun 29, 2020
7f2af2c
Emit on computation pool by default
aballano Jun 29, 2020
5506cd1
Tweak tests a bit
aballano Jun 29, 2020
44abbde
Improve tests
aballano Jun 29, 2020
a021f9d
Run async emission in parallel
aballano Jun 29, 2020
12e526c
Merge branch 'master' into ab/callbackStream
rachelcarmena Jun 30, 2020
f75f8ab
nit
aballano Jun 30, 2020
e8c0100
Simplify Stream.async into one single function
aballano Jun 30, 2020
bbfb358
Nits and cleanup
aballano Jun 30, 2020
fd531ee
Merge branch 'master' into ab/callbackStream
aballano Jun 30, 2020
a46c9a0
Merge branch 'master' into ab/callbackStream
nomisRev Jul 1, 2020
2f77e93
Merge branch 'master' into ab/callbackStream
nomisRev Jul 1, 2020
ac58a0b
Merge branch 'master' into ab/callbackStream
aballano Jul 6, 2020
d1f8af9
Merge branch 'master' into ab/callbackStream
aballano Jul 15, 2020
6520aef
Stream.async -> Stream.callback
aballano Jul 20, 2020
6723497
Merge branch 'master' into ab/callbackStream
aballano Jul 20, 2020
0dfb150
Revert "Simplify Stream.async into one single function"
aballano Jul 20, 2020
4391944
Fix docs
aballano Jul 21, 2020
17430b6
Update cancellable docs and align Stream.cancellable semantics
nomisRev Jul 29, 2020
35a1840
Apply implement END as Chunk review suggestion
nomisRev Jul 29, 2020
c0f9a19
Fix docs
nomisRev Jul 29, 2020
543816a
Add missing import
nomisRev Jul 29, 2020
6b815f6
Merge branch 'master' into ab/callbackStream
aballano Jul 30, 2020
f3d0121
Merge branch 'master' into ab/callbackStream
aballano Jul 30, 2020
40ba5a4
Merge branch 'master' into ab/callbackStream
aballano Jul 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 112 additions & 2 deletions arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,62 @@ import kotlin.coroutines.suspendCoroutine
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED

/**
* Create a cancellable `suspend` function that executes an asynchronous process on evaluation.
* This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.*
* import java.lang.RuntimeException
* import java.util.concurrent.Executors
* import java.util.concurrent.ScheduledFuture
* import java.util.concurrent.TimeUnit
*
* typealias Callback = (List<String>?, Throwable?) -> Unit
*
* class GithubId
* object GithubService {
* private val listeners: MutableMap<GithubId, ScheduledFuture<*>> = mutableMapOf()
* fun getUsernames(callback: Callback): GithubId {
* val id = GithubId()
* val future = Executors.newScheduledThreadPool(1).run {
* schedule({
* callback(listOf("Arrow"), null)
* shutdown()
* }, 2, TimeUnit.SECONDS)
* }
* listeners[id] = future
* return id
* }
* fun unregisterCallback(id: GithubId): Unit {
* listeners[id]?.cancel(false)
* listeners.remove(id)
* }
* }
*
* suspend fun main(): Unit {
* //sampleStart
* suspend fun getUsernames(): List<String> =
* cancellable { cb: (Result<List<String>>) -> Unit ->
* val id = GithubService.getUsernames { names, throwable ->
* when {
* names != null -> cb(Result.success(names))
* throwable != null -> cb(Result.failure(throwable))
* else -> cb(Result.failure(RuntimeException("Null result and no exception")))
* }
* }
* CancelToken { GithubService.unregisterCallback(id) }
* }
* val result = getUsernames()
* //sampleEnd
* println(result)
* }
* ```
*
* @param cb an asynchronous computation that might fail.
* @see suspendCoroutine for wrapping impure APIs without cancellation
* @see cancellableF for wrapping impure APIs using a suspend with cancellation
*/
suspend fun <A> cancellable(cb: ((Result<A>) -> Unit) -> CancelToken): A =
suspendCoroutine { cont ->
val conn = cont.context.connection()
Expand All @@ -24,7 +80,61 @@ suspend fun <A> cancellable(cb: ((Result<A>) -> Unit) -> CancelToken): A =
} else cancellable.complete(CancelToken.unit)
}

suspend fun <A> cancellableF(k: suspend ((Result<A>) -> Unit) -> CancelToken): A =
/**
* Create a cancellable `suspend` function that executes an asynchronous process on evaluation.
* This combinator can be used to wrap callbacks or other similar impure code that requires cancellation code.
*
* The suspending [cb] runs in an uncancellable manner, acquiring [CancelToken] as a resource.
* If cancellation signal is received while [cb] is running, then the [CancelToken] will be triggered as soon as it's returned.
*
* ```kotlin:ank:playground
* import arrow.core.*
* import arrow.fx.coroutines.*
* import java.lang.RuntimeException
*
* typealias Callback = (List<String>?, Throwable?) -> Unit
*
* class GithubId
* object GithubService {
* private val listeners: MutableMap<GithubId, Callback> = mutableMapOf()
* suspend fun getUsernames(callback: Callback): GithubId {
* val id = GithubId()
* listeners[id] = callback
* ForkConnected { sleep(2.seconds); callback(listOf("Arrow"), null) }
* return id
* }
*
* fun unregisterCallback(id: GithubId): Unit {
* listeners.remove(id)
* }
* }
*
* suspend fun main(): Unit {
* //sampleStart
* suspend fun getUsernames(): List<String> =
* cancellableF { cb: (Result<List<String>>) -> Unit ->
* val id = GithubService.getUsernames { names, throwable ->
* when {
* names != null -> cb(Result.success(names))
* throwable != null -> cb(Result.failure(throwable))
* else -> cb(Result.failure(RuntimeException("Null result and no exception")))
* }
* }
*
* CancelToken { GithubService.unregisterCallback(id) }
* }
*
* val result = getUsernames()
* //sampleEnd
* println(result)
* }
* ```
*
* @param cb an asynchronous computation that might fail.
* @see suspendCoroutine for wrapping impure APIs without cancellation
* @see cancellable for wrapping impure APIs with cancellation
*/
suspend fun <A> cancellableF(cb: suspend ((Result<A>) -> Unit) -> CancelToken): A =
suspendCoroutine { cont ->
val conn = cont.context.connection()

Expand All @@ -51,7 +161,7 @@ suspend fun <A> cancellableF(k: suspend ((Result<A>) -> Unit) -> CancelToken): A
// uninterruptedly, otherwise risking a leak, hence the bracket
// TODO CREATE KotlinTracker issue using CancelToken here breaks something in compilation
bracketCase<suspend () -> Unit, Unit>(
acquire = { k(cb1).cancel },
acquire = { cb(cb1).cancel },
use = { waitUntilCallbackInvoked(state) },
release = { token, ex ->
when (ex) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package arrow.fx.coroutines

inline infix fun <A, B, C> ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C =
internal inline infix fun <A, B, C> ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C =
{ a -> f(this(a)) }

internal inline infix fun <A, B, C> (suspend (A) -> B).andThen(crossinline f: suspend (B) -> C): suspend (A) -> C =
aballano marked this conversation as resolved.
Show resolved Hide resolved
{ a: A -> f(this(a)) }

infix fun <A> A.prependTo(fa: Iterable<A>): List<A> =
listOf(this) + fa

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ abstract class Chunk<out O> {

object Empty : Chunk<Nothing>() {
override fun size(): Int = 0
override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty.apply($i)")
override fun get(i: Int): Nothing = throw RuntimeException("Chunk.empty[$i]")
override fun splitAtChunk_(n: Int): Pair<Chunk<Nothing>, Chunk<Nothing>> = TODO("INTERNAL DEV ERROR NUB")
override fun copyToArray_(xs: Array<Any?>, start: Int) = Unit
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.uncancellable

/**
* Nondeterministically merges a stream of streams (`outer`) in to a single stream,
* Non-deterministically merges a stream of streams (`outer`) in to a single stream,
* opening at most `maxOpen` streams at any point in time.
*
* The outer stream is evaluated and each resulting inner stream is run concurrently,
Expand Down Expand Up @@ -67,7 +67,7 @@ suspend fun <O> stop(
outputQ.enqueue1(None)
}

suspend fun <O> decrementRunning(
internal suspend fun <O> decrementRunning(
done: SignallingAtomic<Option<Option<Throwable>>>,
outputQ: NoneTerminatedQueue<Chunk<O>>,
running: SignallingAtomic<Int>
Expand All @@ -77,7 +77,7 @@ suspend fun <O> decrementRunning(
Pair(now, (if (now == 0) suspend { stop(done, outputQ, None) } else suspend { Unit }))
}.invoke()

suspend fun incrementRunning(running: SignallingAtomic<Int>): Unit =
internal suspend fun incrementRunning(running: SignallingAtomic<Int>): Unit =
running.update { it + 1 }

// runs inner stream, each stream is forked. terminates when killSignal is true if fails will enq in queue failure
Expand Down Expand Up @@ -121,7 +121,7 @@ internal suspend fun <O> runInner(
}

// runs the outer stream, interrupts when kill == true, and then decrements the `running`
suspend fun <O> Stream<Stream<O>>.runOuter(
internal suspend fun <O> Stream<Stream<O>>.runOuter(
done: SignallingAtomic<Option<Option<Throwable>>>,
outputQ: NoneTerminatedQueue<Chunk<O>>,
running: SignallingAtomic<Int>,
Expand All @@ -146,9 +146,17 @@ suspend fun <O> Stream<Stream<O>>.runOuter(

// awaits when all streams (outer + inner) finished,
// and then collects result of the stream (outer + inner) execution
suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable>>>): Unit =
internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable>>>): Unit =
done.get().flatten().fold({ Unit }, { throw it })

/**
* Merges both Streams into an Stream of A and B represented by Either<A, B>.
* This operation is equivalent to a normal merge but for different types.
*/
fun <A, B> Stream<A>.either(other: Stream<B>): Stream<Either<A, B>> =
aballano marked this conversation as resolved.
Show resolved Hide resolved
Stream(this.map { Either.Left(it) }, other.map { Either.Right(it) })
.parJoin(2)

fun <O> Stream<Stream<O>>.parJoin(maxOpen: Int): Stream<O> {
require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" }
return Stream.effect {
Expand Down Expand Up @@ -180,6 +188,6 @@ fun <O> Stream<Stream<O>>.parJoin(maxOpen: Int): Stream<O> {
}.flatten()
}

/** Like [parJoin] but races all inner streams simultaneously. */
/** Like [parJoin] but races all inner streams simultaneously without limit. */
fun <O> Stream<Stream<O>>.parJoinUnbounded(): Stream<O> =
parJoin(Int.MAX_VALUE)
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
}

/**
* Determinsitically zips elements, terminating when the end of either branch is reached naturally.
* Deterministically zips elements, terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand Down Expand Up @@ -1069,7 +1069,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
zipWith(that) { x, _ -> x }

/**
* Determinsitically zips elements using the specified function,
* Deterministically zips elements using the specified function,
* terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
Expand Down Expand Up @@ -1405,13 +1405,13 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Run the supplied effectful action at the end of this stream, regardless of how the stream terminates.
*/
fun onFinalize(f: suspend () -> Unit): Stream<O> =
bracket({ Unit }, { f.invoke() }).flatMap { this }
bracket({ Unit }, { f() }).flatMap { this }

/**
* Like [onFinalize] but provides the reason for finalization as an `ExitCase`.
*/
fun onFinalizeCase(f: suspend (ExitCase) -> Unit): Stream<O> =
Stream.bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this }
bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this }

/**
* Interrupts the stream, when `haltOnSignal` finishes its evaluation.
Expand All @@ -1433,9 +1433,9 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
/**
* Introduces an explicit scope.
*
* Scopes are normally introduced automatically, when using `bracket` or xsimilar
* Scopes are normally introduced automatically, when using `bracket` or similar
* operations that acquire resources and run finalizers. Manual scope introduction
* is useful when using [onFinalizeWeak]/[onFinalizeCaseWeak], where no scope
* is useful when using [bracketWeak]/[bracketCaseWeak], where no scope
* is introduced.
*/
fun scope(): Stream<O> =
Expand Down Expand Up @@ -1971,7 +1971,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
}

/**
* Determinsitically zips elements, terminating when the ends of both branches
* Deterministically zips elements, terminating when the ends of both branches
* are reached naturally, padding the left branch with `pad1` and padding the right branch
* with `pad2` as necessary.
*
Expand All @@ -1991,7 +1991,7 @@ fun <O, B> Stream<O>.zipAll(that: Stream<B>, pad1: O, pad2: B): Stream<Pair<O, B
zipAllWith(that, pad1, pad2, ::Pair)

/**
* Determinsitically zips elements with the specified function, terminating
* Deterministically zips elements with the specified function, terminating
* when the ends of both branches are reached naturally, padding the left
* branch with `pad1` and padding the right branch with `pad2` as necessary.
*
Expand Down Expand Up @@ -2172,13 +2172,32 @@ fun <O : Any> Stream<O?>.filterNull(): Stream<O> =
* //sampleEnd
* ```
*/
fun <O : Any> Stream<O?>.terminateOnNull(): Stream<O> =
fun <O> Stream<O>.terminateOnNull(): Stream<O> =
terminateOn { it == null }

/**
* Halts the input stream when the condition is true.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
*
* //sampleStart
* suspend fun main(): Unit =
* Stream(1, 2, 3, 4)
* .terminateOn { it == 3 }
* .compile()
* .toList()
* .let(::println) //[1, 2]
* //sampleEnd
* ```
*/
fun <O> Stream<O>.terminateOn(terminator: (O) -> Boolean): Stream<O> =
asPull.repeat { pull ->
pull.unconsOrNull().flatMap { uncons ->
when (uncons) {
null -> Pull.just(null)
else -> {
when (val idx = uncons.head.indexOfFirst { it == null }) {
when (val idx = uncons.head.indexOfFirst(terminator)) {
0 -> Pull.just(null)
null -> Pull.output<O>(uncons.head.map { it!! }).map { uncons.tail }
else -> Pull.output(uncons.head.take(idx).map { it!! }).map { null }
Expand Down Expand Up @@ -2243,6 +2262,10 @@ fun <O> Stream<Option<O>>.terminateOnNone(): Stream<O> =
}
}.stream()

/**
* Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion.
* Note that this doesn't actually limit an infinite stream.
*/
fun <O> Stream<O>.noneTerminate(): Stream<Option<O>> =
map { Some(it) }.append { Stream.just(None) }

Expand All @@ -2258,7 +2281,7 @@ fun <O> emptyStream(): Stream<O> =
Stream.empty()

/**
* Determinsitically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.
* Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand All @@ -2277,7 +2300,7 @@ fun <O> Stream<O>.interleave(that: Stream<O>): Stream<O> =
zip(that).flatMap { (o1, o2) -> Stream(o1, o2) }

/**
* Determinsitically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.
* Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand Down
Loading