Skip to content

Commit

Permalink
ref(system): Spawn with custom task ID (#4262)
Browse files Browse the repository at this point in the history
Allow spawning a task with a service name.
---------

Co-authored-by: Sebastian Zivota <[email protected]>
  • Loading branch information
jjbayer and loewenheim authored Nov 22, 2024
1 parent a11a419 commit 4614953
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 40 deletions.
119 changes: 83 additions & 36 deletions relay-system/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,76 +2,95 @@ use futures::Future;
use tokio::task::JoinHandle;

use crate::statsd::SystemCounters;
use crate::Service;

/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
/// Spawns an instrumented task with an automatically generated [`TaskId`].
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
/// Returns a [`JoinHandle`].
#[macro_export]
macro_rules! spawn {
($future:expr) => {{
static _TASK_ID: ::std::sync::OnceLock<$crate::TaskId> = ::std::sync::OnceLock::new();
let task_id = _TASK_ID.get_or_init(|| (*::std::panic::Location::caller()).into());
$crate::_spawn_inner(task_id, $future)
static _PARTS: ::std::sync::OnceLock<(String, String, String)> =
::std::sync::OnceLock::new();
let (id, file, line) = _PARTS.get_or_init(|| {
let caller = *::std::panic::Location::caller();
let id = format!("{}:{}", caller.file(), caller.line());
(id, caller.file().to_owned(), caller.line().to_string())
});
$crate::spawn(
$crate::TaskId::_from_location(id.as_str(), file.as_str(), line.as_str()),
$future,
)
}};
}

#[doc(hidden)]
/// Spawns a new asynchronous task, returning a [`JoinHandle`] for it.
///
/// This is in instrumented spawn variant of Tokio's [`tokio::spawn`].
#[allow(clippy::disallowed_methods)]
pub fn _spawn_inner<F>(task_id: &'static TaskId, future: F) -> JoinHandle<F::Output>
pub fn spawn<F>(task_id: TaskId, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
tokio::spawn(Task::new(task_id, future))
}

/// An internal id for a spawned task.
#[doc(hidden)]
/// An identifier for tasks spawned by [`spawn()`], used to log metrics.
pub struct TaskId {
id: String,
file: String,
line: String,
id: &'static str,
file: Option<&'static str>,
line: Option<&'static str>,
}

impl From<std::panic::Location<'_>> for TaskId {
fn from(value: std::panic::Location<'_>) -> Self {
impl TaskId {
/// Create a task ID based on the service's name.
pub fn for_service<S: Service>() -> Self {
Self {
id: S::name(),
file: None,
line: None,
}
}

#[doc(hidden)]
pub fn _from_location(id: &'static str, file: &'static str, line: &'static str) -> Self {
Self {
id: format!("{}:{}", value.file(), value.line()),
file: value.file().to_owned(),
line: value.line().to_string(),
id,
file: Some(file),
line: Some(line),
}
}

fn emit_metric(&self, metric: SystemCounters) {
let Self { id, file, line } = self;
relay_statsd::metric!(
counter(metric) += 1,
id = id,
file = file.unwrap_or_default(),
line = line.unwrap_or_default()
);
}
}

pin_project_lite::pin_project! {
/// Wraps a future and emits related task metrics.
struct Task<T> {
id: &'static TaskId,
id: TaskId,
#[pin]
inner: T,
}

impl<T> PinnedDrop for Task<T> {
fn drop(this: Pin<&mut Self>) {
let this = this.project();
relay_statsd::metric!(
counter(SystemCounters::RuntimeTaskTerminated) += 1,
id = this.id.id.as_str(),
file = this.id.file.as_str(),
line = this.id.line.as_str(),
);
this.id.emit_metric(SystemCounters::RuntimeTaskTerminated);
}
}
}

impl<T> Task<T> {
fn new(id: &'static TaskId, inner: T) -> Self {
relay_statsd::metric!(
counter(SystemCounters::RuntimeTaskCreated) += 1,
id = id.id.as_str(),
file = id.file.as_str(),
line = id.line.as_str(),
);
fn new(id: TaskId, inner: T) -> Self {
id.emit_metric(SystemCounters::RuntimeTaskCreated);
Self { id, inner }
}
}
Expand All @@ -92,6 +111,8 @@ impl<T: Future> Future for Task<T> {
mod tests {
use insta::assert_debug_snapshot;

use crate::{Service, TaskId};

#[test]
fn test_spawn_spawns_a_future() {
let rt = tokio::runtime::Builder::new_current_thread()
Expand All @@ -107,15 +128,41 @@ mod tests {
#[cfg(not(windows))]
assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103",
"runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:103,file:relay-system/src/runtime.rs,line:103",
"runtime.task.spawn.created:1|c|#id:relay-system/src/runtime.rs:124,file:relay-system/src/runtime.rs,line:124",
"runtime.task.spawn.terminated:1|c|#id:relay-system/src/runtime.rs:124,file:relay-system/src/runtime.rs,line:124",
]
"###);
#[cfg(windows)]
assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103",
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:103,file:relay-system\\src\\runtime.rs,line:103",
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime.rs:124,file:relay-system\\src\\runtime.rs,line:124",
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime.rs:124,file:relay-system\\src\\runtime.rs,line:124",
]
"###);
}

#[test]
fn test_spawn_with_custom_id() {
struct Foo;
impl Service for Foo {
type Interface = ();
async fn run(self, _rx: crate::Receiver<Self::Interface>) {}
}

let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();

let captures = relay_statsd::with_capturing_test_client(|| {
rt.block_on(async {
let _ = crate::spawn(TaskId::for_service::<Foo>(), async {}).await;
})
});

assert_debug_snapshot!(captures, @r###"
[
"runtime.task.spawn.created:1|c|#id:relay_system::runtime::tests::test_spawn_with_custom_id::Foo,file:,line:",
"runtime.task.spawn.terminated:1|c|#id:relay_system::runtime::tests::test_spawn_with_custom_id::Foo,file:,line:",
]
"###);
}
Expand Down
7 changes: 4 additions & 3 deletions relay-system/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tokio::time::MissedTickBehavior;

use crate::spawn;
use crate::statsd::SystemGauges;
use crate::{spawn, TaskId};

/// Interval for recording backlog metrics on service channels.
const BACKLOG_INTERVAL: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -1009,7 +1009,7 @@ pub trait Service: Sized {
/// for tests.
fn start_detached(self) -> Addr<Self::Interface> {
let (addr, rx) = channel(Self::name());
spawn!(self.run(rx));
spawn(TaskId::for_service::<Self>(), self.run(rx));
addr
}

Expand Down Expand Up @@ -1043,7 +1043,8 @@ impl ServiceRunner {

/// Starts a service and starts tracking its join handle, given a predefined receiver.
pub fn start_with<S: Service>(&mut self, service: S, rx: Receiver<S::Interface>) {
self.0.push(spawn!(service.run(rx)));
self.0
.push(spawn(TaskId::for_service::<S>(), service.run(rx)));
}

/// Awaits until all services have finished.
Expand Down
2 changes: 1 addition & 1 deletion relay-system/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use relay_statsd::{CounterMetric, GaugeMetric};
pub enum SystemCounters {
/// Number of runtime tasks created/spawned.
///
/// Every call to [`spawn`](`crate::spawn`) increases this counter by one.
/// Every call to [`spawn`](`crate::spawn()`) increases this counter by one.
///
/// This metric is tagged with:
/// - `id`: A unique identifier for the task, derived from its location in code.
Expand Down

0 comments on commit 4614953

Please sign in to comment.