Skip to content

Commit

Permalink
Server side authority overload pushback (#16096)
Browse files Browse the repository at this point in the history
## Description 

This PR implements the server-side logic of authority overload pushback.
- When txn rejected during signing, objects are still locked to prevent
equivocation.
- Create a new error type, ValidatorPushbackAndRetry, to indicate that
the client should keep retrying the same request.

## Test Plan 

Unit tests added for txn selection logic
Integration tests added for testing server side logic.

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
halfprice authored Feb 13, 2024
1 parent dbaf4c8 commit 13bb49e
Show file tree
Hide file tree
Showing 11 changed files with 423 additions and 21 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ tower-http = { version = "0.3.4", features = [
] }
# tower-http = { version="0.4", features = ["trace"] }
tower-layer = "0.3.2"
twox-hash = "1.6.3"
tracing = "0.1.37"
tracing-appender = "0.2.2"
tracing-subscriber = { version = "0.3.15", default-features = false, features = [
Expand Down
12 changes: 12 additions & 0 deletions crates/sui-config/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,16 @@ pub struct OverloadThresholdConfig {
// is well under used, and will not enter load shedding mode.
#[serde(default = "default_safe_transaction_ready_rate")]
pub safe_transaction_ready_rate: u32,

// When set to true, transaction signing may be rejected when the validator
// is overloaded.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub check_system_overload_at_signing: bool,

// When set to true, transaction execution may be rejected when the validator
// is overloaded.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub check_system_overload_at_execution: bool,
// TODO: Move other thresholds here as well, including `MAX_TM_QUEUE_LENGTH`
// and `MAX_PER_OBJECT_QUEUE_LENGTH`.
}
Expand Down Expand Up @@ -758,6 +768,8 @@ impl Default for OverloadThresholdConfig {
min_load_shedding_percentage_above_hard_limit:
default_min_load_shedding_percentage_above_hard_limit(),
safe_transaction_ready_rate: default_safe_transaction_ready_rate(),
check_system_overload_at_signing: false,
check_system_overload_at_execution: false,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ tokio = { workspace = true, features = ["full", "tracing", "test-util"] }
tokio-retry.workspace = true
tokio-stream.workspace = true
tracing.workspace = true
twox-hash.workspace = true

fastcrypto.workspace = true
fastcrypto-tbls.workspace = true
Expand Down
38 changes: 35 additions & 3 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use std::collections::{BTreeMap, BTreeSet};
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -144,7 +145,9 @@ use crate::execution_driver::execution_process;
use crate::metrics::LatencyObserver;
use crate::metrics::RateTracker;
use crate::module_cache_metrics::ResolverMetrics;
use crate::overload_monitor::{overload_monitor, AuthorityOverloadInfo};
use crate::overload_monitor::{
overload_monitor, overload_monitor_accept_tx, AuthorityOverloadInfo,
};
use crate::stake_aggregator::StakeAggregator;
use crate::state_accumulator::{AccumulatorStore, StateAccumulator, WrappedObject};
use crate::subscription_handler::SubscriptionHandler;
Expand Down Expand Up @@ -931,17 +934,43 @@ impl AuthorityState {
}
}

pub fn check_system_overload_at_signing(&self) -> bool {
self.overload_threshold_config
.check_system_overload_at_signing
}

pub fn check_system_overload_at_execution(&self) -> bool {
self.overload_threshold_config
.check_system_overload_at_execution
}

pub(crate) fn check_system_overload(
&self,
consensus_adapter: &Arc<ConsensusAdapter>,
tx_data: &SenderSignedData,
do_authority_overload_check: bool,
) -> SuiResult {
if do_authority_overload_check {
self.check_authority_overload(tx_data)?;
}
self.transaction_manager
.check_execution_overload(self.max_txn_age_in_queue(), tx_data)?;
consensus_adapter.check_consensus_overload()?;
Ok(())
}

fn check_authority_overload(&self, tx_data: &SenderSignedData) -> SuiResult {
if !self.overload_info.is_overload.load(Ordering::Relaxed) {
return Ok(());
}

let load_shedding_percentage = self
.overload_info
.load_shedding_percentage
.load(Ordering::Relaxed);
overload_monitor_accept_tx(load_shedding_percentage, tx_data.digest())
}

/// Executes a transaction that's known to have correct effects.
/// For such transaction, we don't have to wait for consensus to set shared object
/// locks because we already know the shared object versions based on the effects.
Expand Down Expand Up @@ -2530,8 +2559,11 @@ impl AuthorityState {
rx_execution_shutdown,
));

let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(overload_monitor(authority_state, overload_threshold_config));
// Don't start the overload monitor when max_load_shedding_percentage is 0.
if overload_threshold_config.max_load_shedding_percentage > 0 {
let authority_state = Arc::downgrade(&state);
spawn_monitored_task!(overload_monitor(authority_state, overload_threshold_config));
}

// TODO: This doesn't belong to the constructor of AuthorityState.
state
Expand Down
49 changes: 34 additions & 15 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,15 @@ impl ValidatorService {
pub async fn execute_certificate_for_testing(
&self,
cert: CertifiedTransaction,
) -> HandleCertificateResponseV2 {
self.handle_certificate_v2(tonic::Request::new(cert))
.await
.unwrap()
.into_inner()
) -> Result<tonic::Response<HandleCertificateResponseV2>, tonic::Status> {
self.handle_certificate_v2(tonic::Request::new(cert)).await
}

pub async fn handle_transaction_for_testing(
&self,
transaction: Transaction,
) -> HandleTransactionResponse {
self.transaction(tonic::Request::new(transaction))
.await
.unwrap()
.into_inner()
) -> Result<tonic::Response<HandleTransactionResponse>, tonic::Status> {
self.transaction(tonic::Request::new(transaction)).await
}

async fn handle_transaction(
Expand Down Expand Up @@ -312,14 +306,31 @@ impl ValidatorService {
.into());
}

let overload_check_res =
state.check_system_overload(&consensus_adapter, transaction.data());
// When authority is overloaded and decide to reject this tx, we still lock the object
// and ask the client to retry in the future. This is because without locking, the
// input objects can be locked by a different tx in the future, however, the input objects
// may already be locked by this tx in other validators. This can cause non of the txes
// to have enough quorum to form a certificate, causing the objects to be locked for
// the entire epoch. By doing locking but pushback, retrying transaction will have
// higher chance to succeed.
let mut validator_pushback_error = None;
let overload_check_res = state.check_system_overload(
&consensus_adapter,
transaction.data(),
state.check_system_overload_at_signing(),
);
if let Err(error) = overload_check_res {
metrics
.num_rejected_tx_during_overload
.with_label_values(&[error.as_ref()])
.inc();
return Err(error.into());
// TODO: consider change the behavior for other types of overload errors.
match error {
SuiError::ValidatorOverloadedRetryAfter { .. } => {
validator_pushback_error = Some(error)
}
_ => return Err(error.into()),
}
}

let _handle_tx_metrics_guard = metrics.handle_transaction_latency.start_timer();
Expand All @@ -345,6 +356,11 @@ impl ValidatorService {
}
})?;

if let Some(error) = validator_pushback_error {
// TODO: right now, we still sign the txn, but just don't return it. We can also skip signing
// to save more CPU.
return Err(error.into());
}
Ok(tonic::Response::new(info))
}

