Skip to content

Commit 6f0541d

Browse files
committed
Do not panic when SIGPIPE is received
1 parent be05555 commit 6f0541d

File tree

2 files changed

+51
-11
lines changed

2 files changed

+51
-11
lines changed

crates/hyperqueue/src/bin/hq.rs

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,6 @@ use clap::{CommandFactory, FromArgMatches};
22
use clap_complete::generate;
33
use cli_table::ColorChoice;
44
use colored::Colorize;
5-
use std::io;
6-
use std::io::IsTerminal;
7-
use std::panic::PanicHookInfo;
8-
95
use hyperqueue::HQ_VERSION;
106
use hyperqueue::client::commands::autoalloc::command_autoalloc;
117
use hyperqueue::client::commands::doc::command_doc;
@@ -16,7 +12,7 @@ use hyperqueue::client::commands::job::{
1612
};
1713
use hyperqueue::client::commands::journal::command_journal;
1814
use hyperqueue::client::commands::outputlog::command_reader;
19-
use hyperqueue::client::commands::server::command_server;
15+
use hyperqueue::client::commands::server::{ServerCommand, ServerOpts, command_server};
2016
use hyperqueue::client::commands::submit::command::{SubmitJobConfOpts, open_job};
2117
use hyperqueue::client::commands::submit::{
2218
JobSubmitFileOpts, JobSubmitOpts, submit_computation, submit_computation_from_job_file,
@@ -40,7 +36,7 @@ use hyperqueue::client::task::{
4036
use hyperqueue::common::cli::{
4137
ColorPolicy, CommonOpts, DeploySshOpts, GenerateCompletionOpts, HwDetectOpts, JobCommand,
4238
JobProgressOpts, JobWaitOpts, OptsWithMatches, RootOptions, SubCommand, WorkerAddressOpts,
43-
WorkerCommand, WorkerInfoOpts, WorkerListOpts, WorkerStopOpts, WorkerWaitOpts,
39+
WorkerCommand, WorkerInfoOpts, WorkerListOpts, WorkerOpts, WorkerStopOpts, WorkerWaitOpts,
4440
get_task_id_selector, get_task_selector,
4541
};
4642
use hyperqueue::common::setup::setup_logging;
@@ -52,6 +48,10 @@ use hyperqueue::transfer::messages::{
5248
use hyperqueue::worker::hwdetect::{
5349
detect_additional_resources, detect_cpus, prune_hyper_threading,
5450
};
51+
use nix::sys::signal::{SigHandler, Signal};
52+
use std::io;
53+
use std::io::IsTerminal;
54+
use std::panic::PanicHookInfo;
5555
use tako::resources::{CPU_RESOURCE_NAME, ResourceDescriptor, ResourceDescriptorItem};
5656

5757
#[cfg(feature = "jemalloc")]
@@ -382,6 +382,17 @@ environment variable, and attach the logs to the issue, to provide us more infor
382382
};
383383
}
384384

385+
#[cfg(unix)]
386+
fn reset_sigpipe() {
387+
unsafe {
388+
nix::sys::signal::signal(Signal::SIGPIPE, SigHandler::SigDfl)
389+
.expect("cannot reset sigpipe");
390+
}
391+
}
392+
393+
#[cfg(not(unix))]
394+
fn reset_sigpipe() {}
395+
385396
#[tokio::main(flavor = "current_thread")]
386397
async fn main() -> hyperqueue::Result<()> {
387398
// Augment panics - first print the error and backtrace like normally,
@@ -419,6 +430,35 @@ async fn main() -> hyperqueue::Result<()> {
419430

420431
let gsettings = make_global_settings(top_opts.common);
421432

433+
let is_cli_like = match &top_opts.subcmd {
434+
SubCommand::Server(ServerOpts {
435+
subcmd: ServerCommand::Start(_),
436+
}) => false,
437+
SubCommand::Worker(WorkerOpts {
438+
subcmd: WorkerCommand::Start(_),
439+
}) => false,
440+
#[cfg(feature = "dashboard")]
441+
SubCommand::Dashboard(_) => false,
442+
_ => true,
443+
};
444+
445+
if is_cli_like {
446+
// When our stdout is attached to a pipe and the pipe is closed,
447+
// it manifests as an I/O error, because the Rust runtime ignores
448+
// SIGPIPE by default.
449+
// This in turn causes `println!` to panic, which is not ideal,
450+
// because it crashes HQ when used with Unix CLI utilities (such as `head`).
451+
// Therefore, we reset SIGPIPE to its default behavior (terminate the process)
452+
// to avoid the panics.
453+
// See https://github.com/It4innovations/hyperqueue/issues/851.
454+
// However, we only do this for client commands, which are short running and
455+
// designed to be combined with other CLI tools.
456+
// Enabling this for server and workers has unintended consequences, for example
457+
// when a worker writes stdin to a task and the task has closed its stdin, then
458+
// this would terminate the worker.
459+
reset_sigpipe();
460+
}
461+
422462
let result = match top_opts.subcmd {
423463
SubCommand::Server(opts) => command_server(&gsettings, opts).await,
424464
SubCommand::Worker(opts) => match opts.subcmd {

crates/hyperqueue/src/client/commands/server.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::time::Duration;
1919
#[derive(Parser)]
2020
pub struct ServerOpts {
2121
#[clap(subcommand)]
22-
subcmd: ServerCommand,
22+
pub subcmd: ServerCommand,
2323
}
2424

2525
#[derive(Parser)]
@@ -57,7 +57,7 @@ pub struct GenerateAccessOpts {
5757
}
5858

5959
#[derive(Parser)]
60-
enum ServerCommand {
60+
pub enum ServerCommand {
6161
/// Start the HyperQueue server
6262
Start(ServerStartOpts),
6363
/// Stop the HyperQueue server, if it is running
@@ -69,7 +69,7 @@ enum ServerCommand {
6969
}
7070

7171
#[derive(Parser)]
72-
struct ServerStartOpts {
72+
pub struct ServerStartOpts {
7373
/// Hostname/IP of the machine under which is visible to others, default: hostname
7474
#[arg(long)]
7575
host: Option<String>,
@@ -119,10 +119,10 @@ struct ServerStartOpts {
119119
}
120120

121121
#[derive(Parser)]
122-
struct ServerStopOpts {}
122+
pub struct ServerStopOpts {}
123123

124124
#[derive(Parser)]
125-
struct ServerInfoOpts {}
125+
pub struct ServerInfoOpts {}
126126

127127
pub async fn command_server(gsettings: &GlobalSettings, opts: ServerOpts) -> anyhow::Result<()> {
128128
match opts.subcmd {

0 commit comments

Comments
 (0)