Skip to content
This repository has been archived by the owner on Feb 24, 2021. It is now read-only.

Adds Stream.callback #199

Merged
merged 34 commits into from
Jul 31, 2020
Merged

Adds Stream.callback #199

merged 34 commits into from
Jul 31, 2020

Conversation

aballano
Copy link
Member

Also fixes a couple of nits found in docs

aballano added 2 commits June 26, 2020 13:12
Also fixes a couple of nits found in docs
@JorgeCastilloPrz
Copy link
Member

Some Ktlint warnings

Copy link
Member

@nomisRev nomisRev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Emitting empty chunks is broken.

@aballano aballano changed the title Adds callbackStream Adds Stream.async Jun 26, 2020
Copy link
Member

@nomisRev nomisRev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still a bunch of open unanswered review remarks from before.

@aballano aballano changed the title Adds Stream.async Adds Stream.callback Jul 20, 2020
# Conflicts:
#	arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/StreamTest.kt
aballano added 2 commits July 21, 2020 11:57
This reverts commit e8c0100.

# Conflicts:
#	arrow-fx-coroutines/src/main/kotlin/arrow/fx/coroutines/stream/callbacks.kt
#	arrow-fx-coroutines/src/test/kotlin/arrow/fx/coroutines/stream/ops/CallbackTest.kt
}
}

"can cancel never" {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would need to refactor this test somehow, any idea? cc @nomisRev

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @aballano,
Sorry for the long wait :/ I aligned the semantics of Stream.cancellable to cancellable as discussed, and also tried to update the documentation and examples to clarify how these functions work.

@pablisco pablisco self-requested a review July 23, 2020 00:32
Copy link
Contributor

@pablisco pablisco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job 💯 Just a couple of thoughts/comments

import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine

class CallbackTest : StreamSpec(iterations = 250, spec = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a test for when we emit elements after calling end()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another potential test case could be if we emit from a separate coroutine. And also if we emit from a separate coroutine after the stream has ended.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you emit after end it's offloaded in the Queue but never emitted downstream. Would you expect an exception here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, in the future we can have restrictions by the compiler. I started a chat about this on the KotlinLang Slack.

I guess throwing an exception at the calling site may make sense... As consumers have probably removed their subscription after the end signal. On other coroutine APIs they do emit and offer to differentiate if the consumer cares or not about the success of the push.

We could have a 3rd one that returns a result or either... Maybe we can have this as a separate ticket

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved here: #239

interface EmitterSyntax<A> {
fun emit(a: A): Unit
fun emit(chunk: Chunk<A>): Unit
fun emit(iterable: Iterable<A>): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include an option for emitting a Stream<A>?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could potentially emit with two strategies:

  • emit: waits for the end of the stream before continuing
  • emitPar: pushes the stream up in parallel

The end result would be similar to concatenate and merge operations respectively

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe one could achieve the same by just combining that possible Stream with the callback itself, no?

  • Stream.callback { ... }.merge(Stream.xxx)
  • Stream.callback { ... }.flatMap(Stream.xxx).parJoinUnbounded()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's ok if we have knowledge of the other stream outside. My thinking was if we require to do conditions inside the callback...
So, depending on the value we get from a given callback we could start one or another.

One use case I can think of for the secuencial emit could be, if we have a lifecycle callback that generates a new stream of user events each time, ending the previous one. This way we could generate a seamless continuous stream of user events.

As for the parallel emit we could have something that listens to multiple UI elements (like selectable items) and would mean we can combine them all into a single stream.

Not a very good example that last one as it could be done as a merge...

Let's create a separate ticket and I'll investigate more useful use cases or drop it if we find it not that worth it :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved here: #240

Copy link
Member

@nomisRev nomisRev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unblock

val cancel = Promise<CancelToken>()

Stream.bracketCase({
ForkAndForget { emitterCallback(f, cancel, error, q) }
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nomisRev is there a reason on why not using ForkConnected here instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The acquire step of bracket runs in an uncancellable manner, so there is no cancellation to hook into.
So here instead we acquire the Fiber as a resource, and we cancel it when our interruptable stream finishes.

@aballano
Copy link
Member Author

I'll merge on green and solve any leftover questions/tickets separately

@aballano aballano merged commit 1eb44b5 into master Jul 31, 2020
@aballano aballano deleted the ab/callbackStream branch July 31, 2020 07:13
@nomisRev
Copy link
Member

nomisRev commented Aug 3, 2020

Thanks @pablisco for the tickets!!

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants