Skip to content

Commit

Permalink
Build CLI: Combine jobs state singletons into one
Browse files Browse the repository at this point in the history
Manage keeping track on spawned jobs, cancellation, shutting down and
fast fail in one struct that is used as a singleton.
  • Loading branch information
AmmarAbouZor committed Aug 29, 2024
1 parent 0f7dac8 commit 9b65024
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 91 deletions.
23 changes: 0 additions & 23 deletions cli/src/fail_fast.rs

This file was deleted.

41 changes: 0 additions & 41 deletions cli/src/jobs_runner/cancellation.rs

This file was deleted.

83 changes: 83 additions & 0 deletions cli/src/jobs_runner/jobs_state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Manages the state of the running jobs, keeping track on all their spawned tasks, providing
//! method to gracefully close them within a timeout and keeping info if the task should fail fast.

use std::{sync::OnceLock, time::Duration};

use tokio::time::timeout;
use tokio_util::{sync::CancellationToken, task::TaskTracker};

use crate::tracker::get_tracker;

/// Duration to wait for jobs when cancellation is invoked.
pub const TIMEOUT_DURATION: Duration = Duration::from_secs(2);

/// [`JobsState`] singleton
static JOBS_STATE: OnceLock<JobsState> = OnceLock::new();

/// Manages the state of the running jobs, keeping track on all their spawned tasks, providing
/// method to gracefully close them within a timeout.
/// It keeps the info in task should fail fast too.
#[derive(Debug)]
pub struct JobsState {
cancellation_token: CancellationToken,
task_tracker: TaskTracker,
fail_fast: bool,
}

impl JobsState {
fn new(fail_fast: bool) -> Self {
Self {
cancellation_token: CancellationToken::new(),
task_tracker: TaskTracker::new(),
fail_fast,
}
}

/// Provides a reference for [`JobsState`] struct, initializing it with default values
/// if not initializing before.
pub fn get() -> &'static JobsState {
JOBS_STATE.get_or_init(|| JobsState::new(false))
}

/// Initialize jobs state struct setting the fail fast option
///
/// # Panics
/// This function panics if [`JobsState`] already have been initialized or retrieved
/// before the function call
pub fn init(fail_fast: bool) {
JOBS_STATE
.set(JobsState::new(fail_fast))
.expect("Jobs state can't be initialized twice");
}

/// Returns a reference to the shared [`CancellationToken`] across all the tasks in process
pub fn cancellation_token(&self) -> &CancellationToken {
&self.cancellation_token
}

/// Returns a reference to the [`TaskTracker`] that will be used to spawn all
/// the task across the process.
pub fn task_tracker(&self) -> &TaskTracker {
&self.task_tracker
}

/// Closes the tasks trackers and waits for spawned jobs to exit gracefully within the [`TIMEOUT_DURATION`]
pub async fn graceful_shutdown(&self) {
self.task_tracker.close();

if timeout(TIMEOUT_DURATION, self.task_tracker.wait())
.await
.is_err()
{
let tracker = get_tracker();
tracker.print("Graceful shutdown timed out");
}
}

/// Gets if the processes should be cancelled once any task fails.
/// This function will set the value of fail fast to false if it doesn't
/// contain a value before.
pub fn fail_fast(&self) -> bool {
self.fail_fast
}
}
22 changes: 13 additions & 9 deletions cli/src/jobs_runner/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Manages running the provided main job for the given targets after resolving the job
//! dependencies, then it manages running them concurrently when possible.

pub mod cancellation;
mod job_definition;
pub mod jobs_resolver;
pub mod jobs_state;

use cancellation::{cancellation_token, graceful_shutdown, task_tracker};
use std::{collections::BTreeMap, ops::Not};
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};

Expand All @@ -14,12 +13,12 @@ pub use job_definition::JobDefinition;
use crate::{
checksum_records::{ChecksumCompareResult, ChecksumRecords},
cli_args::UiMode,
fail_fast::fail_fast,
job_type::JobType,
log_print::{print_log_separator, print_report},
spawner::SpawnResult,
target::Target,
tracker::get_tracker,
JobsState,
};

use anyhow::Result;
Expand Down Expand Up @@ -93,13 +92,15 @@ pub async fn run(targets: &[Target], main_job: JobType) -> Result<SpawnResultsCo

results.push(result);

let jobs_state = JobsState::get();

