diff --git a/cli/src/fail_fast.rs b/cli/src/fail_fast.rs deleted file mode 100644 index 4e3e4c28d..000000000 --- a/cli/src/fail_fast.rs +++ /dev/null @@ -1,23 +0,0 @@ -//! Manages the state of fail fast option when development commands are running. - -use std::sync::OnceLock; - -static FAIL_FAST_STATE: OnceLock = OnceLock::new(); - -/// Sets the value for fail fast for jobs. -/// -/// # Panics -/// This function panics if fail fast value already have been set or retrieved -/// before the function call -pub fn set_fail_fast(fail_fast: bool) { - if FAIL_FAST_STATE.set(fail_fast).is_err() { - panic!("Fail fast must be set once only"); - } -} - -/// Gets if the processes should be cancelled once any task fails. -/// This function will set the value of fail fast to false if it doesn't -/// contain a value before. -pub fn fail_fast() -> bool { - *FAIL_FAST_STATE.get_or_init(|| false) -} diff --git a/cli/src/jobs_runner/cancellation.rs b/cli/src/jobs_runner/cancellation.rs deleted file mode 100644 index 56e7df37e..000000000 --- a/cli/src/jobs_runner/cancellation.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! Manage cancellation of the running jobs, providing method for gracefully shutdown in case of -//! cancelling or fail-fast. - -use std::{sync::OnceLock, time::Duration}; - -use tokio::time::timeout; -use tokio_util::{sync::CancellationToken, task::TaskTracker}; - -use crate::tracker::get_tracker; - -/// Duration to wait for jobs when cancellation is invoked. -pub const TIMEOUT_DURATION: Duration = Duration::from_secs(2); - -/// Returns a reference to the shared [`CancellationToken`] across all the tasks in process -pub fn cancellation_token() -> &'static CancellationToken { - static CANCELLATION_TOKEN: OnceLock = OnceLock::new(); - - CANCELLATION_TOKEN.get_or_init(CancellationToken::default) -} - -/// Returns a reference to the [`TaskTracker`] that will be used to spawn all -/// the task across the process. -pub fn task_tracker() -> &'static TaskTracker { - static TASK_TRACKER: OnceLock = OnceLock::new(); - - TASK_TRACKER.get_or_init(TaskTracker::new) -} - -/// Closes the tasks trackers and waits for spawned jobs to exit gracefully within the [`TIMEOUT_DURATION`] -pub async fn graceful_shutdown() { - let task_tracker = task_tracker(); - task_tracker.close(); - - if timeout(TIMEOUT_DURATION, task_tracker.wait()) - .await - .is_err() - { - let tracker = get_tracker(); - tracker.print("Graceful shutdown timed out"); - } -} diff --git a/cli/src/jobs_runner/jobs_state.rs b/cli/src/jobs_runner/jobs_state.rs new file mode 100644 index 000000000..d700861d7 --- /dev/null +++ b/cli/src/jobs_runner/jobs_state.rs @@ -0,0 +1,83 @@ +//! Manages the state of the running jobs, keeping track on all their spawned tasks, providing +//! method to gracefully close them within a timeout and keeping info if the task should fail fast. + +use std::{sync::OnceLock, time::Duration}; + +use tokio::time::timeout; +use tokio_util::{sync::CancellationToken, task::TaskTracker}; + +use crate::tracker::get_tracker; + +/// Duration to wait for jobs when cancellation is invoked. +pub const TIMEOUT_DURATION: Duration = Duration::from_secs(2); + +/// [`JobsState`] singleton +static JOBS_STATE: OnceLock = OnceLock::new(); + +/// Manages the state of the running jobs, keeping track on all their spawned tasks, providing +/// method to gracefully close them within a timeout. +/// It keeps the info in task should fail fast too. +#[derive(Debug)] +pub struct JobsState { + cancellation_token: CancellationToken, + task_tracker: TaskTracker, + fail_fast: bool, +} + +impl JobsState { + fn new(fail_fast: bool) -> Self { + Self { + cancellation_token: CancellationToken::new(), + task_tracker: TaskTracker::new(), + fail_fast, + } + } + + /// Provides a reference for [`JobsState`] struct, initializing it with default values + /// if not initializing before. + pub fn get() -> &'static JobsState { + JOBS_STATE.get_or_init(|| JobsState::new(false)) + } + + /// Initialize jobs state struct setting the fail fast option + /// + /// # Panics + /// This function panics if [`JobsState`] already have been initialized or retrieved + /// before the function call + pub fn init(fail_fast: bool) { + JOBS_STATE + .set(JobsState::new(fail_fast)) + .expect("Jobs state can't be initialized twice"); + } + + /// Returns a reference to the shared [`CancellationToken`] across all the tasks in process + pub fn cancellation_token(&self) -> &CancellationToken { + &self.cancellation_token + } + + /// Returns a reference to the [`TaskTracker`] that will be used to spawn all + /// the task across the process. + pub fn task_tracker(&self) -> &TaskTracker { + &self.task_tracker + } + + /// Closes the tasks trackers and waits for spawned jobs to exit gracefully within the [`TIMEOUT_DURATION`] + pub async fn graceful_shutdown(&self) { + self.task_tracker.close(); + + if timeout(TIMEOUT_DURATION, self.task_tracker.wait()) + .await + .is_err() + { + let tracker = get_tracker(); + tracker.print("Graceful shutdown timed out"); + } + } + + /// Gets if the processes should be cancelled once any task fails. + /// This function will set the value of fail fast to false if it doesn't + /// contain a value before. + pub fn fail_fast(&self) -> bool { + self.fail_fast + } +} diff --git a/cli/src/jobs_runner/mod.rs b/cli/src/jobs_runner/mod.rs index 131bec6ef..a00096698 100644 --- a/cli/src/jobs_runner/mod.rs +++ b/cli/src/jobs_runner/mod.rs @@ -1,11 +1,10 @@ //! Manages running the provided main job for the given targets after resolving the job //! dependencies, then it manages running them concurrently when possible. -pub mod cancellation; mod job_definition; pub mod jobs_resolver; +pub mod jobs_state; -use cancellation::{cancellation_token, graceful_shutdown, task_tracker}; use std::{collections::BTreeMap, ops::Not}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; @@ -14,12 +13,12 @@ pub use job_definition::JobDefinition; use crate::{ checksum_records::{ChecksumCompareResult, ChecksumRecords}, cli_args::UiMode, - fail_fast::fail_fast, job_type::JobType, log_print::{print_log_separator, print_report}, spawner::SpawnResult, target::Target, tracker::get_tracker, + JobsState, }; use anyhow::Result; @@ -93,13 +92,15 @@ pub async fn run(targets: &[Target], main_job: JobType) -> Result Result, failed_jobs: &[Target], ) -> Result<()> { - let task_tracker = task_tracker(); + let jobs_state = JobsState::get(); + let task_tracker = jobs_state.task_tracker(); for (job_def, phase) in jobs_status.iter_mut() { let JobPhase::Awaiting(deps) = phase else { continue; @@ -207,7 +209,9 @@ fn spawn_jobs( None => panic!("Spawned jobs already resolved and must have return value."), }; - if sender.send((job_def, result)).is_err() && !cancellation_token().is_cancelled() { + if sender.send((job_def, result)).is_err() + && !jobs_state.cancellation_token().is_cancelled() + { let tracker = get_tracker(); tracker.print(format!( "Error: Job results can't be sent to receiver. Job: {job_def:?}" diff --git a/cli/src/main.rs b/cli/src/main.rs index ce5936ca5..d34079076 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -6,7 +6,6 @@ mod chipmunk_runner; mod cli_args; mod dev_environment; mod dev_tools; -mod fail_fast; mod fstools; mod job_type; mod jobs_runner; @@ -25,9 +24,7 @@ use clap::Parser; use cli_args::{CargoCli, Command, UiMode}; use console::style; use dev_environment::{print_env_info, resolve_dev_tools}; -use fail_fast::set_fail_fast; use job_type::JobType; -use jobs_runner::cancellation::{cancellation_token, graceful_shutdown}; use location::init_location; use log_print::{print_log_separator, print_report}; use release::do_release; @@ -35,6 +32,8 @@ use target::Target; use tokio::signal; use tracker::{get_tracker, init_tracker}; +pub use jobs_runner::jobs_state::JobsState; + use crate::cli_args::EnvironmentCommand; #[tokio::main] @@ -54,9 +53,10 @@ async fn main() -> Result<(), Error> { return main_res } _ = signal::ctrl_c() => { + let jobs_state = JobsState::get(); // Cancel all the running tasks and wait for them to return. - cancellation_token().cancel(); - graceful_shutdown().await; + jobs_state.cancellation_token().cancel(); + jobs_state.graceful_shutdown().await; // Shutdown the tracker channels. let tracker = get_tracker(); @@ -99,7 +99,7 @@ async fn main_process(command: Command) -> Result<(), Error> { fail_fast, ui_mode, } => { - set_fail_fast(fail_fast); + JobsState::init(fail_fast); init_tracker(ui_mode); resolve_dev_tools()?; let targets = get_targets_or_all(target); @@ -112,7 +112,7 @@ async fn main_process(command: Command) -> Result<(), Error> { fail_fast, ui_mode, } => { - set_fail_fast(fail_fast); + JobsState::init(fail_fast); init_tracker(ui_mode); resolve_dev_tools()?; let targets = get_targets_or_all(target); @@ -120,7 +120,7 @@ async fn main_process(command: Command) -> Result<(), Error> { (JobType::Build { production }, results) } Command::Clean { target, ui_mode } => { - set_fail_fast(false); + JobsState::init(false); init_tracker(ui_mode); resolve_dev_tools()?; let targets = get_targets_or_all(target); @@ -133,7 +133,7 @@ async fn main_process(command: Command) -> Result<(), Error> { fail_fast, ui_mode, } => { - set_fail_fast(fail_fast); + JobsState::init(fail_fast); init_tracker(ui_mode); resolve_dev_tools()?; let targets = get_targets_or_all(target); @@ -144,7 +144,7 @@ async fn main_process(command: Command) -> Result<(), Error> { production, no_fail_fast, } => { - set_fail_fast(!no_fail_fast); + JobsState::init(!no_fail_fast); init_tracker(Default::default()); resolve_dev_tools()?; let results = jobs_runner::run(&[Target::App], JobType::Build { production }).await?; @@ -173,7 +173,7 @@ async fn main_process(command: Command) -> Result<(), Error> { fail_fast, development, } => { - set_fail_fast(fail_fast); + JobsState::init(fail_fast); let ui_mode = if verbose { UiMode::PrintImmediately } else { diff --git a/cli/src/spawner.rs b/cli/src/spawner.rs index f201ebf32..f8b7d2cea 100644 --- a/cli/src/spawner.rs +++ b/cli/src/spawner.rs @@ -2,10 +2,8 @@ //! their output and status to the `Tracker`. use crate::{ - jobs_runner::{cancellation::cancellation_token, JobDefinition}, - location::get_root, - target::ProcessCommand, - tracker::get_tracker, + jobs_runner::JobDefinition, location::get_root, target::ProcessCommand, tracker::get_tracker, + JobsState, }; use anyhow::Context; use core::panic; @@ -125,7 +123,7 @@ pub async fn spawn( let mut stdout_buf = BufReader::new(stdout); let mut stderr_buf = BufReader::new(stderr); - let cancel = cancellation_token(); + let cancel = JobsState::get().cancellation_token(); loop { let mut stdout_line = String::new(); let mut stderr_line = String::new(); diff --git a/cli/src/target/wrapper.rs b/cli/src/target/wrapper.rs index 36f8afaa2..beb966934 100644 --- a/cli/src/target/wrapper.rs +++ b/cli/src/target/wrapper.rs @@ -5,8 +5,9 @@ use anyhow::Context; use crate::{ fstools, job_type::JobType, - jobs_runner::{cancellation::cancellation_token, JobDefinition}, + jobs_runner::JobDefinition, spawner::{spawn, spawn_blocking, SpawnResult}, + JobsState, }; use super::{ProcessCommand, Target}; @@ -95,7 +96,7 @@ pub async fn run_test(production: bool) -> Result { let specs_dir_path: PathBuf = ["spec", "build", "spec"].iter().collect(); - let cancel = cancellation_token(); + let cancel = JobsState::get().cancellation_token(); for spec in TEST_SPECS { let spec_file_name = format!("session.{spec}.spec.js");