Skip to content

Commit 94b37de

Browse files
committed
util: add task builder API
1 parent c853991 commit 94b37de

File tree

3 files changed

+278
-0
lines changed

3 files changed

+278
-0
lines changed

tokio-util/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ time = ["tokio/time","slab"]
3030
io = []
3131
io-util = ["io", "tokio/rt", "tokio/io-util"]
3232
rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"]
33+
tracing = ["dep:tracing", "tokio/tracing"]
3334

3435
__docs_rs = ["futures-util"]
3536

tokio-util/src/task/join_map.rs

Lines changed: 270 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ pub struct JoinMap<K, V, S = RandomState> {
125125
tasks: JoinSet<V>,
126126
}
127127

128+
/// A variant of [`task::Builder`] that spawns tasks on a [`JoinMap`] rather than on the current
129+
/// default runtime.
130+
///
131+
/// [`task::Builder`]: tokio::task::Builder
132+
#[cfg(feature = "tracing")]
133+
#[cfg_attr(
134+
docsrs,
135+
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
136+
)]
137+
pub struct Builder<'a, K, V, S> {
138+
joinmap: &'a mut JoinMap<K, V, S>,
139+
name: Option<&'a str>,
140+
}
141+
128142
/// A [`JoinMap`] key.
129143
///
130144
/// This holds both a `K`-typed key (the actual key as seen by the user), _and_
@@ -273,6 +287,38 @@ where
273287
V: 'static,
274288
S: BuildHasher,
275289
{
290+
/// Returns a [`Builder`] that can be used to configure a task prior to spawning it on this
291+
/// [`JoinMap`].
292+
///
293+
/// # Examples
294+
///
295+
/// ```
296+
/// use tokio_util::task::JoinMap;
297+
///
298+
/// #[tokio::main]
299+
/// async fn main() -> std::io::Result<()> {
300+
/// let mut map = JoinMap::new();
301+
///
302+
/// // Use the builder to configure the task's name before spawning it.
303+
/// map.build_task()
304+
/// .name("my_task")
305+
/// .spawn(42, async { /* ... */ });
306+
///
307+
/// Ok(())
308+
/// }
309+
/// ```
310+
#[cfg(feature = "tracing")]
311+
#[cfg_attr(
312+
docsrs,
313+
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
314+
)]
315+
pub fn build_task(&mut self) -> Builder<'_, K, V, S> {
316+
Builder {
317+
joinmap: self,
318+
name: None,
319+
}
320+
}
321+
276322
/// Spawn the provided task and store it in this `JoinMap` with the provided
277323
/// key.
278324
///
@@ -853,6 +899,230 @@ impl<K, V> Default for JoinMap<K, V> {
853899
}
854900
}
855901

