Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a shutdown watchdog to subspace binaries #3170

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ rand = "0.8.5"
rayon = "1.10.0"
schnellru = "0.2.3"
schnorrkel = "0.11.4"
scopeguard = "1.2.0"
serde = { version = "1.0.110", features = ["derive"] }
serde_json = "1.0.128"
static_assertions = "1.1.0"
Expand Down Expand Up @@ -100,3 +101,6 @@ binary = [
"dep:supports-color",
"dep:tracing-subscriber",
]

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::commands::cluster::cache::{cache, CacheArgs};
use crate::commands::cluster::controller::{controller, ControllerArgs};
use crate::commands::cluster::farmer::{farmer, FarmerArgs};
use crate::commands::cluster::plotter::{plotter, PlotterArgs};
use crate::utils::shutdown_signal;
use crate::utils::{shutdown_signal, spawn_shutdown_watchdog};
use anyhow::anyhow;
use async_nats::ServerAddr;
use backoff::ExponentialBackoff;
Expand All @@ -22,6 +22,7 @@ use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::utils::AsyncJoinOnDrop;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_proof_of_space::Table;
use tokio::runtime::Handle;

/// Arguments for cluster
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -87,6 +88,13 @@ where
PosTable: Table,
{
let signal = shutdown_signal();
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let ClusterArgs {
shared_args,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use crate::utils::{shutdown_signal, spawn_shutdown_watchdog};
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use backoff::ExponentialBackoff;
Expand Down Expand Up @@ -54,6 +54,7 @@ use subspace_kzg::Kzg;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::runtime::Handle;
use tokio::sync::{Barrier, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};

Expand Down Expand Up @@ -303,6 +304,13 @@ where
PosTable: Table,
{
let signal = shutdown_signal();
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let FarmingArgs {
node_rpc_url,
Expand Down
84 changes: 84 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/utils.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
use std::process::exit;
use std::time::Duration;
use std::{process, thread};
use tokio::runtime::{Handle, Runtime};
use tokio::signal;
use tracing::{debug, error, info};

/// The amount of time we wait for tasks to finish when shutting down.
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60);

/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the
/// user to trace the process, before exiting.
pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15);

pub(crate) fn raise_fd_limit() {
match fdlimit::raise_fd_limit() {
Expand Down Expand Up @@ -52,3 +64,75 @@ pub(crate) async fn shutdown_signal() {

tracing::info!("Received Ctrl+C, shutting down farmer...");
}

/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is
/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately.
///
/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks
/// blocking shutdown on `runtime_handle`.
///
/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for
/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is
/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in
/// this spawned thread will never be called.
#[cfg_attr(
not(all(tokio_unstable, tokio_taskdump)),
expect(unused_variables, reason = "handle only used in some configs")
)]
pub fn spawn_shutdown_watchdog(runtime_handle: Handle) {
// TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout()
// instead of sleep() and exit()

thread::spawn(move || {
// Shut down immediately if we get a second Ctrl-C.
//
// A tokio runtime that's shutting down will cancel pending futures, so we need to
// wait for ctrl_c() on a separate runtime.
thread::spawn(|| {
debug!("waiting for a second shutdown signal");
Runtime::new()
.expect("creating a runtime to wait for shutdown signal failed")
.block_on(async {
let _ = shutdown_signal().await;
info!("second shutdown signal received, shutting down immediately");
exit(1);
});
});

debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down");
thread::sleep(SHUTDOWN_TIMEOUT);

// Force a shutdown if a task is blocking.
error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit");
info!(
"run `flamegraph --pid {}` or similar to generate a stack dump",
process::id()
);

// Log all the async tasks and spawn_blocking() tasks that are still running.
//
// A tokio runtime that's shutting down will cancel a dump at its first await
// point, so we need to call dump() on a separate runtime.
#[cfg(all(tokio_unstable, tokio_taskdump))]
thread::spawn(move || {
use tracing::warn;

error!(
?SHUTDOWN_TIMEOUT,
"shutdown timed out, trying to dump blocking tasks"
);
Runtime::new()
.expect("creating a runtime to dump blocking tasks failed")
.block_on(async move {
for (task_number, task) in handle.dump().await.tasks().iter().enumerate() {
let trace = task.trace();
warn!(task_number, trace, "blocking task backtrace");
}
});
});

// Give the log messages time to flush, and any dumps time to finish.
thread::sleep(TRACE_TIMEOUT);
exit(1);
});
}
4 changes: 4 additions & 0 deletions crates/subspace-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ hex = "0.4.3"
jsonrpsee = { version = "0.24.5", features = ["server"] }
mimalloc = "0.1.43"
parking_lot = "0.12.2"
scopeguard = "1.2.0"
subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" }
subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" }
subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" }
Expand All @@ -39,3 +40,6 @@ thiserror = "1.0.64"
tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
85 changes: 83 additions & 2 deletions crates/subspace-gateway/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,24 @@ pub(crate) mod run;

use crate::commands::run::RunOptions;
use clap::Parser;
use std::panic;
use std::process::exit;
use std::time::Duration;
use std::{panic, process, thread};
use tokio::runtime::{Handle, Runtime};
use tokio::signal;
use tracing::level_filters::LevelFilter;
use tracing::{debug, warn};
use tracing::{debug, error, info, warn};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::{fmt, EnvFilter, Layer};

/// The amount of time we wait for tasks to finish when shutting down.
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60);

/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the
/// user to trace the process, before exiting.
pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15);

