Skip to content

Commit

Permalink
feat: better CI support (#98)
Browse files Browse the repository at this point in the history
* feat: `--no-watch` flag

* feat: `--exit-after` flag

* fix: make sure that tasks get to start

* fix: resolve suggestions and fix TODOs

* fix: `README.md`

* wip: partial fix for flakey test

* fix: flakey test

* refactor: syncify `StdoutTerminated` impl

* fix: alt commands for tests on windows

* fix: replace TIMEOUT with ping in windows test lol

* fix: pythonify test

* fix: use python itself as the entrypoint for tests
  • Loading branch information
Yohe-Am authored Oct 19, 2023
1 parent e639698 commit 1330a27
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 54 deletions.
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,19 @@ complete example.
See `whiz --help` for more information.

| Flags | Description |
| ------------------- | ---------------------------- |
| -f, --file \<FILE\> | specify the config file |
| -h, --help | print help information |
| --list-jobs | list all the available jobs |
| -r, --run \<JOB\> | run specific jobs |
| -t, --timestamp | enable timestamps in logging |
| -v, --verbose | enable verbose mode |
| -V, --version | print whiz version |
| Flags | Description |
| ------------------- | ------------------------------------------------- |
| -f, --file \<FILE\> | Specify the config file |
| -h, --help | Print help information |
| --list-jobs | List all the available jobs |
| -r, --run \<JOB\> | Run specific jobs |
| -t, --timestamp | Enable timestamps in logging |
| -v, --verbose | Enable verbose mode |
| -V, --version | Print whiz version |
| -V, --version | Print whiz version |
| --watch | Globally enable/disable fs watching |
| --exit-after | Exit whiz after all tasks are done. Useful for CI |


### Key bindings

Expand Down
175 changes: 149 additions & 26 deletions src/actors/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
use shlex;

use crate::config::color::ColorOption;
use crate::actors::grim_reaper::PermaDeathInvite;
use crate::config::{
pipe::{OutputRedirection, Pipe},
Config, Task,
Expand Down Expand Up @@ -87,6 +88,30 @@ impl Child {
}
}

fn wait_or_kill(&mut self, dur: Duration) -> Result<bool> {
if let Child::Process(p) = self {
match p.wait_timeout(dur)? {
Some(status) => {
*self = Child::Exited(status);
Ok(true)
}
None => {
p.terminate()?;
p.kill()?;
let _status = p.wait()?;
if p.wait_timeout(Duration::from_millis(500))?.is_none() {
p.kill()?;
p.wait()?;
}
*self = Self::Killed;
Ok(true)
}
}
} else {
Ok(false)
}
}

fn exit_status(&mut self) -> Option<ExitStatus> {
match &self {
Child::Process(_) => None,
Expand All @@ -97,35 +122,66 @@ impl Child {
}
}

pub struct CommandActor {
op_name: String,
operator: Task,
pub struct CommandActorsBuilder {
config: Config,
console: Addr<ConsoleAct>,
watcher: Addr<WatcherAct>,
arbiter: Arbiter,
child: Child,
nexts: Vec<Addr<CommandActor>>,
cwd: PathBuf,
self_addr: Option<Addr<CommandActor>>,
pending_upstream: BTreeMap<String, usize>,
base_dir: PathBuf,
verbose: bool,
started_at: DateTime<Local>,
env: Vec<(String, String)>,
pipes: Vec<Pipe>,
colors: Vec<ColorOption>,
entrypoint: Option<String>,
colors_map: HashMap<String, Vec<ColorOption>>,
pipes_map: HashMap<String, Vec<Pipe>>,
watch_enabled_globally: bool,
}

impl CommandActor {
pub async fn from_config(
config: &Config,
impl CommandActorsBuilder {
pub fn new(
config: Config,
console: Addr<ConsoleAct>,
watcher: Addr<WatcherAct>,
base_dir: PathBuf,
verbose: bool,
pipes_map: HashMap<String, Vec<Pipe>>,
colors_map: HashMap<String, Vec<ColorOption>>,
) -> Result<Vec<Addr<CommandActor>>> {
) -> Self {
Self {
config,
console,
watcher,
base_dir,
verbose: false,
pipes_map: Default::default(),
watch_enabled_globally: true,
colors_map,
}
}

pub fn pipes_map(self, pipes_map: HashMap<String, Vec<Pipe>>) -> Self {
Self { pipes_map, ..self }
}

pub fn verbose(self, toggle: bool) -> Self {
Self {
verbose: toggle,
..self
}
}

pub fn globally_enable_watch(self, toggle: bool) -> Self {
Self {
watch_enabled_globally: toggle,
..self
}
}

pub async fn build(self) -> Result<HashMap<String, Addr<CommandActor>>> {
let Self {
config,
console,
watcher,
base_dir,
verbose,
pipes_map,
watch_enabled_globally,
colors_map,
} = self;
let mut shared_env = HashMap::from_iter(std::env::vars());
shared_env.extend(lade_sdk::resolve(&config.env, &shared_env)?);
let shared_env = lade_sdk::hydrate(shared_env, base_dir.clone()).await?;
Expand Down Expand Up @@ -173,6 +229,7 @@ impl CommandActor {
task_pipes,
colors,
op.entrypoint.clone(),
watch_enabled_globally,
)
.start();

Expand All @@ -182,9 +239,32 @@ impl CommandActor {
commands.insert(op_name, actor);
}

Ok(commands.values().map(|i| i.to_owned()).collect::<Vec<_>>())
Ok(commands)
}
}

pub struct CommandActor {
op_name: String,
operator: Task,
console: Addr<ConsoleAct>,
watcher: Addr<WatcherAct>,
arbiter: Arbiter,
child: Child,
nexts: Vec<Addr<CommandActor>>,
cwd: PathBuf,
self_addr: Option<Addr<CommandActor>>,
pending_upstream: BTreeMap<String, usize>,
verbose: bool,
started_at: DateTime<Local>,
env: Vec<(String, String)>,
pipes: Vec<Pipe>,
colors: Vec<ColorOption>,
entrypoint: Option<String>,
watch: bool,
death_invite: Option<PermaDeathInvite>,
}

impl CommandActor {
#[allow(clippy::too_many_arguments)]
pub fn new(
op_name: String,
Expand All @@ -198,6 +278,7 @@ impl CommandActor {
pipes: Vec<Pipe>,
colors: Vec<ColorOption>,
entrypoint: Option<String>,
watch: bool,
) -> Self {
Self {
op_name,
Expand All @@ -216,6 +297,8 @@ impl CommandActor {
pipes,
colors,
entrypoint,
watch,
death_invite: None,
}
}

Expand Down Expand Up @@ -409,6 +492,17 @@ impl CommandActor {

Ok(())
}

fn accept_death_invite(&mut self, cx: &mut Context<Self>) {
if let Some(invite) = self.death_invite.take() {
let status = match &self.child {
Child::Killed => ExitStatus::Other(1),
Child::Exited(val) => *val,
child => panic!("invalid death invite acceptance: {child:?}"),
};
invite.rsvp::<Self, Context<Self>>(self.op_name.clone(), status, cx);
}
}
}

impl Actor for CommandActor {
Expand All @@ -426,7 +520,7 @@ impl Actor for CommandActor {

let watches = self.operator.watch.resolve();

if !watches.is_empty() {
if self.watch && !watches.is_empty() {
let mut on = GlobSetBuilder::new();
for pattern in self.operator.watch.resolve() {
on.add(
Expand Down Expand Up @@ -527,7 +621,7 @@ impl Handler<Reload> for CommandActor {
self.send_will_reload();
}
Reload::Watch(files) => {
self.log_info(format!("RELOAD: files {} changed", files));
self.log_info(format!("RELOAD: file changed: {files} "));
self.send_will_reload();
}
Reload::Op(op_name) => {
Expand Down Expand Up @@ -586,6 +680,7 @@ impl Handler<WaitStatus> for CommandActor {
Box::pin(f)
}
}

#[derive(Message)]
#[rtype(result = "()")]
struct StdoutTerminated {
Expand All @@ -595,15 +690,24 @@ struct StdoutTerminated {
impl Handler<StdoutTerminated> for CommandActor {
type Result = ();

fn handle(&mut self, msg: StdoutTerminated, _: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: StdoutTerminated, cx: &mut Self::Context) -> Self::Result {
if msg.started_at == self.started_at {
self.ensure_stopped();
// since there's a chance that child might not be done by this point
// wait for it die for a maximum of 1 seconds
// before pulling the plug
if self
.child
.wait_or_kill(Duration::from_millis(1000))
.unwrap()
{
self.send_reload();
}
let exit = self.child.exit_status();

self.console.do_send(PanelStatus {
panel_name: self.op_name.clone(),
status: exit,
});
self.accept_death_invite(cx);
}
}
}
Expand All @@ -616,6 +720,25 @@ impl Handler<PoisonPill> for CommandActor {
type Result = ();

fn handle(&mut self, _: PoisonPill, ctx: &mut Context<Self>) -> Self::Result {
self.accept_death_invite(ctx);
ctx.stop();
}
}

impl Handler<PermaDeathInvite> for CommandActor {
type Result = ();

fn handle(&mut self, evt: PermaDeathInvite, cx: &mut Context<Self>) -> Self::Result {
self.child.poll(false).unwrap();
let status = match &self.child {
Child::Killed => Some(ExitStatus::Other(1)),
Child::Exited(val) => Some(*val),
_ => None,
};
if let Some(status) = status {
evt.rsvp::<Self, Self::Context>(self.op_name.clone(), status, cx);
} else {
self.death_invite = Some(evt);
}
}
}
Loading

0 comments on commit 1330a27

Please sign in to comment.