Skip to content

Commit

Permalink
Only start overload monitor when sui node is a validator (#16309)
Browse files Browse the repository at this point in the history
## Description 

Says by the title.

It also doesn't handle validator -> fullnode -> validator flow.

## Test Plan 

Unit test added. Also did a round of cluster testing to make sure that
overload monitor can start on validators.

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
halfprice authored Feb 24, 2024
1 parent 56b0816 commit e7ee886
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 13 deletions.
10 changes: 1 addition & 9 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ use crate::execution_driver::execution_process;
use crate::metrics::LatencyObserver;
use crate::metrics::RateTracker;
use crate::module_cache_metrics::ResolverMetrics;
use crate::overload_monitor::{
overload_monitor, overload_monitor_accept_tx, AuthorityOverloadInfo,
};
use crate::overload_monitor::{overload_monitor_accept_tx, AuthorityOverloadInfo};
use crate::stake_aggregator::StakeAggregator;
use crate::state_accumulator::{AccumulatorStore, StateAccumulator, WrappedObject};
use crate::subscription_handler::SubscriptionHandler;
Expand Down Expand Up @@ -2566,12 +2564,6 @@ impl AuthorityState {
rx_execution_shutdown,
));

// Don't start the overload monitor when max_load_shedding_percentage is 0.
if authority_overload_config.max_load_shedding_percentage > 0 {
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(overload_monitor(authority_state, authority_overload_config));
}

// TODO: This doesn't belong to the constructor of AuthorityState.
state
.create_owner_index_if_empty(genesis_objects, &epoch_store)
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mod execution_driver;
pub mod metrics;
pub mod module_cache_metrics;
pub mod mysticeti_adapter;
mod overload_monitor;
pub mod overload_monitor;
pub(crate) mod post_consensus_tx_reorder;
pub mod quorum_driver;
pub mod safe_client;
Expand Down
33 changes: 30 additions & 3 deletions crates/sui-core/src/overload_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use tokio::time::sleep;
use tracing::{debug, info};
use twox_hash::XxHash64;

#[cfg(test)]
#[path = "unit_tests/overload_monitor_tests.rs"]
pub mod overload_monitor_tests;