/// Commands for working with a gateway.
#[derive(Debug, Parser)]
#[clap(about, version)]
Expand Down Expand Up @@ -102,3 +111,75 @@ pub(crate) async fn shutdown_signal() {

tracing::info!("Received Ctrl+C, shutting down gateway...");
}

/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is
/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately.
///
/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks
/// blocking shutdown on `runtime_handle`.
///
/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for
/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is
/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in
/// this spawned thread will never be called.
#[cfg_attr(
not(all(tokio_unstable, tokio_taskdump)),
expect(unused_variables, reason = "handle only used in some configs")
)]
pub fn spawn_shutdown_watchdog(runtime_handle: Handle) {
// TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout()
// instead of sleep() and exit()

thread::spawn(move || {
// Shut down immediately if we get a second Ctrl-C.
//
// A tokio runtime that's shutting down will cancel pending futures, so we need to
// wait for ctrl_c() on a separate runtime.
thread::spawn(|| {
debug!("waiting for a second shutdown signal");
Runtime::new()
.expect("creating a runtime to wait for shutdown signal failed")
.block_on(async {
let _ = shutdown_signal().await;
info!("second shutdown signal received, shutting down immediately");
exit(1);
});
});

debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down");
thread::sleep(SHUTDOWN_TIMEOUT);

// Force a shutdown if a task is blocking.
error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit");
info!(
"run `flamegraph --pid {}` or similar to generate a stack dump",
process::id()
);

// Log all the async tasks and spawn_blocking() tasks that are still running.
//
// A tokio runtime that's shutting down will cancel a dump at its first await
// point, so we need to call dump() on a separate runtime.
#[cfg(all(tokio_unstable, tokio_taskdump))]
thread::spawn(move || {
use tracing::warn;

error!(
?SHUTDOWN_TIMEOUT,
"shutdown timed out, trying to dump blocking tasks"
);
Runtime::new()
.expect("creating a runtime to dump blocking tasks failed")
.block_on(async move {
for (task_number, task) in handle.dump().await.tasks().iter().enumerate() {
let trace = task.trace();
warn!(task_number, trace, "blocking task backtrace");
}
});
});

// Give the log messages time to flush, and any dumps time to finish.
thread::sleep(TRACE_TIMEOUT);
exit(1);
});
}
10 changes: 9 additions & 1 deletion crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod rpc;

use crate::commands::run::network::{configure_network, NetworkArgs};
use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT};
use crate::commands::shutdown_signal;
use crate::commands::{shutdown_signal, spawn_shutdown_watchdog};
use crate::piece_getter::DsnPieceGetter;
use crate::piece_validator::SegmentCommitmentPieceValidator;
use anyhow::anyhow;
Expand All @@ -20,6 +20,7 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_erasure_coding::ErasureCoding;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig};
use subspace_kzg::Kzg;
use tokio::runtime::Handle;
use tracing::info;

/// The default size limit, based on the maximum block size in some domains.
Expand Down Expand Up @@ -58,6 +59,13 @@ pub(crate) struct GatewayOptions {
/// Default run command for gateway
pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let RunOptions {
gateway:
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ async fn main() -> anyhow::Result<()> {
commands::run::run(run_options).await?;
}
}

Ok(())
}
3 changes: 3 additions & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ features = [
rand = "0.8.5"
# TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 is resolved
libp2p-swarm-test = { version = "0.4.0", git = "https://github.com/autonomys/rust-libp2p", rev = "ae7527453146df24aff6afed5f5b9efdffbc15b8" }

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
Loading
Loading