From 25694344f16f9d678f713312b78950250ffd35cb Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Mon, 2 Oct 2023 17:19:34 +0300 Subject: [PATCH 01/13] feat: `--no-watch` flag --- src/actors/command.rs | 10 ++++++++-- src/args.rs | 8 ++++++++ src/main.rs | 1 + 3 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/actors/command.rs b/src/actors/command.rs index 13f8c8f..ede9f60 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -114,6 +114,7 @@ pub struct CommandActor { pipes: Vec, colors: Vec, entrypoint: Option, + no_watch: bool, } impl CommandActor { @@ -125,6 +126,7 @@ impl CommandActor { verbose: bool, pipes_map: HashMap>, colors_map: HashMap>, + global_no_watch: bool, ) -> Result>> { let mut shared_env = HashMap::from_iter(std::env::vars()); shared_env.extend(lade_sdk::resolve(&config.env, &shared_env)?); @@ -173,6 +175,8 @@ impl CommandActor { task_pipes, colors, op.entrypoint.clone(), + // TODO: consider introducing per command no_watch config + global_no_watch, ) .start(); @@ -198,6 +202,7 @@ impl CommandActor { pipes: Vec, colors: Vec, entrypoint: Option, + no_watch: bool, ) -> Self { Self { op_name, @@ -216,6 +221,7 @@ impl CommandActor { pipes, colors, entrypoint, + no_watch, } } @@ -426,7 +432,7 @@ impl Actor for CommandActor { let watches = self.operator.watch.resolve(); - if !watches.is_empty() { + if !self.no_watch && !watches.is_empty() { let mut on = GlobSetBuilder::new(); for pattern in self.operator.watch.resolve() { on.add( @@ -527,7 +533,7 @@ impl Handler for CommandActor { self.send_will_reload(); } Reload::Watch(files) => { - self.log_info(format!("RELOAD: files {} changed", files)); + self.log_info(format!("RELOAD: file changed: {files} ")); self.send_will_reload(); } Reload::Op(op_name) => { diff --git a/src/args.rs b/src/args.rs index aa20188..8024a3c 100644 --- a/src/args.rs +++ b/src/args.rs @@ -46,4 +46,12 @@ pub struct Args { /// List all the jobs set in the config file #[clap(long)] pub list_jobs: bool, + + /// Whiz will exit after all commands have finished executing. + #[clap(long)] + pub exit_after: bool, + + /// Disables triggering task reloading from any watched files + #[clap(long)] + pub no_watch: bool, } diff --git a/src/main.rs b/src/main.rs index caed9cd..9392d4e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -167,6 +167,7 @@ async fn run(args: Args) -> Result<()> { args.verbose, pipes_map, colors_map, + args.no_watch, ) .await .map_err(|err| anyhow!("error spawning commands: {}", err))?; From bb7f793f89f78d650a83797318df2ea7b437dd39 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Mon, 2 Oct 2023 21:54:48 +0300 Subject: [PATCH 02/13] feat: `--exit-after` flag --- src/actors/command.rs | 33 ++++++++++++++- src/actors/grim_reaper.rs | 84 +++++++++++++++++++++++++++++++++++++++ src/actors/mod.rs | 1 + src/main.rs | 6 ++- src/tests.rs | 75 ++++++++++++++++++++++++++++++++-- 5 files changed, 193 insertions(+), 6 deletions(-) create mode 100644 src/actors/grim_reaper.rs diff --git a/src/actors/command.rs b/src/actors/command.rs index ede9f60..c199640 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -21,6 +21,7 @@ use std::{ use shlex; use crate::config::color::ColorOption; +use crate::actors::grim_reaper::PermaDeathInvite; use crate::config::{ pipe::{OutputRedirection, Pipe}, Config, Task, @@ -115,6 +116,7 @@ pub struct CommandActor { colors: Vec, entrypoint: Option, no_watch: bool, + death_invite: Option, } impl CommandActor { @@ -127,7 +129,7 @@ impl CommandActor { pipes_map: HashMap>, colors_map: HashMap>, global_no_watch: bool, - ) -> Result>> { + ) -> Result>> { let mut shared_env = HashMap::from_iter(std::env::vars()); shared_env.extend(lade_sdk::resolve(&config.env, &shared_env)?); let shared_env = lade_sdk::hydrate(shared_env, base_dir.clone()).await?; @@ -186,7 +188,8 @@ impl CommandActor { commands.insert(op_name, actor); } - Ok(commands.values().map(|i| i.to_owned()).collect::>()) + // Ok(commands.values().map(|i| i.to_owned()).collect::>()) + Ok(commands) } #[allow(clippy::too_many_arguments)] @@ -222,6 +225,7 @@ impl CommandActor { colors, entrypoint, no_watch, + death_invite: None, } } @@ -415,6 +419,12 @@ impl CommandActor { Ok(()) } + + fn accept_death_invite(&mut self) { + if let Some(invite) = self.death_invite.take() { + invite.rsvp(self.op_name.clone(), self.child.exit_status().unwrap()); + } + } } impl Actor for CommandActor { @@ -517,6 +527,10 @@ impl Handler for CommandActor { fn handle(&mut self, msg: Reload, _: &mut Context) -> Self::Result { self.ensure_stopped(); + if self.death_invite.is_some() { + return; + } + match &msg { Reload::Start => { self.send_will_reload(); @@ -610,6 +624,7 @@ impl Handler for CommandActor { panel_name: self.op_name.clone(), status: exit, }); + self.accept_death_invite(); } } } @@ -622,6 +637,20 @@ impl Handler for CommandActor { type Result = (); fn handle(&mut self, _: PoisonPill, ctx: &mut Context) -> Self::Result { + self.accept_death_invite(); ctx.stop(); } } + +impl Handler for CommandActor { + type Result = (); + + fn handle(&mut self, evt: PermaDeathInvite, _: &mut Context) -> Self::Result { + self.child.poll(false).unwrap(); + if let Some(status) = self.child.exit_status() { + evt.rsvp(self.op_name.clone(), status); + } else { + self.death_invite = Some(evt); + } + } +} diff --git a/src/actors/grim_reaper.rs b/src/actors/grim_reaper.rs new file mode 100644 index 0000000..d78eb18 --- /dev/null +++ b/src/actors/grim_reaper.rs @@ -0,0 +1,84 @@ +use std::collections::{HashMap, HashSet}; + +use actix::prelude::*; +use subprocess::ExitStatus; + +pub struct GrimReaperActor { + live_invites: HashSet, + non_zero_deaths: HashMap, +} + +impl GrimReaperActor { + pub async fn start_new(targets: HashMap>) -> anyhow::Result<()> + where + T: Actor + Handler, + ::Context: actix::dev::ToEnvelope, + { + let reaper_addr = GrimReaperActor { + live_invites: targets.keys().cloned().collect(), + non_zero_deaths: Default::default(), + } + .start(); + for target in targets.values() { + target + .send(PermaDeathInvite { + reaper_addr: reaper_addr.clone(), + }) + .await?; + } + Ok(()) + } +} + +impl Actor for GrimReaperActor { + type Context = Context; +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct PermaDeathInvite { + reaper_addr: Addr, +} + +impl PermaDeathInvite { + pub fn rsvp(self, actor_name: String, exit_status: ExitStatus) { + // FIXME: `do_send` might fail if actor "mailbox" is full + self.reaper_addr.do_send(InviteAccepted { + actor_name, + exit_status, + }); + } +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct InviteAccepted { + actor_name: String, + exit_status: ExitStatus, +} + +impl Handler for GrimReaperActor { + type Result = (); + + fn handle(&mut self, evt: InviteAccepted, _: &mut Context) -> Self::Result { + assert!(self.live_invites.remove(&evt.actor_name)); + if !evt.exit_status.success() { + self.non_zero_deaths.insert(evt.actor_name, evt.exit_status); + } + if self.live_invites.is_empty() { + if let Some((_op_name, status)) = self.non_zero_deaths.iter().next() { + // exit with the error code of the first aberrant task + let code = match *status { + ExitStatus::Exited(code) => code as i32, + ExitStatus::Other(code) => code, + ExitStatus::Signaled(code) => code as i32, + // TODO: consider erring out on Undetermined + ExitStatus::Undetermined => 0, + }; + System::current().stop_with_code(code); + } + System::current().stop(); + } + } +} + diff --git a/src/actors/mod.rs b/src/actors/mod.rs index 2b48aa5..617c8dd 100644 --- a/src/actors/mod.rs +++ b/src/actors/mod.rs @@ -1,3 +1,4 @@ pub mod command; pub mod console; +pub mod grim_reaper; pub mod watcher; diff --git a/src/main.rs b/src/main.rs index 9392d4e..baddc78 100644 --- a/src/main.rs +++ b/src/main.rs @@ -159,7 +159,7 @@ async fn run(args: Args) -> Result<()> { let console = ConsoleActor::new(Vec::from_iter(config.ops.keys().cloned()), args.timestamp).start(); let watcher = WatcherActor::new(base_dir.clone()).start(); - CommandActor::from_config( + let cmds = CommandActor::from_config( &config, console.clone(), watcher, @@ -172,5 +172,9 @@ async fn run(args: Args) -> Result<()> { .await .map_err(|err| anyhow!("error spawning commands: {}", err))?; + if args.exit_after { + whiz::actors::grim_reaper::GrimReaperActor::start_new(cmds).await?; + } + Ok(()) } diff --git a/src/tests.rs b/src/tests.rs index 7c3d7d2..d8d96fa 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -3,6 +3,7 @@ use std::path::Path; use std::{env, future::Future}; use anyhow::{Ok, Result}; + use subprocess::ExitStatus; use crate::actors::command::WaitStatus; @@ -12,7 +13,8 @@ use crate::args::Args; use crate::{ actors::{ command::CommandActor, - console::{ConsoleActor, Output, TermEvent}, + console::{ConsoleActor, Output, PanelStatus, TermEvent}, + grim_reaper::GrimReaperActor, watcher::WatcherActor, }, config::Config, @@ -38,7 +40,10 @@ macro_rules! mock_actor { } else )* { - println!("unexpect {:?}", msg.downcast::>()); + println!("unexpected {:?} on {}", + msg.downcast::>(), + stringify!($tt) + ); Box::new(None::<()>) } })).start() @@ -72,6 +77,7 @@ fn hello() { }, _msg: RegisterPanel => Some(()), _msg: TermEvent => Some(()), + _msg: PanelStatus => Some(()), }); let watcher = mock_actor!(WatcherActor, { @@ -94,16 +100,79 @@ fn hello() { false, HashMap::new(), HashMap::new(), + false, ) .await?; - let status = commands[0].send(WaitStatus).await?; + let status = commands + .get(&"test".to_string()) + .unwrap() + .send(WaitStatus) + .await?; println!("status: {:?}", status); Ok(()) }); } +#[test] +fn test_grim_reaper() { + let system = System::with_tokio_rt(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .max_blocking_threads(1) + .enable_all() + .build() + .unwrap() + }); + + let fut = async move { + let config: Config = r#" + test: + command: ls + longtest: + command: sleep 1s; echo "wake up"; + "# + .parse()?; + + let console = mock_actor!(ConsoleActor, { + msg: Output => { + println!("---{:?}", msg.message); + Some(()) + }, + _msg: PanelStatus => Some(()), + _msg: RegisterPanel => Some(()), + _msg: TermEvent => Some(()), + }); + + let watcher = mock_actor!(WatcherActor, { + _msg: WatchGlob => Some(()), + }); + + let commands = CommandActor::from_config( + &config, + console, + watcher, + env::current_dir().unwrap(), + false, + HashMap::new(), + false, + ) + .await?; + + GrimReaperActor::start_new(commands).await?; + Ok(()) + }; + + Arbiter::current().spawn(async { fut.await.unwrap() }); + + let timer = std::time::SystemTime::now(); + assert_eq!(0, system.run_with_code().unwrap()); + let elapsed = timer.elapsed().unwrap(); + assert!(elapsed.as_secs_f64() > 1.0); + assert!(elapsed.as_secs_f64() < 2.0); +} + #[test] fn config_search_recursive() { assert!(env::current_dir().is_ok()); From 395f8f2f2b7812987fd4c9f7881885cbf6ae6b5a Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Mon, 2 Oct 2023 22:08:40 +0300 Subject: [PATCH 03/13] fix: make sure that tasks get to start --- src/actors/command.rs | 9 +++------ src/actors/grim_reaper.rs | 6 +++++- src/args.rs | 2 +- src/tests.rs | 7 +++++-- 4 files changed, 14 insertions(+), 10 deletions(-) diff --git a/src/actors/command.rs b/src/actors/command.rs index c199640..7c8122b 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -527,10 +527,6 @@ impl Handler for CommandActor { fn handle(&mut self, msg: Reload, _: &mut Context) -> Self::Result { self.ensure_stopped(); - if self.death_invite.is_some() { - return; - } - match &msg { Reload::Start => { self.send_will_reload(); @@ -606,6 +602,7 @@ impl Handler for CommandActor { Box::pin(f) } } + #[derive(Message)] #[rtype(result = "()")] struct StdoutTerminated { @@ -647,8 +644,8 @@ impl Handler for CommandActor { fn handle(&mut self, evt: PermaDeathInvite, _: &mut Context) -> Self::Result { self.child.poll(false).unwrap(); - if let Some(status) = self.child.exit_status() { - evt.rsvp(self.op_name.clone(), status); + if let Child::Exited(status) = &self.child { + evt.rsvp(self.op_name.clone(), status.clone()); } else { self.death_invite = Some(evt); } diff --git a/src/actors/grim_reaper.rs b/src/actors/grim_reaper.rs index d78eb18..6352bab 100644 --- a/src/actors/grim_reaper.rs +++ b/src/actors/grim_reaper.rs @@ -3,6 +3,9 @@ use std::collections::{HashMap, HashSet}; use actix::prelude::*; use subprocess::ExitStatus; +/// This is responsible for exiting whiz when all tasks are done. +/// It `send`s it's targets `PermaDeathInvite` which and when all +/// have been `rsvp`d, terminates the Actix runtime and thus the program. pub struct GrimReaperActor { live_invites: HashSet, non_zero_deaths: HashMap, @@ -42,6 +45,8 @@ pub struct PermaDeathInvite { impl PermaDeathInvite { pub fn rsvp(self, actor_name: String, exit_status: ExitStatus) { + // TODO: consider asserting death by recieving the target actor's + // `Context` and using `stop` // FIXME: `do_send` might fail if actor "mailbox" is full self.reaper_addr.do_send(InviteAccepted { actor_name, @@ -81,4 +86,3 @@ impl Handler for GrimReaperActor { } } } - diff --git a/src/args.rs b/src/args.rs index 8024a3c..30db99d 100644 --- a/src/args.rs +++ b/src/args.rs @@ -47,7 +47,7 @@ pub struct Args { #[clap(long)] pub list_jobs: bool, - /// Whiz will exit after all commands have finished executing. + /// Whiz will exit after all tasks have finished executing. #[clap(long)] pub exit_after: bool, diff --git a/src/tests.rs b/src/tests.rs index d8d96fa..66f6b23 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -130,8 +130,12 @@ fn test_grim_reaper() { let config: Config = r#" test: command: ls - longtest: + long_test_dep: command: sleep 1s; echo "wake up"; + long_test: + command: echo "my que to enter" + depends_on: + - long_test_dep "# .parse()?; @@ -170,7 +174,6 @@ fn test_grim_reaper() { assert_eq!(0, system.run_with_code().unwrap()); let elapsed = timer.elapsed().unwrap(); assert!(elapsed.as_secs_f64() > 1.0); - assert!(elapsed.as_secs_f64() < 2.0); } #[test] From 998f2e2ff216e5d0198f892099e497c3785d6c64 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Oct 2023 22:49:14 +0300 Subject: [PATCH 04/13] fix: resolve suggestions and fix TODOs --- README.md | 29 +++++---- src/actors/command.rs | 132 +++++++++++++++++++++++++++----------- src/actors/grim_reaper.rs | 27 +++++--- src/args.rs | 8 ++- src/main.rs | 31 +++++---- src/tests.rs | 33 +++------- 6 files changed, 156 insertions(+), 104 deletions(-) diff --git a/README.md b/README.md index 88d5ed3..22c6817 100644 --- a/README.md +++ b/README.md @@ -84,23 +84,24 @@ See `whiz --help` for more information. | -t, --timestamp | enable timestamps in logging | | -v, --verbose | enable verbose mode | | -V, --version | print whiz version | +| -V, --version | print whiz version | +| --watch | globally enable/disable fs watching | + ### Key bindings -| Keys | Action | -| ------------ | ----------------------------------- | -| l, RighArrow | go to next tab | -| h, LeftArrow | go to previous tab | -| k, Ctl + p | scroll up one line | -| j, Ctl + n | scroll down one line | -| Ctl + u | scroll up half page | -| Ctl + d | scroll down half page | -| Ctl + b | scroll up full page | -| Ctl + f | scroll down full page | -| 0 | go to last tab | -| 1-9 | go to the tab at the given position | -| q, Ctl + c | exit the program | -| r | rerun the job in the current tab | +| Flags | Description | +| ------------------- | ------------------------------------------------- | +| -f, --file \ | Specify the config file | +| -h, --help | Print help information | +| --list-jobs | List all the available jobs | +| -r, --run \ | Run specific jobs | +| -t, --timestamp | Enable timestamps in logging | +| -v, --verbose | Enable verbose mode | +| -V, --version | Print whiz version | +| -V, --version | Print whiz version | +| --watch | Globally enable/disable fs watching | +| --exit-after | Exit whiz after all tasks are done. Useful for CI | ## Development diff --git a/src/actors/command.rs b/src/actors/command.rs index 7c8122b..f77a862 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -98,38 +98,66 @@ impl Child { } } -pub struct CommandActor { - op_name: String, - operator: Task, +pub struct CommandActorsBuilder { + config: Config, console: Addr, watcher: Addr, - arbiter: Arbiter, - child: Child, - nexts: Vec>, - cwd: PathBuf, - self_addr: Option>, - pending_upstream: BTreeMap, + base_dir: PathBuf, verbose: bool, - started_at: DateTime, - env: Vec<(String, String)>, - pipes: Vec, - colors: Vec, - entrypoint: Option, - no_watch: bool, - death_invite: Option, + colors_map: HashMap>, + pipes_map: HashMap>, + watch_enabled_globally: bool, } -impl CommandActor { - pub async fn from_config( - config: &Config, +impl CommandActorsBuilder { + pub fn new( + config: Config, console: Addr, watcher: Addr, base_dir: PathBuf, - verbose: bool, - pipes_map: HashMap>, colors_map: HashMap>, - global_no_watch: bool, - ) -> Result>> { + ) -> Self { + Self { + config, + console, + watcher, + base_dir, + verbose: false, + pipes_map: Default::default(), + watch_enabled_globally: true, + colors_map, + } + } + + pub fn pipes_map(self, pipes_map: HashMap>) -> Self { + Self { pipes_map, ..self } + } + + pub fn verbose(self, toggle: bool) -> Self { + Self { + verbose: toggle, + ..self + } + } + + pub fn globally_enable_watch(self, toggle: bool) -> Self { + Self { + watch_enabled_globally: toggle, + ..self + } + } + + pub async fn build(self) -> Result>> { + let Self { + config, + console, + watcher, + base_dir, + verbose, + pipes_map, + watch_enabled_globally, + colors_map, + } = self; let mut shared_env = HashMap::from_iter(std::env::vars()); shared_env.extend(lade_sdk::resolve(&config.env, &shared_env)?); let shared_env = lade_sdk::hydrate(shared_env, base_dir.clone()).await?; @@ -177,8 +205,7 @@ impl CommandActor { task_pipes, colors, op.entrypoint.clone(), - // TODO: consider introducing per command no_watch config - global_no_watch, + watch_enabled_globally, ) .start(); @@ -188,10 +215,32 @@ impl CommandActor { commands.insert(op_name, actor); } - // Ok(commands.values().map(|i| i.to_owned()).collect::>()) Ok(commands) } +} + +pub struct CommandActor { + op_name: String, + operator: Task, + console: Addr, + watcher: Addr, + arbiter: Arbiter, + child: Child, + nexts: Vec>, + cwd: PathBuf, + self_addr: Option>, + pending_upstream: BTreeMap, + verbose: bool, + started_at: DateTime, + env: Vec<(String, String)>, + pipes: Vec, + colors: Vec, + entrypoint: Option, + watch: bool, + death_invite: Option, +} +impl CommandActor { #[allow(clippy::too_many_arguments)] pub fn new( op_name: String, @@ -205,7 +254,7 @@ impl CommandActor { pipes: Vec, colors: Vec, entrypoint: Option, - no_watch: bool, + watch: bool, ) -> Self { Self { op_name, @@ -224,7 +273,7 @@ impl CommandActor { pipes, colors, entrypoint, - no_watch, + watch, death_invite: None, } } @@ -420,9 +469,13 @@ impl CommandActor { Ok(()) } - fn accept_death_invite(&mut self) { + fn accept_death_invite(&mut self, cx: &mut Context) { if let Some(invite) = self.death_invite.take() { - invite.rsvp(self.op_name.clone(), self.child.exit_status().unwrap()); + invite.rsvp::>( + self.op_name.clone(), + self.child.exit_status().unwrap(), + cx, + ); } } } @@ -442,7 +495,7 @@ impl Actor for CommandActor { let watches = self.operator.watch.resolve(); - if !self.no_watch && !watches.is_empty() { + if self.watch && !watches.is_empty() { let mut on = GlobSetBuilder::new(); for pattern in self.operator.watch.resolve() { on.add( @@ -612,7 +665,7 @@ struct StdoutTerminated { impl Handler for CommandActor { type Result = (); - fn handle(&mut self, msg: StdoutTerminated, _: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: StdoutTerminated, cx: &mut Self::Context) -> Self::Result { if msg.started_at == self.started_at { self.ensure_stopped(); let exit = self.child.exit_status(); @@ -621,7 +674,7 @@ impl Handler for CommandActor { panel_name: self.op_name.clone(), status: exit, }); - self.accept_death_invite(); + self.accept_death_invite(cx); } } } @@ -634,7 +687,7 @@ impl Handler for CommandActor { type Result = (); fn handle(&mut self, _: PoisonPill, ctx: &mut Context) -> Self::Result { - self.accept_death_invite(); + self.accept_death_invite(ctx); ctx.stop(); } } @@ -642,10 +695,15 @@ impl Handler for CommandActor { impl Handler for CommandActor { type Result = (); - fn handle(&mut self, evt: PermaDeathInvite, _: &mut Context) -> Self::Result { + fn handle(&mut self, evt: PermaDeathInvite, cx: &mut Context) -> Self::Result { self.child.poll(false).unwrap(); - if let Child::Exited(status) = &self.child { - evt.rsvp(self.op_name.clone(), status.clone()); + let status = match &self.child { + Child::Killed => Some(ExitStatus::Other(1)), + Child::Exited(val) => Some(*val), + _ => None, + }; + if let Some(status) = status { + evt.rsvp::(self.op_name.clone(), status, cx); } else { self.death_invite = Some(evt); } diff --git a/src/actors/grim_reaper.rs b/src/actors/grim_reaper.rs index 6352bab..e8777e1 100644 --- a/src/actors/grim_reaper.rs +++ b/src/actors/grim_reaper.rs @@ -44,14 +44,19 @@ pub struct PermaDeathInvite { } impl PermaDeathInvite { - pub fn rsvp(self, actor_name: String, exit_status: ExitStatus) { - // TODO: consider asserting death by recieving the target actor's - // `Context` and using `stop` - // FIXME: `do_send` might fail if actor "mailbox" is full - self.reaper_addr.do_send(InviteAccepted { - actor_name, - exit_status, - }); + pub fn rsvp(self, actor_name: String, exit_status: ExitStatus, invitee_cx: &mut C) + where + T: Actor + Handler, + ::Context: actix::dev::ToEnvelope, + C: actix::ActorContext, + { + self.reaper_addr + .try_send(InviteAccepted { + actor_name, + exit_status, + }) + .expect("GrimReaperActor mailbox is closed or its mailbox full"); + invitee_cx.stop(); } } @@ -77,8 +82,10 @@ impl Handler for GrimReaperActor { ExitStatus::Exited(code) => code as i32, ExitStatus::Other(code) => code, ExitStatus::Signaled(code) => code as i32, - // TODO: consider erring out on Undetermined - ExitStatus::Undetermined => 0, + ExitStatus::Undetermined => { + eprintln!("ERROR: task {_op_name} exited with Undetermined status"); + 1 + } }; System::current().stop_with_code(code); } diff --git a/src/args.rs b/src/args.rs index 30db99d..ae3c8f1 100644 --- a/src/args.rs +++ b/src/args.rs @@ -36,6 +36,7 @@ pub struct Args { #[clap(short, long)] pub verbose: bool, + /// Enable timestamps in logging #[clap(short, long)] pub timestamp: bool, @@ -48,10 +49,11 @@ pub struct Args { pub list_jobs: bool, /// Whiz will exit after all tasks have finished executing. + /// This disables fs watching despite any values given to the `watch` flag. #[clap(long)] pub exit_after: bool, - /// Disables triggering task reloading from any watched files - #[clap(long)] - pub no_watch: bool, + /// Globally toggle triggering task reloading from any watched files + #[clap(long, default_value = "true", value_name = "WATCH")] + pub watch: Option, } diff --git a/src/main.rs b/src/main.rs index baddc78..914a1af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,9 @@ use clap::Parser; use self_update::{backends::github::Update, cargo_crate_version, update::UpdateStatus}; use semver::Version; use tokio::time::{sleep, Duration as TokioDuration}; +use whiz::actors::command::CommandActorsBuilder; use whiz::{ - actors::{command::CommandActor, console::ConsoleActor, watcher::WatcherActor}, + actors::{console::ConsoleActor, watcher::WatcherActor}, args::Command, config::Config, global_config::GlobalConfig, @@ -111,9 +112,8 @@ fn main() -> Result<()> { }) }); - let _ = system.run(); - - Ok(()) + let code = system.run_with_code()?; + std::process::exit(code); } async fn run(args: Args) -> Result<()> { @@ -159,18 +159,17 @@ async fn run(args: Args) -> Result<()> { let console = ConsoleActor::new(Vec::from_iter(config.ops.keys().cloned()), args.timestamp).start(); let watcher = WatcherActor::new(base_dir.clone()).start(); - let cmds = CommandActor::from_config( - &config, - console.clone(), - watcher, - base_dir.clone(), - args.verbose, - pipes_map, - colors_map, - args.no_watch, - ) - .await - .map_err(|err| anyhow!("error spawning commands: {}", err))?; + let cmds = CommandActorsBuilder::new(config, console.clone(), watcher, base_dir.clone(), colors_map) + .verbose(args.verbose) + .pipes_map(pipes_map) + .globally_enable_watch(if args.exit_after { + false + } else { + args.watch.unwrap_or(true) + }) + .build() + .await + .map_err(|err| anyhow!("error spawning commands: {}", err))?; if args.exit_after { whiz::actors::grim_reaper::GrimReaperActor::start_new(cmds).await?; diff --git a/src/tests.rs b/src/tests.rs index 66f6b23..a85b1be 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::path::Path; use std::{env, future::Future}; @@ -6,13 +5,12 @@ use anyhow::{Ok, Result}; use subprocess::ExitStatus; -use crate::actors::command::WaitStatus; +use crate::actors::command::{CommandActorsBuilder, WaitStatus}; use crate::actors::console::RegisterPanel; use crate::actors::watcher::WatchGlob; use crate::args::Args; use crate::{ actors::{ - command::CommandActor, console::{ConsoleActor, Output, PanelStatus, TermEvent}, grim_reaper::GrimReaperActor, watcher::WatcherActor, @@ -92,17 +90,10 @@ fn hello() { )) .await?; - let commands = CommandActor::from_config( - &config, - console, - watcher, - env::current_dir().unwrap(), - false, - HashMap::new(), - HashMap::new(), - false, - ) - .await?; + let commands = + CommandActorsBuilder::new(config, console, watcher, env::current_dir().unwrap(), HashMap::new()) + .build() + .await?; let status = commands .get(&"test".to_string()) @@ -153,16 +144,10 @@ fn test_grim_reaper() { _msg: WatchGlob => Some(()), }); - let commands = CommandActor::from_config( - &config, - console, - watcher, - env::current_dir().unwrap(), - false, - HashMap::new(), - false, - ) - .await?; + let commands = + CommandActorsBuilder::new(config, console, watcher, env::current_dir().unwrap()) + .build() + .await?; GrimReaperActor::start_new(commands).await?; Ok(()) From 531928440c748fe492d464bdca9b99582022e659 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 3 Oct 2023 22:57:50 +0300 Subject: [PATCH 05/13] fix: `README.md` --- README.md | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/README.md b/README.md index 22c6817..ebc1c20 100644 --- a/README.md +++ b/README.md @@ -75,21 +75,6 @@ complete example. See `whiz --help` for more information. -| Flags | Description | -| ------------------- | ---------------------------- | -| -f, --file \ | specify the config file | -| -h, --help | print help information | -| --list-jobs | list all the available jobs | -| -r, --run \ | run specific jobs | -| -t, --timestamp | enable timestamps in logging | -| -v, --verbose | enable verbose mode | -| -V, --version | print whiz version | -| -V, --version | print whiz version | -| --watch | globally enable/disable fs watching | - - -### Key bindings - | Flags | Description | | ------------------- | ------------------------------------------------- | | -f, --file \ | Specify the config file | @@ -103,6 +88,24 @@ See `whiz --help` for more information. | --watch | Globally enable/disable fs watching | | --exit-after | Exit whiz after all tasks are done. Useful for CI | + +### Key bindings + +| Keys | Action | +| ------------ | ----------------------------------- | +| l, RighArrow | go to next tab | +| h, LeftArrow | go to previous tab | +| k, Ctl + p | scroll up one line | +| j, Ctl + n | scroll down one line | +| Ctl + u | scroll up half page | +| Ctl + d | scroll down half page | +| Ctl + b | scroll up full page | +| Ctl + f | scroll down full page | +| 0 | go to last tab | +| 1-9 | go to the tab at the given position | +| q, Ctl + c | exit the program | +| r | rerun the job in the current tab | + ## Development ```bash From 8d605af32fee9d9159cd9d5e258b7039601661b0 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Thu, 5 Oct 2023 21:55:58 +0300 Subject: [PATCH 06/13] wip: partial fix for flakey test --- src/actors/command.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/actors/command.rs b/src/actors/command.rs index f77a862..d2d686e 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -471,9 +471,14 @@ impl CommandActor { fn accept_death_invite(&mut self, cx: &mut Context) { if let Some(invite) = self.death_invite.take() { + let status = match &self.child { + Child::Killed => ExitStatus::Other(1), + Child::Exited(val) => *val, + child => panic!("invalid death invite acceptance: {child:?}"), + }; invite.rsvp::>( self.op_name.clone(), - self.child.exit_status().unwrap(), + status, cx, ); } From 540d8a51b55b11e63209fd937cc0b528e4a7bb45 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Thu, 5 Oct 2023 22:55:26 +0300 Subject: [PATCH 07/13] fix: flakey test --- src/actors/command.rs | 51 ++++++++++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 15 deletions(-) diff --git a/src/actors/command.rs b/src/actors/command.rs index d2d686e..8f542f0 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -1,4 +1,4 @@ -use actix::clock::sleep; +use actix::clock::{sleep, timeout}; use actix::prelude::*; use anyhow::{Context as ErrorContext, Result}; @@ -476,11 +476,7 @@ impl CommandActor { Child::Exited(val) => *val, child => panic!("invalid death invite acceptance: {child:?}"), }; - invite.rsvp::>( - self.op_name.clone(), - status, - cx, - ); + invite.rsvp::>(self.op_name.clone(), status, cx); } } } @@ -668,19 +664,44 @@ struct StdoutTerminated { } impl Handler for CommandActor { - type Result = (); + type Result = ResponseActFuture; fn handle(&mut self, msg: StdoutTerminated, cx: &mut Self::Context) -> Self::Result { - if msg.started_at == self.started_at { - self.ensure_stopped(); - let exit = self.child.exit_status(); - - self.console.do_send(PanelStatus { - panel_name: self.op_name.clone(), + // early exit + if msg.started_at != self.started_at { + return Box::pin(async {}.into_actor(self).map(|_, _, _| ())); + } + let addr = cx.address(); + // since there's a chance that child might not be done by this point + // wait for it die for a maximum of 1 seconds + // polling every 20 millis + // before pulling the plug + let fut = timeout(Duration::from_secs(1), async move { + loop { + if let Some(status) = addr.send(GetStatus).await.unwrap().unwrap() { + return status; + } + sleep(Duration::from_millis(20)).await; + } + }) + .into_actor(self) + .map(|res, act, cx| { + let exit = if let Ok(status) = res { + act.send_reload(); // signal any dependents, this boy's goin down + Some(status) + } else { + // timeout and child is not still dead + // pull the plug + act.ensure_stopped(); + act.child.exit_status() + }; + act.console.do_send(PanelStatus { + panel_name: act.op_name.clone(), status: exit, }); - self.accept_death_invite(cx); - } + act.accept_death_invite(cx); + }); + Box::pin(fut) } } From cfc36c358aad0d332d560131e42b267e2ab663d4 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Fri, 6 Oct 2023 16:12:39 +0300 Subject: [PATCH 08/13] refactor: syncify `StdoutTerminated` impl --- src/actors/command.rs | 75 +++++++++++++++++++++++-------------------- src/tests.rs | 7 ++-- 2 files changed, 46 insertions(+), 36 deletions(-) diff --git a/src/actors/command.rs b/src/actors/command.rs index 8f542f0..cbbe208 100644 --- a/src/actors/command.rs +++ b/src/actors/command.rs @@ -1,4 +1,4 @@ -use actix::clock::{sleep, timeout}; +use actix::clock::sleep; use actix::prelude::*; use anyhow::{Context as ErrorContext, Result}; @@ -88,6 +88,30 @@ impl Child { } } + fn wait_or_kill(&mut self, dur: Duration) -> Result { + if let Child::Process(p) = self { + match p.wait_timeout(dur)? { + Some(status) => { + *self = Child::Exited(status); + Ok(true) + } + None => { + p.terminate()?; + p.kill()?; + let _status = p.wait()?; + if p.wait_timeout(Duration::from_millis(500))?.is_none() { + p.kill()?; + p.wait()?; + } + *self = Self::Killed; + Ok(true) + } + } + } else { + Ok(false) + } + } + fn exit_status(&mut self) -> Option { match &self { Child::Process(_) => None, @@ -664,44 +688,27 @@ struct StdoutTerminated { } impl Handler for CommandActor { - type Result = ResponseActFuture; + type Result = (); fn handle(&mut self, msg: StdoutTerminated, cx: &mut Self::Context) -> Self::Result { - // early exit - if msg.started_at != self.started_at { - return Box::pin(async {}.into_actor(self).map(|_, _, _| ())); - } - let addr = cx.address(); - // since there's a chance that child might not be done by this point - // wait for it die for a maximum of 1 seconds - // polling every 20 millis - // before pulling the plug - let fut = timeout(Duration::from_secs(1), async move { - loop { - if let Some(status) = addr.send(GetStatus).await.unwrap().unwrap() { - return status; - } - sleep(Duration::from_millis(20)).await; + if msg.started_at == self.started_at { + // since there's a chance that child might not be done by this point + // wait for it die for a maximum of 1 seconds + // before pulling the plug + if self + .child + .wait_or_kill(Duration::from_millis(1000)) + .unwrap() + { + self.send_reload(); } - }) - .into_actor(self) - .map(|res, act, cx| { - let exit = if let Ok(status) = res { - act.send_reload(); // signal any dependents, this boy's goin down - Some(status) - } else { - // timeout and child is not still dead - // pull the plug - act.ensure_stopped(); - act.child.exit_status() - }; - act.console.do_send(PanelStatus { - panel_name: act.op_name.clone(), + let exit = self.child.exit_status(); + self.console.do_send(PanelStatus { + panel_name: self.op_name.clone(), status: exit, }); - act.accept_death_invite(cx); - }); - Box::pin(fut) + self.accept_death_invite(cx); + } } } diff --git a/src/tests.rs b/src/tests.rs index a85b1be..98179d5 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -122,7 +122,7 @@ fn test_grim_reaper() { test: command: ls long_test_dep: - command: sleep 1s; echo "wake up"; + command: sleep 1; echo "wake up"; long_test: command: echo "my que to enter" depends_on: @@ -158,7 +158,10 @@ fn test_grim_reaper() { let timer = std::time::SystemTime::now(); assert_eq!(0, system.run_with_code().unwrap()); let elapsed = timer.elapsed().unwrap(); - assert!(elapsed.as_secs_f64() > 1.0); + assert!( + elapsed.as_millis() >= 1000, + "test took less than a second: {elapsed:?}" + ); } #[test] From 28c03591b918b76ee30c8f3f52f2aa5b49e2911f Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Fri, 6 Oct 2023 16:52:04 +0300 Subject: [PATCH 09/13] fix: alt commands for tests on windows --- src/tests.rs | 39 +++++++++++++++++++++++++++++---------- 1 file changed, 29 insertions(+), 10 deletions(-) diff --git a/src/tests.rs b/src/tests.rs index 98179d5..f7a67a5 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -118,17 +118,36 @@ fn test_grim_reaper() { }); let fut = async move { - let config: Config = r#" - test: - command: ls - long_test_dep: - command: sleep 1; echo "wake up"; - long_test: - command: echo "my que to enter" - depends_on: - - long_test_dep + let config_raw = { + #[cfg(not(target_os = "windows"))] + { + r#" +test: + command: ls +long_test_dep: + command: sleep 1; echo "wake up"; +long_test: + command: echo "my que to enter" + depends_on: + - long_test_dep "# - .parse()?; + } + + #[cfg(target_os = "windows")] + { + r#" +test: + command: dir +long_test_dep: + command: TIMEOUT /T 1 && echo "wake up" +long_test: + command: echo "my que to enter" + depends_on: + - long_test_dep + "# + } + }; + let config: Config = config_raw.parse()?; let console = mock_actor!(ConsoleActor, { msg: Output => { From 0bc92fea81c7454f3130de0583e3e55b455fc3ec Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Sun, 8 Oct 2023 01:02:09 +0300 Subject: [PATCH 10/13] fix: replace TIMEOUT with ping in windows test lol --- src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests.rs b/src/tests.rs index f7a67a5..06f740c 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -139,7 +139,7 @@ long_test: test: command: dir long_test_dep: - command: TIMEOUT /T 1 && echo "wake up" + command: PING 1.1.1.2 -n 1 -w 1000 >NUL && echo "wake up" long_test: command: echo "my que to enter" depends_on: From 38fc2b7fafde2bb630251eab2209721931e201c5 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Sun, 8 Oct 2023 01:19:11 +0300 Subject: [PATCH 11/13] fix: pythonify test --- src/tests.rs | 30 +++++------------------------- 1 file changed, 5 insertions(+), 25 deletions(-) diff --git a/src/tests.rs b/src/tests.rs index 06f740c..6599638 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -118,35 +118,15 @@ fn test_grim_reaper() { }); let fut = async move { - let config_raw = { - #[cfg(not(target_os = "windows"))] - { - r#" + let config_raw =r#" test: - command: ls + command: python3 -c 'print("hello whiz")' long_test_dep: - command: sleep 1; echo "wake up"; + command: python3 -c 'import time; time.sleep(1); print("wake up")' long_test: - command: echo "my que to enter" + command: python3 -c 'print("my que to enter")' depends_on: - - long_test_dep - "# - } - - #[cfg(target_os = "windows")] - { - r#" -test: - command: dir -long_test_dep: - command: PING 1.1.1.2 -n 1 -w 1000 >NUL && echo "wake up" -long_test: - command: echo "my que to enter" - depends_on: - - long_test_dep - "# - } - }; + - long_test_dep"#; let config: Config = config_raw.parse()?; let console = mock_actor!(ConsoleActor, { From 470ff5144213c8935eb8e79d1ba24244b00f75b1 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Tue, 17 Oct 2023 20:48:17 +0300 Subject: [PATCH 12/13] fix: use python itself as the entrypoint for tests --- src/tests.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/tests.rs b/src/tests.rs index 6599638..522c213 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -120,11 +120,14 @@ fn test_grim_reaper() { let fut = async move { let config_raw =r#" test: - command: python3 -c 'print("hello whiz")' + entrypoint: 'python3 -c' + command: 'print("hello whiz")' long_test_dep: - command: python3 -c 'import time; time.sleep(1); print("wake up")' + entrypoint: 'python3 -c' + command: 'import time; time.sleep(1); print("wake up")' long_test: - command: python3 -c 'print("my que to enter")' + entrypoint: 'python3 -c' + command: 'print("my que to enter")' depends_on: - long_test_dep"#; let config: Config = config_raw.parse()?; From 820504e713b87bffe09a242f2a82d3895a7c7255 Mon Sep 17 00:00:00 2001 From: Yohe-Am <56622350+Yohe-Am@users.noreply.github.com> Date: Thu, 19 Oct 2023 11:01:06 +0000 Subject: [PATCH 13/13] fix: rebase bugs --- src/main.rs | 28 +++++++++++++++++----------- src/tests.rs | 28 +++++++++++++++++++--------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/main.rs b/src/main.rs index 914a1af..fd0d8a3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -159,17 +159,23 @@ async fn run(args: Args) -> Result<()> { let console = ConsoleActor::new(Vec::from_iter(config.ops.keys().cloned()), args.timestamp).start(); let watcher = WatcherActor::new(base_dir.clone()).start(); - let cmds = CommandActorsBuilder::new(config, console.clone(), watcher, base_dir.clone(), colors_map) - .verbose(args.verbose) - .pipes_map(pipes_map) - .globally_enable_watch(if args.exit_after { - false - } else { - args.watch.unwrap_or(true) - }) - .build() - .await - .map_err(|err| anyhow!("error spawning commands: {}", err))?; + let cmds = CommandActorsBuilder::new( + config, + console.clone(), + watcher, + base_dir.clone(), + colors_map, + ) + .verbose(args.verbose) + .pipes_map(pipes_map) + .globally_enable_watch(if args.exit_after { + false + } else { + args.watch.unwrap_or(true) + }) + .build() + .await + .map_err(|err| anyhow!("error spawning commands: {}", err))?; if args.exit_after { whiz::actors::grim_reaper::GrimReaperActor::start_new(cmds).await?; diff --git a/src/tests.rs b/src/tests.rs index 522c213..65cb336 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -90,10 +90,15 @@ fn hello() { )) .await?; - let commands = - CommandActorsBuilder::new(config, console, watcher, env::current_dir().unwrap(), HashMap::new()) - .build() - .await?; + let commands = CommandActorsBuilder::new( + config, + console, + watcher, + env::current_dir().unwrap(), + Default::default(), + ) + .build() + .await?; let status = commands .get(&"test".to_string()) @@ -118,7 +123,7 @@ fn test_grim_reaper() { }); let fut = async move { - let config_raw =r#" + let config_raw = r#" test: entrypoint: 'python3 -c' command: 'print("hello whiz")' @@ -146,10 +151,15 @@ long_test: _msg: WatchGlob => Some(()), }); - let commands = - CommandActorsBuilder::new(config, console, watcher, env::current_dir().unwrap()) - .build() - .await?; + let commands = CommandActorsBuilder::new( + config, + console, + watcher, + env::current_dir().unwrap(), + Default::default(), + ) + .build() + .await?; GrimReaperActor::start_new(commands).await?; Ok(())