diff --git a/src/future/futures_ext.rs b/src/future/futures_ext.rs new file mode 100644 index 0000000..75fc6ae --- /dev/null +++ b/src/future/futures_ext.rs @@ -0,0 +1,43 @@ +use crate::future::Join; +use crate::future::Race; +use futures_core::Future; +use std::future::IntoFuture; + +use super::join::tuple::Join2; +use super::race::tuple::Race2; + +/// An extension trait for the `Future` trait. +pub trait FutureExt: Future { + /// Wait for both futures to complete. + fn join(self, other: S2) -> Join2 + where + Self: Future + Sized, + S2: IntoFuture; + + /// Wait for the first future to complete. + fn race(self, other: S2) -> Race2 + where + Self: Future + Sized, + S2: IntoFuture; +} + +impl FutureExt for F1 +where + F1: Future, +{ + fn join(self, other: F2) -> Join2 + where + Self: Future + Sized, + F2: IntoFuture, + { + Join::join((self, other)) + } + + fn race(self, other: S2) -> Race2 + where + Self: Future + Sized, + S2: IntoFuture, + { + Race::race((self, other)) + } +} diff --git a/src/future/mod.rs b/src/future/mod.rs index f590be8..b41c17f 100644 --- a/src/future/mod.rs +++ b/src/future/mod.rs @@ -68,11 +68,13 @@ //! - `future::RaceOk`: wait for the first _successful_ future in the set to //! complete, or return an `Err` if *no* futures complete successfully. //! +pub use futures_ext::FutureExt; pub use join::Join; pub use race::Race; pub use race_ok::RaceOk; pub use try_join::TryJoin; +mod futures_ext; pub(crate) mod join; pub(crate) mod race; pub(crate) mod race_ok; diff --git a/src/lib.rs b/src/lib.rs index 89c1860..6f1d5ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,9 @@ mod utils; /// The futures concurrency prelude. pub mod prelude { + pub use super::future::FutureExt as _; + pub use super::stream::StreamExt as _; + pub use super::future::Join as _; pub use super::future::Race as _; pub use super::future::RaceOk as _; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 74140a6..ef6a374 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -50,9 +50,11 @@ pub use chain::Chain; pub use into_stream::IntoStream; pub use merge::Merge; +pub use stream_ext::StreamExt; pub use zip::Zip; pub(crate) mod chain; mod into_stream; pub(crate) mod merge; +mod stream_ext; pub(crate) mod zip; diff --git a/src/stream/stream_ext.rs b/src/stream/stream_ext.rs new file mode 100644 index 0000000..3321735 --- /dev/null +++ b/src/stream/stream_ext.rs @@ -0,0 +1,56 @@ +use crate::stream::{IntoStream, Merge}; +use futures_core::Stream; + +use super::{chain::tuple::Chain2, merge::tuple::Merge2, zip::tuple::Zip2, Chain, Zip}; + +/// An extension trait for the `Stream` trait. +pub trait StreamExt: Stream { + /// Combines two streams into a single stream of all their outputs. + fn merge(self, other: S2) -> Merge2 + where + Self: Stream + Sized, + S2: IntoStream; + + /// Takes two streams and creates a new stream over all in sequence + fn chain(self, other: S2) -> Chain2 + where + Self: Stream + Sized, + S2: IntoStream; + + /// ‘Zips up’ multiple streams into a single stream of pairs. + fn zip(self, other: S2) -> Zip2 + where + Self: Stream + Sized, + S2: IntoStream; +} + +impl StreamExt for S1 +where + S1: Stream, +{ + fn merge(self, other: S2) -> Merge2 + where + S1: Stream, + S2: IntoStream, + { + Merge::merge((self, other)) + } + + fn chain(self, other: S2) -> Chain2 + where + Self: Stream + Sized, + S2: IntoStream, + { + // TODO(yosh): fix the bounds on the tuple impl + Chain::chain((self, other.into_stream())) + } + + fn zip(self, other: S2) -> Zip2 + where + Self: Stream + Sized, + S2: IntoStream, + { + // TODO(yosh): fix the bounds on the tuple impl + Zip::zip((self, other.into_stream())) + } +}