Skip to content

Commit 0805eac

Browse files
taiki-ecramertj
authored andcommitted
Add TryStreamExt::try_flatten (#1731)
* Add TryStreamExt::try_flatten * Minor clean up StreamExt::flatten
1 parent a32ea8e commit 0805eac

File tree

4 files changed

+209
-23
lines changed

4 files changed

+209
-23
lines changed

futures-util/src/stream/flatten.rs

+32-22
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,35 @@ use pin_utils::unsafe_pinned;
99
#[derive(Debug)]
1010
#[must_use = "streams do nothing unless polled"]
1111
pub struct Flatten<St>
12-
where St: Stream,
12+
where
13+
St: Stream,
1314
{
1415
stream: St,
1516
next: Option<St::Item>,
1617
}
1718

18-
impl<St: Stream> Unpin for Flatten<St>
19-
where St: Stream + Unpin,
20-
St::Item: Stream + Unpin,
21-
{}
19+
impl<St> Unpin for Flatten<St>
20+
where
21+
St: Stream + Unpin,
22+
St::Item: Unpin,
23+
{
24+
}
2225

23-
impl<St: Stream> Flatten<St>
24-
where St: Stream,
25-
St::Item: Stream,
26+
impl<St> Flatten<St>
27+
where
28+
St: Stream,
2629
{
2730
unsafe_pinned!(stream: St);
2831
unsafe_pinned!(next: Option<St::Item>);
32+
}
2933

30-
pub(super) fn new(stream: St) -> Flatten<St>{
31-
Flatten { stream, next: None, }
34+
impl<St> Flatten<St>
35+
where
36+
St: Stream,
37+
St::Item: Stream,
38+
{
39+
pub(super) fn new(stream: St) -> Self {
40+
Self { stream, next: None }
3241
}
3342

3443
/// Acquires a reference to the underlying stream that this combinator is
@@ -64,32 +73,33 @@ where St: Stream,
6473
}
6574
}
6675

67-
impl<St: Stream + FusedStream> FusedStream for Flatten<St> {
76+
impl<St> FusedStream for Flatten<St>
77+
where
78+
St: Stream + FusedStream,
79+
{
6880
fn is_terminated(&self) -> bool {
6981
self.next.is_none() && self.stream.is_terminated()
7082
}
7183
}
7284

7385
impl<St> Stream for Flatten<St>
74-
where St: Stream,
75-
St::Item: Stream,
86+
where
87+
St: Stream,
88+
St::Item: Stream,
7689
{
7790
type Item = <St::Item as Stream>::Item;
7891

79-
fn poll_next(
80-
mut self: Pin<&mut Self>,
81-
cx: &mut Context<'_>,
82-
) -> Poll<Option<Self::Item>> {
92+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
8393
loop {
8494
if self.next.is_none() {
8595
match ready!(self.as_mut().stream().poll_next(cx)) {
8696
Some(e) => self.as_mut().next().set(Some(e)),
8797
None => return Poll::Ready(None),
8898
}
8999
}
90-
let item = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx));
91-
if item.is_some() {
92-
return Poll::Ready(item);
100+
101+
if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) {
102+
return Poll::Ready(Some(item));
93103
} else {
94104
self.as_mut().next().set(None);
95105
}
@@ -100,8 +110,8 @@ impl<St> Stream for Flatten<St>
100110
// Forwarding impl of Sink from the underlying stream
101111
#[cfg(feature = "sink")]
102112
impl<S, Item> Sink<Item> for Flatten<S>
103-
where S: Stream + Sink<Item>,
104-
S::Item: Stream,
113+
where
114+
S: Stream + Sink<Item>,
105115
{
106116
type Error = S::Error;
107117

futures-util/src/try_stream/mod.rs

+49
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ pub use self::try_filter::TryFilter;
4747
mod try_filter_map;
4848
pub use self::try_filter_map::TryFilterMap;
4949

50+
mod try_flatten;
51+
pub use self::try_flatten::TryFlatten;
52+
5053
mod try_collect;
5154
pub use self::try_collect::TryCollect;
5255

@@ -556,6 +559,52 @@ pub trait TryStreamExt: TryStream {
556559
TryFilterMap::new(self, f)
557560
}
558561

562+
/// Flattens a stream of streams into just one continuous stream.
563+
///
564+
/// If this stream's elements are themselves streams then this combinator
565+
/// will flatten out the entire stream to one long chain of elements. Any
566+
/// errors are passed through without looking at them, but otherwise each
567+
/// individual stream will get exhausted before moving on to the next.
568+
///
569+
/// # Examples
570+
///
571+
/// ```
572+
/// #![feature(async_await)]
573+
/// # futures::executor::block_on(async {
574+
/// use futures::channel::mpsc;
575+
/// use futures::stream::{StreamExt, TryStreamExt};
576+
/// use std::thread;
577+
///
578+
/// let (tx1, rx1) = mpsc::unbounded();
579+
/// let (tx2, rx2) = mpsc::unbounded();
580+
/// let (tx3, rx3) = mpsc::unbounded();
581+
///
582+
/// thread::spawn(move || {
583+
/// tx1.unbounded_send(Ok(1)).unwrap();
584+
/// });
585+
/// thread::spawn(move || {
586+
/// tx2.unbounded_send(Ok(2)).unwrap();
587+
/// tx2.unbounded_send(Err(3)).unwrap();
588+
/// });
589+
/// thread::spawn(move || {
590+
/// tx3.unbounded_send(Ok(rx1)).unwrap();
591+
/// tx3.unbounded_send(Ok(rx2)).unwrap();
592+
/// tx3.unbounded_send(Err(4)).unwrap();
593+
/// });
594+
///
595+
/// let mut stream = rx3.try_flatten();
596+
/// assert_eq!(stream.next().await, Some(Ok(1)));
597+
/// assert_eq!(stream.next().await, Some(Ok(2)));
598+
/// assert_eq!(stream.next().await, Some(Err(3)));
599+
/// # });
600+
/// ```
601+
fn try_flatten(self) -> TryFlatten<Self>
602+
where Self::Ok: TryStream,
603+
<Self::Ok as TryStream>::Error: From<Self::Error>,
604+
Self: Sized,
605+
{
606+
TryFlatten::new(self)
607+
}
559608

560609
/// Attempt to execute an accumulating asynchronous computation over a
561610
/// stream, collecting all the values into one final result.
+127
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
use core::pin::Pin;
2+
use futures_core::stream::{FusedStream, Stream, TryStream};
3+
use futures_core::task::{Context, Poll};
4+
#[cfg(feature = "sink")]
5+
use futures_sink::Sink;
6+
use pin_utils::unsafe_pinned;
7+
8+
/// Stream for the [`try_flatten`](super::TryStreamExt::try_flatten) method.
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct TryFlatten<St>
12+
where
13+
St: TryStream,
14+
{
15+
stream: St,
16+
next: Option<St::Ok>,
17+
}
18+
19+
impl<St> Unpin for TryFlatten<St>
20+
where
21+
St: TryStream + Unpin,
22+
St::Ok: Unpin,
23+
{
24+
}
25+
26+
impl<St> TryFlatten<St>
27+
where
28+
St: TryStream,
29+
{
30+
unsafe_pinned!(stream: St);
31+
unsafe_pinned!(next: Option<St::Ok>);
32+
}
33+
34+
impl<St> TryFlatten<St>
35+
where
36+
St: TryStream,
37+
St::Ok: TryStream,
38+
<St::Ok as TryStream>::Error: From<St::Error>,
39+
{
40+
pub(super) fn new(stream: St) -> Self {
41+
Self { stream, next: None }
42+
}
43+
44+
/// Acquires a reference to the underlying stream that this combinator is
45+
/// pulling from.
46+
pub fn get_ref(&self) -> &St {
47+
&self.stream
48+
}
49+
50+
/// Acquires a mutable reference to the underlying stream that this
51+
/// combinator is pulling from.
52+
///
53+
/// Note that care must be taken to avoid tampering with the state of the
54+
/// stream which may otherwise confuse this combinator.
55+
pub fn get_mut(&mut self) -> &mut St {
56+
&mut self.stream
57+
}
58+
59+
/// Acquires a pinned mutable reference to the underlying stream that this
60+
/// combinator is pulling from.
61+
///
62+
/// Note that care must be taken to avoid tampering with the state of the
63+
/// stream which may otherwise confuse this combinator.
64+
pub fn get_pin_mut<'a>(self: Pin<&'a mut Self>) -> Pin<&'a mut St> {
65+
self.stream()
66+
}
67+
68+
/// Consumes this combinator, returning the underlying stream.
69+
///
70+
/// Note that this may discard intermediate state of this combinator, so
71+
/// care should be taken to avoid losing resources when this is called.
72+
pub fn into_inner(self) -> St {
73+
self.stream
74+
}
75+
}
76+
77+
impl<St> FusedStream for TryFlatten<St>
78+
where
79+
St: TryStream + FusedStream,
80+
{
81+
fn is_terminated(&self) -> bool {
82+
self.next.is_none() && self.stream.is_terminated()
83+
}
84+
}
85+
86+
impl<St> Stream for TryFlatten<St>
87+
where
88+
St: TryStream,
89+
St::Ok: TryStream,
90+
<St::Ok as TryStream>::Error: From<St::Error>,
91+
{
92+
type Item = Result<<St::Ok as TryStream>::Ok, <St::Ok as TryStream>::Error>;
93+
94+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
95+
loop {
96+
if self.next.is_none() {
97+
match ready!(self.as_mut().stream().try_poll_next(cx)?) {
98+
Some(e) => self.as_mut().next().set(Some(e)),
99+
None => return Poll::Ready(None),
100+
}
101+
}
102+
103+
if let Some(item) = ready!(self
104+
.as_mut()
105+
.next()
106+
.as_pin_mut()
107+
.unwrap()
108+
.try_poll_next(cx)?)
109+
{
110+
return Poll::Ready(Some(Ok(item)));
111+
} else {
112+
self.as_mut().next().set(None);
113+
}
114+
}
115+
}
116+
}
117+
118+
// Forwarding impl of Sink from the underlying stream
119+
#[cfg(feature = "sink")]
120+
impl<S, Item> Sink<Item> for TryFlatten<S>
121+
where
122+
S: TryStream + Sink<Item>,
123+
{
124+
type Error = <S as Sink<Item>>::Error;
125+
126+
delegate_sink!(stream, Item);
127+
}

futures/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ pub mod stream {
449449
TryStreamExt,
450450
AndThen, ErrInto, MapOk, MapErr, OrElse,
451451
InspectOk, InspectErr,
452-
TryNext, TryForEach, TryFilterMap,
452+
TryNext, TryForEach, TryFilterMap, TryFlatten,
453453
TryCollect, TryFold, TrySkipWhile,
454454
IntoStream,
455455
};

0 commit comments

Comments
 (0)