Skip to content

Commit 3c3ab18

Browse files
committed
Stream::merge does not end prematurely if one stream is delayed
1 parent d5fd035 commit 3c3ab18

File tree

2 files changed

+55
-7
lines changed

2 files changed

+55
-7
lines changed

src/stream/stream/merge.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,14 @@ where
3838

3939
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4040
let this = self.project();
41-
if let Poll::Ready(Some(item)) = this.left.poll_next(cx) {
42-
// The first stream made progress. The Merge needs to be polled
43-
// again to check the progress of the second stream.
44-
cx.waker().wake_by_ref();
45-
Poll::Ready(Some(item))
46-
} else {
47-
this.right.poll_next(cx)
41+
match this.left.poll_next(cx) {
42+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
43+
Poll::Ready(None) => this.right.poll_next(cx),
44+
Poll::Pending => match this.right.poll_next(cx) {
45+
Poll::Ready(Some(item)) => Poll::Ready(Some(item)),
46+
Poll::Ready(None) => Poll::Pending,
47+
Poll::Pending => Poll::Pending,
48+
}
4849
}
4950
}
5051
}

tests/stream.rs

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
use async_std::prelude::*;
2+
use async_std::stream;
3+
use async_std::sync::channel;
4+
use async_std::task;
5+
6+
#[test]
7+
/// Checks that streams are merged fully even if one of the components
8+
/// experiences delay.
9+
fn merging_delayed_streams_work() {
10+
let (sender, receiver) = channel::<i32>(10);
11+
12+
let mut s = receiver.merge(stream::empty());
13+
let t = task::spawn(async move {
14+
let mut xs = Vec::new();
15+
while let Some(x) = s.next().await {
16+
xs.push(x);
17+
}
18+
xs
19+
});
20+
21+
task::block_on(async move {
22+
task::sleep(std::time::Duration::from_millis(500)).await;
23+
sender.send(92).await;
24+
drop(sender);
25+
let xs = t.await;
26+
assert_eq!(xs, vec![92])
27+
});
28+
29+
let (sender, receiver) = channel::<i32>(10);
30+
31+
let mut s = stream::empty().merge(receiver);
32+
let t = task::spawn(async move {
33+
let mut xs = Vec::new();
34+
while let Some(x) = s.next().await {
35+
xs.push(x);
36+
}
37+
xs
38+
});
39+
40+
task::block_on(async move {
41+
task::sleep(std::time::Duration::from_millis(500)).await;
42+
sender.send(92).await;
43+
drop(sender);
44+
let xs = t.await;
45+
assert_eq!(xs, vec![92])
46+
});
47+
}

0 commit comments

Comments
 (0)