if failed {
failed_jobs.push(job_def.target);

if fail_fast() {
cancellation_token().cancel();
if jobs_state.fail_fast() {
jobs_state.cancellation_token().cancel();

graceful_shutdown().await;
jobs_state.graceful_shutdown().await;

return Ok(results);
}
Expand Down Expand Up @@ -132,7 +133,7 @@ pub async fn run(targets: &[Target], main_job: JobType) -> Result<SpawnResultsCo
}

// Skip spawning new jobs if cancel is already invoked.
if cancellation_token().is_cancelled() {
if jobs_state.cancellation_token().is_cancelled() {
return Ok(results);
}

Expand All @@ -155,7 +156,8 @@ fn spawn_jobs(
checksum_compare_map: &mut BTreeMap<Target, ChecksumCompareResult>,
failed_jobs: &[Target],
) -> Result<()> {
let task_tracker = task_tracker();
let jobs_state = JobsState::get();
let task_tracker = jobs_state.task_tracker();
for (job_def, phase) in jobs_status.iter_mut() {
let JobPhase::Awaiting(deps) = phase else {
continue;
Expand Down Expand Up @@ -207,7 +209,9 @@ fn spawn_jobs(
None => panic!("Spawned jobs already resolved and must have return value."),
};

if sender.send((job_def, result)).is_err() && !cancellation_token().is_cancelled() {
if sender.send((job_def, result)).is_err()
&& !jobs_state.cancellation_token().is_cancelled()
{
let tracker = get_tracker();
tracker.print(format!(
"Error: Job results can't be sent to receiver. Job: {job_def:?}"
Expand Down
22 changes: 11 additions & 11 deletions cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ mod chipmunk_runner;
mod cli_args;
mod dev_environment;
mod dev_tools;
mod fail_fast;
mod fstools;
mod job_type;
mod jobs_runner;
Expand All @@ -25,16 +24,16 @@ use clap::Parser;
use cli_args::{CargoCli, Command, UiMode};
use console::style;
use dev_environment::{print_env_info, resolve_dev_tools};
use fail_fast::set_fail_fast;
use job_type::JobType;
use jobs_runner::cancellation::{cancellation_token, graceful_shutdown};
use location::init_location;
use log_print::{print_log_separator, print_report};
use release::do_release;
use target::Target;
use tokio::signal;
use tracker::{get_tracker, init_tracker};

pub use jobs_runner::jobs_state::JobsState;

use crate::cli_args::EnvironmentCommand;

#[tokio::main]
Expand All @@ -54,9 +53,10 @@ async fn main() -> Result<(), Error> {
return main_res
}
_ = signal::ctrl_c() => {
let jobs_state = JobsState::get();
// Cancel all the running tasks and wait for them to return.
cancellation_token().cancel();
graceful_shutdown().await;
jobs_state.cancellation_token().cancel();
jobs_state.graceful_shutdown().await;

// Shutdown the tracker channels.
let tracker = get_tracker();
Expand Down Expand Up @@ -99,7 +99,7 @@ async fn main_process(command: Command) -> Result<(), Error> {
fail_fast,
ui_mode,
} => {
set_fail_fast(fail_fast);
JobsState::init(fail_fast);
init_tracker(ui_mode);
resolve_dev_tools()?;
let targets = get_targets_or_all(target);
Expand All @@ -112,15 +112,15 @@ async fn main_process(command: Command) -> Result<(), Error> {
fail_fast,
ui_mode,
} => {
set_fail_fast(fail_fast);
JobsState::init(fail_fast);
init_tracker(ui_mode);
resolve_dev_tools()?;
let targets = get_targets_or_all(target);
let results = jobs_runner::run(&targets, JobType::Build { production }).await?;
(JobType::Build { production }, results)
}
Command::Clean { target, ui_mode } => {
set_fail_fast(false);
JobsState::init(false);
init_tracker(ui_mode);
resolve_dev_tools()?;
let targets = get_targets_or_all(target);
Expand All @@ -133,7 +133,7 @@ async fn main_process(command: Command) -> Result<(), Error> {
fail_fast,
ui_mode,
} => {
set_fail_fast(fail_fast);
JobsState::init(fail_fast);
init_tracker(ui_mode);
resolve_dev_tools()?;
let targets = get_targets_or_all(target);
Expand All @@ -144,7 +144,7 @@ async fn main_process(command: Command) -> Result<(), Error> {
production,
no_fail_fast,
} => {
set_fail_fast(!no_fail_fast);
JobsState::init(!no_fail_fast);
init_tracker(Default::default());
resolve_dev_tools()?;
let results = jobs_runner::run(&[Target::App], JobType::Build { production }).await?;
Expand Down Expand Up @@ -173,7 +173,7 @@ async fn main_process(command: Command) -> Result<(), Error> {
fail_fast,
development,
} => {
set_fail_fast(fail_fast);
JobsState::init(fail_fast);
let ui_mode = if verbose {
UiMode::PrintImmediately
} else {
Expand Down
8 changes: 3 additions & 5 deletions cli/src/spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
//! their output and status to the `Tracker`.

use crate::{
jobs_runner::{cancellation::cancellation_token, JobDefinition},
location::get_root,
target::ProcessCommand,
tracker::get_tracker,
jobs_runner::JobDefinition, location::get_root, target::ProcessCommand, tracker::get_tracker,
JobsState,
};
use anyhow::Context;
use core::panic;
Expand Down Expand Up @@ -125,7 +123,7 @@ pub async fn spawn(
let mut stdout_buf = BufReader::new(stdout);
let mut stderr_buf = BufReader::new(stderr);

let cancel = cancellation_token();
let cancel = JobsState::get().cancellation_token();
loop {
let mut stdout_line = String::new();
let mut stderr_line = String::new();
Expand Down
5 changes: 3 additions & 2 deletions cli/src/target/wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ use anyhow::Context;
use crate::{
fstools,
job_type::JobType,
jobs_runner::{cancellation::cancellation_token, JobDefinition},
jobs_runner::JobDefinition,
spawner::{spawn, spawn_blocking, SpawnResult},
JobsState,
};

use super::{ProcessCommand, Target};
Expand Down Expand Up @@ -95,7 +96,7 @@ pub async fn run_test(production: bool) -> Result<SpawnResult, anyhow::Error> {

let specs_dir_path: PathBuf = ["spec", "build", "spec"].iter().collect();

let cancel = cancellation_token();
let cancel = JobsState::get().cancellation_token();

for spec in TEST_SPECS {
let spec_file_name = format!("session.{spec}.spec.js");
Expand Down

0 comments on commit 9b65024

Please sign in to comment.