Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Adds Stream.callback #199

Merged
merged 34 commits into from
Jul 31, 2020
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1359f2f
Add implementation + tests
aballano Jun 26, 2020
e86280b
Added callbackStream javadocs
aballano Jun 26, 2020
5e82a12
Rename callbackStream[Cancellable] to async[Cancellable]
aballano Jun 26, 2020
2babd55
Fix long running test ending and count
aballano Jun 26, 2020
0db7bfc
Try to fix flaky test
aballano Jun 26, 2020
574a3fe
Merge branch 'master' into ab/callbackStream
nomisRev Jun 29, 2020
4952430
Configuration: download logs option
rachelcarmena Jun 29, 2020
2d86e1f
Revert "Configuration: download logs option"
rachelcarmena Jun 29, 2020
b02f381
Fixes from code review
aballano Jun 29, 2020
6e8ab19
Test in chunks
aballano Jun 29, 2020
7f2af2c
Emit on computation pool by default
aballano Jun 29, 2020
5506cd1
Tweak tests a bit
aballano Jun 29, 2020
44abbde
Improve tests
aballano Jun 29, 2020
a021f9d
Run async emission in parallel
aballano Jun 29, 2020
12e526c
Merge branch 'master' into ab/callbackStream
rachelcarmena Jun 30, 2020
f75f8ab
nit
aballano Jun 30, 2020
e8c0100
Simplify Stream.async into one single function
aballano Jun 30, 2020
bbfb358
Nits and cleanup
aballano Jun 30, 2020
fd531ee
Merge branch 'master' into ab/callbackStream
aballano Jun 30, 2020
a46c9a0
Merge branch 'master' into ab/callbackStream
nomisRev Jul 1, 2020
2f77e93
Merge branch 'master' into ab/callbackStream
nomisRev Jul 1, 2020
ac58a0b
Merge branch 'master' into ab/callbackStream
aballano Jul 6, 2020
d1f8af9
Merge branch 'master' into ab/callbackStream
aballano Jul 15, 2020
6520aef
Stream.async -> Stream.callback
aballano Jul 20, 2020
6723497
Merge branch 'master' into ab/callbackStream
aballano Jul 20, 2020
0dfb150
Revert "Simplify Stream.async into one single function"
aballano Jul 20, 2020
4391944
Fix docs
aballano Jul 21, 2020
17430b6
Update cancellable docs and align Stream.cancellable semantics
nomisRev Jul 29, 2020
35a1840
Apply implement END as Chunk review suggestion
nomisRev Jul 29, 2020
c0f9a19
Fix docs
nomisRev Jul 29, 2020
543816a
Add missing import
nomisRev Jul 29, 2020
6b815f6
Merge branch 'master' into ab/callbackStream
aballano Jul 30, 2020
f3d0121
Merge branch 'master' into ab/callbackStream
aballano Jul 30, 2020
40ba5a4
Merge branch 'master' into ab/callbackStream
aballano Jul 30, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package arrow.fx.coroutines

inline infix fun <A, B, C> ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C =
internal inline infix fun <A, B, C> ((A) -> B).andThen(crossinline f: (B) -> C): (A) -> C =
{ a -> f(this(a)) }

internal inline infix fun <A, B, C> (suspend (A) -> B).andThen(crossinline f: suspend (B) -> C): suspend (A) -> C =
aballano marked this conversation as resolved.
Show resolved Hide resolved
{ a: A -> f(this(a)) }

infix fun <A> A.prependTo(fa: Iterable<A>): List<A> =
listOf(this) + fa

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import arrow.fx.coroutines.stream.concurrent.SignallingAtomic
import arrow.fx.coroutines.uncancellable

/**
* Nondeterministically merges a stream of streams (`outer`) in to a single stream,
* Non-deterministically merges a stream of streams (`outer`) in to a single stream,
* opening at most `maxOpen` streams at any point in time.
*
* The outer stream is evaluated and each resulting inner stream is run concurrently,
Expand Down Expand Up @@ -67,7 +67,7 @@ suspend fun <O> stop(
outputQ.enqueue1(None)
}

suspend fun <O> decrementRunning(
internal suspend fun <O> decrementRunning(
done: SignallingAtomic<Option<Option<Throwable>>>,
outputQ: NoneTerminatedQueue<Chunk<O>>,
running: SignallingAtomic<Int>
Expand All @@ -77,7 +77,7 @@ suspend fun <O> decrementRunning(
Pair(now, (if (now == 0) suspend { stop(done, outputQ, None) } else suspend { Unit }))
}.invoke()

suspend fun incrementRunning(running: SignallingAtomic<Int>): Unit =
internal suspend fun incrementRunning(running: SignallingAtomic<Int>): Unit =
running.update { it + 1 }

// runs inner stream, each stream is forked. terminates when killSignal is true if fails will enq in queue failure
Expand Down Expand Up @@ -121,7 +121,7 @@ internal suspend fun <O> runInner(
}

// runs the outer stream, interrupts when kill == true, and then decrements the `running`
suspend fun <O> Stream<Stream<O>>.runOuter(
internal suspend fun <O> Stream<Stream<O>>.runOuter(
done: SignallingAtomic<Option<Option<Throwable>>>,
outputQ: NoneTerminatedQueue<Chunk<O>>,
running: SignallingAtomic<Int>,
Expand All @@ -146,9 +146,17 @@ suspend fun <O> Stream<Stream<O>>.runOuter(

// awaits when all streams (outer + inner) finished,
// and then collects result of the stream (outer + inner) execution
suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable>>>): Unit =
internal suspend fun signalResult(done: SignallingAtomic<Option<Option<Throwable>>>): Unit =
done.get().flatten().fold({ Unit }, { throw it })

/**
* Merges both Streams into an Stream of A and B represented by Either<A, B>.
* This operation is equivalent to a normal merge but for different types.
*/
fun <A, B> Stream<A>.either(other: Stream<B>): Stream<Either<A, B>> =
aballano marked this conversation as resolved.
Show resolved Hide resolved
Stream(this.map { Either.Left(it) }, other.map { Either.Right(it) })
.parJoin(2)

fun <O> Stream<Stream<O>>.parJoin(maxOpen: Int): Stream<O> {
require(maxOpen > 0) { "maxOpen must be > 0, was: $maxOpen" }
return Stream.effect {
Expand Down Expand Up @@ -180,6 +188,6 @@ fun <O> Stream<Stream<O>>.parJoin(maxOpen: Int): Stream<O> {
}.flatten()
}

/** Like [parJoin] but races all inner streams simultaneously. */
/** Like [parJoin] but races all inner streams simultaneously without limit. */
fun <O> Stream<Stream<O>>.parJoinUnbounded(): Stream<O> =
parJoin(Int.MAX_VALUE)
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
}

/**
* Determinsitically zips elements, terminating when the end of either branch is reached naturally.
* Deterministically zips elements, terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand Down Expand Up @@ -1069,7 +1069,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
zipWith(that) { x, _ -> x }

/**
* Determinsitically zips elements using the specified function,
* Deterministically zips elements using the specified function,
* terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
Expand Down Expand Up @@ -1405,13 +1405,13 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
* Run the supplied effectful action at the end of this stream, regardless of how the stream terminates.
*/
fun onFinalize(f: suspend () -> Unit): Stream<O> =
bracket({ Unit }, { f.invoke() }).flatMap { this }
bracket({ Unit }, { f() }).flatMap { this }

/**
* Like [onFinalize] but provides the reason for finalization as an `ExitCase`.
*/
fun onFinalizeCase(f: suspend (ExitCase) -> Unit): Stream<O> =
Stream.bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this }
bracketCase({ Unit }) { _, ec -> f(ec) }.flatMap { this }

/**
* Interrupts the stream, when `haltOnSignal` finishes its evaluation.
Expand All @@ -1433,9 +1433,9 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
/**
* Introduces an explicit scope.
*
* Scopes are normally introduced automatically, when using `bracket` or xsimilar
* Scopes are normally introduced automatically, when using `bracket` or similar
* operations that acquire resources and run finalizers. Manual scope introduction
* is useful when using [onFinalizeWeak]/[onFinalizeCaseWeak], where no scope
* is useful when using [bracketWeak]/[bracketCaseWeak], where no scope
* is introduced.
*/
fun scope(): Stream<O> =
Expand Down Expand Up @@ -1971,7 +1971,7 @@ inline fun <A> StreamOf<A>.fix(): Stream<A> =
}

/**
* Determinsitically zips elements, terminating when the ends of both branches
* Deterministically zips elements, terminating when the ends of both branches
* are reached naturally, padding the left branch with `pad1` and padding the right branch
* with `pad2` as necessary.
*
Expand All @@ -1991,7 +1991,7 @@ fun <O, B> Stream<O>.zipAll(that: Stream<B>, pad1: O, pad2: B): Stream<Pair<O, B
zipAllWith(that, pad1, pad2, ::Pair)

/**
* Determinsitically zips elements with the specified function, terminating
* Deterministically zips elements with the specified function, terminating
* when the ends of both branches are reached naturally, padding the left
* branch with `pad1` and padding the right branch with `pad2` as necessary.
*
Expand Down Expand Up @@ -2172,13 +2172,32 @@ fun <O : Any> Stream<O?>.filterNull(): Stream<O> =
* //sampleEnd
* ```
*/
fun <O : Any> Stream<O?>.terminateOnNull(): Stream<O> =
fun <O> Stream<O>.terminateOnNull(): Stream<O> =
terminateOn { it == null }

