From e7ee886aca352eb8bbd682cb18c863e3b35723f9 Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Fri, 23 Feb 2024 20:03:58 -0800 Subject: [PATCH] Only start overload monitor when sui node is a validator (#16309) ## Description Says by the title. It also doesn't handle validator -> fullnode -> validator flow. ## Test Plan Unit test added. Also did a round of cluster testing to make sure that overload monitor can start on validators. --- If your changes are not user-facing and do not break anything, you can skip the following section. Otherwise, please briefly describe what has changed under the Release Notes section. ### Type of Change (Check all that apply) - [ ] protocol change - [ ] user-visible impact - [ ] breaking change for a client SDKs - [ ] breaking change for FNs (FN binary must upgrade) - [ ] breaking change for validators or node operators (must upgrade binaries) - [ ] breaking change for on-chain data layout - [ ] necessitate either a data wipe or data migration ### Release notes --- crates/sui-core/src/authority.rs | 10 +---- crates/sui-core/src/lib.rs | 2 +- crates/sui-core/src/overload_monitor.rs | 33 +++++++++++++-- .../src/unit_tests/overload_monitor_tests.rs | 41 +++++++++++++++++++ crates/sui-node/src/lib.rs | 26 ++++++++++++ 5 files changed, 99 insertions(+), 13 deletions(-) create mode 100644 crates/sui-core/src/unit_tests/overload_monitor_tests.rs diff --git a/crates/sui-core/src/authority.rs b/crates/sui-core/src/authority.rs index 26ff5abec2ba8..197e47fef51e8 100644 --- a/crates/sui-core/src/authority.rs +++ b/crates/sui-core/src/authority.rs @@ -145,9 +145,7 @@ use crate::execution_driver::execution_process; use crate::metrics::LatencyObserver; use crate::metrics::RateTracker; use crate::module_cache_metrics::ResolverMetrics; -use crate::overload_monitor::{ - overload_monitor, overload_monitor_accept_tx, AuthorityOverloadInfo, -}; +use crate::overload_monitor::{overload_monitor_accept_tx, AuthorityOverloadInfo}; use crate::stake_aggregator::StakeAggregator; use crate::state_accumulator::{AccumulatorStore, StateAccumulator, WrappedObject}; use crate::subscription_handler::SubscriptionHandler; @@ -2566,12 +2564,6 @@ impl AuthorityState { rx_execution_shutdown, )); - // Don't start the overload monitor when max_load_shedding_percentage is 0. - if authority_overload_config.max_load_shedding_percentage > 0 { - let authority_state = Arc::downgrade(&state); - spawn_monitored_task!(overload_monitor(authority_state, authority_overload_config)); - } - // TODO: This doesn't belong to the constructor of AuthorityState. state .create_owner_index_if_empty(genesis_objects, &epoch_store) diff --git a/crates/sui-core/src/lib.rs b/crates/sui-core/src/lib.rs index 7ebb71bca0ba9..213df582e6dae 100644 --- a/crates/sui-core/src/lib.rs +++ b/crates/sui-core/src/lib.rs @@ -22,7 +22,7 @@ mod execution_driver; pub mod metrics; pub mod module_cache_metrics; pub mod mysticeti_adapter; -mod overload_monitor; +pub mod overload_monitor; pub(crate) mod post_consensus_tx_reorder; pub mod quorum_driver; pub mod safe_client; diff --git a/crates/sui-core/src/overload_monitor.rs b/crates/sui-core/src/overload_monitor.rs index 437ae881eccc4..de67ac7fb9d3f 100644 --- a/crates/sui-core/src/overload_monitor.rs +++ b/crates/sui-core/src/overload_monitor.rs @@ -17,6 +17,10 @@ use tokio::time::sleep; use tracing::{debug, info}; use twox_hash::XxHash64; +#[cfg(test)] +#[path = "unit_tests/overload_monitor_tests.rs"] +pub mod overload_monitor_tests; + #[derive(Default)] pub struct AuthorityOverloadInfo { /// Whether the authority is overloaded. @@ -442,6 +446,20 @@ mod tests { assert!(!check_authority_overload(&authority, &config)); } + // Creates an AuthorityState and starts an overload monitor that monitors its metrics. + async fn start_overload_monitor() -> (Arc, JoinHandle<()>) { + let overload_config = AuthorityOverloadConfig::default(); + let state = TestAuthorityBuilder::new() + .with_authority_overload_config(overload_config.clone()) + .build() + .await; + let authority_state = Arc::downgrade(&state); + let monitor_handle = tokio::spawn(async move { + overload_monitor(authority_state, overload_config).await; + }); + (state, monitor_handle) + } + // Starts a load generator that generates a steady workload, and also allow it to accept // burst of request through `burst_rx`. // Request tracking is done by the overload monitor inside `authority`. @@ -565,7 +583,7 @@ mod tests { min_dropping_rate: f64, max_dropping_rate: f64, ) { - let state = TestAuthorityBuilder::new().build().await; + let (state, monitor_handle) = start_overload_monitor().await; let (tx, rx) = unbounded_channel(); let (_burst_tx, burst_rx) = unbounded_channel(); @@ -593,6 +611,9 @@ mod tests { / total_requests.load(Ordering::SeqCst) as f64; assert!(min_dropping_rate <= dropped_ratio); assert!(dropped_ratio <= max_dropping_rate); + + monitor_handle.abort(); + let _ = monitor_handle.await; } // Tests that when request generation rate is slower than execution rate, no requests should be dropped. @@ -624,7 +645,7 @@ mod tests { #[tokio::test(flavor = "current_thread", start_paused = true)] pub async fn test_workload_single_spike() { telemetry_subscribers::init_for_testing(); - let state = TestAuthorityBuilder::new().build().await; + let (state, monitor_handle) = start_overload_monitor().await; let (tx, rx) = unbounded_channel(); let (burst_tx, burst_rx) = unbounded_channel(); @@ -653,6 +674,9 @@ mod tests { // No requests should be dropped. assert_eq!(dropped_requests.load(Ordering::SeqCst), 0); + + monitor_handle.abort(); + let _ = monitor_handle.await; } // Tests that when there are regular spikes that keep queueing latency consistently high, @@ -660,7 +684,7 @@ mod tests { #[tokio::test(flavor = "current_thread", start_paused = true)] pub async fn test_workload_consistent_short_spike() { telemetry_subscribers::init_for_testing(); - let state = TestAuthorityBuilder::new().build().await; + let (state, monitor_handle) = start_overload_monitor().await; let (tx, rx) = unbounded_channel(); let (burst_tx, burst_rx) = unbounded_channel(); @@ -695,6 +719,9 @@ mod tests { // execution rate. assert!(0.4 < dropped_ratio); assert!(dropped_ratio < 0.6); + + monitor_handle.abort(); + let _ = monitor_handle.await; } // Tests that the ratio of rejected transactions created randomly matches load shedding percentage in diff --git a/crates/sui-core/src/unit_tests/overload_monitor_tests.rs b/crates/sui-core/src/unit_tests/overload_monitor_tests.rs new file mode 100644 index 0000000000000..31323f32a41df --- /dev/null +++ b/crates/sui-core/src/unit_tests/overload_monitor_tests.rs @@ -0,0 +1,41 @@ +// Copyright (c) Mysten Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// Tests that overload monitor only starts on validators. +#[cfg(msim)] +mod simtests { + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::sync::Arc; + use sui_macros::register_fail_point; + use sui_macros::sim_test; + use test_cluster::TestClusterBuilder; + + #[sim_test] + async fn overload_monitor_in_different_nodes() { + telemetry_subscribers::init_for_testing(); + + // Uses a fail point to count the number of nodes that start overload monitor. + let counter: Arc = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + register_fail_point("starting_overload_monitor", move || { + counter_clone.fetch_add(1, Ordering::SeqCst); + }); + + // Creates a cluster, and tests that number of nodes with overload monitor is equal to + // the number of validators. + let test_cluster = TestClusterBuilder::new().build().await; + let nodes_with_overload_monitor = counter.load(Ordering::SeqCst); + assert_eq!( + nodes_with_overload_monitor, + test_cluster.swarm.validator_node_handles().len() + ); + + // Tests (indirectly) that fullnodes don't run overload monitor. + assert!( + test_cluster.swarm.all_nodes().collect::>().len() > nodes_with_overload_monitor + ); + } +} + +// TODO: move other overload relate tests from execution_driver_tests.rs to here. diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 825d2f8d7dd38..ae25bd2c255a2 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -79,6 +79,7 @@ use sui_core::epoch::epoch_metrics::EpochMetrics; use sui_core::epoch::reconfiguration::ReconfigurationInitiator; use sui_core::execution_cache::{ExecutionCache, ExecutionCacheReconfigAPI}; use sui_core::module_cache_metrics::ResolverMetrics; +use sui_core::overload_monitor::overload_monitor; use sui_core::signature_verifier::SignatureVerifierMetrics; use sui_core::state_accumulator::StateAccumulator; use sui_core::storage::RocksDbStore; @@ -95,6 +96,7 @@ use sui_json_rpc::read_api::ReadApi; use sui_json_rpc::transaction_builder_api::TransactionBuilderApi; use sui_json_rpc::transaction_execution_api::TransactionExecutionApi; use sui_json_rpc::JsonRpcServerBuilder; +use sui_macros::fail_point; use sui_macros::{fail_point_async, replay_log}; use sui_network::api::ValidatorServer; use sui_network::discovery; @@ -130,6 +132,7 @@ pub mod metrics; pub struct ValidatorComponents { validator_server_handle: JoinHandle>, + validator_overload_monitor_handle: Option>, consensus_manager: ConsensusManager, consensus_epoch_data_remover: EpochDataRemover, consensus_adapter: Arc, @@ -1087,6 +1090,24 @@ impl SuiNode { ) .await?; + // Starts an overload monitor that monitors the execution of the authority. + // Don't start the overload monitor when max_load_shedding_percentage is 0. + let validator_overload_monitor_handle = if config + .authority_overload_config + .max_load_shedding_percentage + > 0 + { + let authority_state = Arc::downgrade(&state); + let overload_config = config.authority_overload_config.clone(); + fail_point!("starting_overload_monitor"); + Some(spawn_monitored_task!(overload_monitor( + authority_state, + overload_config, + ))) + } else { + None + }; + Self::start_epoch_specific_validator_components( config, state.clone(), @@ -1098,6 +1119,7 @@ impl SuiNode { consensus_epoch_data_remover, accumulator, validator_server_handle, + validator_overload_monitor_handle, checkpoint_metrics, sui_node_metrics, sui_tx_validator_metrics, @@ -1116,6 +1138,7 @@ impl SuiNode { consensus_epoch_data_remover: EpochDataRemover, accumulator: Arc, validator_server_handle: JoinHandle>, + validator_overload_monitor_handle: Option>, checkpoint_metrics: Arc, sui_node_metrics: Arc, sui_tx_validator_metrics: Arc, @@ -1199,6 +1222,7 @@ impl SuiNode { Ok(ValidatorComponents { validator_server_handle, + validator_overload_monitor_handle, consensus_manager, consensus_epoch_data_remover, consensus_adapter, @@ -1484,6 +1508,7 @@ impl SuiNode { // in the new epoch. let new_validator_components = if let Some(ValidatorComponents { validator_server_handle, + validator_overload_monitor_handle, consensus_manager, consensus_epoch_data_remover, consensus_adapter, @@ -1526,6 +1551,7 @@ impl SuiNode { consensus_epoch_data_remover, self.accumulator.clone(), validator_server_handle, + validator_overload_monitor_handle, checkpoint_metrics, self.metrics.clone(), sui_tx_validator_metrics,