#[derive(Default)]
pub struct AuthorityOverloadInfo {
/// Whether the authority is overloaded.
Expand Down Expand Up @@ -442,6 +446,20 @@ mod tests {
assert!(!check_authority_overload(&authority, &config));
}

// Creates an AuthorityState and starts an overload monitor that monitors its metrics.
async fn start_overload_monitor() -> (Arc<AuthorityState>, JoinHandle<()>) {
let overload_config = AuthorityOverloadConfig::default();
let state = TestAuthorityBuilder::new()
.with_authority_overload_config(overload_config.clone())
.build()
.await;
let authority_state = Arc::downgrade(&state);
let monitor_handle = tokio::spawn(async move {
overload_monitor(authority_state, overload_config).await;
});
(state, monitor_handle)
}

// Starts a load generator that generates a steady workload, and also allow it to accept
// burst of request through `burst_rx`.
// Request tracking is done by the overload monitor inside `authority`.
Expand Down Expand Up @@ -565,7 +583,7 @@ mod tests {
min_dropping_rate: f64,
max_dropping_rate: f64,
) {
let state = TestAuthorityBuilder::new().build().await;
let (state, monitor_handle) = start_overload_monitor().await;

let (tx, rx) = unbounded_channel();
let (_burst_tx, burst_rx) = unbounded_channel();
Expand Down Expand Up @@ -593,6 +611,9 @@ mod tests {
/ total_requests.load(Ordering::SeqCst) as f64;
assert!(min_dropping_rate <= dropped_ratio);
assert!(dropped_ratio <= max_dropping_rate);

monitor_handle.abort();
let _ = monitor_handle.await;
}

// Tests that when request generation rate is slower than execution rate, no requests should be dropped.
Expand Down Expand Up @@ -624,7 +645,7 @@ mod tests {
#[tokio::test(flavor = "current_thread", start_paused = true)]
pub async fn test_workload_single_spike() {
telemetry_subscribers::init_for_testing();
let state = TestAuthorityBuilder::new().build().await;
let (state, monitor_handle) = start_overload_monitor().await;

let (tx, rx) = unbounded_channel();
let (burst_tx, burst_rx) = unbounded_channel();
Expand Down Expand Up @@ -653,14 +674,17 @@ mod tests {

// No requests should be dropped.
assert_eq!(dropped_requests.load(Ordering::SeqCst), 0);

monitor_handle.abort();
let _ = monitor_handle.await;
}

// Tests that when there are regular spikes that keep queueing latency consistently high,
// overload monitor should kick in and shed load.
#[tokio::test(flavor = "current_thread", start_paused = true)]
pub async fn test_workload_consistent_short_spike() {
telemetry_subscribers::init_for_testing();
let state = TestAuthorityBuilder::new().build().await;
let (state, monitor_handle) = start_overload_monitor().await;

let (tx, rx) = unbounded_channel();
let (burst_tx, burst_rx) = unbounded_channel();
Expand Down Expand Up @@ -695,6 +719,9 @@ mod tests {
// execution rate.
assert!(0.4 < dropped_ratio);
assert!(dropped_ratio < 0.6);

monitor_handle.abort();
let _ = monitor_handle.await;
}

// Tests that the ratio of rejected transactions created randomly matches load shedding percentage in
Expand Down
41 changes: 41 additions & 0 deletions crates/sui-core/src/unit_tests/overload_monitor_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

// Tests that overload monitor only starts on validators.
#[cfg(msim)]
mod simtests {
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use sui_macros::register_fail_point;
use sui_macros::sim_test;
use test_cluster::TestClusterBuilder;

#[sim_test]
async fn overload_monitor_in_different_nodes() {
telemetry_subscribers::init_for_testing();

// Uses a fail point to count the number of nodes that start overload monitor.
let counter: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
register_fail_point("starting_overload_monitor", move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});

// Creates a cluster, and tests that number of nodes with overload monitor is equal to
// the number of validators.
let test_cluster = TestClusterBuilder::new().build().await;
let nodes_with_overload_monitor = counter.load(Ordering::SeqCst);
assert_eq!(
nodes_with_overload_monitor,
test_cluster.swarm.validator_node_handles().len()
);

// Tests (indirectly) that fullnodes don't run overload monitor.
assert!(
test_cluster.swarm.all_nodes().collect::<Vec<_>>().len() > nodes_with_overload_monitor
);
}
}

// TODO: move other overload relate tests from execution_driver_tests.rs to here.
26 changes: 26 additions & 0 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ use sui_core::epoch::epoch_metrics::EpochMetrics;
use sui_core::epoch::reconfiguration::ReconfigurationInitiator;
use sui_core::execution_cache::{ExecutionCache, ExecutionCacheReconfigAPI};
use sui_core::module_cache_metrics::ResolverMetrics;
use sui_core::overload_monitor::overload_monitor;
use sui_core::signature_verifier::SignatureVerifierMetrics;
use sui_core::state_accumulator::StateAccumulator;
use sui_core::storage::RocksDbStore;
Expand All @@ -95,6 +96,7 @@ use sui_json_rpc::read_api::ReadApi;
use sui_json_rpc::transaction_builder_api::TransactionBuilderApi;
use sui_json_rpc::transaction_execution_api::TransactionExecutionApi;
use sui_json_rpc::JsonRpcServerBuilder;
use sui_macros::fail_point;
use sui_macros::{fail_point_async, replay_log};
use sui_network::api::ValidatorServer;
use sui_network::discovery;
Expand Down Expand Up @@ -130,6 +132,7 @@ pub mod metrics;

pub struct ValidatorComponents {
validator_server_handle: JoinHandle<Result<()>>,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
consensus_manager: ConsensusManager,
consensus_epoch_data_remover: EpochDataRemover,
consensus_adapter: Arc<ConsensusAdapter>,
Expand Down Expand Up @@ -1087,6 +1090,24 @@ impl SuiNode {
)
.await?;

// Starts an overload monitor that monitors the execution of the authority.
// Don't start the overload monitor when max_load_shedding_percentage is 0.
let validator_overload_monitor_handle = if config
.authority_overload_config
.max_load_shedding_percentage
> 0
{
let authority_state = Arc::downgrade(&state);
let overload_config = config.authority_overload_config.clone();
fail_point!("starting_overload_monitor");
Some(spawn_monitored_task!(overload_monitor(
authority_state,
overload_config,
)))
} else {
None
};

Self::start_epoch_specific_validator_components(
config,
state.clone(),
Expand All @@ -1098,6 +1119,7 @@ impl SuiNode {
consensus_epoch_data_remover,
accumulator,
validator_server_handle,
validator_overload_monitor_handle,
checkpoint_metrics,
sui_node_metrics,
sui_tx_validator_metrics,
Expand All @@ -1116,6 +1138,7 @@ impl SuiNode {
consensus_epoch_data_remover: EpochDataRemover,
accumulator: Arc<StateAccumulator>,
validator_server_handle: JoinHandle<Result<()>>,
validator_overload_monitor_handle: Option<JoinHandle<()>>,
checkpoint_metrics: Arc<CheckpointMetrics>,
sui_node_metrics: Arc<SuiNodeMetrics>,
sui_tx_validator_metrics: Arc<SuiTxValidatorMetrics>,
Expand Down Expand Up @@ -1199,6 +1222,7 @@ impl SuiNode {

Ok(ValidatorComponents {
validator_server_handle,
validator_overload_monitor_handle,
consensus_manager,
consensus_epoch_data_remover,
consensus_adapter,
Expand Down Expand Up @@ -1484,6 +1508,7 @@ impl SuiNode {
// in the new epoch.
let new_validator_components = if let Some(ValidatorComponents {
validator_server_handle,
validator_overload_monitor_handle,
consensus_manager,
consensus_epoch_data_remover,
consensus_adapter,
Expand Down Expand Up @@ -1526,6 +1551,7 @@ impl SuiNode {
consensus_epoch_data_remover,
self.accumulator.clone(),
validator_server_handle,
validator_overload_monitor_handle,
checkpoint_metrics,
self.metrics.clone(),
sui_tx_validator_metrics,
Expand Down

0 comments on commit e7ee886

Please sign in to comment.