-
Notifications
You must be signed in to change notification settings - Fork 341
Stream::merge does not end prematurely if one stream is delayed #437
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Question (unrelated to PR mostly): is it a contract of enum MergeState {
Both(L, R),
Left(L),
Right(R),
} |
086ac1e
to
98022e5
Compare
tests/stream.rs
Outdated
}); | ||
|
||
task::block_on(async move { | ||
task::sleep(std::time::Duration::from_millis(5)).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this sleep have any significance? Perhaps it'd be safer to pick a larger duration, maybe 500ms?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
without this sleep, the test can pass by accident, as we would not poll a Poll::Pending
out of the channel. Basically, we need to ensure that the t
tasks is polled before we do a send via channel. Bumped the dealay to 500ms: we can trivially dial it back if we find test-suite to be too long!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be more precise, the unfortunate timing here can't lead to false positive: if the code is correct, the test will always pass. However, timing can lead to a false-negative, where a buggy code would pass the test.
I've considered making this deterministic by writing a stupid Stream
which answers with Pending
the first time it is polled and with Ready(Some(92))
the second time. In the end, I've decided to go with channels + sleep to simulate a similar effect: writing a stream impl manually can easily introduce errors of it's own.
Hm, judging by the |
Took some time to think about this, and you're entirely right. Phew, you just caught a whole lot of bugs here. Thanks so much for this patch! |
@yoshuawuyts i’ve even found one (debatable though) in std::iter::Chain: rust-lang/rust#66031 as a result of yesterday’s thinking about cancellation. Still, haven’t figured our cancellation itself :D |
added fusing as well |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, very happy with this :)
r? @yoshuawuyts
The bug was that if
left
returnedPoll::Pending
, the right could have returnrfPoll::Ready(None)
, and we would consider the stream as finished, although we haven't really polled the left part thoroughly enough.I also don't understand why we need
cx.waker().wake_by_ref();
here: we are returningPoll::Ready
, and callingwake_by_ref
is only required if you returnPending
?