/**
* Halts the input stream when the condition is true.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
*
* //sampleStart
* suspend fun main(): Unit =
* Stream(1, 2, 3, 4)
* .terminateOn { it == 3 }
* .compile()
* .toList()
* .let(::println) //[1, 2]
* //sampleEnd
* ```
*/
fun <O> Stream<O>.terminateOn(terminator: (O) -> Boolean): Stream<O> =
asPull.repeat { pull ->
pull.unconsOrNull().flatMap { uncons ->
when (uncons) {
null -> Pull.just(null)
else -> {
when (val idx = uncons.head.indexOfFirst { it == null }) {
when (val idx = uncons.head.indexOfFirst(terminator)) {
0 -> Pull.just(null)
null -> Pull.output<O>(uncons.head.map { it!! }).map { uncons.tail }
else -> Pull.output(uncons.head.take(idx).map { it!! }).map { null }
Expand Down Expand Up @@ -2243,6 +2262,10 @@ fun <O> Stream<Option<O>>.terminateOnNone(): Stream<O> =
}
}.stream()

/**
* Wraps the inner values in Option as Some and adds an None at the end of the Stream to signal completion.
* Note that this doesn't actually limit an infinite stream.
*/
fun <O> Stream<O>.noneTerminate(): Stream<Option<O>> =
map { Some(it) }.append { Stream.just(None) }

Expand All @@ -2258,7 +2281,7 @@ fun <O> emptyStream(): Stream<O> =
Stream.empty()

/**
* Determinsitically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.
* Deterministically interleaves elements, starting on the left, terminating when the end of either branch is reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand All @@ -2277,7 +2300,7 @@ fun <O> Stream<O>.interleave(that: Stream<O>): Stream<O> =
zip(that).flatMap { (o1, o2) -> Stream(o1, o2) }

/**
* Determinsitically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.
* Deterministically interleaves elements, starting on the left, terminating when the ends of both branches are reached naturally.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package arrow.fx.coroutines.stream

import arrow.core.Either
import arrow.fx.coroutines.CancelToken
import arrow.fx.coroutines.ExitCase
import arrow.fx.coroutines.ForkConnected
import arrow.fx.coroutines.UnsafePromise
import arrow.fx.coroutines.guaranteeCase
import arrow.fx.coroutines.stream.concurrent.Queue
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine
import kotlin.experimental.ExperimentalTypeInference

private object END
aballano marked this conversation as resolved.
Show resolved Hide resolved
pablisco marked this conversation as resolved.
Show resolved Hide resolved

interface EmitterSyntax<A> {
fun onCancel(cancelF: suspend () -> Unit): Unit
fun emit(a: A): Unit
fun emit(chunk: Chunk<A>): Unit
fun emit(iterable: Iterable<A>): Unit
pablisco marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include an option for emitting a Stream<A>?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially emit with two strategies:

  • emit: waits for the end of the stream before continuing
  • emitPar: pushes the stream up in parallel

The end result would be similar to concatenate and merge operations respectively

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe one could achieve the same by just combining that possible Stream with the callback itself, no?

  • Stream.callback { ... }.merge(Stream.xxx)
  • Stream.callback { ... }.flatMap(Stream.xxx).parJoinUnbounded()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok if we have knowledge of the other stream outside. My thinking was if we require to do conditions inside the callback...
So, depending on the value we get from a given callback we could start one or another.

One use case I can think of for the secuencial emit could be, if we have a lifecycle callback that generates a new stream of user events each time, ending the previous one. This way we could generate a seamless continuous stream of user events.

As for the parallel emit we could have something that listens to multiple UI elements (like selectable items) and would mean we can combine them all into a single stream.

Not a very good example that last one as it could be done as a merge...

Let's create a separate ticket and I'll investigate more useful use cases or drop it if we find it not that worth it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved here: #240

fun emit(vararg aas: A): Unit
fun end(): Unit
}

/**
* Creates a Stream from the given suspended block callback, allowing to emit, set cancel effects and end the emission.
*
* ```kotlin:ank:playground
* import arrow.fx.coroutines.stream.*
*
* //sampleStart
* suspend fun main(): Unit =
* Stream.callback {
* onCancel { /* cancel something */ }
* emit(1)
* emit(2, 3, 4)
* end()
* }
* .compile()
* .toList()
* .let(::println) //[1, 2, 3, 4]
* //sampleEnd
* ```
*
* Note that if neither `end()` nor other limit operators such as `take(N)` are called,
* then the Stream will never end.
*
* Cancellation or errors might happen any time during emission, so it's recommended to call `onCancel` as early as
* possible, otherwise it's not guaranteed to be called.
*/
// @OptIn(ExperimentalTypeInference::class) in 1.3.70
@UseExperimental(ExperimentalTypeInference::class)
fun <A> Stream.Companion.callback(@BuilderInference f: suspend EmitterSyntax<A>.() -> Unit): Stream<A> =
aballano marked this conversation as resolved.
Show resolved Hide resolved
force {
val q = Queue.unbounded<Any?>()
val error = UnsafePromise<Throwable>()
val cancel = UnsafePromise<CancelToken>()

ForkConnected { emitterCallback(f, cancel, error, q) }

(q.dequeue()
.interruptWhen { Either.Left(error.join()) }
.terminateOn { it === END } as Stream<Chunk<A>>)
.flatMap(::chunk)
.onFinalizeCase {
when (it) {
is ExitCase.Cancelled -> {
when (val r = cancel.tryGet()) {
null -> Unit // This means the user didn't set the onCancel or set it too late
else -> r.getOrNull()!!.invoke()
}
}
}
}
}

private suspend fun <A> emitterCallback(
f: suspend EmitterSyntax<A>.() -> Unit,
cancel: UnsafePromise<CancelToken>,
error: UnsafePromise<Throwable>,
q: Queue<Any?>
): Unit {
val cb = { ch: Any? ->
suspend {
q.enqueue1(ch)
}.startCoroutine(Continuation(EmptyCoroutineContext) { r ->
r.fold({ Unit }, { e -> error.complete(Result.success(e)) })
})
}

val emitter = object : EmitterSyntax<A> {
override fun emit(a: A) {
emit(Chunk.just(a))
}

override fun emit(chunk: Chunk<A>) {
cb(chunk)
}

override fun emit(iterable: Iterable<A>) {
cb(Chunk.iterable(iterable))
}

override fun emit(vararg aas: A) {
cb(Chunk(*aas))
}

override fun end() {
cb(END)
}

override fun onCancel(cancelF: suspend () -> Unit) {
cancel.complete(Result.success(CancelToken(cancelF)))
}
}

guaranteeCase({ emitter.f() }, { exit ->
when (exit) {
is ExitCase.Failure -> error.complete(Result.success(exit.failure))
}
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ interface Queue<A> : Enqueue<A>, Dequeue1<A>, Dequeue<A> {
suspend fun <A> bounded(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedFifo(maxSize))

/** Creates a FILO queue with the specified size bound. */
suspend fun <A> boundedLife(maxSize: Int): Queue<A> =
/** Creates a LIFO queue with the specified size bound. */
suspend fun <A> boundedLifo(maxSize: Int): Queue<A> =
fromStrategy(Strategy.boundedLifo(maxSize))

/** Creates a queue which stores the last `maxSize` enqueued elements and which never blocks on enqueue. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import io.kotest.property.arbitrary.int
import io.kotest.property.arbitrary.list
import io.kotest.property.arbitrary.long
import io.kotest.property.arbitrary.string
import io.kotest.property.checkAll

class BracketTest : StreamSpec(spec = {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package arrow.fx.coroutines.stream

import arrow.core.None
import arrow.core.Some
import arrow.core.extensions.list.foldable.combineAll
import arrow.core.extensions.list.foldable.foldMap
import arrow.core.extensions.monoid
Expand Down Expand Up @@ -527,4 +529,25 @@ class StreamTest : StreamSpec(spec = {
.toList() shouldBe List(n) { i }
}
}

"terminateOn" {
Stream(1, 2, 3, 4)
.terminateOn { it % 3 == 0 }
.compile()
.toList() shouldBe listOf(1, 2)
}

"terminateOnNull" {
Stream(1, 2, null, 4)
.terminateOnNull()
.compile()
.toList() shouldBe listOf(1, 2)
}

"terminateOnNone" {
Stream(Some(1), Some(2), None, Some(4))
.terminateOnNone()
.compile()
.toList() shouldBe listOf(1, 2)
}
})
Loading