902+
// === impl Builder ===
903+
904+
#[cfg(feature = "tracing")]
905+
#[cfg_attr(
906+
docsrs,
907+
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
908+
)]
909+
impl<'a, K, V, S> Builder<'a, K, V, S>
910+
where
911+
K: Hash + Eq,
912+
V: 'static,
913+
S: BuildHasher,
914+
{
915+
/// Assigns a name to the task which will be spawned.
916+
pub fn name(mut self, name: &'a str) -> Self {
917+
self.name = Some(name);
918+
self
919+
}
920+
921+
/// Spawn the provided task with this builder's settings and store it in this `JoinMap` with
922+
/// the provided key.
923+
///
924+
/// If a task previously existed in the `JoinMap` for this key, that task
925+
/// will be cancelled and replaced with the new one. The previous task will
926+
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
927+
/// *not* return a cancelled [`JoinError`] for that task.
928+
///
929+
/// # Panics
930+
///
931+
/// This method panics if called outside of a Tokio runtime.
932+
///
933+
/// [`join_next`]: JoinMap::join_next
934+
#[track_caller]
935+
pub fn spawn<F>(self, key: K, task: F) -> std::io::Result<()>
936+
where
937+
F: Future<Output = V>,
938+
F: Send + 'static,
939+
V: Send,
940+
{
941+
let builder = self.joinmap.tasks.build_task();
942+
let builder = if let Some(name) = self.name {
943+
builder.name(name)
944+
} else {
945+
builder
946+
};
947+
let abort = builder.spawn(task)?;
948+
949+
Ok(self.joinmap.insert(key, abort))
950+
}
951+
952+
/// Spawn the provided task on the provided runtime and store it in this
953+
/// `JoinMap` with the provided key.
954+
///
955+
/// If a task previously existed in the `JoinMap` for this key, that task
956+
/// will be cancelled and replaced with the new one. The previous task will
957+
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
958+
/// *not* return a cancelled [`JoinError`] for that task.
959+
///
960+
/// [`join_next`]: JoinMap::join_next
961+
#[track_caller]
962+
pub fn spawn_on<F>(&mut self, key: K, task: F, handle: &Handle) -> std::io::Result<()>
963+
where
964+
F: Future<Output = V>,
965+
F: Send + 'static,
966+
V: Send,
967+
{
968+
let builder = self.joinmap.tasks.build_task();
969+
let builder = if let Some(name) = self.name {
970+
builder.name(name)
971+
} else {
972+
builder
973+
};
974+
let abort = builder.spawn_on(task, handle)?;
975+
976+
Ok(self.joinmap.insert(key, abort))
977+
}
978+
979+
/// Spawn the blocking code on the blocking threadpool and store it in this `JoinMap` with the provided
980+
/// key.
981+
///
982+
/// If a task previously existed in the `JoinMap` for this key, that task
983+
/// will be cancelled and replaced with the new one. The previous task will
984+
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
985+
/// *not* return a cancelled [`JoinError`] for that task.
986+
///
987+
/// Note that blocking tasks cannot be cancelled after execution starts.
988+
/// Replaced blocking tasks will still run to completion if the task has begun
989+
/// to execute when it is replaced. A blocking task which is replaced before
990+
/// it has been scheduled on a blocking worker thread will be cancelled.
991+
///
992+
/// # Panics
993+
///
994+
/// This method panics if called outside of a Tokio runtime.
995+
///
996+
/// [`join_next`]: JoinMap::join_next
997+
#[track_caller]
998+
pub fn spawn_blocking<F>(&mut self, key: K, f: F) -> std::io::Result<()>
999+
where
1000+
F: FnOnce() -> V,
1001+
F: Send + 'static,
1002+
V: Send,
1003+
{
1004+
let builder = self.joinmap.tasks.build_task();
1005+
let builder = if let Some(name) = self.name {
1006+
builder.name(name)
1007+
} else {
1008+
builder
1009+
};
1010+
let abort = builder.spawn_blocking(f)?;
1011+
1012+
Ok(self.joinmap.insert(key, abort))
1013+
}
1014+
1015+
/// Spawn the blocking code on the blocking threadpool of the provided runtime and store it in this
1016+
/// `JoinMap` with the provided key.
1017+
///
1018+
/// If a task previously existed in the `JoinMap` for this key, that task
1019+
/// will be cancelled and replaced with the new one. The previous task will
1020+
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
1021+
/// *not* return a cancelled [`JoinError`] for that task.
1022+
///
1023+
/// Note that blocking tasks cannot be cancelled after execution starts.
1024+
/// Replaced blocking tasks will still run to completion if the task has begun
1025+
/// to execute when it is replaced. A blocking task which is replaced before
1026+
/// it has been scheduled on a blocking worker thread will be cancelled.
1027+
///
1028+
/// [`join_next`]: JoinMap::join_next
1029+
#[track_caller]
1030+
pub fn spawn_blocking_on<F>(&mut self, key: K, f: F, handle: &Handle) -> std::io::Result<()>
1031+
where
1032+
F: FnOnce() -> V,
1033+
F: Send + 'static,
1034+
V: Send,
1035+
{
1036+
let builder = self.joinmap.tasks.build_task();
1037+
let builder = if let Some(name) = self.name {
1038+
builder.name(name)
1039+
} else {
1040+
builder
1041+
};
1042+
let abort = builder.spawn_blocking_on(f, handle)?;
1043+
1044+
Ok(self.joinmap.insert(key, abort))
1045+
}
1046+
1047+
/// Spawn the provided task on the current [`LocalSet`] and store it in this
1048+
/// `JoinMap` with the provided key.
1049+
///
1050+
/// If a task previously existed in the `JoinMap` for this key, that task
1051+
/// will be cancelled and replaced with the new one. The previous task will
1052+
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
1053+
/// *not* return a cancelled [`JoinError`] for that task.
1054+
///
1055+
/// # Panics
1056+
///
1057+
/// This method panics if it is called outside of a `LocalSet`.
1058+
///
1059+
/// [`LocalSet`]: tokio::task::LocalSet
1060+
/// [`join_next`]: JoinMap::join_next
1061+
#[track_caller]
1062+
pub fn spawn_local<F>(&mut self, key: K, task: F) -> std::io::Result<()>
1063+
where
1064+
F: Future<Output = V>,
1065+
F: 'static,
1066+
{
1067+
let builder = self.joinmap.tasks.build_task();
1068+
let builder = if let Some(name) = self.name {
1069+
builder.name(name)
1070+
} else {
1071+
builder
1072+
};
1073+
let abort = builder.spawn_local(task)?;
1074+
1075+
Ok(self.joinmap.insert(key, abort))
1076+
}
1077+
1078+
/// Spawn the provided task on the provided [`LocalSet`] and store it in
1079+
/// this `JoinMap` with the provided key.
1080+
///
1081+
/// If a task previously existed in the `JoinMap` for this key, that task
1082+
/// will be cancelled and replaced with the new one. The previous task will
1083+
/// be removed from the `JoinMap`; a subsequent call to [`join_next`] will
1084+
/// *not* return a cancelled [`JoinError`] for that task.
1085+
///
1086+
/// [`LocalSet`]: tokio::task::LocalSet
1087+
/// [`join_next`]: JoinMap::join_next
1088+
#[track_caller]
1089+
pub fn spawn_local_on<F>(
1090+
&mut self,
1091+
key: K,
1092+
task: F,
1093+
local_set: &LocalSet,
1094+
) -> std::io::Result<()>
1095+
where
1096+
F: Future<Output = V>,
1097+
F: 'static,
1098+
{
1099+
let builder = self.joinmap.tasks.build_task();
1100+
let builder = if let Some(name) = self.name {
1101+
builder.name(name)
1102+
} else {
1103+
builder
1104+
};
1105+
let abort = builder.spawn_local_on(task, local_set)?;
1106+
1107+
Ok(self.joinmap.insert(key, abort))
1108+
}
1109+
}
1110+
1111+
// Manual `Debug` impl so that `Builder` is `Debug` regardless of whether `V` and `S` are `Debug`.
1112+
#[cfg(feature = "tracing")]
1113+
#[cfg_attr(
1114+
docsrs,
1115+
doc(cfg(all(feature = "rt", feature = "tracing", tokio_unstable)))
1116+
)]
1117+
impl<'a, K: fmt::Debug, V, S> fmt::Debug for Builder<'a, K, V, S> {
1118+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1119+
f.debug_struct("join_map::Builder")
1120+
.field("joinmap", &self.joinmap)
1121+
.field("name", &self.name)
1122+
.finish()
1123+
}
1124+
}
1125+
8561126
// === impl Key ===
8571127

8581128
impl<K: Hash> Hash for Key<K> {

tokio-util/src/task/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ pub use spawn_pinned::LocalPoolHandle;
99
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))]
1010
pub use join_map::{JoinMap, JoinMapKeys};
1111

12+
#[cfg(all(tokio_unstable, feature = "tracing"))]
13+
#[cfg_attr(
14+
docsrs,
15+
doc(cfg(all(tokio_unstable, feature = "rt", feature = "tracing")))
16+
)]
17+
pub use join_map::Builder as JoinMapBuilder;
18+
1219
pub mod task_tracker;
1320
pub use task_tracker::TaskTracker;
1421

0 commit comments

Comments
 (0)