Skip to content

Commit 06d19b1

Browse files
committed
Stream::merge does not end prematurely if one stream is delayed
1 parent d5fd035 commit 06d19b1

File tree

2 files changed

+59
-7
lines changed

2 files changed

+59
-7
lines changed

src/stream/stream/merge.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@ where
3838

3939
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4040
let this = self.project();
41-
if let Poll::Ready(Some(item)) = this.left.poll_next(cx) {
42-
// The first stream made progress. The Merge needs to be polled
43-
// again to check the progress of the second stream.
44-
cx.waker().wake_by_ref();
45-
Poll::Ready(Some(item))
46-
} else {
47-
this.right.poll_next(cx)
41+
match this.left.poll_next(cx) {
42+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
43+
Poll::Ready(None) => this.right.poll_next(cx),
44+
Poll::Pending => match this.right.poll_next(cx) {
45+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
46+
Poll::Ready(None) => Poll::Pending,
47+
Poll::Pending => Poll::Pending,
48+
}
4849
}
4950
}
5051
}

tests/stream.rs

+51
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use async_std::prelude::*;
2+
use async_std::stream;
3+
use async_std::sync::channel;
4+
use async_std::task;
5+
6+
#[test]
7+
/// Checks that streams are merged fully even if one of the components
8+
/// experiences delay.
9+
fn merging_delayed_streams_work() {
10+
let (sender, receiver) = channel::<i32>(10);
11+
12+
// Delay the left stream.
13+
let s1 = receiver.chain(stream::once(92));
14+
let s2 = stream::empty();
15+
let mut s = s1.merge(s2);
16+
let t1 = task::spawn(async move {
17+
let mut xs = Vec::new();
18+
while let Some(x) = s.next().await {
19+
xs.push(x);
20+
}
21+
xs
22+
});
23+
24+
task::block_on(async move {
25+
task::sleep(std::time::Duration::from_millis(5)).await;
26+
drop(sender);
27+
let xs = t1.await;
28+
assert_eq!(xs, vec![92])
29+
});
30+
31+
let (sender, receiver) = channel::<i32>(10);
32+
33+
let s1 = stream::empty();
34+
// Delay the right stream.
35+
let s2 = receiver.chain(stream::once(92));
36+
let mut s = s1.merge(s2);
37+
let t1 = task::spawn(async move {
38+
let mut xs = Vec::new();
39+
while let Some(x) = s.next().await {
40+
xs.push(x);
41+
}
42+
xs
43+
});
44+
45+
task::block_on(async move {
46+
task::sleep(std::time::Duration::from_millis(5)).await;
47+
drop(sender);
48+
let xs = t1.await;
49+
assert_eq!(xs, vec![92])
50+
});
51+
}

0 commit comments

Comments
 (0)