From 28c1f2807b190414d2b5ffe0b5b71ec941fa128c Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 25 Oct 2024 12:00:00 +1000 Subject: [PATCH 1/5] Add a shutdown watchdog fn to subspace binaries --- crates/subspace-farmer/Cargo.toml | 3 + .../src/bin/subspace-farmer/utils.rs | 84 ++++++++++++++++++ crates/subspace-gateway/Cargo.toml | 3 + crates/subspace-gateway/src/commands.rs | 85 +++++++++++++++++- crates/subspace-node/Cargo.toml | 3 + crates/subspace-node/src/commands/shared.rs | 88 ++++++++++++++++++- 6 files changed, 264 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 331c9731d6..4323db39f5 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -100,3 +100,6 @@ binary = [ "dep:supports-color", "dep:tracing-subscriber", ] + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs index bd9ca957ab..56470111b9 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/utils.rs @@ -1,4 +1,16 @@ +use std::process::exit; +use std::time::Duration; +use std::{process, thread}; +use tokio::runtime::{Handle, Runtime}; use tokio::signal; +use tracing::{debug, error, info}; + +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); pub(crate) fn raise_fd_limit() { match fdlimit::raise_fd_limit() { @@ -52,3 +64,75 @@ pub(crate) async fn shutdown_signal() { tracing::info!("Received Ctrl+C, shutting down farmer..."); } + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let _ = shutdown_signal().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index e272074658..6989a5754a 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -39,3 +39,6 @@ thiserror = "1.0.64" tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "macros"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-gateway/src/commands.rs b/crates/subspace-gateway/src/commands.rs index cbdf1190c6..a466cefaa7 100644 --- a/crates/subspace-gateway/src/commands.rs +++ b/crates/subspace-gateway/src/commands.rs @@ -4,13 +4,24 @@ pub(crate) mod run; use crate::commands::run::RunOptions; use clap::Parser; +use std::process::exit; +use std::time::Duration; +use std::{process, thread}; +use tokio::runtime::{Handle, Runtime}; use tokio::signal; use tracing::level_filters::LevelFilter; -use tracing::{debug, warn}; +use tracing::{debug, error, info, warn}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{fmt, EnvFilter, Layer}; +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); + /// Commands for working with a gateway. #[derive(Debug, Parser)] #[clap(about, version)] @@ -90,3 +101,75 @@ pub(crate) async fn shutdown_signal() { tracing::info!("Received Ctrl+C, shutting down gateway..."); } + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let _ = shutdown_signal().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index 8b197a10ab..54ec0512f1 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -98,3 +98,6 @@ runtime-benchmarks = [ "subspace-runtime/runtime-benchmarks", "subspace-service/runtime-benchmarks", ] + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-node/src/commands/shared.rs b/crates/subspace-node/src/commands/shared.rs index 1ea4b51967..c6a42ae851 100644 --- a/crates/subspace-node/src/commands/shared.rs +++ b/crates/subspace-node/src/commands/shared.rs @@ -1,5 +1,5 @@ use clap::Parser; -use sc_cli::Error; +use sc_cli::{Error, Signals}; use sc_keystore::LocalKeystore; use sp_core::crypto::{ExposeSecret, SecretString}; use sp_core::sr25519::Pair; @@ -7,10 +7,22 @@ use sp_core::Pair as PairT; use sp_domains::KEY_TYPE; use sp_keystore::Keystore; use std::path::PathBuf; +use std::process::{self, exit}; +use std::thread; +use std::time::Duration; +use tokio::runtime::{Handle, Runtime}; +use tracing::{debug, error, info}; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); + /// Options used for keystore #[derive(Debug, Parser)] pub(super) struct KeystoreOptions { @@ -70,3 +82,77 @@ pub(super) fn init_logger() { ) .init(); } + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let signals = Signals::capture() + .expect("creating a future to wait for shutdown signal failed"); + let _ = signals.future().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} From f88533a0df751430f247d3aa775e0219bf46fd70 Mon Sep 17 00:00:00 2001 From: teor Date: Fri, 25 Oct 2024 12:55:06 +1000 Subject: [PATCH 2/5] Spawn the watchdog when the runtime should finish --- Cargo.lock | 5 ++++- crates/subspace-farmer/Cargo.toml | 1 + .../subspace-farmer/src/bin/subspace-farmer/main.rs | 11 +++++++++++ crates/subspace-gateway/Cargo.toml | 1 + crates/subspace-gateway/src/main.rs | 10 +++++++++- crates/subspace-node/Cargo.toml | 1 + crates/subspace-node/src/commands/run.rs | 9 ++++++++- 7 files changed, 35 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 53b3c329e9..f4c67110ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "Inflector" @@ -12560,6 +12560,7 @@ dependencies = [ "rayon", "schnellru", "schnorrkel", + "scopeguard", "serde", "serde_json", "ss58-registry", @@ -12633,6 +12634,7 @@ dependencies = [ "jsonrpsee", "mimalloc", "parking_lot 0.12.3", + "scopeguard", "subspace-core-primitives", "subspace-data-retrieval", "subspace-erasure-coding", @@ -12836,6 +12838,7 @@ dependencies = [ "sc-telemetry", "sc-transaction-pool-api", "sc-utils", + "scopeguard", "serde_json", "sp-api", "sp-blockchain", diff --git a/crates/subspace-farmer/Cargo.toml b/crates/subspace-farmer/Cargo.toml index 4323db39f5..9fad81628d 100644 --- a/crates/subspace-farmer/Cargo.toml +++ b/crates/subspace-farmer/Cargo.toml @@ -48,6 +48,7 @@ rand = "0.8.5" rayon = "1.10.0" schnellru = "0.2.3" schnorrkel = "0.11.4" +scopeguard = "1.2.0" serde = { version = "1.0.110", features = ["derive"] } serde_json = "1.0.128" static_assertions = "1.1.0" diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index ff078fa7b9..271bc41dda 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -9,11 +9,13 @@ mod commands; mod utils; +use crate::utils::spawn_shutdown_watchdog; use clap::Parser; use std::fs; use std::path::PathBuf; use subspace_farmer::single_disk_farm::{ScrubTarget, SingleDiskFarm}; use subspace_proof_of_space::chia::ChiaTable; +use tokio::runtime::Handle; use tracing::info; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::prelude::*; @@ -97,9 +99,18 @@ async fn main() -> anyhow::Result<()> { match command { Command::Farm(farming_args) => { + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; commands::farm::farm::(farming_args).await?; } Command::Cluster(cluster_args) => { + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; commands::cluster::cluster::(cluster_args).await?; } Command::Benchmark(benchmark_args) => { diff --git a/crates/subspace-gateway/Cargo.toml b/crates/subspace-gateway/Cargo.toml index 6989a5754a..0b76a8b500 100644 --- a/crates/subspace-gateway/Cargo.toml +++ b/crates/subspace-gateway/Cargo.toml @@ -26,6 +26,7 @@ hex = "0.4.3" jsonrpsee = { version = "0.24.5", features = ["server"] } mimalloc = "0.1.43" parking_lot = "0.12.2" +scopeguard = "1.2.0" subspace-core-primitives = { version = "0.1.0", path = "../subspace-core-primitives" } subspace-data-retrieval = { version = "0.1.0", path = "../../shared/subspace-data-retrieval" } subspace-erasure-coding = { version = "0.1.0", path = "../subspace-erasure-coding" } diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index 82cb8b94c2..ff622d6d71 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -5,8 +5,9 @@ mod node_client; mod piece_getter; mod piece_validator; -use crate::commands::{init_logger, raise_fd_limit, Command}; +use crate::commands::{init_logger, raise_fd_limit, spawn_shutdown_watchdog, Command}; use clap::Parser; +use tokio::runtime::Handle; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; @@ -20,8 +21,15 @@ async fn main() -> anyhow::Result<()> { match command { Command::Run(run_options) => { + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; commands::run::run(run_options).await?; } } + Ok(()) } diff --git a/crates/subspace-node/Cargo.toml b/crates/subspace-node/Cargo.toml index 54ec0512f1..138ac6c587 100644 --- a/crates/subspace-node/Cargo.toml +++ b/crates/subspace-node/Cargo.toml @@ -57,6 +57,7 @@ sc-telemetry = { git = "https://github.com/subspace/polkadot-sdk", rev = "587181 sc-transaction-pool-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-network-sync = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sc-utils = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } +scopeguard = "1.2.0" serde_json = "1.0.128" sp-api = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } sp-blockchain = { git = "https://github.com/subspace/polkadot-sdk", rev = "5871818e1d736f1843eb9078f886290695165c42" } diff --git a/crates/subspace-node/src/commands/run.rs b/crates/subspace-node/src/commands/run.rs index a944fc5779..f05a2ac072 100644 --- a/crates/subspace-node/src/commands/run.rs +++ b/crates/subspace-node/src/commands/run.rs @@ -8,7 +8,7 @@ use crate::commands::run::consensus::{ use crate::commands::run::domain::{ create_domain_configuration, run_domain, DomainOptions, DomainStartOptions, }; -use crate::commands::shared::init_logger; +use crate::commands::shared::{init_logger, spawn_shutdown_watchdog}; use crate::{set_default_ss58_version, Error, PosTable}; use clap::Parser; use cross_domain_message_gossip::GossipWorkerBuilder; @@ -27,6 +27,7 @@ use std::env; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_runtime::{Block, RuntimeApi}; use subspace_service::config::ChainSyncMode; +use tokio::runtime::Handle; use tracing::{debug, error, info, info_span, warn}; /// Options for running a node @@ -74,6 +75,12 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { raise_fd_limit(); let signals = Signals::capture()?; + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let RunOptions { consensus, From bc43816efb53a076239ac2253f1bc2bd302ade79 Mon Sep 17 00:00:00 2001 From: teor Date: Mon, 28 Oct 2024 19:07:53 +1000 Subject: [PATCH 3/5] DEBUG: log what the farmer is doing when shutting down --- .../src/bin/subspace-farmer/commands/farm.rs | 6 +++++- crates/subspace-farmer/src/bin/subspace-farmer/main.rs | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 66aad5be88..cf6fab187e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -842,7 +842,9 @@ where select! { // Signal future - _ = signal.fuse() => {}, + _ = signal.fuse() => { + info!("signal select branch") + }, // Networking future _ = networking_fut.fuse() => { @@ -860,6 +862,8 @@ where }, } + info!("end of async fn farm()"); + anyhow::Ok(()) } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 271bc41dda..18cfbe80a6 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -103,9 +103,11 @@ async fn main() -> anyhow::Result<()> { // This watchdog runs on shutdown, and makes sure the process exits within a timeout, // or when the user sends a second Ctrl-C. scopeguard::defer! { + info!("spawning watchdog"); spawn_shutdown_watchdog(Handle::current()); }; commands::farm::farm::(farming_args).await?; + info!("end of block with defer!() and farm() calls"); } Command::Cluster(cluster_args) => { scopeguard::defer! { @@ -158,5 +160,7 @@ async fn main() -> anyhow::Result<()> { } } } + + info!("end of tokio::main()"); Ok(()) } From 2eca57da441c4921b66ddcd822114b0965fa031b Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 29 Oct 2024 11:20:20 +1000 Subject: [PATCH 4/5] Revert "DEBUG: log what the farmer is doing when shutting down" This reverts commit bc43816efb53a076239ac2253f1bc2bd302ade79. --- .../src/bin/subspace-farmer/commands/farm.rs | 6 +----- crates/subspace-farmer/src/bin/subspace-farmer/main.rs | 4 ---- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index cfd700e152..0cbe578a92 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -858,9 +858,7 @@ where select! { // Signal future - _ = signal.fuse() => { - info!("signal select branch") - }, + _ = signal.fuse() => {}, // Networking future _ = networking_fut.fuse() => { @@ -878,8 +876,6 @@ where }, } - info!("end of async fn farm()"); - anyhow::Ok(()) } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index bb3bbc416d..04e9a17475 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -112,11 +112,9 @@ async fn main() -> anyhow::Result<()> { // This watchdog runs on shutdown, and makes sure the process exits within a timeout, // or when the user sends a second Ctrl-C. scopeguard::defer! { - info!("spawning watchdog"); spawn_shutdown_watchdog(Handle::current()); }; commands::farm::farm::(farming_args).await?; - info!("end of block with defer!() and farm() calls"); } Command::Cluster(cluster_args) => { scopeguard::defer! { @@ -169,7 +167,5 @@ async fn main() -> anyhow::Result<()> { } } } - - info!("end of tokio::main()"); Ok(()) } From b64b2522b33da7d8e2c8b212244c1188fc7cb5ee Mon Sep 17 00:00:00 2001 From: teor Date: Tue, 29 Oct 2024 11:49:26 +1000 Subject: [PATCH 5/5] Move shutdown watchdog to an inner scope, implement for subspace-bootstrap-node --- .../bin/subspace-farmer/commands/cluster.rs | 10 +- .../src/bin/subspace-farmer/commands/farm.rs | 10 +- .../src/bin/subspace-farmer/main.rs | 11 -- crates/subspace-gateway/src/commands/run.rs | 10 +- crates/subspace-gateway/src/main.rs | 11 +- crates/subspace-networking/Cargo.toml | 3 + .../src/bin/subspace-bootstrap-node/main.rs | 126 +++++++++++++++++- crates/subspace-node/src/commands/run.rs | 1 + 8 files changed, 154 insertions(+), 28 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs index ba042d06f5..ded5961afb 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs @@ -7,7 +7,7 @@ use crate::commands::cluster::cache::{cache, CacheArgs}; use crate::commands::cluster::controller::{controller, ControllerArgs}; use crate::commands::cluster::farmer::{farmer, FarmerArgs}; use crate::commands::cluster::plotter::{plotter, PlotterArgs}; -use crate::utils::shutdown_signal; +use crate::utils::{shutdown_signal, spawn_shutdown_watchdog}; use anyhow::anyhow; use async_nats::ServerAddr; use backoff::ExponentialBackoff; @@ -22,6 +22,7 @@ use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::utils::AsyncJoinOnDrop; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_proof_of_space::Table; +use tokio::runtime::Handle; /// Arguments for cluster #[derive(Debug, Parser)] @@ -87,6 +88,13 @@ where PosTable: Table, { let signal = shutdown_signal(); + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let ClusterArgs { shared_args, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 0cbe578a92..0bbf69a273 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -1,6 +1,6 @@ use crate::commands::shared::network::{configure_network, NetworkArgs}; use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority}; -use crate::utils::shutdown_signal; +use crate::utils::{shutdown_signal, spawn_shutdown_watchdog}; use anyhow::anyhow; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use backoff::ExponentialBackoff; @@ -54,6 +54,7 @@ use subspace_kzg::Kzg; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::utils::piece_provider::PieceProvider; use subspace_proof_of_space::Table; +use tokio::runtime::Handle; use tokio::sync::{Barrier, Semaphore}; use tracing::{error, info, info_span, warn, Instrument}; @@ -303,6 +304,13 @@ where PosTable: Table, { let signal = shutdown_signal(); + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let FarmingArgs { node_rpc_url, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs index 04e9a17475..bab9a7abd2 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/main.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/main.rs @@ -9,14 +9,12 @@ mod commands; mod utils; -use crate::utils::spawn_shutdown_watchdog; use clap::Parser; use std::path::PathBuf; use std::process::exit; use std::{fs, panic}; use subspace_farmer::single_disk_farm::{ScrubTarget, SingleDiskFarm}; use subspace_proof_of_space::chia::ChiaTable; -use tokio::runtime::Handle; use tracing::info; use tracing_subscriber::filter::LevelFilter; use tracing_subscriber::prelude::*; @@ -108,18 +106,9 @@ async fn main() -> anyhow::Result<()> { match command { Command::Farm(farming_args) => { - // The async runtime can wait forever for tasks to yield or finish. - // This watchdog runs on shutdown, and makes sure the process exits within a timeout, - // or when the user sends a second Ctrl-C. - scopeguard::defer! { - spawn_shutdown_watchdog(Handle::current()); - }; commands::farm::farm::(farming_args).await?; } Command::Cluster(cluster_args) => { - scopeguard::defer! { - spawn_shutdown_watchdog(Handle::current()); - }; commands::cluster::cluster::(cluster_args).await?; } Command::Benchmark(benchmark_args) => { diff --git a/crates/subspace-gateway/src/commands/run.rs b/crates/subspace-gateway/src/commands/run.rs index ace0f5333b..f22acffe79 100644 --- a/crates/subspace-gateway/src/commands/run.rs +++ b/crates/subspace-gateway/src/commands/run.rs @@ -6,7 +6,7 @@ mod rpc; use crate::commands::run::network::{configure_network, NetworkArgs}; use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT}; -use crate::commands::shutdown_signal; +use crate::commands::{shutdown_signal, spawn_shutdown_watchdog}; use crate::piece_getter::DsnPieceGetter; use crate::piece_validator::SegmentCommitmentPieceValidator; use anyhow::anyhow; @@ -20,6 +20,7 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher; use subspace_erasure_coding::ErasureCoding; use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig}; use subspace_kzg::Kzg; +use tokio::runtime::Handle; use tracing::info; /// The default size limit, based on the maximum block size in some domains. @@ -58,6 +59,13 @@ pub(crate) struct GatewayOptions { /// Default run command for gateway pub async fn run(run_options: RunOptions) -> anyhow::Result<()> { let signal = shutdown_signal(); + // The async runtime can wait forever for tasks to yield or finish. + // This watchdog runs on shutdown, and makes sure the process exits within a timeout, + // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. + scopeguard::defer! { + spawn_shutdown_watchdog(Handle::current()); + }; let RunOptions { gateway: diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index e56343c1cf..ddc0f8dae8 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -5,11 +5,8 @@ mod node_client; mod piece_getter; mod piece_validator; -use crate::commands::{ - init_logger, raise_fd_limit, set_exit_on_panic, spawn_shutdown_watchdog, Command, -}; +use crate::commands::{init_logger, raise_fd_limit, set_exit_on_panic, Command}; use clap::Parser; -use tokio::runtime::Handle; #[global_allocator] static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc; @@ -24,12 +21,6 @@ async fn main() -> anyhow::Result<()> { match command { Command::Run(run_options) => { - // The async runtime can wait forever for tasks to yield or finish. - // This watchdog runs on shutdown, and makes sure the process exits within a timeout, - // or when the user sends a second Ctrl-C. - scopeguard::defer! { - spawn_shutdown_watchdog(Handle::current()); - }; commands::run::run(run_options).await?; } } diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index e90e9dac80..ab5d9cb253 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -79,3 +79,6 @@ features = [ rand = "0.8.5" # TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 is resolved libp2p-swarm-test = { version = "0.4.0", git = "https://github.com/autonomys/rust-libp2p", rev = "ae7527453146df24aff6afed5f5b9efdffbc15b8" } + +[lints.rust] +unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] } diff --git a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs index d554991e80..a965b28b02 100644 --- a/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs +++ b/crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs @@ -12,13 +12,16 @@ use serde::{Deserialize, Serialize}; use std::error::Error; use std::fmt::{Display, Formatter}; use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr}; -use std::panic; use std::process::exit; use std::sync::Arc; +use std::time::Duration; +use std::{panic, process, thread}; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::libp2p::multiaddr::Protocol; use subspace_networking::{peer_id, Config, KademliaMode}; -use tracing::{debug, info, Level}; +use tokio::runtime::{Handle, Runtime}; +use tokio::signal; +use tracing::{debug, error, info, Level}; use tracing_subscriber::fmt::Subscriber; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::EnvFilter; @@ -26,6 +29,13 @@ use tracing_subscriber::EnvFilter; /// Size of the LRU cache for peers. pub const KNOWN_PEERS_CACHE_SIZE: u32 = 10000; +/// The amount of time we wait for tasks to finish when shutting down. +pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60); + +/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the +/// user to trace the process, before exiting. +pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15); + #[derive(Debug, Parser)] #[clap(about, version)] enum Command { @@ -128,6 +138,109 @@ fn init_logging() { builder.init() } +#[cfg(unix)] +pub(crate) async fn shutdown_signal() { + use futures::FutureExt; + use std::pin::pin; + + futures::future::select( + pin!(signal::unix::signal(signal::unix::SignalKind::interrupt()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGINT, shutting down gateway..."); + }),), + pin!(signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Setting signal handlers must never fail") + .recv() + .map(|_| { + tracing::info!("Received SIGTERM, shutting down gateway..."); + }),), + ) + .await; +} + +#[cfg(not(unix))] +pub(crate) async fn shutdown_signal() { + signal::ctrl_c() + .await + .expect("Setting signal handlers must never fail"); + + tracing::info!("Received Ctrl+C, shutting down gateway..."); +} + +/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is +/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately. +/// +/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks +/// blocking shutdown on `runtime_handle`. +/// +/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for +/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is +/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in +/// this spawned thread will never be called. +#[cfg_attr( + not(all(tokio_unstable, tokio_taskdump)), + expect(unused_variables, reason = "handle only used in some configs") +)] +pub fn spawn_shutdown_watchdog(runtime_handle: Handle) { + // TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout() + // instead of sleep() and exit() + + thread::spawn(move || { + // Shut down immediately if we get a second Ctrl-C. + // + // A tokio runtime that's shutting down will cancel pending futures, so we need to + // wait for ctrl_c() on a separate runtime. + thread::spawn(|| { + debug!("waiting for a second shutdown signal"); + Runtime::new() + .expect("creating a runtime to wait for shutdown signal failed") + .block_on(async { + let _ = shutdown_signal().await; + info!("second shutdown signal received, shutting down immediately"); + exit(1); + }); + }); + + debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down"); + thread::sleep(SHUTDOWN_TIMEOUT); + + // Force a shutdown if a task is blocking. + error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit"); + info!( + "run `flamegraph --pid {}` or similar to generate a stack dump", + process::id() + ); + + // Log all the async tasks and spawn_blocking() tasks that are still running. + // + // A tokio runtime that's shutting down will cancel a dump at its first await + // point, so we need to call dump() on a separate runtime. + #[cfg(all(tokio_unstable, tokio_taskdump))] + thread::spawn(move || { + use tracing::warn; + + error!( + ?SHUTDOWN_TIMEOUT, + "shutdown timed out, trying to dump blocking tasks" + ); + Runtime::new() + .expect("creating a runtime to dump blocking tasks failed") + .block_on(async move { + for (task_number, task) in handle.dump().await.tasks().iter().enumerate() { + let trace = task.trace(); + warn!(task_number, trace, "blocking task backtrace"); + } + }); + }); + + // Give the log messages time to flush, and any dumps time to finish. + thread::sleep(TRACE_TIMEOUT); + exit(1); + }); +} + #[tokio::main] async fn main() -> Result<(), Box> { set_exit_on_panic(); @@ -183,6 +296,10 @@ async fn main() -> Result<(), Box> { dsn_metrics_registry, ) }; + + // These tasks can hang on shutdown or when dropped. But there are no error returns + // here, so the only way we exit is when a task finishes. That means we can just launch + // the shutdown watchdog at the end of the block. let (node, mut node_runner) = subspace_networking::construct(config).expect("Networking stack creation failed."); @@ -210,12 +327,13 @@ async fn main() -> Result<(), Box> { .transpose()?; if let Some(prometheus_task) = prometheus_task { select! { - _ = node_runner.run().fuse() => {}, - _ = prometheus_task.fuse() => {}, + _ = node_runner.run().fuse() => {}, + _ = prometheus_task.fuse() => {}, } } else { node_runner.run().await } + spawn_shutdown_watchdog(Handle::current()); } Command::GenerateKeypair { json } => { let output = KeypairOutput::new(Keypair::generate()); diff --git a/crates/subspace-node/src/commands/run.rs b/crates/subspace-node/src/commands/run.rs index f05a2ac072..5b1cfb3b4d 100644 --- a/crates/subspace-node/src/commands/run.rs +++ b/crates/subspace-node/src/commands/run.rs @@ -78,6 +78,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> { // The async runtime can wait forever for tasks to yield or finish. // This watchdog runs on shutdown, and makes sure the process exits within a timeout, // or when the user sends a second Ctrl-C. + // TODO: make sure this runs before anything else is dropped, because drops can hang. scopeguard::defer! { spawn_shutdown_watchdog(Handle::current()); };