Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shutdown watchdog to subspace binaries #3170

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rand = "0.8.5"
rayon = "1.10.0"
schnellru = "0.2.3"
schnorrkel = "0.11.4"
scopeguard = "1.2.0"
serde = { version = "1.0.110", features = ["derive"] }
serde_json = "1.0.128"
static_assertions = "1.1.0"
Expand Down Expand Up @@ -100,3 +101,6 @@ binary = [
"dep:supports-color",
"dep:tracing-subscriber",
]

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
11 changes: 11 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
mod commands;
mod utils;

use crate::utils::spawn_shutdown_watchdog;
use clap::Parser;
use std::fs;
use std::path::PathBuf;
use subspace_farmer::single_disk_farm::{ScrubTarget, SingleDiskFarm};
use subspace_proof_of_space::chia::ChiaTable;
use tokio::runtime::Handle;
use tracing::info;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -97,9 +99,18 @@ async fn main() -> anyhow::Result<()> {

match command {
Command::Farm(farming_args) => {
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
commands::farm::farm::<PosTable>(farming_args).await?;
}
Command::Cluster(cluster_args) => {
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
commands::cluster::cluster::<PosTable>(cluster_args).await?;
}
Command::Benchmark(benchmark_args) => {
Expand Down
84 changes: 84 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
use std::process::exit;
use std::time::Duration;
use std::{process, thread};
use tokio::runtime::{Handle, Runtime};
use tokio::signal;
use tracing::{debug, error, info};

/// The amount of time we wait for tasks to finish when shutting down.
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60);

/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the
/// user to trace the process, before exiting.
pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15);

pub(crate) fn raise_fd_limit() {
match fdlimit::raise_fd_limit() {
Expand Down Expand Up @@ -52,3 +64,75 @@ pub(crate) async fn shutdown_signal() {

tracing::info!("Received Ctrl+C, shutting down farmer...");
}

/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is
/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately.
///
/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks
/// blocking shutdown on `runtime_handle`.
///
/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for
/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is
/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in
/// this spawned thread will never be called.
#[cfg_attr(
not(all(tokio_unstable, tokio_taskdump)),
expect(unused_variables, reason = "handle only used in some configs")
)]
pub fn spawn_shutdown_watchdog(runtime_handle: Handle) {
// TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout()
// instead of sleep() and exit()

thread::spawn(move || {
// Shut down immediately if we get a second Ctrl-C.
//
// A tokio runtime that's shutting down will cancel pending futures, so we need to
// wait for ctrl_c() on a separate runtime.
thread::spawn(|| {
debug!("waiting for a second shutdown signal");
Runtime::new()
.expect("creating a runtime to wait for shutdown signal failed")
.block_on(async {
let _ = shutdown_signal().await;
info!("second shutdown signal received, shutting down immediately");
exit(1);
});
});

debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down");
thread::sleep(SHUTDOWN_TIMEOUT);

// Force a shutdown if a task is blocking.
error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit");
info!(
"run `flamegraph --pid {}` or similar to generate a stack dump",
process::id()
);

// Log all the async tasks and spawn_blocking() tasks that are still running.
//
// A tokio runtime that's shutting down will cancel a dump at its first await
// point, so we need to call dump() on a separate runtime.
#[cfg(all(tokio_unstable, tokio_taskdump))]
thread::spawn(move || {
use tracing::warn;

error!(
?SHUTDOWN_TIMEOUT,
"shutdown timed out, trying to dump blocking tasks"
);
Runtime::new()
.expect("creating a runtime to dump blocking tasks failed")
.block_on(async move {
for (task_number, task) in handle.dump().await.tasks().iter().enumerate() {
let trace = task.trace();
warn!(task_number, trace, "blocking task backtrace");
}
});
});

// Give the log messages time to flush, and any dumps time to finish.
thread::sleep(TRACE_TIMEOUT);
exit(1);
});
}
4 changes: 4 additions & 0 deletions crates/subspace-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hex = "0.4.3"
jsonrpsee = { version = "0.24.5", features = ["server"] }
mimalloc = "0.1.43"
parking_lot = "0.12.2"
scopeguard = "1.2.0"
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" }
subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" }
Expand All @@ -39,3 +40,6 @@ thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
85 changes: 84 additions & 1 deletion crates/subspace-gateway/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@ pub(crate) mod run;

use crate::commands::run::RunOptions;
use clap::Parser;
use std::process::exit;
use std::time::Duration;
use std::{process, thread};
use tokio::runtime::{Handle, Runtime};
use tokio::signal;
use tracing::level_filters::LevelFilter;
use tracing::{debug, warn};
use tracing::{debug, error, info, warn};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter, Layer};