Expand Down Expand Up @@ -414,8 +430,11 @@ impl ValidatorService {

// 2) Verify the cert.
// Check system overload
let overload_check_res =
state.check_system_overload(&consensus_adapter, certificate.data());
let overload_check_res = state.check_system_overload(
&consensus_adapter,
certificate.data(),
state.check_system_overload_at_execution(),
);
if let Err(error) = overload_check_res {
metrics
.num_rejected_cert_during_overload
Expand Down
97 changes: 97 additions & 0 deletions crates/sui-core/src/overload_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@

use crate::authority::AuthorityState;
use std::cmp::{max, min};
use std::hash::Hasher;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::Weak;
use std::time::Duration;
use std::time::{SystemTime, UNIX_EPOCH};
use sui_config::node::OverloadThresholdConfig;
use sui_types::digests::TransactionDigest;
use sui_types::error::SuiError;
use sui_types::error::SuiResult;
use sui_types::fp_bail;
use tokio::time::sleep;
use tracing::{debug, info};
use twox_hash::XxHash64;

#[derive(Default)]
pub struct AuthorityOverloadInfo {
Expand Down Expand Up @@ -204,6 +211,40 @@ fn check_overload_signals(
(overload_status, load_shedding_percentage)
}

// Return true if we should reject the txn with `tx_digest`.
fn should_reject_tx(
load_shedding_percentage: u32,
tx_digest: TransactionDigest,
minutes_since_epoch: u64,
) -> bool {
// TODO: we also need to add a secret salt (e.g. first consensus commit in the current epoch),
// to prevent gaming the system.
let mut hasher = XxHash64::with_seed(minutes_since_epoch);
hasher.write(tx_digest.inner());
let value = hasher.finish();
value % 100 < load_shedding_percentage as u64
}

// Checks if we can accept the transaction with `tx_digest`.
pub fn overload_monitor_accept_tx(
load_shedding_percentage: u32,
tx_digest: TransactionDigest,
) -> SuiResult {
// Using the minutes_since_epoch as the hash seed to allow rejected transaction's
// retry to have a chance to go through in the future.
let minutes_since_epoch = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Sui did not exist prior to 1970")
.as_secs()
/ 60;

if should_reject_tx(load_shedding_percentage, tx_digest, minutes_since_epoch) {
// TODO: complete the suggestion for client retry deadline.
fp_bail!(SuiError::ValidatorOverloadedRetryAfter { retry_after_sec: 0 });
}
Ok(())
}

#[cfg(test)]
#[allow(clippy::disallowed_methods)] // allow unbounded_channel() since tests are simulating txn manager execution driver interaction.
mod tests {
Expand All @@ -215,6 +256,7 @@ mod tests {
Rng, SeedableRng,
};
use std::sync::Arc;
use sui_macros::sim_test;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
Expand Down Expand Up @@ -645,4 +687,59 @@ mod tests {
assert!(0.4 < dropped_ratio);
assert!(dropped_ratio < 0.6);
}

// Tests that the ratio of rejected transactions created randomly matches load shedding percentage in
// the overload monitor.
#[test]
fn test_txn_rejection_rate() {
for rejection_percentage in 0..=100 {
let mut reject_count = 0;
for _ in 0..10000 {
let digest = TransactionDigest::random();
if should_reject_tx(rejection_percentage, digest, 28455473) {
reject_count += 1;
}
}

// Give it a 2% fluctuation.
assert!(rejection_percentage as f32 / 100.0 - 0.02 < reject_count as f32 / 10000.0);
assert!(reject_count as f32 / 10000.0 < rejection_percentage as f32 / 100.0 + 0.02);
}
}

// Tests that rejected transaction will have a chance to be accepted in the future.
#[sim_test]
async fn test_txn_rejection_over_time() {
let start_time = Instant::now();
let mut digest = TransactionDigest::random();
let mut minutes_since_epoch = 28455473;
let load_shedding_percentage = 50;

// Find a rejected transaction with 50% rejection rate.
while !should_reject_tx(load_shedding_percentage, digest, minutes_since_epoch)
&& start_time.elapsed() < Duration::from_secs(30)
{
digest = TransactionDigest::random();
}

// It should always be rejected in the current minute.
for _ in 0..100 {
assert!(should_reject_tx(
load_shedding_percentage,
digest,
minutes_since_epoch
));
}

// It will be accepted in the future.
minutes_since_epoch += 1;
while should_reject_tx(load_shedding_percentage, digest, minutes_since_epoch)
&& start_time.elapsed() < Duration::from_secs(30)
{
minutes_since_epoch += 1;
}

// Make sure that the tests can finish within 30 seconds.
assert!(start_time.elapsed() < Duration::from_secs(30));
}
}
Loading

0 comments on commit 13bb49e

Please sign in to comment.