Skip to content

Commit 00b449c

Browse files
committed
Add biased variants for newly fair select functions
1 parent b1152d1 commit 00b449c

File tree

7 files changed

+215
-14
lines changed

7 files changed

+215
-14
lines changed

futures-util/src/future/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ mod join_all;
8686
pub use self::join_all::{join_all, JoinAll};
8787

8888
mod select;
89-
pub use self::select::{select, Select};
89+
pub use self::select::{select, select_biased, Select};
9090

9191
#[cfg(feature = "alloc")]
9292
mod select_all;
@@ -102,12 +102,12 @@ mod try_join_all;
102102
pub use self::try_join_all::{try_join_all, TryJoinAll};
103103

104104
mod try_select;
105-
pub use self::try_select::{try_select, TrySelect};
105+
pub use self::try_select::{try_select, try_select_biased, TrySelect};
106106

107107
#[cfg(feature = "alloc")]
108108
mod select_ok;
109109
#[cfg(feature = "alloc")]
110-
pub use self::select_ok::{select_ok, SelectOk};
110+
pub use self::select_ok::{select_ok, select_ok_biased, SelectOk};
111111

112112
mod either;
113113
pub use self::either::Either;

futures-util/src/future/select.rs

+87-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use futures_core::task::{Context, Poll};
99
#[derive(Debug)]
1010
pub struct Select<A, B> {
1111
inner: Option<(A, B)>,
12+
_biased: bool,
1213
}
1314

1415
impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
@@ -23,7 +24,8 @@ impl<A: Unpin, B: Unpin> Unpin for Select<A, B> {}
2324
/// wrapped version of them.
2425
///
2526
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
26-
/// selected.
27+
/// selected, unless the std feature is not enabled. If std is enabled, the first
28+
/// argument will always win.
2729
///
2830
/// Also note that if both this and the second future have the same
2931
/// output type you can use the `Either::factor_first` method to
@@ -91,6 +93,88 @@ where
9193
{
9294
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
9395
inner: Some((future1, future2)),
96+
_biased: false,
97+
})
98+
}
99+
100+
/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one.
101+
///
102+
/// This function will return a new future which awaits for either one of both
103+
/// futures to complete. The returned future will finish with both the value
104+
/// resolved and a future representing the completion of the other work.
105+
///
106+
/// Note that this function consumes the receiving futures and returns a
107+
/// wrapped version of them.
108+
///
109+
/// If both futures are ready when this is polled, the winner will always be the first argument.
110+
///
111+
/// Also note that if both this and the second future have the same
112+
/// output type you can use the `Either::factor_first` method to
113+
/// conveniently extract out the value at the end.
114+
///
115+
/// # Examples
116+
///
117+
/// A simple example
118+
///
119+
/// ```
120+
/// # futures::executor::block_on(async {
121+
/// use futures::{
122+
/// pin_mut,
123+
/// future::Either,
124+
/// future::self,
125+
/// };
126+
///
127+
/// // These two futures have different types even though their outputs have the same type.
128+
/// let future1 = async {
129+
/// future::pending::<()>().await; // will never finish
130+
/// 1
131+
/// };
132+
/// let future2 = async {
133+
/// future::ready(2).await
134+
/// };
135+
///
136+
/// // 'select_biased' requires Future + Unpin bounds
137+
/// pin_mut!(future1);
138+
/// pin_mut!(future2);
139+
///
140+
/// let value = match future::select_biased(future1, future2).await {
141+
/// Either::Left((value1, _)) => value1, // `value1` is resolved from `future1`
142+
/// // `_` represents `future2`
143+
/// Either::Right((value2, _)) => value2, // `value2` is resolved from `future2`
144+
/// // `_` represents `future1`
145+
/// };
146+
///
147+
/// assert!(value == 2);
148+
/// # });
149+
/// ```
150+
///
151+
/// A more complex example
152+
///
153+
/// ```
154+
/// use futures::future::{self, Either, Future, FutureExt};
155+
///
156+
/// // A poor-man's join implemented on top of select
157+
///
158+
/// fn join<A, B>(a: A, b: B) -> impl Future<Output=(A::Output, B::Output)>
159+
/// where A: Future + Unpin,
160+
/// B: Future + Unpin,
161+
/// {
162+
/// future::select_biased(a, b).then(|either| {
163+
/// match either {
164+
/// Either::Left((x, b)) => b.map(move |y| (x, y)).left_future(),
165+
/// Either::Right((y, a)) => a.map(move |x| (x, y)).right_future(),
166+
/// }
167+
/// })
168+
/// }
169+
/// ```
170+
pub fn select_biased<A, B>(future1: A, future2: B) -> Select<A, B>
171+
where
172+
A: Future + Unpin,
173+
B: Future + Unpin,
174+
{
175+
assert_future::<Either<(A::Output, B), (B::Output, A)>, _>(Select {
176+
inner: Some((future1, future2)),
177+
_biased: true,
94178
})
95179
}
96180

@@ -111,6 +195,7 @@ where
111195
Some(value) => value,
112196
}
113197
}
198+
let _biased = self._biased;
114199

