diff --git a/Cargo.lock b/Cargo.lock index ddb3b847d4..8b43ae9137 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -12570,6 +12570,7 @@ dependencies = [ "rayon", "schnellru", "schnorrkel", + "scopeguard", "serde", "serde_json", "ss58-registry", @@ -12643,6 +12644,7 @@ dependencies = [ "jsonrpsee", "mimalloc", "parking_lot 0.12.3", + "scopeguard", "subspace-core-primitives", "subspace-data-retrieval", "subspace-erasure-coding", @@ -12846,6 +12848,7 @@ dependencies = [ "sc-telemetry", "sc-transaction-pool-api", "sc-utils", + "scopeguard", "serde_json", "sp-api", "sp-blockchain", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 331c9731d6..9fad81628d 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -48,6 +48,7 @@ rand = "0.8.5" rayon = "1.10.0" schnellru = "0.2.3" schnorrkel = "0.11.4" +scopeguard = "1.2.0" serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.128" static_assertions = "1.1.0" @@ -100,3 +101,6 @@ binary = [ "dep:supports-color", "dep:tracing-subscriber", ] + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs index ba042d06f5..ded5961afb 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs @@ -7,7 +7,7 @@ use crate::commands::cluster::cache::{cache, CacheArgs}; use crate::commands::cluster::controller::{controller, ControllerArgs}; use crate::commands::cluster::farmer::{farmer, FarmerArgs}; use crate::commands::cluster::plotter::{plotter, PlotterArgs}; -use crate::utils::shutdown_signal; +use crate::utils::{shutdown_signal, spawn_shutdown_watchdog}; use anyhow::anyhow; use async_nats::ServerAddr; use backoff::ExponentialBackoff; @@ -22,6 +22,7 @@ use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::utils::AsyncJoinOnDrop; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_proof_of_space::Table; +use tokio::runtime::Handle; /// Arguments for cluster #[derive(Debug, Parser)] @@ -87,6 +88,13 @@ where PosTable: Table, { let signal = shutdown_signal(); + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let ClusterArgs { shared_args, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 0cbe578a92..0bbf69a273 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -1,6 +1,6 @@ use crate::commands::shared::network::{configure_network, NetworkArgs}; use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority}; -use crate::utils::shutdown_signal; +use crate::utils::{shutdown_signal, spawn_shutdown_watchdog}; use anyhow::anyhow; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use backoff::ExponentialBackoff; @@ -54,6 +54,7 @@ use subspace_kzg::Kzg; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; +use tokio::runtime::Handle; use tokio::sync::{Barrier, Semaphore}; use tracing::{error, info, info_span, warn, Instrument}; @@ -303,6 +304,13 @@ where PosTable: Table, { let signal = shutdown_signal(); + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let FarmingArgs { node_rpc_url, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs index bd9ca957ab..56470111b9 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs @@ -1,4 +1,16 @@ +use std::process::exit; +use std::time::Duration; +use std::{process, thread}; +use tokio::runtime::{Handle, Runtime}; use tokio::signal; +use tracing::{debug, error, info}; + +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); pub(crate) fn raise_fd_limit() { match fdlimit::raise_fd_limit() { @@ -52,3 +64,75 @@ pub(crate) async fn shutdown_signal() { tracing::info!("Received Ctrl+C, shutting down farmer..."); } + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let _ = shutdown_signal().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index e272074658..0b76a8b500 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -26,6 +26,7 @@ hex = "0.4.3" jsonrpsee = { version = "0.24.5", features = ["server"] } mimalloc = "0.1.43" parking_lot = "0.12.2" +scopeguard = "1.2.0" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } @@ -39,3 +40,6 @@ thiserror = "1.0.64" tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-gateway/src/commands.rs b/crates/subspace-gateway/src/commands.rs index 159afc558f..63174e0cb1 100644 --- a/crates/subspace-gateway/src/commands.rs +++ b/crates/subspace-gateway/src/commands.rs @@ -4,15 +4,24 @@ pub(crate) mod run; use crate::commands::run::RunOptions; use clap::Parser; -use std::panic; use std::process::exit; +use std::time::Duration; +use std::{panic, process, thread}; +use tokio::runtime::{Handle, Runtime}; use tokio::signal; use tracing::level_filters::LevelFilter; -use tracing::{debug, warn}; +use tracing::{debug, error, info, warn}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter, Layer}; +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); + /// Commands for working with a gateway. #[derive(Debug, Parser)] #[clap(about, version)] @@ -102,3 +111,75 @@ pub(crate) async fn shutdown_signal() { tracing::info!("Received Ctrl+C, shutting down gateway..."); } + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let _ = shutdown_signal().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index ace0f5333b..f22acffe79 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -6,7 +6,7 @@ mod rpc; use crate::commands::run::network::{configure_network, NetworkArgs}; use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; -use crate::commands::shutdown_signal; +use crate::commands::{shutdown_signal, spawn_shutdown_watchdog}; use crate::piece_getter::DsnPieceGetter; use crate::piece_validator::SegmentCommitmentPieceValidator; use anyhow::anyhow; @@ -20,6 +20,7 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_erasure_coding::ErasureCoding; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; use subspace_kzg::Kzg; +use tokio::runtime::Handle; use tracing::info; /// The default size limit, based on the maximum block size in some domains. @@ -58,6 +59,13 @@ pub(crate) struct GatewayOptions { /// Default run command for gateway pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let signal = shutdown_signal(); + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let RunOptions { gateway: diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index d215a014cb..ddc0f8dae8 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -24,5 +24,6 @@ async fn main() -> anyhow::Result<()> { commands::run::run(run_options).await?; } } + Ok(()) } diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index e90e9dac80..ab5d9cb253 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -79,3 +79,6 @@ features = [ rand = "0.8.5" # TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 is resolved libp2p-swarm-test = { version = "0.4.0", git = "https://github.com/autonomys/rust-libp2p", rev = "ae7527453146df24aff6afed5f5b9efdffbc15b8" } + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index d554991e80..a965b28b02 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -12,13 +12,16 @@ use serde::{Deserialize, Serialize}; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::panic; use std::process::exit; use std::sync::Arc; +use std::time::Duration; +use std::{panic, process, thread}; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::{peer_id, Config, KademliaMode}; -use tracing::{debug, info, Level}; +use tokio::runtime::{Handle, Runtime}; +use tokio::signal; +use tracing::{debug, error, info, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -26,6 +29,13 @@ use tracing_subscriber::EnvFilter; /// Size of the LRU cache for peers. pub const KNOWN_PEERS_CACHE_SIZE: u32 = 10000; +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); + #[derive(Debug, Parser)] #[clap(about, version)] enum Command { @@ -128,6 +138,109 @@ fn init_logging() { builder.init() } +#[cfg(unix)] +pub(crate) async fn shutdown_signal() { + use futures::FutureExt; + use std::pin::pin; + + futures::future::select( + pin!(signal::unix::signal(signal::unix::SignalKind::interrupt()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGINT, shutting down gateway..."); + }),), + pin!(signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGTERM, shutting down gateway..."); + }),), + ) + .await; +} + +#[cfg(not(unix))] +pub(crate) async fn shutdown_signal() { + signal::ctrl_c() + .await + .expect("Setting signal handlers must never fail"); + + tracing::info!("Received Ctrl+C, shutting down gateway..."); +} + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let _ = shutdown_signal().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} + #[tokio::main] async fn main() -> Result<(), Box> { set_exit_on_panic(); @@ -183,6 +296,10 @@ async fn main() -> Result<(), Box> { dsn_metrics_registry, ) }; + + // These tasks can hang on shutdown or when dropped. But there are no error returns + // here, so the only way we exit is when a task finishes. That means we can just launch + // the shutdown watchdog at the end of the block. let (node, mut node_runner) = subspace_networking::construct(config).expect("Networking stack creation failed."); @@ -210,12 +327,13 @@ async fn main() -> Result<(), Box> { .transpose()?; if let Some(prometheus_task) = prometheus_task { select! { - _ = node_runner.run().fuse() => {}, - _ = prometheus_task.fuse() => {}, + _ = node_runner.run().fuse() => {}, + _ = prometheus_task.fuse() => {}, } } else { node_runner.run().await } + spawn_shutdown_watchdog(Handle::current()); } Command::GenerateKeypair { json } => { let output = KeypairOutput::new(Keypair::generate()); diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index 8b197a10ab..138ac6c587 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -57,6 +57,7 @@ sc-telemetry = { git = "https://github.com/subspace/polkadot-sdk", rev = "587181 sc-transaction-pool-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-network-sync = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-utils = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } +scopeguard = "1.2.0" serde_json = "1.0.128" sp-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sp-blockchain = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } @@ -98,3 +99,6 @@ runtime-benchmarks = [ "subspace-runtime/runtime-benchmarks", "subspace-service/runtime-benchmarks", ] + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-node/src/commands/run.rs b/crates/subspace-node/src/commands/run.rs index a944fc5779..5b1cfb3b4d 100644 --- a/crates/subspace-node/src/commands/run.rs +++ b/crates/subspace-node/src/commands/run.rs @@ -8,7 +8,7 @@ use crate::commands::run::consensus::{ use crate::commands::run::domain::{ create_domain_configuration, run_domain, DomainOptions, DomainStartOptions, }; -use crate::commands::shared::init_logger; +use crate::commands::shared::{init_logger, spawn_shutdown_watchdog}; use crate::{set_default_ss58_version, Error, PosTable}; use clap::Parser; use cross_domain_message_gossip::GossipWorkerBuilder; @@ -27,6 +27,7 @@ use std::env; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_runtime::{Block, RuntimeApi}; use subspace_service::config::ChainSyncMode; +use tokio::runtime::Handle; use tracing::{debug, error, info, info_span, warn}; /// Options for running a node @@ -74,6 +75,13 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { raise_fd_limit(); let signals = Signals::capture()?; + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let RunOptions { consensus, diff --git a/crates/subspace-node/src/commands/shared.rs b/crates/subspace-node/src/commands/shared.rs index 52f48d1eec..c56d0a9c2a 100644 --- a/crates/subspace-node/src/commands/shared.rs +++ b/crates/subspace-node/src/commands/shared.rs @@ -1,18 +1,28 @@ use clap::Parser; -use sc_cli::Error; +use sc_cli::{Error, Signals}; use sc_keystore::LocalKeystore; use sp_core::crypto::{ExposeSecret, SecretString}; use sp_core::sr25519::Pair; use sp_core::Pair as PairT; use sp_domains::KEY_TYPE; use sp_keystore::Keystore; -use std::panic; use std::path::PathBuf; -use std::process::exit; +use std::process::{self, exit}; +use std::time::Duration; +use std::{panic, thread}; +use tokio::runtime::{Handle, Runtime}; +use tracing::{debug, error, info}; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); + /// Options used for keystore #[derive(Debug, Parser)] pub(super) struct KeystoreOptions { @@ -82,3 +92,77 @@ pub(super) fn init_logger() { ) .init(); } + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let signals = Signals::capture() + .expect("creating a future to wait for shutdown signal failed"); + let _ = signals.future().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +}