Skip to content

Stream::merge does not end prematurely if one stream is delayed #437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 commits merged into from
Nov 2, 2019

Conversation

matklad
Copy link
Member

@matklad matklad commented Nov 1, 2019

r? @yoshuawuyts

The bug was that if left returned Poll::Pending, the right could have returnrf Poll::Ready(None), and we would consider the stream as finished, although we haven't really polled the left part thoroughly enough.

I also don't understand why we need cx.waker().wake_by_ref(); here: we are returning Poll::Ready, and calling wake_by_ref is only required if you return Pending?

@matklad
Copy link
Member Author

matklad commented Nov 1, 2019

Question (unrelated to PR mostly): is it a contract of Stream trait that, after returning Poll::Ready(None) once, all subsequent calls to poll_next should return Poll::Ready(None) as well? I think both initial and this version of merge assume this behavior, but it is not documented. I think merge doesn't neccaary have to require this contract from substreams, if we change it's internal state to

enum MergeState {
    Both(L, R),
    Left(L),
    Right(R),
}

@matklad matklad force-pushed the fix-merge branch 2 times, most recently from 086ac1e to 98022e5 Compare November 1, 2019 18:44
tests/stream.rs Outdated
});

task::block_on(async move {
task::sleep(std::time::Duration::from_millis(5)).await;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this sleep have any significance? Perhaps it'd be safer to pick a larger duration, maybe 500ms?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without this sleep, the test can pass by accident, as we would not poll a Poll::Pending out of the channel. Basically, we need to ensure that the t tasks is polled before we do a send via channel. Bumped the dealay to 500ms: we can trivially dial it back if we find test-suite to be too long!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be more precise, the unfortunate timing here can't lead to false positive: if the code is correct, the test will always pass. However, timing can lead to a false-negative, where a buggy code would pass the test.

I've considered making this deterministic by writing a stupid Stream which answers with Pending the first time it is polled and with Ready(Some(92)) the second time. In the end, I've decided to go with channels + sleep to simulate a similar effect: writing a stream impl manually can easily introduce errors of it's own.

@matklad
Copy link
Member Author

matklad commented Nov 1, 2019

Hm, judging by the StreamExt::fuse method, we don't actually have this guarantee. so we should call .fuse ourselves int the Merge ctor

@yoshuawuyts
Copy link
Contributor

Hm, judging by the StreamExt::fuse method, we don't actually have this guarantee. so we should call .fuse ourselves int the Merge ctor

Took some time to think about this, and you're entirely right. Phew, you just caught a whole lot of bugs here. Thanks so much for this patch!

@matklad
Copy link
Member Author

matklad commented Nov 2, 2019

@yoshuawuyts i’ve even found one (debatable though) in std::iter::Chain: rust-lang/rust#66031 as a result of yesterday’s thinking about cancellation. Still, haven’t figured our cancellation itself :D

@matklad
Copy link
Member Author

matklad commented Nov 2, 2019

added fusing as well

Copy link
Contributor

@yoshuawuyts yoshuawuyts left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks good to me, thanks so much @matklad!

@stjepang it seems your concerns have been addressed too, can you verify this is the case?

Copy link

@ghost ghost left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, very happy with this :)

@ghost ghost merged commit fa91d7f into async-rs:master Nov 2, 2019
@matklad matklad deleted the fix-merge branch November 3, 2019 07:32
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants