-
Notifications
You must be signed in to change notification settings - Fork 15
Conversation
Also fixes a couple of nits found in docs
Some Ktlint warnings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Emitting empty chunks is broken.
arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt
Outdated
Show resolved
Hide resolved
arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackStreamTest.kt
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still a bunch of open unanswered review remarks from before.
arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt
Outdated
Show resolved
Hide resolved
This reverts commit 4952430.
Move the cancellation to the syntax to avoid infinite emissions not able to call cancel
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Outdated
Show resolved
Hide resolved
# Conflicts: # arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Show resolved
Hide resolved
This reverts commit e8c0100. # Conflicts: # arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt # arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt
} | ||
} | ||
|
||
"can cancel never" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We would need to refactor this test somehow, any idea? cc @nomisRev
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @aballano,
Sorry for the long wait :/ I aligned the semantics of Stream.cancellable
to cancellable
as discussed, and also tried to update the documentation and examples to clarify how these functions work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job 💯 Just a couple of thoughts/comments
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/ParJoin.kt
Show resolved
Hide resolved
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Show resolved
Hide resolved
arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
Outdated
Show resolved
Hide resolved
import kotlin.coroutines.EmptyCoroutineContext | ||
import kotlin.coroutines.startCoroutine | ||
|
||
class CallbackTest : StreamSpec(iterations = 250, spec = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a test for when we emit elements after calling end()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another potential test case could be if we emit from a separate coroutine. And also if we emit from a separate coroutine after the stream has ended.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you emit after end
it's offloaded in the Queue
but never emitted downstream. Would you expect an exception here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally, in the future we can have restrictions by the compiler. I started a chat about this on the KotlinLang Slack.
I guess throwing an exception at the calling site may make sense... As consumers have probably removed their subscription after the end signal. On other coroutine APIs they do emit and offer to differentiate if the consumer cares or not about the success of the push.
We could have a 3rd one that returns a result or either... Maybe we can have this as a separate ticket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved here: #239
interface EmitterSyntax<A> { | ||
fun emit(a: A): Unit | ||
fun emit(chunk: Chunk<A>): Unit | ||
fun emit(iterable: Iterable<A>): Unit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we include an option for emitting a Stream<A>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could potentially emit with two strategies:
- emit: waits for the end of the stream before continuing
- emitPar: pushes the stream up in parallel
The end result would be similar to concatenate and merge operations respectively
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe one could achieve the same by just combining that possible Stream with the callback itself, no?
Stream.callback { ... }.merge(Stream.xxx)
Stream.callback { ... }.flatMap(Stream.xxx).parJoinUnbounded()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's ok if we have knowledge of the other stream outside. My thinking was if we require to do conditions inside the callback...
So, depending on the value we get from a given callback we could start one or another.
One use case I can think of for the secuencial emit could be, if we have a lifecycle callback that generates a new stream of user events each time, ending the previous one. This way we could generate a seamless continuous stream of user events.
As for the parallel emit we could have something that listens to multiple UI elements (like selectable items) and would mean we can combine them all into a single stream.
Not a very good example that last one as it could be done as a merge...
Let's create a separate ticket and I'll investigate more useful use cases or drop it if we find it not that worth it :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved here: #240
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unblock
val cancel = Promise<CancelToken>() | ||
|
||
Stream.bracketCase({ | ||
ForkAndForget { emitterCallback(f, cancel, error, q) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nomisRev is there a reason on why not using ForkConnected
here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The acquire step of bracket
runs in an uncancellable
manner, so there is no cancellation to hook into.
So here instead we acquire the Fiber
as a resource, and we cancel
it when our interruptable stream finishes.
I'll merge on green and solve any leftover questions/tickets separately |
Thanks @pablisco for the tickets!! |
Also fixes a couple of nits found in docs