Skip to content

Commit 8963b5d

Browse files
committed
rt: overhaul task hooks
This change overhauls the entire task hooks system so that users can propagate arbitrary information between task hook invocations and pass context data between the hook "harnesses" for parent and child tasks at time of spawn. This is intended to be significantly more extensible and long-term maintainable than the current task hooks system, and should ultimately be much easier to stabilize.
1 parent a258bff commit 8963b5d

35 files changed

+1260
-719
lines changed

tokio/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,10 +351,7 @@
351351
//! - [`task::Builder`]
352352
//! - Some methods on [`task::JoinSet`]
353353
//! - [`runtime::RuntimeMetrics`]
354-
//! - [`runtime::Builder::on_task_spawn`]
355-
//! - [`runtime::Builder::on_task_terminate`]
356354
//! - [`runtime::Builder::unhandled_panic`]
357-
//! - [`runtime::TaskMeta`]
358355
//!
359356
//! This flag enables **unstable** features. The public API of these features
360357
//! may break in 1.x releases. To enable these features, the `--cfg

tokio/src/runtime/blocking/pool.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,10 +375,15 @@ impl Spawner {
375375
F: FnOnce() -> R + Send + 'static,
376376
R: Send + 'static,
377377
{
378+
// let parent = with_c
378379
let id = task::Id::next();
379380
let fut =
380381
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());
381382

383+
#[cfg(tokio_unstable)]
384+
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id, None);
385+
386+
#[cfg(not(tokio_unstable))]
382387
let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);
383388

384389
let spawned = self.spawn_task(Task::new(task, is_mandatory), rt);

tokio/src/runtime/blocking/schedule.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
#[cfg(feature = "test-util")]
22
use crate::runtime::scheduler;
3-
use crate::runtime::task::{self, Task, TaskHarnessScheduleHooks};
3+
use crate::runtime::task::{self, Task};
44
use crate::runtime::Handle;
5+
#[cfg(tokio_unstable)]
6+
use crate::runtime::{OptionalTaskHooksFactory, OptionalTaskHooksFactoryRef};
57

68
/// `task::Schedule` implementation that does nothing (except some bookkeeping
79
/// in test-util builds). This is unique to the blocking scheduler as tasks
@@ -12,7 +14,8 @@ use crate::runtime::Handle;
1214
pub(crate) struct BlockingSchedule {
1315
#[cfg(feature = "test-util")]
1416
handle: Handle,
15-
hooks: TaskHarnessScheduleHooks,
17+
#[cfg(tokio_unstable)]
18+
hooks_factory: OptionalTaskHooksFactory,
1619
}
1720

1821
impl BlockingSchedule {
@@ -33,9 +36,8 @@ impl BlockingSchedule {
3336
BlockingSchedule {
3437
#[cfg(feature = "test-util")]
3538
handle: handle.clone(),
36-
hooks: TaskHarnessScheduleHooks {
37-
task_terminate_callback: handle.inner.hooks().task_terminate_callback.clone(),
38-
},
39+
#[cfg(tokio_unstable)]
40+
hooks_factory: handle.inner.hooks_factory(),
3941
}
4042
}
4143
}
@@ -62,9 +64,13 @@ impl task::Schedule for BlockingSchedule {
6264
unreachable!();
6365
}
6466

65-
fn hooks(&self) -> TaskHarnessScheduleHooks {
66-
TaskHarnessScheduleHooks {
67-
task_terminate_callback: self.hooks.task_terminate_callback.clone(),
68-
}
67+
#[cfg(tokio_unstable)]
68+
fn hooks_factory(&self) -> OptionalTaskHooksFactory {
69+
self.hooks_factory.clone()
70+
}
71+
72+
#[cfg(tokio_unstable)]
73+
fn hooks_factory_ref(&self) -> OptionalTaskHooksFactoryRef<'_> {
74+
self.hooks_factory.as_ref().map(AsRef::as_ref)
6975
}
7076
}

tokio/src/runtime/builder.rs

Lines changed: 22 additions & 215 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
#![cfg_attr(loom, allow(unused_imports))]
22

3+
use crate::runtime::blocking::BlockingPool;
34
use crate::runtime::handle::Handle;
4-
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime, TaskCallback};
5+
use crate::runtime::scheduler::CurrentThread;
6+
use crate::runtime::{blocking, driver, Callback, HistogramBuilder, Runtime};
57
#[cfg(tokio_unstable)]
6-
use crate::runtime::{metrics::HistogramConfiguration, LocalOptions, LocalRuntime, TaskMeta};
8+
use crate::runtime::{
9+
metrics::HistogramConfiguration, LocalOptions, LocalRuntime, OptionalTaskHooksFactory,
10+
TaskHookHarnessFactory,
11+
};
712
use crate::util::rand::{RngSeed, RngSeedGenerator};
8-
9-
use crate::runtime::blocking::BlockingPool;
10-
use crate::runtime::scheduler::CurrentThread;
1113
use std::fmt;
1214
use std::io;
15+
#[cfg(tokio_unstable)]
16+
use std::sync::Arc;
1317
use std::thread::ThreadId;
1418
use std::time::Duration;
1519

@@ -85,19 +89,8 @@ pub struct Builder {
8589
/// To run after each thread is unparked.
8690
pub(super) after_unpark: Option<Callback>,
8791

88-
/// To run before each task is spawned.
89-
pub(super) before_spawn: Option<TaskCallback>,
90-
91-
/// To run before each poll
9292
#[cfg(tokio_unstable)]
93-
pub(super) before_poll: Option<TaskCallback>,
94-
95-
/// To run after each poll
96-
#[cfg(tokio_unstable)]
97-
pub(super) after_poll: Option<TaskCallback>,
98-
99-
/// To run after each task is terminated.
100-
pub(super) after_termination: Option<TaskCallback>,
93+
pub(super) task_hook_harness_factory: OptionalTaskHooksFactory,
10194

10295
/// Customizable keep alive timeout for `BlockingPool`
10396
pub(super) keep_alive: Option<Duration>,
@@ -311,13 +304,8 @@ impl Builder {
311304
before_park: None,
312305
after_unpark: None,
313306

314-
before_spawn: None,
315-
after_termination: None,
316-
317307
#[cfg(tokio_unstable)]
318-
before_poll: None,
319-
#[cfg(tokio_unstable)]
320-
after_poll: None,
308+
task_hook_harness_factory: None,
321309

322310
keep_alive: None,
323311

@@ -706,188 +694,19 @@ impl Builder {
706694
self
707695
}
708696

709-
/// Executes function `f` just before a task is spawned.
710-
///
711-
/// `f` is called within the Tokio context, so functions like
712-
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
713-
/// invoked immediately.
714-
///
715-
/// This can be used for bookkeeping or monitoring purposes.
716-
///
717-
/// Note: There can only be one spawn callback for a runtime; calling this function more
718-
/// than once replaces the last callback defined, rather than adding to it.
719-
///
720-
/// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
721-
///
722-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
723-
/// may break in 1.x releases. See [the documentation on unstable
724-
/// features][unstable] for details.
725-
///
726-
/// [unstable]: crate#unstable-features
727-
///
728-
/// # Examples
729-
///
730-
/// ```
731-
/// # use tokio::runtime;
732-
/// # pub fn main() {
733-
/// let runtime = runtime::Builder::new_current_thread()
734-
/// .on_task_spawn(|_| {
735-
/// println!("spawning task");
736-
/// })
737-
/// .build()
738-
/// .unwrap();
697+
/// Factory method for producing "fallback" task hook harnesses.
739698
///
740-
/// runtime.block_on(async {
741-
/// tokio::task::spawn(std::future::ready(()));
742-
///
743-
/// for _ in 0..64 {
744-
/// tokio::task::yield_now().await;
745-
/// }
746-
/// })
747-
/// # }
748-
/// ```
699+
/// The order of operations for assigning the hook harness for a task are as follows:
700+
/// 1. [`crate::task::spawn_with_hooks`], if used.
701+
/// 2. [`crate::runtime::task_hooks::TaskHookHarnessFactory`], if it returns something other than [Option::None].
702+
/// 3. This function.
749703
#[cfg(all(not(loom), tokio_unstable))]
750704
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
751-
pub fn on_task_spawn<F>(&mut self, f: F) -> &mut Self
752-
where
753-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
754-
{
755-
self.before_spawn = Some(std::sync::Arc::new(f));
756-
self
757-
}
758-
759-
/// Executes function `f` just before a task is polled
760-
///
761-
/// `f` is called within the Tokio context, so functions like
762-
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
763-
/// invoked immediately.
764-
///
765-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
766-
/// may break in 1.x releases. See [the documentation on unstable
767-
/// features][unstable] for details.
768-
///
769-
/// [unstable]: crate#unstable-features
770-
///
771-
/// # Examples
772-
///
773-
/// ```
774-
/// # use std::sync::{atomic::AtomicUsize, Arc};
775-
/// # use tokio::task::yield_now;
776-
/// # pub fn main() {
777-
/// let poll_start_counter = Arc::new(AtomicUsize::new(0));
778-
/// let poll_start = poll_start_counter.clone();
779-
/// let rt = tokio::runtime::Builder::new_multi_thread()
780-
/// .enable_all()
781-
/// .on_before_task_poll(move |meta| {
782-
/// println!("task {} is about to be polled", meta.id())
783-
/// })
784-
/// .build()
785-
/// .unwrap();
786-
/// let task = rt.spawn(async {
787-
/// yield_now().await;
788-
/// });
789-
/// let _ = rt.block_on(task);
790-
///
791-
/// # }
792-
/// ```
793-
#[cfg(tokio_unstable)]
794-
pub fn on_before_task_poll<F>(&mut self, f: F) -> &mut Self
705+
pub fn hook_harness_factory<T>(&mut self, hooks: T) -> &mut Self
795706
where
796-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
707+
T: TaskHookHarnessFactory + Send + Sync + 'static,
797708
{
798-
self.before_poll = Some(std::sync::Arc::new(f));
799-
self
800-
}
801-
802-
/// Executes function `f` just after a task is polled
803-
///
804-
/// `f` is called within the Tokio context, so functions like
805-
/// [`tokio::spawn`](crate::spawn) can be called, and may result in this callback being
806-
/// invoked immediately.
807-
///
808-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
809-
/// may break in 1.x releases. See [the documentation on unstable
810-
/// features][unstable] for details.
811-
///
812-
/// [unstable]: crate#unstable-features
813-
///
814-
/// # Examples
815-
///
816-
/// ```
817-
/// # use std::sync::{atomic::AtomicUsize, Arc};
818-
/// # use tokio::task::yield_now;
819-
/// # pub fn main() {
820-
/// let poll_stop_counter = Arc::new(AtomicUsize::new(0));
821-
/// let poll_stop = poll_stop_counter.clone();
822-
/// let rt = tokio::runtime::Builder::new_multi_thread()
823-
/// .enable_all()
824-
/// .on_after_task_poll(move |meta| {
825-
/// println!("task {} completed polling", meta.id());
826-
/// })
827-
/// .build()
828-
/// .unwrap();
829-
/// let task = rt.spawn(async {
830-
/// yield_now().await;
831-
/// });
832-
/// let _ = rt.block_on(task);
833-
///
834-
/// # }
835-
/// ```
836-
#[cfg(tokio_unstable)]
837-
pub fn on_after_task_poll<F>(&mut self, f: F) -> &mut Self
838-
where
839-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
840-
{
841-
self.after_poll = Some(std::sync::Arc::new(f));
842-
self
843-
}
844-
845-
/// Executes function `f` just after a task is terminated.
846-
///
847-
/// `f` is called within the Tokio context, so functions like
848-
/// [`tokio::spawn`](crate::spawn) can be called.
849-
///
850-
/// This can be used for bookkeeping or monitoring purposes.
851-
///
852-
/// Note: There can only be one task termination callback for a runtime; calling this
853-
/// function more than once replaces the last callback defined, rather than adding to it.
854-
///
855-
/// This *does not* support [`LocalSet`](crate::task::LocalSet) at this time.
856-
///
857-
/// **Note**: This is an [unstable API][unstable]. The public API of this type
858-
/// may break in 1.x releases. See [the documentation on unstable
859-
/// features][unstable] for details.
860-
///
861-
/// [unstable]: crate#unstable-features
862-
///
863-
/// # Examples
864-
///
865-
/// ```
866-
/// # use tokio::runtime;
867-
/// # pub fn main() {
868-
/// let runtime = runtime::Builder::new_current_thread()
869-
/// .on_task_terminate(|_| {
870-
/// println!("killing task");
871-
/// })
872-
/// .build()
873-
/// .unwrap();
874-
///
875-
/// runtime.block_on(async {
876-
/// tokio::task::spawn(std::future::ready(()));
877-
///
878-
/// for _ in 0..64 {
879-
/// tokio::task::yield_now().await;
880-
/// }
881-
/// })
882-
/// # }
883-
/// ```
884-
#[cfg(all(not(loom), tokio_unstable))]
885-
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
886-
pub fn on_task_terminate<F>(&mut self, f: F) -> &mut Self
887-
where
888-
F: Fn(&TaskMeta<'_>) + Send + Sync + 'static,
889-
{
890-
self.after_termination = Some(std::sync::Arc::new(f));
709+
self.task_hook_harness_factory = Some(Arc::new(hooks));
891710
self
892711
}
893712

@@ -1508,12 +1327,8 @@ impl Builder {
15081327
Config {
15091328
before_park: self.before_park.clone(),
15101329
after_unpark: self.after_unpark.clone(),
1511-
before_spawn: self.before_spawn.clone(),
15121330
#[cfg(tokio_unstable)]
1513-
before_poll: self.before_poll.clone(),
1514-
#[cfg(tokio_unstable)]
1515-
after_poll: self.after_poll.clone(),
1516-
after_termination: self.after_termination.clone(),
1331+
task_hook_factory: self.task_hook_harness_factory.clone(),
15171332
global_queue_interval: self.global_queue_interval,
15181333
event_interval: self.event_interval,
15191334
local_queue_capacity: self.local_queue_capacity,
@@ -1662,12 +1477,8 @@ cfg_rt_multi_thread! {
16621477
Config {
16631478
before_park: self.before_park.clone(),
16641479
after_unpark: self.after_unpark.clone(),
1665-
before_spawn: self.before_spawn.clone(),
1666-
#[cfg(tokio_unstable)]
1667-
before_poll: self.before_poll.clone(),
16681480
#[cfg(tokio_unstable)]
1669-
after_poll: self.after_poll.clone(),
1670-
after_termination: self.after_termination.clone(),
1481+
task_hook_factory: self.task_hook_harness_factory.clone(),
16711482
global_queue_interval: self.global_queue_interval,
16721483
event_interval: self.event_interval,
16731484
local_queue_capacity: self.local_queue_capacity,
@@ -1715,12 +1526,8 @@ cfg_rt_multi_thread! {
17151526
Config {
17161527
before_park: self.before_park.clone(),
17171528
after_unpark: self.after_unpark.clone(),
1718-
before_spawn: self.before_spawn.clone(),
1719-
after_termination: self.after_termination.clone(),
1720-
#[cfg(tokio_unstable)]
1721-
before_poll: self.before_poll.clone(),
17221529
#[cfg(tokio_unstable)]
1723-
after_poll: self.after_poll.clone(),
1530+
task_hook_factory: self.task_hook_harness_factory.clone(),
17241531
global_queue_interval: self.global_queue_interval,
17251532
event_interval: self.event_interval,
17261533
local_queue_capacity: self.local_queue_capacity,

0 commit comments

Comments
 (0)