From caca9248743bd383b990c41aa4184cf5f92c9179 Mon Sep 17 00:00:00 2001 From: Jake Goulding Date: Tue, 5 Dec 2023 13:00:28 -0500 Subject: [PATCH] Track CPU usage for spawned execution We log the request parameters when a programs uses more than 60 seconds of CPU time. Hopefully this can help us see if there's any people deliberately abusing the system. In the future, we could inform the user that they may kill the process if it's unintended. --- compiler/base/orchestrator/Cargo.lock | 151 ++++++++++++++++-- compiler/base/orchestrator/Cargo.toml | 3 + compiler/base/orchestrator/src/coordinator.rs | 32 +++- compiler/base/orchestrator/src/message.rs | 9 ++ compiler/base/orchestrator/src/worker.rs | 97 ++++++++++- ui/Cargo.lock | 30 ++++ ui/src/server_axum/websocket.rs | 12 +- 7 files changed, 319 insertions(+), 15 deletions(-) diff --git a/compiler/base/orchestrator/Cargo.lock b/compiler/base/orchestrator/Cargo.lock index d3aa01369..ab0fbe4ff 100644 --- a/compiler/base/orchestrator/Cargo.lock +++ b/compiler/base/orchestrator/Cargo.lock @@ -78,6 +78,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + [[package]] name = "bytes" version = "1.5.0" @@ -111,6 +117,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -218,6 +234,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "indexmap" version = "2.1.0" @@ -246,6 +268,12 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +[[package]] +name = "linux-raw-sys" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" + [[package]] name = "log" version = "0.4.20" @@ -275,7 +303,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -323,6 +351,7 @@ dependencies = [ "futures", "modify-cargo-toml", "once_cell", + "procfs", "serde", "serde_json", "snafu", @@ -392,6 +421,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags", + "hex", +] + [[package]] name = "quote" version = "1.0.33" @@ -482,6 +534,19 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.38.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "ryu" version = "1.0.15" @@ -640,7 +705,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -815,7 +880,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -824,13 +898,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -839,42 +928,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" diff --git a/compiler/base/orchestrator/Cargo.toml b/compiler/base/orchestrator/Cargo.toml index f79af0d14..736d838b1 100644 --- a/compiler/base/orchestrator/Cargo.toml +++ b/compiler/base/orchestrator/Cargo.toml @@ -20,6 +20,9 @@ tokio-util = { version = "0.7.8", default-features = false, features = ["io", "i toml = { version = "0.8.2", default-features = false, features = ["parse", "display"] } tracing = { version = "0.1.37", default-features = false, features = ["attributes"] } +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { version = "0.16.0", default-features = false } + [dev-dependencies] assert_matches = "1.5.0" assertables = "7.0.1" diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 4f84a4213..01c1c1c9e 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -29,9 +29,9 @@ use tracing::{instrument, trace, trace_span, warn, Instrument}; use crate::{ bincode_input_closed, message::{ - CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest, ExecuteCommandResponse, - JobId, Multiplexed, OneToOneResponse, ReadFileRequest, ReadFileResponse, SerializedError, - WorkerMessage, WriteFileRequest, + CommandStatistics, CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest, + ExecuteCommandResponse, JobId, Multiplexed, OneToOneResponse, ReadFileRequest, + ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, }, DropErrorDetailsExt, }; @@ -1164,8 +1164,12 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = v; + drop(stdin_tx); + drop(status_rx); + let task = async { task.await?.map_err(VersionError::from) }; let o = WithOutput::try_absorb(task, stdout_rx, stderr_rx).await?; Ok(if o.success { Some(o.stdout) } else { None }) @@ -1191,9 +1195,12 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self.begin_execute(token, request).await?; drop(stdin_tx); + drop(status_rx); + WithOutput::try_absorb(task, stdout_rx, stderr_rx).await } @@ -1225,6 +1232,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await @@ -1250,6 +1258,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, }) } @@ -1301,12 +1310,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let commander = self.commander.clone(); let task = async move { @@ -1391,12 +1402,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let commander = self.commander.clone(); let task = async move { @@ -1471,12 +1484,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1540,12 +1555,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1612,12 +1629,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1652,6 +1671,7 @@ impl Container { let (stdin_tx, mut stdin_rx) = mpsc::channel(8); let (stdout_tx, stdout_rx) = mpsc::channel(8); let (stderr_tx, stderr_rx) = mpsc::channel(8); + let (status_tx, status_rx) = mpsc::channel(8); let (to_worker_tx, mut from_worker_rx) = self .commander @@ -1703,6 +1723,9 @@ impl Container { WorkerMessage::StderrPacket(packet) => { stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */); } + WorkerMessage::CommandStatistics(stats) => { + status_tx.send(stats).await.ok(/* Receiver gone, that's OK */); + } _ => return UnexpectedMessageSnafu.fail(), } }, @@ -1719,6 +1742,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, }) } @@ -1739,6 +1763,7 @@ pub struct ActiveExecution { pub stdin_tx: mpsc::Sender, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, + pub status_rx: mpsc::Receiver, } impl fmt::Debug for ActiveExecution { @@ -1999,6 +2024,7 @@ struct SpawnCargo { stdin_tx: mpsc::Sender, stdout_rx: mpsc::Receiver, stderr_rx: mpsc::Receiver, + status_rx: mpsc::Receiver, } #[derive(Debug, Snafu)] diff --git a/compiler/base/orchestrator/src/message.rs b/compiler/base/orchestrator/src/message.rs index fe728d5b4..226850c7e 100644 --- a/compiler/base/orchestrator/src/message.rs +++ b/compiler/base/orchestrator/src/message.rs @@ -46,6 +46,7 @@ pub enum WorkerMessage { ExecuteCommand(ExecuteCommandResponse), StdoutPacket(String), StderrPacket(String), + CommandStatistics(CommandStatistics), Error(SerializedError), } @@ -73,6 +74,7 @@ impl_narrow_to_broad!( DeleteFile => DeleteFileResponse, ReadFile => ReadFileResponse, ExecuteCommand => ExecuteCommandResponse, + CommandStatistics => CommandStatistics, ); impl_broad_to_narrow_with_error!( @@ -136,6 +138,13 @@ pub struct ExecuteCommandResponse { pub exit_detail: String, } +#[derive(Debug, Serialize, Deserialize)] +pub struct CommandStatistics { + pub total_time_secs: f64, + pub virtual_memory_bytes: u64, + pub resident_set_size_bytes: u64, +} + #[derive(Debug, Serialize, Deserialize)] pub struct SerializedError(pub String); diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs index 9931dccd6..d010a8380 100644 --- a/compiler/base/orchestrator/src/worker.rs +++ b/compiler/base/orchestrator/src/worker.rs @@ -447,6 +447,12 @@ impl ProcessState { } }; + let statistics_task = tokio::task::spawn_blocking({ + let child_id = child.id(); + let worker_msg_tx = worker_msg_tx.clone(); + move || stream_command_statistics(child_id, worker_msg_tx) + }); + let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr); self.kill_tokens.insert(job_id, token.clone()); @@ -455,7 +461,7 @@ impl ProcessState { let stdin_shutdown_tx = self.stdin_shutdown_tx.clone(); async move { worker_msg_tx - .send(process_end(token, child, task_set, stdin_shutdown_tx, job_id).await) + .send(process_end(token, child, task_set, statistics_task, stdin_shutdown_tx, job_id).await) .await .context(UnableToSendExecuteCommandResponseSnafu) } @@ -590,6 +596,7 @@ async fn process_end( token: CancellationToken, mut child: Child, mut task_set: JoinSet>, + statistics_task: tokio::task::JoinHandle>, stdin_shutdown_tx: mpsc::Sender, job_id: JobId, ) -> Result { @@ -613,6 +620,9 @@ async fn process_end( .context(StdioTaskFailedSnafu)?; } + statistics_task.await.context(StatisticsTaskPanickedSnafu)? + .context(StatisticsTaskFailedSnafu)?; + let success = status.success(); let exit_detail = extract_exit_detail(status); @@ -754,6 +764,12 @@ pub enum ProcessError { #[snafu(display("The command's stdio task failed"))] StdioTaskFailed { source: StdioError }, + #[snafu(display("The command's statistics task panicked"))] + StatisticsTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The command's statistics task failed"))] + StatisticsTaskFailed { source: CommandStatisticsError }, + #[snafu(display("Failed to send the command started response to the coordinator"))] UnableToSendExecuteCommandStartedResponse { source: MultiplexingSenderError }, @@ -767,6 +783,85 @@ pub enum ProcessError { ProcessTaskPanicked { source: tokio::task::JoinError }, } +#[cfg(not(target_os = "linux"))] +fn stream_command_statistics( + _child_id: Option, + _worker_msg_tx: MultiplexingSender, +) -> Result<(), CommandStatisticsError> { + Ok(()) +} + +#[cfg(target_os = "linux")] +fn stream_command_statistics( + child_id: Option, + worker_msg_tx: MultiplexingSender, +) -> Result<(), CommandStatisticsError> { + use crate::message::CommandStatistics; + use command_statistics_error::*; + use procfs::process::Process; + use std::time::Duration; + + const STATISTIC_INTERVAL: Duration = Duration::from_secs(1); + + let process_id = child_id.context(ChildIdMissingSnafu)?; + + let process_id = process_id + .try_into() + .context(ProcessIdOutOfRangeSnafu { process_id })?; + let process = Process::new(process_id).context(InvalidProcessSnafu { process_id })?; + + let ticks_per_second = procfs::ticks_per_second(); + let page_size = procfs::page_size(); + + loop { + let Ok(stat) = process.stat() else { + // Process no longer running + break; + }; + let state = stat.state().context(UnableToGetStatSnafu { process_id }); + + let total_time_ticks = stat.utime + stat.stime; + let total_time_secs = total_time_ticks as f64 / ticks_per_second as f64; + + let virtual_memory_bytes = stat.vsize; + let resident_set_size_bytes = stat.rss * page_size; + + let stats = CommandStatistics { + total_time_secs, + virtual_memory_bytes, + resident_set_size_bytes, + }; + + let sent = futures::executor::block_on(worker_msg_tx.send_ok(stats)); + if sent.is_err() { + // No one listening anymore + break; + } + + std::thread::sleep(STATISTIC_INTERVAL); + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CommandStatisticsError { + #[snafu(display("The child did not have a process ID"))] + ChildIdMissing, + + #[snafu(display("The process ID {process_id} could not be converted"))] + ProcessIdOutOfRange { source: std::num::TryFromIntError, process_id: u32 }, + + #[cfg(target_os = "linux")] + #[snafu(display("The process ID {process_id} is not valid"))] + InvalidProcess { source: procfs::ProcError, process_id: i32 }, + + #[cfg(target_os = "linux")] + #[snafu(display("Could not read the stats for process ID {process_id}"))] + UnableToGetStat { source: procfs::ProcError, process_id: i32 }, +} + fn stream_stdio( coordinator_tx: MultiplexingSender, mut stdin_rx: mpsc::Receiver, diff --git a/ui/Cargo.lock b/ui/Cargo.lock index 68bf17db0..633a35f76 100644 --- a/ui/Cargo.lock +++ b/ui/Cargo.lock @@ -549,6 +549,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.11" @@ -1018,6 +1024,7 @@ dependencies = [ "bincode", "futures", "modify-cargo-toml", + "procfs", "serde", "serde_json", "snafu", @@ -1136,6 +1143,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.4.1", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.4.1", + "hex", +] + [[package]] name = "prometheus" version = "0.13.3" diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 43c31d93f..4675e4865 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -589,8 +589,9 @@ async fn handle_execute_inner( stdin_tx, mut stdout_rx, mut stderr_rx, + mut status_rx, } = coordinator - .begin_execute(token.clone(), req) + .begin_execute(token.clone(), req.clone()) .await .context(BeginSnafu)?; @@ -613,6 +614,8 @@ async fn handle_execute_inner( .await }; + let mut reported = false; + let status = loop { tokio::select! { status = &mut task => break status, @@ -644,6 +647,13 @@ async fn handle_execute_inner( let sent = send_stderr(stderr).await; abandon_if_closed!(sent); }, + + Some(status) = status_rx.recv() => { + if !reported && status.total_time_secs > 60.0 { + error!("Request consumed more than 60s of CPU time: {req:?}"); + reported = true; + } + } } };