diff --git a/compiler/base/orchestrator/Cargo.lock b/compiler/base/orchestrator/Cargo.lock index d3aa01369..ab0fbe4ff 100644 --- a/compiler/base/orchestrator/Cargo.lock +++ b/compiler/base/orchestrator/Cargo.lock @@ -78,6 +78,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "327762f6e5a765692301e5bb513e0d9fef63be86bbc14528052b1cd3e6f03e07" + [[package]] name = "bytes" version = "1.5.0" @@ -111,6 +117,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" +[[package]] +name = "errno" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -218,6 +234,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "indexmap" version = "2.1.0" @@ -246,6 +268,12 @@ version = "0.2.150" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +[[package]] +name = "linux-raw-sys" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4cd1a83af159aa67994778be9070f0ae1bd732942279cabb14f86f986a21456" + [[package]] name = "log" version = "0.4.20" @@ -275,7 +303,7 @@ checksum = "3dce281c5e46beae905d4de1870d8b1509a9142b62eedf18b443b011ca8343d0" dependencies = [ "libc", "wasi", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -323,6 +351,7 @@ dependencies = [ "futures", "modify-cargo-toml", "once_cell", + "procfs", "serde", "serde_json", "snafu", @@ -392,6 +421,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags", + "hex", +] + [[package]] name = "quote" version = "1.0.33" @@ -482,6 +534,19 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustix" +version = "0.38.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9470c4bf8246c8daf25f9598dca807fb6510347b1e1cfa55749113850c79d88a" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.52.0", +] + [[package]] name = "ryu" version = "1.0.15" @@ -640,7 +705,7 @@ dependencies = [ "pin-project-lite", "signal-hook-registry", "tokio-macros", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -815,7 +880,16 @@ version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ - "windows-targets", + "windows-targets 0.48.5", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", ] [[package]] @@ -824,13 +898,28 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", ] [[package]] @@ -839,42 +928,84 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" diff --git a/compiler/base/orchestrator/Cargo.toml b/compiler/base/orchestrator/Cargo.toml index f79af0d14..736d838b1 100644 --- a/compiler/base/orchestrator/Cargo.toml +++ b/compiler/base/orchestrator/Cargo.toml @@ -20,6 +20,9 @@ tokio-util = { version = "0.7.8", default-features = false, features = ["io", "i toml = { version = "0.8.2", default-features = false, features = ["parse", "display"] } tracing = { version = "0.1.37", default-features = false, features = ["attributes"] } +[target.'cfg(target_os = "linux")'.dependencies] +procfs = { version = "0.16.0", default-features = false } + [dev-dependencies] assert_matches = "1.5.0" assertables = "7.0.1" diff --git a/compiler/base/orchestrator/src/coordinator.rs b/compiler/base/orchestrator/src/coordinator.rs index 4f84a4213..01c1c1c9e 100644 --- a/compiler/base/orchestrator/src/coordinator.rs +++ b/compiler/base/orchestrator/src/coordinator.rs @@ -29,9 +29,9 @@ use tracing::{instrument, trace, trace_span, warn, Instrument}; use crate::{ bincode_input_closed, message::{ - CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest, ExecuteCommandResponse, - JobId, Multiplexed, OneToOneResponse, ReadFileRequest, ReadFileResponse, SerializedError, - WorkerMessage, WriteFileRequest, + CommandStatistics, CoordinatorMessage, DeleteFileRequest, ExecuteCommandRequest, + ExecuteCommandResponse, JobId, Multiplexed, OneToOneResponse, ReadFileRequest, + ReadFileResponse, SerializedError, WorkerMessage, WriteFileRequest, }, DropErrorDetailsExt, }; @@ -1164,8 +1164,12 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = v; + drop(stdin_tx); + drop(status_rx); + let task = async { task.await?.map_err(VersionError::from) }; let o = WithOutput::try_absorb(task, stdout_rx, stderr_rx).await?; Ok(if o.success { Some(o.stdout) } else { None }) @@ -1191,9 +1195,12 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self.begin_execute(token, request).await?; drop(stdin_tx); + drop(status_rx); + WithOutput::try_absorb(task, stdout_rx, stderr_rx).await } @@ -1225,6 +1232,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await @@ -1250,6 +1258,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, }) } @@ -1301,12 +1310,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let commander = self.commander.clone(); let task = async move { @@ -1391,12 +1402,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let commander = self.commander.clone(); let task = async move { @@ -1471,12 +1484,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1540,12 +1555,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1612,12 +1629,14 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, } = self .spawn_cargo_task(token, execute_cargo) .await .context(CouldNotStartCargoSnafu)?; drop(stdin_tx); + drop(status_rx); let task = async move { let ExecuteCommandResponse { @@ -1652,6 +1671,7 @@ impl Container { let (stdin_tx, mut stdin_rx) = mpsc::channel(8); let (stdout_tx, stdout_rx) = mpsc::channel(8); let (stderr_tx, stderr_rx) = mpsc::channel(8); + let (status_tx, status_rx) = mpsc::channel(8); let (to_worker_tx, mut from_worker_rx) = self .commander @@ -1703,6 +1723,9 @@ impl Container { WorkerMessage::StderrPacket(packet) => { stderr_tx.send(packet).await.ok(/* Receiver gone, that's OK */); } + WorkerMessage::CommandStatistics(stats) => { + status_tx.send(stats).await.ok(/* Receiver gone, that's OK */); + } _ => return UnexpectedMessageSnafu.fail(), } }, @@ -1719,6 +1742,7 @@ impl Container { stdin_tx, stdout_rx, stderr_rx, + status_rx, }) } @@ -1739,6 +1763,7 @@ pub struct ActiveExecution { pub stdin_tx: mpsc::Sender, pub stdout_rx: mpsc::Receiver, pub stderr_rx: mpsc::Receiver, + pub status_rx: mpsc::Receiver, } impl fmt::Debug for ActiveExecution { @@ -1999,6 +2024,7 @@ struct SpawnCargo { stdin_tx: mpsc::Sender, stdout_rx: mpsc::Receiver, stderr_rx: mpsc::Receiver, + status_rx: mpsc::Receiver, } #[derive(Debug, Snafu)] diff --git a/compiler/base/orchestrator/src/message.rs b/compiler/base/orchestrator/src/message.rs index fe728d5b4..226850c7e 100644 --- a/compiler/base/orchestrator/src/message.rs +++ b/compiler/base/orchestrator/src/message.rs @@ -46,6 +46,7 @@ pub enum WorkerMessage { ExecuteCommand(ExecuteCommandResponse), StdoutPacket(String), StderrPacket(String), + CommandStatistics(CommandStatistics), Error(SerializedError), } @@ -73,6 +74,7 @@ impl_narrow_to_broad!( DeleteFile => DeleteFileResponse, ReadFile => ReadFileResponse, ExecuteCommand => ExecuteCommandResponse, + CommandStatistics => CommandStatistics, ); impl_broad_to_narrow_with_error!( @@ -136,6 +138,13 @@ pub struct ExecuteCommandResponse { pub exit_detail: String, } +#[derive(Debug, Serialize, Deserialize)] +pub struct CommandStatistics { + pub total_time_secs: f64, + pub virtual_memory_bytes: u64, + pub resident_set_size_bytes: u64, +} + #[derive(Debug, Serialize, Deserialize)] pub struct SerializedError(pub String); diff --git a/compiler/base/orchestrator/src/worker.rs b/compiler/base/orchestrator/src/worker.rs index 9931dccd6..d010a8380 100644 --- a/compiler/base/orchestrator/src/worker.rs +++ b/compiler/base/orchestrator/src/worker.rs @@ -447,6 +447,12 @@ impl ProcessState { } }; + let statistics_task = tokio::task::spawn_blocking({ + let child_id = child.id(); + let worker_msg_tx = worker_msg_tx.clone(); + move || stream_command_statistics(child_id, worker_msg_tx) + }); + let task_set = stream_stdio(worker_msg_tx.clone(), stdin_rx, stdin, stdout, stderr); self.kill_tokens.insert(job_id, token.clone()); @@ -455,7 +461,7 @@ impl ProcessState { let stdin_shutdown_tx = self.stdin_shutdown_tx.clone(); async move { worker_msg_tx - .send(process_end(token, child, task_set, stdin_shutdown_tx, job_id).await) + .send(process_end(token, child, task_set, statistics_task, stdin_shutdown_tx, job_id).await) .await .context(UnableToSendExecuteCommandResponseSnafu) } @@ -590,6 +596,7 @@ async fn process_end( token: CancellationToken, mut child: Child, mut task_set: JoinSet>, + statistics_task: tokio::task::JoinHandle>, stdin_shutdown_tx: mpsc::Sender, job_id: JobId, ) -> Result { @@ -613,6 +620,9 @@ async fn process_end( .context(StdioTaskFailedSnafu)?; } + statistics_task.await.context(StatisticsTaskPanickedSnafu)? + .context(StatisticsTaskFailedSnafu)?; + let success = status.success(); let exit_detail = extract_exit_detail(status); @@ -754,6 +764,12 @@ pub enum ProcessError { #[snafu(display("The command's stdio task failed"))] StdioTaskFailed { source: StdioError }, + #[snafu(display("The command's statistics task panicked"))] + StatisticsTaskPanicked { source: tokio::task::JoinError }, + + #[snafu(display("The command's statistics task failed"))] + StatisticsTaskFailed { source: CommandStatisticsError }, + #[snafu(display("Failed to send the command started response to the coordinator"))] UnableToSendExecuteCommandStartedResponse { source: MultiplexingSenderError }, @@ -767,6 +783,85 @@ pub enum ProcessError { ProcessTaskPanicked { source: tokio::task::JoinError }, } +#[cfg(not(target_os = "linux"))] +fn stream_command_statistics( + _child_id: Option, + _worker_msg_tx: MultiplexingSender, +) -> Result<(), CommandStatisticsError> { + Ok(()) +} + +#[cfg(target_os = "linux")] +fn stream_command_statistics( + child_id: Option, + worker_msg_tx: MultiplexingSender, +) -> Result<(), CommandStatisticsError> { + use crate::message::CommandStatistics; + use command_statistics_error::*; + use procfs::process::Process; + use std::time::Duration; + + const STATISTIC_INTERVAL: Duration = Duration::from_secs(1); + + let process_id = child_id.context(ChildIdMissingSnafu)?; + + let process_id = process_id + .try_into() + .context(ProcessIdOutOfRangeSnafu { process_id })?; + let process = Process::new(process_id).context(InvalidProcessSnafu { process_id })?; + + let ticks_per_second = procfs::ticks_per_second(); + let page_size = procfs::page_size(); + + loop { + let Ok(stat) = process.stat() else { + // Process no longer running + break; + }; + let state = stat.state().context(UnableToGetStatSnafu { process_id }); + + let total_time_ticks = stat.utime + stat.stime; + let total_time_secs = total_time_ticks as f64 / ticks_per_second as f64; + + let virtual_memory_bytes = stat.vsize; + let resident_set_size_bytes = stat.rss * page_size; + + let stats = CommandStatistics { + total_time_secs, + virtual_memory_bytes, + resident_set_size_bytes, + }; + + let sent = futures::executor::block_on(worker_msg_tx.send_ok(stats)); + if sent.is_err() { + // No one listening anymore + break; + } + + std::thread::sleep(STATISTIC_INTERVAL); + } + + Ok(()) +} + +#[derive(Debug, Snafu)] +#[snafu(module)] +pub enum CommandStatisticsError { + #[snafu(display("The child did not have a process ID"))] + ChildIdMissing, + + #[snafu(display("The process ID {process_id} could not be converted"))] + ProcessIdOutOfRange { source: std::num::TryFromIntError, process_id: u32 }, + + #[cfg(target_os = "linux")] + #[snafu(display("The process ID {process_id} is not valid"))] + InvalidProcess { source: procfs::ProcError, process_id: i32 }, + + #[cfg(target_os = "linux")] + #[snafu(display("Could not read the stats for process ID {process_id}"))] + UnableToGetStat { source: procfs::ProcError, process_id: i32 }, +} + fn stream_stdio( coordinator_tx: MultiplexingSender, mut stdin_rx: mpsc::Receiver, diff --git a/ui/Cargo.lock b/ui/Cargo.lock index 68bf17db0..633a35f76 100644 --- a/ui/Cargo.lock +++ b/ui/Cargo.lock @@ -549,6 +549,12 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "0.2.11" @@ -1018,6 +1024,7 @@ dependencies = [ "bincode", "futures", "modify-cargo-toml", + "procfs", "serde", "serde_json", "snafu", @@ -1136,6 +1143,29 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.4.1", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.4.1", + "hex", +] + [[package]] name = "prometheus" version = "0.13.3" diff --git a/ui/src/server_axum/websocket.rs b/ui/src/server_axum/websocket.rs index 43c31d93f..4675e4865 100644 --- a/ui/src/server_axum/websocket.rs +++ b/ui/src/server_axum/websocket.rs @@ -589,8 +589,9 @@ async fn handle_execute_inner( stdin_tx, mut stdout_rx, mut stderr_rx, + mut status_rx, } = coordinator - .begin_execute(token.clone(), req) + .begin_execute(token.clone(), req.clone()) .await .context(BeginSnafu)?; @@ -613,6 +614,8 @@ async fn handle_execute_inner( .await }; + let mut reported = false; + let status = loop { tokio::select! { status = &mut task => break status, @@ -644,6 +647,13 @@ async fn handle_execute_inner( let sent = send_stderr(stderr).await; abandon_if_closed!(sent); }, + + Some(status) = status_rx.recv() => { + if !reported && status.total_time_secs > 60.0 { + error!("Request consumed more than 60s of CPU time: {req:?}"); + reported = true; + } + } } };