Skip to content
This repository was archived by the owner on Feb 3, 2023. It is now read-only.

install a helper to panic if all tokio's executor threads are blocked by blocking tasks #2121

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ workflows:
only:
- develop
- final-exam
- tokio-deadlock-panic
- wire-message-receipts
- docker-build-trycp-server:
requires:
Expand All @@ -284,6 +285,7 @@ workflows:
only:
- develop
- final-exam
- tokio-deadlock-panic
- wire-message-receipts
- docker-build-circle-build:
requires:
Expand Down
115 changes: 115 additions & 0 deletions crates/sim2h/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,70 @@ use std::{
use holochain_locksmith::Mutex;
use holochain_metrics::{config::MetricPublisherConfig, Metric};

/// install a tokio task that reports if it gets cpu time once per second
/// setup a system OS thread that checks the channel once every `check_ms`
/// if we haven't gotten a tokio report in the timeframe, panic!
/// we probably have locked up all of tokio's executor threads with blocking
fn tokio_deadlock_panic(check_ms: u128) {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

let (mut send, mut recv) = tokio::sync::mpsc::channel(10);

let cont = Arc::new(AtomicBool::new(true));

struct D(Arc<AtomicBool>);

impl Drop for D {
fn drop(&mut self) {
self.0.store(false, Ordering::Relaxed);
}
}

impl D {
fn cont(&self) -> bool {
self.0.load(Ordering::Relaxed)
}
}

let d = D(cont.clone());

tokio::task::spawn(async move {
loop {
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
// this is mostly to move `d` into this block
// if this task is dropped because our runtime is dropped
// then the thread below will also exit
if !d.cont() {
return;
}
if let Err(_) = send.send(std::time::Instant::now()).await {
error!("TOKIO DEADLOCK PANIC - task ending, shutting down?");
return;
}
}
});

// use an OS thread to check that tokio is still processing tasks
std::thread::spawn(move || {
let mut last_inst = std::time::Instant::now();
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
if !cont.load(Ordering::Relaxed) {
return;
}
while let Ok(inst) = recv.try_recv() {
last_inst = inst;
}
if last_inst.elapsed().as_millis() >= check_ms {
panic!("TOKIO DEADLOCK PANIC - no tick from tokio task");
}
}
});
}

lazy_static! {
static ref SET_THREAD_PANIC_FATAL: bool = {
let orig_handler = std::panic::take_hook();
Expand All @@ -68,6 +132,7 @@ lazy_static! {
orig_handler(panic_info);
std::process::exit(1);
}));
tokio_deadlock_panic(5000);
true
};
}
Expand Down Expand Up @@ -1405,3 +1470,53 @@ async fn missing_aspects_resync(sim2h_handle: Sim2hHandle, _schedule_guard: Sche
}
trace!("sim2h gossip full loop in {} ms (ok to be long, this task is broken into multiple sub-loops)", gossip_full_start.elapsed().as_millis());
}

#[cfg(test)]
mod tests {
#[allow(unused)]
use super::*;

#[test]
fn check_tokio_panic() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

let did_panic = Arc::new(AtomicBool::new(false));
let did_panic_clone = did_panic.clone();

// it's hard to test panics in threads with rust's test framework
// replace the panic handler - then put it back at the end of this test
let orig_panic_handler = std::panic::take_hook();
std::panic::set_hook(Box::new(move |_| {
// yay, we panic!d
did_panic_clone.store(true, Ordering::Relaxed);
}));

tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
.thread_name("tokio-panic-thread")
.enable_all()
.build()
.unwrap()
.block_on(async move {
tokio_deadlock_panic(2);

tokio::task::spawn(async move {
// use a thread sleep on purpose to block the task executor
std::thread::sleep(std::time::Duration::from_secs(3));
});

for _ in 0..3 {
tokio::time::delay_for(std::time::Duration::from_secs(1)).await;
}
});

std::panic::set_hook(orig_panic_handler);
if !did_panic.load(Ordering::Relaxed) {
panic!("got to end without panicing - tokio panic not working?");
}
}
}