Skip to content

Commit 49189ff

Browse files
authored
Merge pull request #73 from yoshuawuyts/zip-and-chain-traits
Init zip and chain traits
2 parents c13f81e + 9037628 commit 49189ff

File tree

12 files changed

+644
-6
lines changed

12 files changed

+644
-6
lines changed

src/lib.rs

+6
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,10 @@ pub mod prelude {
5757
pub use super::future::Race as _;
5858
pub use super::future::RaceOk as _;
5959
pub use super::future::TryJoin as _;
60+
pub use super::stream::Chain as _;
6061
pub use super::stream::IntoStream as _;
6162
pub use super::stream::Merge as _;
63+
pub use super::stream::Zip as _;
6264
}
6365

6466
pub mod future;
@@ -70,13 +72,17 @@ pub mod array {
7072
pub use crate::future::race::array::Race;
7173
pub use crate::future::race_ok::array::{AggregateError, RaceOk};
7274
pub use crate::future::try_join::array::TryJoin;
75+
pub use crate::stream::chain::array::Chain;
7376
pub use crate::stream::merge::array::Merge;
77+
pub use crate::stream::zip::array::Zip;
7478
}
7579
/// A contiguous growable array type with heap-allocated contents, written `Vec<T>`.
7680
pub mod vec {
7781
pub use crate::future::join::vec::Join;
7882
pub use crate::future::race::vec::Race;
7983
pub use crate::future::race_ok::vec::{AggregateError, RaceOk};
8084
pub use crate::future::try_join::vec::TryJoin;
85+
pub use crate::stream::chain::vec::Chain;
8186
pub use crate::stream::merge::vec::Merge;
87+
pub use crate::stream::zip::vec::Zip;
8288
}

src/stream/chain/array.rs

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use core::task::{Context, Poll};
4+
5+
use futures_core::Stream;
6+
use pin_project::pin_project;
7+
8+
use crate::utils;
9+
10+
use super::Chain as ChainTrait;
11+
12+
/// A stream that chains multiple streams one after another.
13+
///
14+
/// This `struct` is created by the [`chain`] method on the [`Chain`] trait. See its
15+
/// documentation for more.
16+
///
17+
/// [`chain`]: trait.Chain.html#method.merge
18+
/// [`Chain`]: trait.Chain.html
19+
#[pin_project]
20+
pub struct Chain<S, const N: usize> {
21+
#[pin]
22+
streams: [S; N],
23+
index: usize,
24+
len: usize,
25+
done: bool,
26+
}
27+
28+
impl<S: Stream, const N: usize> Stream for Chain<S, N> {
29+
type Item = S::Item;
30+
31+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32+
let mut this = self.project();
33+
34+
assert!(!*this.done, "Stream should not be polled after completion");
35+
36+
loop {
37+
if this.index == this.len {
38+
*this.done = true;
39+
return Poll::Ready(None);
40+
}
41+
let stream = utils::iter_pin_mut(this.streams.as_mut())
42+
.nth(*this.index)
43+
.unwrap();
44+
match stream.poll_next(cx) {
45+
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
46+
Poll::Ready(None) => {
47+
*this.index += 1;
48+
continue;
49+
}
50+
Poll::Pending => return Poll::Pending,
51+
}
52+
}
53+
}
54+
}
55+
56+
impl<S, const N: usize> fmt::Debug for Chain<S, N>
57+
where
58+
S: Stream + fmt::Debug,
59+
{
60+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61+
f.debug_list().entries(self.streams.iter()).finish()
62+
}
63+
}
64+
65+
impl<S: Stream, const N: usize> ChainTrait for [S; N] {
66+
type Item = S::Item;
67+
68+
type Stream = Chain<S, N>;
69+
70+
fn chain(self) -> Self::Stream {
71+
Chain {
72+
len: self.len(),
73+
streams: self,
74+
index: 0,
75+
done: false,
76+
}
77+
}
78+
}
79+
80+
#[cfg(test)]
81+
mod tests {
82+
use super::*;
83+
use futures_lite::future::block_on;
84+
use futures_lite::prelude::*;
85+
use futures_lite::stream;
86+
87+
#[test]
88+
fn chain_3() {
89+
block_on(async {
90+
let a = stream::once(1);
91+
let b = stream::once(2);
92+
let c = stream::once(3);
93+
let mut s = [a, b, c].chain();
94+
95+
assert_eq!(s.next().await, Some(1));
96+
assert_eq!(s.next().await, Some(2));
97+
assert_eq!(s.next().await, Some(3));
98+
assert_eq!(s.next().await, None);
99+
})
100+
}
101+
}

