Skip to content

Commit

Permalink
Replace local execution with wait for checkpoint execution (#19629)
Browse files Browse the repository at this point in the history
## Description 

This PR replaces active local execution with waiting for checkpoint
execution.
There are some cleanup needed later to remove all the local execution
code, but we can do that separately.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
lxfind authored Oct 1, 2024
1 parent 045352d commit ff1aedd
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 39 deletions.
2 changes: 2 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,8 @@ impl AuthorityState {
/// For such transaction, we don't have to wait for consensus to set shared object
/// locks because we already know the shared object versions based on the effects.
/// This function can be called by a fullnode only.
// TODO: This function is no longer needed. Remove it and cleanup all the
// related functions.
#[instrument(level = "trace", skip_all)]
pub async fn fullnode_execute_certificate_with_effects(
&self,
Expand Down
57 changes: 18 additions & 39 deletions crates/sui-core/src/transaction_orchestrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@ use std::sync::Arc;
use std::time::Duration;
use sui_storage::write_path_pending_tx_log::WritePathPendingTransactionLog;
use sui_types::base_types::TransactionDigest;
use sui_types::effects::{TransactionEffectsAPI, VerifiedCertifiedTransactionEffects};
use sui_types::error::{SuiError, SuiResult};
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::quorum_driver_types::{
ExecuteTransactionRequestType, ExecuteTransactionRequestV3, ExecuteTransactionResponseV3,
FinalizedEffects, IsTransactionExecutedLocally, QuorumDriverEffectsQueueResult,
Expand Down Expand Up @@ -118,8 +116,7 @@ where
let pending_tx_log_clone = pending_tx_log.clone();
let _local_executor_handle = {
spawn_monitored_task!(async move {
Self::loop_execute_finalized_tx_locally(effects_receiver, pending_tx_log_clone)
.await;
Self::loop_pending_transaction_log(effects_receiver, pending_tx_log_clone).await;
})
};
Self::schedule_txes_in_log(pending_tx_log.clone(), quorum_driver_handler.clone());
Expand Down Expand Up @@ -161,15 +158,9 @@ where
request_type,
ExecuteTransactionRequestType::WaitForLocalExecution
) {
let executable_tx = VerifiedExecutableTransaction::new_from_quorum_execution(
transaction,
response.effects_cert.executed_epoch(),
);
let executed_locally = Self::execute_finalized_tx_locally_with_timeout(
let executed_locally = Self::wait_for_finalized_tx_executed_locally_with_timeout(
&self.validator_state,
&epoch_store,
&executable_tx,
&response.effects_cert,
&transaction,
&self.metrics,
)
.await
Expand Down Expand Up @@ -350,27 +341,13 @@ where
})
}

#[instrument(name = "tx_orchestrator_execute_finalized_tx_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
async fn execute_finalized_tx_locally_with_timeout(
#[instrument(name = "tx_orchestrator_wait_for_finalized_tx_executed_locally_with_timeout", level = "debug", skip_all, fields(tx_digest = ?transaction.digest()), err)]
async fn wait_for_finalized_tx_executed_locally_with_timeout(
validator_state: &Arc<AuthorityState>,
epoch_store: &Arc<AuthorityPerEpochStore>,
transaction: &VerifiedExecutableTransaction,
effects_cert: &VerifiedCertifiedTransactionEffects,
transaction: &VerifiedTransaction,
metrics: &TransactionOrchestratorMetrics,
) -> SuiResult {
// TODO: attempt a finalized tx at most once per request.
// Every WaitForLocalExecution request will be attempted to execute twice,
// one from the subscriber queue, one from the proactive execution before
// returning results to clients. This is not insanely bad because:
// 1. it's possible that one attempt finishes before the other, so there's
// zero extra work except DB checks
// 2. an up-to-date fullnode should have minimal overhead to sync parents
// (for one extra time)
// 3. at the end of day, the tx will be executed at most once per lock guard.
let tx_digest = transaction.digest();
if validator_state.is_tx_already_executed(tx_digest)? {
return Ok(());
}
let tx_digest = *transaction.digest();
metrics.local_execution_in_flight.inc();
let _metrics_guard =
scopeguard::guard(metrics.local_execution_in_flight.clone(), |in_flight| {
Expand All @@ -382,14 +359,15 @@ where
} else {
metrics.local_execution_latency_single_writer.start_timer()
};
debug!(?tx_digest, "Executing finalized tx locally.");
debug!(
?tx_digest,
"Waiting for finalized tx to be executed locally."
);
match timeout(
LOCAL_EXECUTION_TIMEOUT,
validator_state.fullnode_execute_certificate_with_effects(
transaction,
effects_cert,
epoch_store,
),
validator_state
.get_transaction_cache_reader()
.notify_read_executed_effects_digests(&[tx_digest]),
)
.instrument(error_span!(
"transaction_orchestrator::local_execution",
Expand All @@ -400,7 +378,7 @@ where
Err(_elapsed) => {
debug!(
?tx_digest,
"Executing tx locally by orchestrator timed out within {:?}.",
"Waiting for finalized tx to be executed locally timed out within {:?}.",
LOCAL_EXECUTION_TIMEOUT
);
metrics.local_execution_timeout.inc();
Expand All @@ -409,7 +387,7 @@ where
Ok(Err(err)) => {
debug!(
?tx_digest,
"Executing tx locally by orchestrator failed with error: {:?}", err
"Waiting for finalized tx to be executed locally failed with error: {:?}", err
);
metrics.local_execution_failure.inc();
Err(SuiError::TransactionOrchestratorLocalExecutionError {
Expand All @@ -423,7 +401,8 @@ where
}
}

async fn loop_execute_finalized_tx_locally(
// TODO: Potentially cleanup this function and pending transaction log.
async fn loop_pending_transaction_log(
mut effects_receiver: Receiver<QuorumDriverEffectsQueueResult>,
pending_transaction_log: Arc<WritePathPendingTransactionLog>,
) {
Expand Down
6 changes: 6 additions & 0 deletions crates/sui-e2e-tests/tests/traffic_control_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use jsonrpsee::{
rpc_params,
};
use std::fs::File;
use std::num::NonZeroUsize;
use std::time::Duration;
use sui_core::authority_client::make_network_authority_clients_with_network_config;
use sui_core::authority_client::AuthorityAPI;
Expand Down Expand Up @@ -45,6 +46,7 @@ async fn test_validator_traffic_control_noop() -> Result<(), anyhow::Error> {
..Default::default()
};
let network_config = ConfigBuilder::new_with_temp_dir()
.committee_size(NonZeroUsize::new(4).unwrap())
.with_policy_config(Some(policy_config))
.build();
let test_cluster = TestClusterBuilder::new()
Expand Down Expand Up @@ -88,6 +90,7 @@ async fn test_validator_traffic_control_ok() -> Result<(), anyhow::Error> {
..Default::default()
};
let network_config = ConfigBuilder::new_with_temp_dir()
.committee_size(NonZeroUsize::new(4).unwrap())
.with_policy_config(Some(policy_config))
.build();
let test_cluster = TestClusterBuilder::new()
Expand Down Expand Up @@ -133,6 +136,7 @@ async fn test_validator_traffic_control_dry_run() -> Result<(), anyhow::Error> {
..Default::default()
};
let network_config = ConfigBuilder::new_with_temp_dir()
.committee_size(NonZeroUsize::new(4).unwrap())
.with_policy_config(Some(policy_config))
.build();
let test_cluster = TestClusterBuilder::new()
Expand Down Expand Up @@ -217,6 +221,7 @@ async fn test_validator_traffic_control_error_blocked() -> Result<(), anyhow::Er
..Default::default()
};
let network_config = ConfigBuilder::new_with_temp_dir()
.committee_size(NonZeroUsize::new(4).unwrap())
.with_policy_config(Some(policy_config))
.build();
let committee = network_config.committee_with_network();
Expand Down Expand Up @@ -396,6 +401,7 @@ async fn test_validator_traffic_control_error_delegated() -> Result<(), anyhow::
drain_timeout_secs: 10,
};
let network_config = ConfigBuilder::new_with_temp_dir()
.committee_size(NonZeroUsize::new(4).unwrap())
.with_policy_config(Some(policy_config))
.with_firewall_config(Some(firewall_config))
.build();
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-swarm-config/src/network_config_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ impl ConfigBuilder {
rng: Some(OsRng),
config_directory: config_directory.as_ref().into(),
supported_protocol_versions_config: None,
// FIXME: A network with only 1 validator does not have liveness.
// We need to change this. There are some tests that depend on it though.
committee: CommitteeConfig::Size(NonZeroUsize::new(1).unwrap()),
genesis_config: None,
reference_gas_price: None,
Expand Down

0 comments on commit ff1aedd

Please sign in to comment.