Skip to content

Commit

Permalink
detect loss of liveness in tests (#17299)
Browse files Browse the repository at this point in the history
In simtests, and also if the `PANIC_ON_NEW_CHECKPOINT_TIMEOUT` env var
is set, we panic if we do not receive a certified checkpoint for too
long. This condition should encompass more or less everything we mean
when we say "loss of liveness".

In order to get tests to pass under this stricter condition, several
fixes were required:
- changes to the crash failpoints to prevent crashes from happening too
close together in time
- disable pruning in crash tests

Additionally, some tests cannot pass in this condition, so we suppress
the panic on a per-test basis.
  • Loading branch information
mystenmark authored Apr 25, 2024
1 parent 0649dff commit 4611063
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 9 deletions.
65 changes: 58 additions & 7 deletions crates/sui-benchmark/tests/simtest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod test {
use sui_types::full_checkpoint_content::CheckpointData;
use sui_types::messages_checkpoint::VerifiedCheckpoint;
use test_cluster::{TestCluster, TestClusterBuilder};
use tracing::{error, info};
use tracing::{error, info, trace};
use typed_store::traits::Map;

struct DeadValidator {
Expand Down Expand Up @@ -148,9 +148,11 @@ mod test {
fn handle_failpoint(
dead_validator: Arc<Mutex<Option<DeadValidator>>>,
keep_alive_nodes: HashSet<sui_simulator::task::NodeId>,
grace_period: Arc<Mutex<Option<Instant>>>,
probability: f64,
) {
let mut dead_validator = dead_validator.lock().unwrap();
let mut grace_period = grace_period.lock().unwrap();
let cur_node = sui_simulator::current_simnode_id();

if keep_alive_nodes.contains(&cur_node) {
Expand All @@ -167,16 +169,36 @@ mod test {
// otherwise, possibly fail the current node
let mut rng = thread_rng();
if rng.gen_range(0.0..1.0) < probability {
error!("Matched probability threshold for failpoint. Failing...");
// clear grace period if expired
if let Some(t) = *grace_period {
if t < Instant::now() {
*grace_period = None;
}
}

// check if any node is in grace period
if grace_period.is_some() {
trace!(?cur_node, "grace period in effect, not failing node");
return;
}

let restart_after = Duration::from_millis(rng.gen_range(10000..20000));
let dead_until = Instant::now() + restart_after;

// Prevent the same node from being restarted again rapidly.
let alive_until = dead_until + Duration::from_millis(rng.gen_range(5000..30000));
*grace_period = Some(alive_until);

error!(?cur_node, ?dead_until, ?alive_until, "killing node");

*dead_validator = Some(DeadValidator {
node_id: cur_node,
dead_until: Instant::now() + restart_after,
dead_until,
});

// must manually release lock before calling kill_current_node, which panics
// and would poison the lock.
drop(grace_period);
drop(dead_validator);

sui_simulator::task::kill_current_node(Some(restart_after));
Expand Down Expand Up @@ -231,13 +253,19 @@ mod test {
#[sim_test(config = "test_config()")]
async fn test_simulated_load_reconfig_with_crashes_and_delays() {
sui_protocol_config::ProtocolConfig::poison_get_for_min_version();
let test_cluster = build_test_cluster(4, 1000).await;

let test_cluster = init_test_cluster_builder(4, 1000)
.with_num_unpruned_validators(4)
.build()
.await;

let dead_validator_orig: Arc<Mutex<Option<DeadValidator>>> = Default::default();
let grace_period: Arc<Mutex<Option<Instant>>> = Default::default();

let dead_validator = dead_validator_orig.clone();
let keep_alive_nodes = get_keep_alive_nodes(&test_cluster);
let keep_alive_nodes_clone = keep_alive_nodes.clone();
let grace_period_clone = grace_period.clone();
register_fail_points(
&[
"batch-write-before",
Expand All @@ -250,23 +278,36 @@ mod test {
"highest-executed-checkpoint",
],
move || {
handle_failpoint(dead_validator.clone(), keep_alive_nodes_clone.clone(), 0.02);
handle_failpoint(
dead_validator.clone(),
keep_alive_nodes_clone.clone(),
grace_period_clone.clone(),
0.02,
);
},
);

let dead_validator = dead_validator_orig.clone();
let keep_alive_nodes_clone = keep_alive_nodes.clone();
let grace_period_clone = grace_period.clone();
register_fail_point_async("crash", move || {
let dead_validator = dead_validator.clone();
let keep_alive_nodes_clone = keep_alive_nodes_clone.clone();
let grace_period_clone = grace_period_clone.clone();
async move {
handle_failpoint(dead_validator.clone(), keep_alive_nodes_clone.clone(), 0.01);
handle_failpoint(
dead_validator.clone(),
keep_alive_nodes_clone.clone(),
grace_period_clone.clone(),
0.01,
);
}
});

// Narwhal & Consensus 2.0 fail points.
let dead_validator = dead_validator_orig.clone();
let keep_alive_nodes_clone = keep_alive_nodes.clone();
let grace_period_clone = grace_period.clone();
register_fail_points(
&[
"narwhal-rpc-response",
Expand All @@ -280,6 +321,7 @@ mod test {
handle_failpoint(
dead_validator.clone(),
keep_alive_nodes_clone.clone(),
grace_period_clone.clone(),
0.001,
);
},
Expand All @@ -288,13 +330,16 @@ mod test {

let dead_validator = dead_validator_orig.clone();
let keep_alive_nodes_clone = keep_alive_nodes.clone();
let grace_period_clone = grace_period.clone();
register_fail_point_async("consensus-rpc-response", move || {
let dead_validator = dead_validator.clone();
let keep_alive_nodes_clone = keep_alive_nodes_clone.clone();
let grace_period_clone = grace_period_clone.clone();
async move {
handle_failpoint(
dead_validator.clone(),
keep_alive_nodes_clone.clone(),
grace_period_clone.clone(),
0.001,
);
}
Expand All @@ -313,8 +358,14 @@ mod test {

let dead_validator: Arc<Mutex<Option<DeadValidator>>> = Default::default();
let keep_alive_nodes = get_keep_alive_nodes(&test_cluster);
let grace_period: Arc<Mutex<Option<Instant>>> = Default::default();
register_fail_points(&["before-open-new-epoch-store"], move || {
handle_failpoint(dead_validator.clone(), keep_alive_nodes.clone(), 1.0);
handle_failpoint(
dead_validator.clone(),
keep_alive_nodes.clone(),
grace_period.clone(),
1.0,
);
});
test_simulated_load(TestInitData::new(&test_cluster).await, 120).await;
}
Expand Down
5 changes: 5 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ use crate::subscription_handler::SubscriptionHandler;
use crate::transaction_input_loader::TransactionInputLoader;
use crate::transaction_manager::TransactionManager;

#[cfg(msim)]
pub use crate::checkpoints::checkpoint_executor::{
init_checkpoint_timeout_config, CheckpointTimeoutConfig,
};

#[cfg(msim)]
use sui_types::committee::CommitteeTrait;
use sui_types::execution_config_utils::to_binary_config;
Expand Down
50 changes: 48 additions & 2 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,49 @@ type CheckpointExecutionBuffer =
/// The interval to log checkpoint progress, in # of checkpoints processed.
const CHECKPOINT_PROGRESS_LOG_COUNT_INTERVAL: u64 = 5000;

const SCHEDULING_EVENT_FUTURE_TIMEOUT_MS: u64 = 2000;
#[derive(Debug, Clone, Copy)]
pub struct CheckpointTimeoutConfig {
pub timeout: Duration,
pub panic_on_timeout: bool,
}

// We use a thread local so that the config can be overridden on a per-test basis. This means
// that get_scheduling_timeout() can be called multiple times in a multithreaded context, but
// the function is still very cheap to call so this is okay.
thread_local! {
static SCHEDULING_TIMEOUT: once_cell::sync::OnceCell<CheckpointTimeoutConfig> =
once_cell::sync::OnceCell::new();
}

#[cfg(msim)]
pub fn init_checkpoint_timeout_config(config: CheckpointTimeoutConfig) {
SCHEDULING_TIMEOUT.with(|s| {
s.set(config).expect("SchedulingTimeoutConfig already set");
});
}

fn get_scheduling_timeout() -> CheckpointTimeoutConfig {
fn inner() -> CheckpointTimeoutConfig {
let panic_on_timeout = cfg!(msim)
|| std::env::var("PANIC_ON_NEW_CHECKPOINT_TIMEOUT")
.map_or(false, |s| s == "true" || s == "1");

// if we are panicking on timeout default to a longer timeout than if we are simply logging.
let timeout = Duration::from_millis(
std::env::var("NEW_CHECKPOINT_TIMEOUT_MS")
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(if panic_on_timeout { 20000 } else { 2000 }),
);

CheckpointTimeoutConfig {
timeout,
panic_on_timeout,
}
}

SCHEDULING_TIMEOUT.with(|s| *s.get_or_init(inner))
}

#[derive(PartialEq, Eq, Debug)]
pub enum StopReason {
Expand Down Expand Up @@ -192,7 +234,8 @@ impl CheckpointExecutor {
.as_ref()
.map(|c| c.network_total_transactions)
.unwrap_or(0);
let scheduling_timeout = Duration::from_millis(SCHEDULING_EVENT_FUTURE_TIMEOUT_MS);
let scheduling_timeout_config = get_scheduling_timeout();
let scheduling_timeout = scheduling_timeout_config.timeout;

loop {
// If we have executed the last checkpoint of the current epoch, stop.
Expand Down Expand Up @@ -265,6 +308,9 @@ impl CheckpointExecutor {
warn!(
"Received no new synced checkpoints for {scheduling_timeout:?}. Next checkpoint to be scheduled: {next_to_schedule}",
);
if scheduling_timeout_config.panic_on_timeout {
panic!("No new synced checkpoints received for {scheduling_timeout:?}");
}
fail_point!("cp_exec_scheduling_timeout_reached");
},
Ok(Ok(checkpoint)) => {
Expand Down
10 changes: 10 additions & 0 deletions crates/sui-e2e-tests/tests/checkpoint_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ async fn basic_checkpoints_integration_test() {

#[sim_test]
async fn checkpoint_split_brain_test() {
#[cfg(msim)]
{
// this test intentionally halts the network by causing a fork, so we cannot panic on
// loss of liveness
use sui_core::authority::{init_checkpoint_timeout_config, CheckpointTimeoutConfig};
init_checkpoint_timeout_config(CheckpointTimeoutConfig {
timeout: Duration::from_secs(2),
panic_on_timeout: false,
});
}
let committee_size = 9;
// count number of nodes that have reached split brain condition
let count_split_brain_nodes: Arc<Mutex<AtomicUsize>> = Default::default();
Expand Down
8 changes: 8 additions & 0 deletions crates/sui-e2e-tests/tests/transaction_orchestrator_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ async fn test_blocking_execution() -> Result<(), anyhow::Error> {

#[sim_test]
async fn test_fullnode_wal_log() -> Result<(), anyhow::Error> {
#[cfg(msim)]
{
use sui_core::authority::{init_checkpoint_timeout_config, CheckpointTimeoutConfig};
init_checkpoint_timeout_config(CheckpointTimeoutConfig {
timeout: Duration::from_secs(2),
panic_on_timeout: false,
});
}
telemetry_subscribers::init_for_testing();
let mut test_cluster = TestClusterBuilder::new()
.with_epoch_duration_ms(600000)
Expand Down

0 comments on commit 4611063

Please sign in to comment.