Skip to content

Commit 2c27422

Browse files
committed
feat(tokio/macros): add biased mode to join! and try_join!
1 parent 1ae9434 commit 2c27422

File tree

4 files changed

+195
-10
lines changed

4 files changed

+195
-10
lines changed

tokio/src/macros/join.rs

Lines changed: 63 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,25 @@ macro_rules! doc {
3232
///
3333
/// [`tokio::spawn`]: crate::spawn
3434
///
35+
/// # Fairness
36+
///
37+
/// By default, `join!`'s generated future rotates which contained
38+
/// future is polled first whenever it is woken.
39+
///
40+
/// This behavior can be overridden by adding `biased;` to the beginning of the
41+
/// macro usage. See the examples for details. This will cause `join` to poll
42+
/// the futures in the order they appear from top to bottom.
43+
///
44+
/// You may want this if your futures may interact in a way where known polling order is significant.
45+
///
46+
/// But there is an important caveat to this mode. It becomes your responsibility
47+
/// to ensure that the polling order of your futures is fair. If for example you
48+
/// are joining a stream and a shutdown future, and the stream has a
49+
/// huge volume of messages that takes a long time to finish processing per poll, you should
50+
/// place the shutdown future earlier in the `join!` list to ensure that it is
51+
/// always polled, and will not be delayed due to the stream futuretaking a long time to return
52+
/// `Poll::Pending`.
53+
///
3554
/// # Examples
3655
///
3756
/// Basic join with two branches
@@ -54,6 +73,30 @@ macro_rules! doc {
5473
/// // do something with the values
5574
/// }
5675
/// ```
76+
///
77+
/// Using the `biased;` mode to control polling order.
78+
///
79+
/// ```
80+
/// async fn do_stuff_async() {
81+
/// // async work
82+
/// }
83+
///
84+
/// async fn more_async_work() {
85+
/// // more here
86+
/// }
87+
///
88+
/// #[tokio::main]
89+
/// async fn main() {
90+
/// let (first, second) = tokio::join!(
91+
/// // do_stuff_async() will always be polled first when woken
92+
/// biased;
93+
/// do_stuff_async(),
94+
/// more_async_work());
95+
///
96+
/// // do something with the values
97+
/// }
98+
/// ```
99+
57100
#[macro_export]
58101
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
59102
$join
@@ -68,6 +111,10 @@ doc! {macro_rules! join {
68111
#[cfg(not(doc))]
69112
doc! {macro_rules! join {
70113
(@ {
114+
// Whether to rotate which future is polled first every poll,
115+
// by incrementing a skip counter
116+
rotate_poll_order=$rotate_poll_order:literal;
117+
71118
// One `_` for each branch in the `join!` macro. This is not used once
72119
// normalization is complete.
73120
( $($count:tt)* )
@@ -96,13 +143,18 @@ doc! {macro_rules! join {
96143
// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
97144
let mut futures = &mut futures;
98145

99-
// Each time the future created by poll_fn is polled, a different future will be polled first
146+
// Each time the future created by poll_fn is polled,
147+
// if not running in biased mode,
148+
// a different future will be polled first
100149
// to ensure every future passed to join! gets a chance to make progress even if
101150
// one of the futures consumes the whole budget.
102151
//
103152
// This is number of futures that will be skipped in the first loop
104153
// iteration the next time.
154+
//
155+
// If running in biased mode, this value will always be 0.
105156
let mut skip_next_time: u32 = 0;
157+
let rotate_skip = $rotate_poll_order;
106158

107159
poll_fn(move |cx| {
108160
const COUNT: u32 = $($total)*;
@@ -114,7 +166,10 @@ doc! {macro_rules! join {
114166
// The number of futures that will be skipped in the first loop iteration.
115167
let mut skip = skip_next_time;
116168

117-
skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
169+
// only change skip index if we are not in biased mode, aka rotate_poll_order == true
170+
if rotate_skip {
171+
skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
172+
}
118173

119174
// This loop runs twice and the first `skip` futures
120175
// are not polled in the first iteration.
@@ -164,14 +219,17 @@ doc! {macro_rules! join {
164219

165220
// ===== Normalize =====
166221

167-
(@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
168-
$crate::join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
222+
(@ { rotate_poll_order=$rotate_poll_order:literal; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
223+
$crate::join!(@{ rotate_poll_order=$rotate_poll_order; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
169224
};
170225

171226
// ===== Entry point =====
227+
( biased; $($e:expr),+ $(,)?) => {
228+
$crate::join!(@{ rotate_poll_order=false; () (0) } $($e,)*)
229+
};
172230

173231
( $($e:expr),+ $(,)?) => {
174-
$crate::join!(@{ () (0) } $($e,)*)
232+
$crate::join!(@{ rotate_poll_order=true; () (0) } $($e,)*)
175233
};
176234

177235
() => { async {}.await }

tokio/src/macros/try_join.rs

Lines changed: 70 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,25 @@ macro_rules! doc {
3030
///
3131
/// [`tokio::spawn`]: crate::spawn
3232
///
33+
/// # Fairness
34+
///
35+
/// By default, `try_join!`'s generated future rotates which
36+
/// contained future is polled first whenever it is woken.
37+
///
38+
/// This behavior can be overridden by adding `biased;` to the beginning of the
39+
/// macro usage. See the examples for details. This will cause `try_join` to poll
40+
/// the futures in the order they appear from top to bottom.
41+
///
42+
/// You may want this if your futures may interact in a way where known polling order is significant.
43+
///
44+
/// But there is an important caveat to this mode. It becomes your responsibility
45+
/// to ensure that the polling order of your futures is fair. If for example you
46+
/// are joining a stream and a shutdown future, and the stream has a
47+
/// huge volume of messages that takes a long time to finish processing per poll, you should
48+
/// place the shutdown future earlier in the `try_join!` list to ensure that it is
49+
/// always polled, and will not be delayed due to the stream futuretaking a long time to return
50+
/// `Poll::Pending`.
51+
///
3352
/// # Examples
3453
///
3554
/// Basic `try_join` with two branches.
@@ -100,6 +119,37 @@ macro_rules! doc {
100119
/// }
101120
/// }
102121
/// ```
122+
/// Using the `biased;` mode to control polling order.
123+
///
124+
/// ```
125+
/// async fn do_stuff_async() -> Result<(), &'static str> {
126+
/// // async work
127+
/// # Ok(())
128+
/// }
129+
///
130+
/// async fn more_async_work() -> Result<(), &'static str> {
131+
/// // more here
132+
/// # Ok(())
133+
/// }
134+
///
135+
/// #[tokio::main]
136+
/// async fn main() {
137+
/// let res = tokio::try_join!(
138+
/// //do_stuff_async() will always be polled first when woken
139+
/// biased;
140+
/// do_stuff_async(),
141+
/// more_async_work());
142+
///
143+
/// match res {
144+
/// Ok((first, second)) => {
145+
/// // do something with the values
146+
/// }
147+
/// Err(err) => {
148+
/// println!("processing failed; error = {}", err);
149+
/// }
150+
/// }
151+
/// }
152+
/// ```
103153
#[macro_export]
104154
#[cfg_attr(docsrs, doc(cfg(feature = "macros")))]
105155
$try_join
@@ -114,6 +164,10 @@ doc! {macro_rules! try_join {
114164
#[cfg(not(doc))]
115165
doc! {macro_rules! try_join {
116166
(@ {
167+
// Whether to rotate which future is polled first every poll,
168+
// by incrementing a skip counter
169+
rotate_poll_order=$rotate_poll_order:literal;
170+
117171
// One `_` for each branch in the `try_join!` macro. This is not used once
118172
// normalization is complete.
119173
( $($count:tt)* )
@@ -142,13 +196,18 @@ doc! {macro_rules! try_join {
142196
// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
143197
let mut futures = &mut futures;
144198

145-
// Each time the future created by poll_fn is polled, a different future will be polled first
199+
// Each time the future created by poll_fn is polled,
200+
// if not running in biased mode,
201+
// a different future will be polled first
146202
// to ensure every future passed to join! gets a chance to make progress even if
147203
// one of the futures consumes the whole budget.
148204
//
149205
// This is number of futures that will be skipped in the first loop
150206
// iteration the next time.
207+
//
208+
// If running in biased mode, this value will always be 0.
151209
let mut skip_next_time: u32 = 0;
210+
let rotate_skip = $rotate_poll_order;
152211

153212
poll_fn(move |cx| {
154213
const COUNT: u32 = $($total)*;
@@ -160,7 +219,10 @@ doc! {macro_rules! try_join {
160219
// The number of futures that will be skipped in the first loop iteration
161220
let mut skip = skip_next_time;
162221

163-
skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
222+
// only change skip index if we are not in biased mode, aka rotate_poll_order == true
223+
if rotate_skip {
224+
skip_next_time = if skip + 1 == COUNT { 0 } else { skip + 1 };
225+
}
164226

165227
// This loop runs twice and the first `skip` futures
166228
// are not polled in the first iteration.
@@ -216,14 +278,17 @@ doc! {macro_rules! try_join {
216278

217279
// ===== Normalize =====
218280

219-
(@ { ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
220-
$crate::try_join!(@{ ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
281+
(@ { rotate_poll_order=$rotate_poll_order:literal; ( $($s:tt)* ) ( $($n:tt)* ) $($t:tt)* } $e:expr, $($r:tt)* ) => {
282+
$crate::try_join!(@{ rotate_poll_order=$rotate_poll_order; ($($s)* _) ($($n)* + 1) $($t)* ($($s)*) $e, } $($r)*)
221283
};
222284

223285
// ===== Entry point =====
286+
( biased; $($e:expr),+ $(,)?) => {
287+
$crate::try_join!(@{ rotate_poll_order=false; () (0) } $($e,)*)
288+
};
224289

225290
( $($e:expr),+ $(,)?) => {
226-
$crate::try_join!(@{ () (0) } $($e,)*)
291+
$crate::try_join!(@{ rotate_poll_order=true; () (0) } $($e,)*)
227292
};
228293

229294
() => { async { Ok(()) }.await }

tokio/tests/macros_join.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,37 @@ async fn a_different_future_is_polled_first_every_time_poll_fn_is_polled() {
154154
);
155155
}
156156

157+
#[tokio::test]
158+
async fn futures_are_polled_in_order_in_biased_mode() {
159+
let poll_order = Arc::new(std::sync::Mutex::new(vec![]));
160+
161+
let fut = |x, poll_order: Arc<std::sync::Mutex<Vec<i32>>>| async move {
162+
for _ in 0..4 {
163+
{
164+
let mut guard = poll_order.lock().unwrap();
165+
166+
guard.push(x);
167+
}
168+
169+
tokio::task::yield_now().await;
170+
}
171+
};
172+
173+
tokio::join!(
174+
biased;
175+
fut(1, Arc::clone(&poll_order)),
176+
fut(2, Arc::clone(&poll_order)),
177+
fut(3, Arc::clone(&poll_order)),
178+
);
179+
180+
// Each time the future created by join! is polled, it should start
181+
// by polling a different future first.
182+
assert_eq!(
183+
vec![1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3],
184+
*poll_order.lock().unwrap()
185+
);
186+
}
187+
157188
#[tokio::test]
158189
#[allow(clippy::unit_cmp)]
159190
async fn empty_join() {

tokio/tests/macros_try_join.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,37 @@ async fn a_different_future_is_polled_first_every_time_poll_fn_is_polled() {
179179
);
180180
}
181181

182+
#[tokio::test]
183+
async fn futures_are_polled_in_order_in_biased_mode() {
184+
let poll_order = Arc::new(std::sync::Mutex::new(vec![]));
185+
186+
let fut = |x, poll_order: Arc<std::sync::Mutex<Vec<i32>>>| async move {
187+
for _ in 0..4 {
188+
{
189+
let mut guard = poll_order.lock().unwrap();
190+
191+
guard.push(x);
192+
}
193+
194+
tokio::task::yield_now().await;
195+
}
196+
};
197+
198+
tokio::join!(
199+
biased;
200+
fut(1, Arc::clone(&poll_order)),
201+
fut(2, Arc::clone(&poll_order)),
202+
fut(3, Arc::clone(&poll_order)),
203+
);
204+
205+
// Each time the future created by join! is polled, it should start
206+
// by polling a different future first.
207+
assert_eq!(
208+
vec![1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3],
209+
*poll_order.lock().unwrap()
210+
);
211+
}
212+
182213
#[tokio::test]
183214
async fn empty_try_join() {
184215
assert_eq!(tokio::try_join!() as Result<_, ()>, Ok(()));

0 commit comments

Comments
 (0)