src/stream/chain/mod.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use futures_core::Stream;
2+
3+
pub(crate) mod array;
4+
pub(crate) mod tuple;
5+
pub(crate) mod vec;
6+
7+
/// Takes multiple streams and creates a new stream over all in sequence.
8+
pub trait Chain {
9+
/// What's the return type of our stream?
10+
type Item;
11+
12+
/// What stream do we return?
13+
type Stream: Stream<Item = Self::Item>;
14+
15+
/// Combine multiple streams into a single stream.
16+
fn chain(self) -> Self::Stream;
17+
}

src/stream/chain/tuple.rs

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+

src/stream/chain/vec.rs

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use core::fmt;
2+
use core::pin::Pin;
3+
use core::task::{Context, Poll};
4+
5+
use futures_core::Stream;
6+
use pin_project::pin_project;
7+
8+
use crate::utils;
9+
10+
use super::Chain as ChainTrait;
11+
12+
/// A stream that chains multiple streams one after another.
13+
///
14+
/// This `struct` is created by the [`chain`] method on the [`Chain`] trait. See its
15+
/// documentation for more.
16+
///
17+
/// [`chain`]: trait.Chain.html#method.merge
18+
/// [`Chain`]: trait.Chain.html
19+
#[pin_project]
20+
pub struct Chain<S> {
21+
#[pin]
22+
streams: Vec<S>,
23+
index: usize,
24+
len: usize,
25+
done: bool,
26+
}
27+
28+
impl<S: Stream> Stream for Chain<S> {
29+
type Item = S::Item;
30+
31+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32+
let mut this = self.project();
33+
34+
assert!(!*this.done, "Stream should not be polled after completion");
35+
36+
loop {
37+
if this.index == this.len {
38+
*this.done = true;
39+
return Poll::Ready(None);
40+
}
41+
let stream = utils::iter_pin_mut_vec(this.streams.as_mut())
42+
.nth(*this.index)
43+
.unwrap();
44+
match stream.poll_next(cx) {
45+
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
46+
Poll::Ready(None) => {
47+
*this.index += 1;
48+
continue;
49+
}
50+
Poll::Pending => return Poll::Pending,
51+
}
52+
}
53+
}
54+
}
55+
56+
impl<S> fmt::Debug for Chain<S>
57+
where
58+
S: Stream + fmt::Debug,
59+
{
60+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
61+
f.debug_list().entries(self.streams.iter()).finish()
62+
}
63+
}
64+
65+
impl<S: Stream> ChainTrait for Vec<S> {
66+
type Item = S::Item;
67+
68+
type Stream = Chain<S>;
69+
70+
fn chain(self) -> Self::Stream {
71+
Chain {
72+
len: self.len(),
73+
streams: self,
74+
index: 0,
75+
done: false,
76+
}
77+
}
78+
}
79+
80+
#[cfg(test)]
81+
mod tests {
82+
use super::*;
83+
use futures_lite::future::block_on;
84+
use futures_lite::prelude::*;
85+
use futures_lite::stream;
86+
87+
#[test]
88+
fn chain_3() {
89+
block_on(async {
90+
let a = stream::once(1);
91+
let b = stream::once(2);
92+
let c = stream::once(3);
93+
let mut s = vec![a, b, c].chain();
94+
95+
assert_eq!(s.next().await, Some(1));
96+
assert_eq!(s.next().await, Some(2));
97+
assert_eq!(s.next().await, Some(3));
98+
assert_eq!(s.next().await, None);
99+
})
100+
}
101+
}

src/stream/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,12 @@
4747
//!
4848
//! See the [future concurrency][crate::future#concurrency] documentation for
4949
//! more on futures concurrency.
50+
pub use chain::Chain;
5051
pub use into_stream::IntoStream;
5152
pub use merge::Merge;
53+
pub use zip::Zip;
5254

55+
pub(crate) mod chain;
5356
mod into_stream;
5457
pub(crate) mod merge;
58+
pub(crate) mod zip;

0 commit comments

Comments
 (0)