115200
let (a, b) = self.inner.as_mut().expect("cannot poll Select twice");
116201

@@ -123,7 +208,7 @@ where
123208
}
124209

125210
#[cfg(feature = "std")]
126-
if crate::gen_index(2) == 0 {
211+
if _biased || crate::gen_index(2) == 0 {
127212
poll_wrap!(a, unwrap_option(self.inner.take()).1, Either::Left);
128213
poll_wrap!(b, unwrap_option(self.inner.take()).0, Either::Right);
129214
} else {

futures-util/src/future/select_ok.rs

+36-4
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use futures_core::task::{Context, Poll};
1212
#[must_use = "futures do nothing unless you `.await` or poll them"]
1313
pub struct SelectOk<Fut> {
1414
inner: Vec<Fut>,
15+
_biased: bool,
1516
}
1617

1718
impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
@@ -35,7 +36,7 @@ impl<Fut: Unpin> Unpin for SelectOk<Fut> {}
3536
/// Some futures that would have been polled and had errors get dropped, may now instead
3637
/// remain in the collection without being polled.
3738
///
38-
/// If you were relying on this biased behavior, consider switching to the [`select_biased!`](crate::select_biased) macro.
39+
/// If you were relying on this biased behavior, consider switching to the [`select_ok_biased`] function.
3940
///
4041
/// # Panics
4142
///
@@ -45,7 +46,36 @@ where
4546
I: IntoIterator,
4647
I::Item: TryFuture + Unpin,
4748
{
48-
let ret = SelectOk { inner: iter.into_iter().collect() };
49+
let ret = SelectOk { inner: iter.into_iter().collect(), _biased: false };
50+
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
51+
assert_future::<
52+
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
53+
_,
54+
>(ret)
55+
}
56+
57+
/// Creates a new future which will select the first successful future over a list of futures.
58+
///
59+
/// The returned future will wait for any future within `iter` to be ready and Ok. Unlike
60+
/// `select_all`, this will only return the first successful completion, or the last
61+
/// failure. This is useful in contexts where any success is desired and failures
62+
/// are ignored, unless all the futures fail.
63+
///
64+
/// This function is only available when the `std` or `alloc` feature of this
65+
/// library is activated, and it is activated by default.
66+
///
67+
/// If multiple futures are ready at the same time this function is biased towards
68+
/// entries that are earlier in the list.
69+
///
70+
/// # Panics
71+
///
72+
/// This function will panic if the iterator specified contains no items.
73+
pub fn select_ok_biased<I>(iter: I) -> SelectOk<I::Item>
74+
where
75+
I: IntoIterator,
76+
I::Item: TryFuture + Unpin,
77+
{
78+
let ret = SelectOk { inner: iter.into_iter().collect(), _biased: true };
4979
assert!(!ret.inner.is_empty(), "iterator provided to select_ok was empty");
5080
assert_future::<
5181
Result<(<I::Item as TryFuture>::Ok, Vec<I::Item>), <I::Item as TryFuture>::Error>,
@@ -57,10 +87,12 @@ impl<Fut: TryFuture + Unpin> Future for SelectOk<Fut> {
5787
type Output = Result<(Fut::Ok, Vec<Fut>), Fut::Error>;
5888

5989
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
60-
let Self { inner } = &mut *self;
90+
let Self { inner, _biased } = &mut *self;
6191
#[cfg(feature = "std")]
6292
{
63-
crate::shuffle(inner);
93+
if !*_biased {
94+
crate::shuffle(inner);
95+
}
6496
}
6597
// loop until we've either exhausted all errors, a success was hit, or nothing is ready
6698
loop {

futures-util/src/future/try_select.rs

+53-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use futures_core::task::{Context, Poll};
88
#[derive(Debug)]
99
pub struct TrySelect<A, B> {
1010
inner: Option<(A, B)>,
11+
_biased: bool,
1112
}
1213

1314
impl<A: Unpin, B: Unpin> Unpin for TrySelect<A, B> {}
@@ -25,7 +26,8 @@ type EitherErr<A, B> = Either<(<A as TryFuture>::Error, B), (<B as TryFuture>::E
2526
/// wrapped version of them.
2627
///
2728
/// If both futures are ready when this is polled, the winner will be pseudo-randomly
28-
/// selected.
29+
/// selected, unless the `std` feature is disabled. If the std feature is disabled,
30+
/// the first argument will always win.
2931
///
3032
/// Also note that if both this and the second future have the same
3133
/// success/error type you can use the `Either::factor_first` method to
@@ -60,6 +62,55 @@ where
6062
{
6163
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
6264
inner: Some((future1, future2)),
65+
_biased: false,
66+
})
67+
}
68+
69+
/// Waits for either one of two differently-typed futures to complete, giving preferential treatment to the first one.
70+
///
71+
/// This function will return a new future which awaits for either one of both
72+
/// futures to complete. The returned future will finish with both the value
73+
/// resolved and a future representing the completion of the other work.
74+
///
75+
/// Note that this function consumes the receiving futures and returns a
76+
/// wrapped version of them.
77+
///
78+
/// If both futures are ready when this is polled, the winner will always be the first one.
79+
///
80+
/// Also note that if both this and the second future have the same
81+
/// success/error type you can use the `Either::factor_first` method to
82+
/// conveniently extract out the value at the end.
83+
///
84+
/// # Examples
85+
///
86+
/// ```
87+
/// use futures::future::{self, Either, Future, FutureExt, TryFuture, TryFutureExt};
88+
///
89+
/// // A poor-man's try_join implemented on top of select
90+
///
91+
/// fn try_join<A, B, E>(a: A, b: B) -> impl TryFuture<Ok=(A::Ok, B::Ok), Error=E>
92+
/// where A: TryFuture<Error = E> + Unpin + 'static,
93+
/// B: TryFuture<Error = E> + Unpin + 'static,
94+
/// E: 'static,
95+
/// {
96+
/// future::try_select_biased(a, b).then(|res| -> Box<dyn Future<Output = Result<_, _>> + Unpin> {
97+
/// match res {
98+
/// Ok(Either::Left((x, b))) => Box::new(b.map_ok(move |y| (x, y))),
99+
/// Ok(Either::Right((y, a))) => Box::new(a.map_ok(move |x| (x, y))),
100+
/// Err(Either::Left((e, _))) => Box::new(future::err(e)),
101+
/// Err(Either::Right((e, _))) => Box::new(future::err(e)),
102+
/// }
103+
/// })
104+
/// }
105+
/// ```
106+
pub fn try_select_biased<A, B>(future1: A, future2: B) -> TrySelect<A, B>
107+
where
108+
A: TryFuture + Unpin,
109+
B: TryFuture + Unpin,
110+
{
111+
super::assert_future::<Result<EitherOk<A, B>, EitherErr<A, B>>, _>(TrySelect {
112+
inner: Some((future1, future2)),
113+
_biased: true,
63114
})
64115
}
65116

@@ -91,7 +142,7 @@ where
91142

92143
#[cfg(feature = "std")]
93144
{
94-
if crate::gen_index(2) == 0 {
145+
if self._biased || crate::gen_index(2) == 0 {
95146
poll_wrap!(a, b, Either::Left, Either::Right)
96147
} else {
97148
poll_wrap!(b, a, Either::Right, Either::Left)

futures/tests/future_select.rs

+11-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::future::ready;
22

3-
use futures::future::select;
3+
use futures::future::{select, select_biased};
44
use futures_executor::block_on;
55

66
#[test]
@@ -14,3 +14,13 @@ fn is_fair() {
1414
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
1515
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
1616
}
17+
18+
#[test]
19+
fn is_biased() {
20+
let mut results = Vec::with_capacity(100);
21+
for _ in 0..100 {
22+
let (i, _) = block_on(select_biased(ready(0), ready(1))).factor_first();
23+
results.push(i);
24+
}
25+
assert!(results.iter().all(|i| *i == 0));
26+
}

futures/tests/future_select_ok.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::fmt::Debug;
22
use std::time::Duration;
33

44
use futures::executor::block_on;
5-
use futures::future::{err, ok, select_ok, Future};
5+
use futures::future::{err, ok, select_ok, select_ok_biased, Future};
66
use futures_channel::oneshot;
77
use std::thread;
88

@@ -65,3 +65,15 @@ fn is_fair() {
6565
assert_eq!(results.iter().filter(|i| **i == 3).take(THRESHOLD).count(), THRESHOLD);
6666
assert_eq!(results.iter().filter(|i| **i == 4).take(THRESHOLD).count(), THRESHOLD);
6767
}
68+
69+
#[test]
70+
fn is_biased() {
71+
let mut results = Vec::with_capacity(100);
72+
for _ in 0..100 {
73+
let v = vec![err(1), err(2), ok(3), ok(4)];
74+
75+
let (i, _v) = block_on(select_ok_biased(v)).ok().unwrap();
76+
results.push(i);
77+
}
78+
assert!(results.iter().all(|i| *i == 3));
79+
}

futures/tests/future_try_select.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures::future::{ok, try_select};
1+
use futures::future::{ok, try_select, try_select_biased};
22
use futures_executor::block_on;
33

44
#[test]
@@ -12,3 +12,14 @@ fn is_fair() {
1212
assert_eq!(results.iter().filter(|i| **i == 0).take(THRESHOLD).count(), THRESHOLD);
1313
assert_eq!(results.iter().filter(|i| **i == 1).take(THRESHOLD).count(), THRESHOLD)
1414
}
15+
16+
#[test]
17+
fn is_biased() {
18+
let mut results = Vec::with_capacity(100);
19+
for _ in 0..100 {
20+
let (i, _) =
21+
block_on(try_select_biased(ok::<_, ()>(0), ok::<_, ()>(1))).unwrap().factor_first();
22+
results.push(i);
23+
}
24+
assert!(results.iter().all(|i| *i == 0));
25+
}

0 commit comments

Comments
 (0)