Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: better CI support #98

Merged
merged 13 commits into from
Oct 19, 2023
22 changes: 13 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,19 @@ complete example.

See `whiz --help` for more information.

| Flags | Description |
| ------------------- | ---------------------------- |
| -f, --file \<FILE\> | specify the config file |
| -h, --help | print help information |
| --list-jobs | list all the available jobs |
| -r, --run \<JOB\> | run specific jobs |
| -t, --timestamp | enable timestamps in logging |
| -v, --verbose | enable verbose mode |
| -V, --version | print whiz version |
| Flags | Description |
| ------------------- | ------------------------------------------------- |
| -f, --file \<FILE\> | Specify the config file |
| -h, --help | Print help information |
| --list-jobs | List all the available jobs |
| -r, --run \<JOB\> | Run specific jobs |
| -t, --timestamp | Enable timestamps in logging |
| -v, --verbose | Enable verbose mode |
| -V, --version | Print whiz version |
| -V, --version | Print whiz version |
| --watch | Globally enable/disable fs watching |
| --exit-after | Exit whiz after all tasks are done. Useful for CI |


### Key bindings

Expand Down
175 changes: 149 additions & 26 deletions src/actors/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::{
use shlex;

use crate::config::color::ColorOption;
use crate::actors::grim_reaper::PermaDeathInvite;
use crate::config::{
pipe::{OutputRedirection, Pipe},
Config, Task,
Expand Down Expand Up @@ -87,6 +88,30 @@ impl Child {
}
}

fn wait_or_kill(&mut self, dur: Duration) -> Result<bool> {
if let Child::Process(p) = self {
match p.wait_timeout(dur)? {
Some(status) => {
*self = Child::Exited(status);
Ok(true)
}
None => {
p.terminate()?;
p.kill()?;
let _status = p.wait()?;
if p.wait_timeout(Duration::from_millis(500))?.is_none() {
p.kill()?;
p.wait()?;
}
*self = Self::Killed;
Ok(true)
}
}
} else {
Ok(false)
}
}

fn exit_status(&mut self) -> Option<ExitStatus> {
match &self {
Child::Process(_) => None,
Expand All @@ -97,35 +122,66 @@ impl Child {
}
}

pub struct CommandActor {
op_name: String,
operator: Task,
pub struct CommandActorsBuilder {
config: Config,
console: Addr<ConsoleAct>,
watcher: Addr<WatcherAct>,
arbiter: Arbiter,
child: Child,
nexts: Vec<Addr<CommandActor>>,
cwd: PathBuf,
self_addr: Option<Addr<CommandActor>>,
pending_upstream: BTreeMap<String, usize>,
base_dir: PathBuf,
verbose: bool,
started_at: DateTime<Local>,
env: Vec<(String, String)>,
pipes: Vec<Pipe>,
colors: Vec<ColorOption>,
entrypoint: Option<String>,
colors_map: HashMap<String, Vec<ColorOption>>,
pipes_map: HashMap<String, Vec<Pipe>>,
watch_enabled_globally: bool,
}

impl CommandActor {
pub async fn from_config(
config: &Config,
impl CommandActorsBuilder {
pub fn new(
config: Config,
console: Addr<ConsoleAct>,
watcher: Addr<WatcherAct>,
base_dir: PathBuf,
verbose: bool,
pipes_map: HashMap<String, Vec<Pipe>>,
colors_map: HashMap<String, Vec<ColorOption>>,
) -> Result<Vec<Addr<CommandActor>>> {
) -> Self {
Self {
config,
console,
watcher,
base_dir,
verbose: false,
pipes_map: Default::default(),
watch_enabled_globally: true,
colors_map,
}
}

pub fn pipes_map(self, pipes_map: HashMap<String, Vec<Pipe>>) -> Self {
Self { pipes_map, ..self }
}

pub fn verbose(self, toggle: bool) -> Self {
Self {
verbose: toggle,
..self
}
}

pub fn globally_enable_watch(self, toggle: bool) -> Self {
Self {
watch_enabled_globally: toggle,
..self
}
}

pub async fn build(self) -> Result<HashMap<String, Addr<CommandActor>>> {
let Self {
config,
console,
watcher,
base_dir,
verbose,
pipes_map,
watch_enabled_globally,
colors_map,
} = self;
let mut shared_env = HashMap::from_iter(std::env::vars());
shared_env.extend(lade_sdk::resolve(&config.env, &shared_env)?);
let shared_env = lade_sdk::hydrate(shared_env, base_dir.clone()).await?;
Expand Down Expand Up @@ -173,6 +229,7 @@ impl CommandActor {
task_pipes,
colors,
op.entrypoint.clone(),
watch_enabled_globally,
)
.start();

Expand All @@ -182,9 +239,32 @@ impl CommandActor {
commands.insert(op_name, actor);
}

Ok(commands.values().map(|i| i.to_owned()).collect::<Vec<_>>())
Ok(commands)
}
}

pub struct CommandActor {
op_name: String,
operator: Task,
console: Addr<ConsoleAct>,
watcher: Addr<WatcherAct>,
arbiter: Arbiter,
child: Child,
nexts: Vec<Addr<CommandActor>>,
cwd: PathBuf,
self_addr: Option<Addr<CommandActor>>,
pending_upstream: BTreeMap<String, usize>,
verbose: bool,
started_at: DateTime<Local>,
env: Vec<(String, String)>,
pipes: Vec<Pipe>,
colors: Vec<ColorOption>,
entrypoint: Option<String>,
watch: bool,
death_invite: Option<PermaDeathInvite>,
}

impl CommandActor {
#[allow(clippy::too_many_arguments)]
pub fn new(
op_name: String,
Expand All @@ -198,6 +278,7 @@ impl CommandActor {
pipes: Vec<Pipe>,
colors: Vec<ColorOption>,
entrypoint: Option<String>,
watch: bool,
) -> Self {
Self {
op_name,
Expand All @@ -216,6 +297,8 @@ impl CommandActor {
pipes,
colors,
entrypoint,
watch,
death_invite: None,
}
}

Expand Down Expand Up @@ -409,6 +492,17 @@ impl CommandActor {

Ok(())
}

fn accept_death_invite(&mut self, cx: &mut Context<Self>) {
if let Some(invite) = self.death_invite.take() {
let status = match &self.child {
Child::Killed => ExitStatus::Other(1),
Child::Exited(val) => *val,
child => panic!("invalid death invite acceptance: {child:?}"),
};
invite.rsvp::<Self, Context<Self>>(self.op_name.clone(), status, cx);
}
}
}

impl Actor for CommandActor {
Expand All @@ -426,7 +520,7 @@ impl Actor for CommandActor {

let watches = self.operator.watch.resolve();

if !watches.is_empty() {
if self.watch && !watches.is_empty() {
let mut on = GlobSetBuilder::new();
for pattern in self.operator.watch.resolve() {
on.add(
Expand Down Expand Up @@ -527,7 +621,7 @@ impl Handler<Reload> for CommandActor {
self.send_will_reload();
}
Reload::Watch(files) => {
self.log_info(format!("RELOAD: files {} changed", files));
self.log_info(format!("RELOAD: file changed: {files} "));
self.send_will_reload();
}
Reload::Op(op_name) => {
Expand Down Expand Up @@ -586,6 +680,7 @@ impl Handler<WaitStatus> for CommandActor {
Box::pin(f)
}
}

#[derive(Message)]
#[rtype(result = "()")]
struct StdoutTerminated {
Expand All @@ -595,15 +690,24 @@ struct StdoutTerminated {
impl Handler<StdoutTerminated> for CommandActor {
type Result = ();

fn handle(&mut self, msg: StdoutTerminated, _: &mut Self::Context) -> Self::Result {
fn handle(&mut self, msg: StdoutTerminated, cx: &mut Self::Context) -> Self::Result {
if msg.started_at == self.started_at {
self.ensure_stopped();
// since there's a chance that child might not be done by this point
// wait for it die for a maximum of 1 seconds
// before pulling the plug
if self
.child
.wait_or_kill(Duration::from_millis(1000))
.unwrap()
{
self.send_reload();
}
let exit = self.child.exit_status();

self.console.do_send(PanelStatus {
panel_name: self.op_name.clone(),
status: exit,
});
self.accept_death_invite(cx);
}
}
}
Expand All @@ -616,6 +720,25 @@ impl Handler<PoisonPill> for CommandActor {
type Result = ();

fn handle(&mut self, _: PoisonPill, ctx: &mut Context<Self>) -> Self::Result {
self.accept_death_invite(ctx);
ctx.stop();
}
}

impl Handler<PermaDeathInvite> for CommandActor {
type Result = ();

fn handle(&mut self, evt: PermaDeathInvite, cx: &mut Context<Self>) -> Self::Result {
self.child.poll(false).unwrap();
let status = match &self.child {
Child::Killed => Some(ExitStatus::Other(1)),
Child::Exited(val) => Some(*val),
_ => None,
};
if let Some(status) = status {
evt.rsvp::<Self, Self::Context>(self.op_name.clone(), status, cx);
} else {
self.death_invite = Some(evt);
}
}
}
Loading