/// The amount of time we wait for tasks to finish when shutting down.
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60);

/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the
/// user to trace the process, before exiting.
pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15);

/// Commands for working with a gateway.
#[derive(Debug, Parser)]
#[clap(about, version)]
Expand Down Expand Up @@ -90,3 +101,75 @@ pub(crate) async fn shutdown_signal() {

tracing::info!("Received Ctrl+C, shutting down gateway...");
}

/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is
/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately.
///
/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks
/// blocking shutdown on `runtime_handle`.
///
/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for
/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is
/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in
/// this spawned thread will never be called.
#[cfg_attr(
not(all(tokio_unstable, tokio_taskdump)),
expect(unused_variables, reason = "handle only used in some configs")
)]
pub fn spawn_shutdown_watchdog(runtime_handle: Handle) {
// TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout()
// instead of sleep() and exit()

thread::spawn(move || {
// Shut down immediately if we get a second Ctrl-C.
//
// A tokio runtime that's shutting down will cancel pending futures, so we need to
// wait for ctrl_c() on a separate runtime.
thread::spawn(|| {
debug!("waiting for a second shutdown signal");
Runtime::new()
.expect("creating a runtime to wait for shutdown signal failed")
.block_on(async {
let _ = shutdown_signal().await;
info!("second shutdown signal received, shutting down immediately");
exit(1);
});
});

debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down");
thread::sleep(SHUTDOWN_TIMEOUT);

// Force a shutdown if a task is blocking.
error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit");
info!(
"run `flamegraph --pid {}` or similar to generate a stack dump",
process::id()
);

// Log all the async tasks and spawn_blocking() tasks that are still running.
//
// A tokio runtime that's shutting down will cancel a dump at its first await
// point, so we need to call dump() on a separate runtime.
#[cfg(all(tokio_unstable, tokio_taskdump))]
thread::spawn(move || {
use tracing::warn;

error!(
?SHUTDOWN_TIMEOUT,
"shutdown timed out, trying to dump blocking tasks"
);
Runtime::new()
.expect("creating a runtime to dump blocking tasks failed")
.block_on(async move {
for (task_number, task) in handle.dump().await.tasks().iter().enumerate() {
let trace = task.trace();
warn!(task_number, trace, "blocking task backtrace");
}
});
});

// Give the log messages time to flush, and any dumps time to finish.
thread::sleep(TRACE_TIMEOUT);
exit(1);
});
}
10 changes: 9 additions & 1 deletion crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ mod node_client;
mod piece_getter;
mod piece_validator;

use crate::commands::{init_logger, raise_fd_limit, Command};
use crate::commands::{init_logger, raise_fd_limit, spawn_shutdown_watchdog, Command};
use clap::Parser;
use tokio::runtime::Handle;

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
Expand All @@ -20,8 +21,15 @@ async fn main() -> anyhow::Result<()> {

match command {
Command::Run(run_options) => {
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
commands::run::run(run_options).await?;
}
}

Ok(())
}
4 changes: 4 additions & 0 deletions crates/subspace-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ sc-telemetry = { git = "https://github.com/subspace/polkadot-sdk", rev = "587181
sc-transaction-pool-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
sc-network-sync = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
sc-utils = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
scopeguard = "1.2.0"
serde_json = "1.0.128"
sp-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
sp-blockchain = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" }
Expand Down Expand Up @@ -98,3 +99,6 @@ runtime-benchmarks = [
"subspace-runtime/runtime-benchmarks",
"subspace-service/runtime-benchmarks",
]

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
9 changes: 8 additions & 1 deletion crates/subspace-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::commands::run::consensus::{
use crate::commands::run::domain::{
create_domain_configuration, run_domain, DomainOptions, DomainStartOptions,
};
use crate::commands::shared::init_logger;
use crate::commands::shared::{init_logger, spawn_shutdown_watchdog};
use crate::{set_default_ss58_version, Error, PosTable};
use clap::Parser;
use cross_domain_message_gossip::GossipWorkerBuilder;
Expand All @@ -27,6 +27,7 @@ use std::env;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_runtime::{Block, RuntimeApi};
use subspace_service::config::ChainSyncMode;
use tokio::runtime::Handle;
use tracing::{debug, error, info, info_span, warn};

/// Options for running a node
Expand Down Expand Up @@ -74,6 +75,12 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
raise_fd_limit();

let signals = Signals::capture()?;
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let RunOptions {
consensus,
Expand Down
Loading
Loading