Skip to content

Commit

Permalink
Capture recent output for nicer error messages
Browse files Browse the repository at this point in the history
This will include recent output in the Slack error messages.
  • Loading branch information
snoyberg committed Jun 13, 2024
1 parent 8bf2ae8 commit b3f3a58
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 34 deletions.
98 changes: 66 additions & 32 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use parking_lot::Mutex;
use reqwest::Url;
use signal_hook::consts::{SIGINT, SIGTERM};
use std::{
collections::VecDeque,
io::{Read, Write},
process::{Child, Command, ExitStatus, Stdio},
str::FromStr,
Expand Down Expand Up @@ -48,6 +49,9 @@ pub(crate) struct Cli {
/// Arguments to the process
#[arg(required = false)]
pub(crate) args: Vec<String>,
/// How many lines of output should we store for error messages?
#[arg(long, default_value_t = 10)]
pub(crate) output_lines: usize,
}

#[derive(Debug)]
Expand Down Expand Up @@ -82,45 +86,59 @@ impl Cli {
let mut command = Command::new(&self.command);
command.args(&self.args[..]);

if self.task_output_timeout.is_some() {
command.stdout(Stdio::piped()).stderr(Stdio::piped());
}
command.stdout(Stdio::piped()).stderr(Stdio::piped());

let mut child = command
.spawn()
.context(format!("Failed to spawn {}", self.command))?;

let (send, recv) = mpsc::channel::<MainMessage>();
let send = SendMainMessage(send);
let max_recent_output = self.output_lines;
let recent_output = Arc::new(Mutex::new(VecDeque::with_capacity(max_recent_output)));

match self.task_output_timeout {
Some(task_output_timeout) => {
let last_output = Arc::new(Mutex::new(Instant::now()));
let child_stdout = child.stdout.take().context("child stdout is None")?;
let child_stderr = child.stderr.take().context("child stderr is None")?;
let send_clone = send.clone();
let last_output_clone = last_output.clone();
std::thread::spawn(|| {
process_std_handle(child_stdout, send_clone, StdType::Stdout, last_output_clone)
});
let send_clone = send.clone();
let last_output_clone = last_output.clone();
std::thread::spawn(|| {
process_std_handle(child_stderr, send_clone, StdType::Stderr, last_output_clone)
});
let send_clone = send.clone();
std::thread::spawn(move || {
detect_deadlock(
last_output,
send_clone,
Duration::from_secs(task_output_timeout),
)
});
}
None => {
anyhow::ensure!(child.stdout.is_none());
anyhow::ensure!(child.stderr.is_none());
}
// Always capture output so we can keep recent output available for error messages.
let last_output = Arc::new(Mutex::new(Instant::now()));
{
let child_stdout = child.stdout.take().context("child stdout is None")?;
let child_stderr = child.stderr.take().context("child stderr is None")?;
let send_clone = send.clone();
let last_output_clone = last_output.clone();
let recent_output_clone = recent_output.clone();
std::thread::spawn(move || {
process_std_handle(
child_stdout,
send_clone,
StdType::Stdout,
last_output_clone,
recent_output_clone,
max_recent_output,
)
});
let send_clone = send.clone();
let last_output_clone = last_output.clone();
let recent_output_clone = recent_output.clone();
std::thread::spawn(move || {
process_std_handle(
child_stderr,
send_clone,
StdType::Stderr,
last_output_clone,
recent_output_clone,
max_recent_output,
)
});
}

if let Some(task_output_timeout) = self.task_output_timeout {
let send_clone = send.clone();
std::thread::spawn(move || {
detect_deadlock(
last_output,
send_clone,
Duration::from_secs(task_output_timeout),
)
});
}

let send_clone = send.clone();
Expand Down Expand Up @@ -174,7 +192,12 @@ impl Cli {
self.app_version,
self.image_url,
);
let result = slack_app.send_notification(&e);
let mut msg = String::new();
for line in &*recent_output.lock() {
msg.push_str(line);
msg.push('\n');
}
let result = slack_app.send_notification(&e, &msg);
if let Err(err) = result {
eprintln!("Slack notification failed: {err:?}");
}
Expand All @@ -190,6 +213,8 @@ fn process_std_handle(
send: SendMainMessage,
std_type: StdType,
last_output: Arc<Mutex<Instant>>,
recent_output: Arc<Mutex<VecDeque<String>>>,
max_recent_output: usize,
) {
let mut buffer = [0u8; 4096];
loop {
Expand Down Expand Up @@ -217,6 +242,15 @@ fn process_std_handle(
send.send(MainMessage::Error(e));
break;
}

let mut guard = recent_output.lock();
for line in buffer.split(|x| *x == b'\n') {
if guard.len() >= max_recent_output {
guard.pop_front();
}
let line = line.strip_suffix(&[b'\r']).unwrap_or(line);
guard.push_back(String::from_utf8_lossy(line).into_owned());
}
}
Err(e) => {
send.send(MainMessage::Error(e));
Expand Down
15 changes: 13 additions & 2 deletions src/slack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,11 @@ impl SlackApp {
)
}

pub(crate) fn send_notification(&self, message: &anyhow::Error) -> Result<()> {
pub(crate) fn send_notification(
&self,
message: &anyhow::Error,
latest_output: &str,
) -> Result<()> {
let description = self.compute_description();
let mut value = serde_json::json!(
{
Expand All @@ -69,7 +73,14 @@ impl SlackApp {
"type": "mrkdwn",
"text": description
},
}
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": latest_output,
}
},
]
});
if let Some(image_url) = &self.app_info.image_url {
Expand Down

0 comments on commit b3f3a58

Please sign in to comment.