Skip to content

Commit

Permalink
Adds multi-commit checkpoint batching in Sui.
Browse files Browse the repository at this point in the history
Adds `version_specific_data` to `CheckpointSummary` to keep track
of which `RandomnessRound`s are present in a checkpoint.

Batching is configurable by a minimum interval based on the
commit timestamp.
  • Loading branch information
aschran committed May 28, 2024
1 parent daa3383 commit 6ced907
Show file tree
Hide file tree
Showing 15 changed files with 1,061 additions and 71 deletions.
6 changes: 4 additions & 2 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3504,14 +3504,16 @@ impl AuthorityPerEpochStore {
Ok(())
}

pub fn last_built_checkpoint_commit_height(&self) -> SuiResult<Option<CheckpointHeight>> {
pub fn last_built_checkpoint_builder_summary(
&self,
) -> SuiResult<Option<BuilderCheckpointSummary>> {
Ok(self
.tables()?
.builder_checkpoint_summary_v2
.unbounded_iter()
.skip_to_last()
.next()
.and_then(|(_, b)| b.checkpoint_height))
.map(|(_, s)| s))
}

pub fn last_built_checkpoint_summary(
Expand Down
58 changes: 39 additions & 19 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use sui_types::crypto::RandomnessRound;
use sui_types::effects::{TransactionEffects, TransactionEffectsAPI};
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::message_envelope::Message;
use sui_types::messages_checkpoint::CheckpointVersionSpecificData;
use sui_types::transaction::TransactionKind;
use sui_types::{
base_types::{ExecutionDigests, TransactionDigest, TransactionEffectsDigest},
Expand Down Expand Up @@ -715,7 +716,7 @@ async fn execute_checkpoint(
// get_unexecuted_transactions()
// - Second, we execute all remaining transactions.

let (execution_digests, all_tx_digests, executable_txns, randomness_round) =
let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
get_unexecuted_transactions(
checkpoint.clone(),
transaction_cache_reader,
Expand Down Expand Up @@ -748,9 +749,9 @@ async fn execute_checkpoint(

// Once execution is complete, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
if let Some(round) = randomness_round {
// RandomnessManager is only present on validators.
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
Expand Down Expand Up @@ -980,7 +981,7 @@ fn extract_end_of_epoch_tx(
}

// Given a checkpoint, filter out any already executed transactions, then return the remaining
// execution digests, transaction digests, transactions to be executed, and randomness round
// execution digests, transaction digests, transactions to be executed, and randomness rounds
// (if any) included in the checkpoint.
#[allow(clippy::type_complexity)]
fn get_unexecuted_transactions(
Expand All @@ -992,7 +993,7 @@ fn get_unexecuted_transactions(
Vec<ExecutionDigests>,
Vec<TransactionDigest>,
Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
Option<RandomnessRound>,
Vec<RandomnessRound>,
) {
let checkpoint_sequence = checkpoint.sequence_number();
let full_contents = checkpoint_store
Expand Down Expand Up @@ -1039,27 +1040,46 @@ fn get_unexecuted_transactions(
assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
});

// Look for a randomness state update tx. It must be first if it exists, because all other
// transactions in a checkpoint that includes a randomness state update are causally
// dependent on it.
let randomness_round = if let Some(first_digest) = execution_digests.first() {
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
let randomness_rounds = if epoch_store
.protocol_config()
.checkpoint_summary_version_specific_data()
{
// With version-specific data, randomness rounds are stored in checkpoint summary.
let version_specific_data: CheckpointVersionSpecificData =
bcs::from_bytes(&checkpoint.version_specific_data)
.expect("version_specific_data should deserialize"); //TODO-DNS is this safe?
version_specific_data.into_v1().randomness_rounds
} else {
// Before version-specific data, checkpoint batching must be disabled. In this case,
// randomness state update tx must be first if it exists, because all otherk
// transactions in a checkpoint that includes a randomness state update are causally
// dependent on it.
assert_eq!(
0,
epoch_store
.protocol_config()
.min_checkpoint_interval_ms_as_option()
.unwrap_or_default(),
);
if let Some(first_digest) = execution_digests.first() {
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
.expect("read cannot fail")
.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
checkpoint.sequence_number()
)
);
if let TransactionKind::RandomnessStateUpdate(rsu) =
maybe_randomness_tx.data().transaction_data().kind()
{
Some(rsu.randomness_round)
if let TransactionKind::RandomnessStateUpdate(rsu) =
maybe_randomness_tx.data().transaction_data().kind()
{
vec![rsu.randomness_round]
} else {
Vec::new()
}
} else {
None
Vec::new()
}
} else {
None
};

let all_tx_digests: Vec<TransactionDigest> =
Expand Down Expand Up @@ -1141,7 +1161,7 @@ fn get_unexecuted_transactions(
execution_digests,
all_tx_digests,
executable_txns,
randomness_round,
randomness_rounds,
)
}

Expand Down
130 changes: 100 additions & 30 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ use sui_macros::fail_point;
use sui_network::default_mysten_network_config;
use sui_types::base_types::ConciseableName;
use sui_types::executable_transaction::VerifiedExecutableTransaction;
use sui_types::messages_checkpoint::CheckpointCommitment;
use sui_types::messages_checkpoint::{
CheckpointCommitment, CheckpointVersionSpecificData, CheckpointVersionSpecificDataV1,
};
use sui_types::sui_system_state::epoch_start_sui_system_state::EpochStartSystemStateTrait;

use crate::authority::authority_per_epoch_store::AuthorityPerEpochStore;
Expand Down Expand Up @@ -932,28 +934,67 @@ impl CheckpointBuilder {
}
Ok(false) => (),
};
let mut last = self

// Collect info about the most recently built checkpoint.
let summary = self
.epoch_store
.last_built_checkpoint_commit_height()
.last_built_checkpoint_builder_summary()
.expect("epoch should not have ended");
let mut last_height = summary.clone().and_then(|s| s.checkpoint_height);
let mut last_timestamp = summary.map(|s| s.summary.timestamp_ms);

let min_checkpoint_interval_ms = self
.epoch_store
.protocol_config()
.min_checkpoint_interval_ms_as_option()
.unwrap_or_default();
let mut grouped_pending_checkpoints = Vec::new();
for (height, pending) in self
.epoch_store
.get_pending_checkpoints(last)
.get_pending_checkpoints(last_height)
.expect("unexpected epoch store error")
{
last = Some(height);
// Group PendingCheckpoints until minimum interval has elapsed.
let current_timestamp = pending.details().timestamp_ms;
let can_build = match last_timestamp {
Some(last_timestamp) => {
current_timestamp >= last_timestamp + min_checkpoint_interval_ms
}
None => true,
};
grouped_pending_checkpoints.push(pending);
if !can_build {
debug!(
checkpoint_commit_height = height,
last_timestamp = ?last_timestamp,
?current_timestamp,
"waiting for more PendingCheckpoints: minimum interval not yet elapsed"
);
continue;
}

// Min interval has elasped, we can now coalesce and build a checkpoint.
last_height = Some(height);
last_timestamp = Some(current_timestamp);
debug!(
checkpoint_commit_height = height,
"Making checkpoint at commit height"
);
if let Err(e) = self.make_checkpoint(height, pending).await {
if let Err(e) = self
.make_checkpoint(std::mem::take(&mut grouped_pending_checkpoints))
.await
{
error!("Error while making checkpoint, will retry in 1s: {:?}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
self.metrics.checkpoint_errors.inc();
continue 'main;
}
//TODO-DNS need to fix checkpoint executor not to assume RSU is first in a checkpoint (or that there's only one)
}
debug!("Waiting for more checkpoints from consensus after processing {last:?}");
debug!(
"Waiting for more checkpoints from consensus after processing {last_height:?}; {} pending checkpoints left unprocessed until next interval",
grouped_pending_checkpoints.len(),
);
match select(self.exit.changed().boxed(), self.notify.notified().boxed()).await {
Either::Left(_) => {
// break loop on exit signal
Expand All @@ -965,20 +1006,20 @@ impl CheckpointBuilder {
info!("Shutting down CheckpointBuilder");
}

#[instrument(level = "debug", skip_all, fields(?height))]
async fn make_checkpoint(
&self,
height: CheckpointHeight,
pending: PendingCheckpointV2,
) -> anyhow::Result<()> {
let pending = pending.into_v2();
#[instrument(level = "debug", skip_all, fields(last_height = pendings.last().unwrap().details().checkpoint_height))]
async fn make_checkpoint(&self, pendings: Vec<PendingCheckpointV2>) -> anyhow::Result<()> {
let last_details = pendings.last().unwrap().details().clone();
let roots = pendings
.into_iter()
.flat_map(|pending| pending.into_v2().roots)
.collect::<Vec<_>>();
self.metrics
.checkpoint_roots_count
.inc_by(pending.roots.len() as u64);
.inc_by(roots.len() as u64);

let root_digests = self
.epoch_store
.notify_read_executed_digests(&pending.roots)
.notify_read_executed_digests(&roots)
.in_monitored_scope("CheckpointNotifyDigests")
.await?;
let root_effects = self
Expand All @@ -993,8 +1034,9 @@ impl CheckpointBuilder {
let _scope = monitored_scope("CheckpointBuilder::causal_sort");
CausalOrder::causal_sort(unsorted)
};
let new_checkpoint = self.create_checkpoints(sorted, pending.details).await?;
self.write_checkpoints(height, new_checkpoint).await?;
let new_checkpoints = self.create_checkpoints(sorted, &last_details).await?;
self.write_checkpoints(last_details.checkpoint_height, new_checkpoints)
.await?;
Ok(())
}

Expand Down Expand Up @@ -1129,7 +1171,7 @@ impl CheckpointBuilder {
async fn create_checkpoints(
&self,
all_effects: Vec<TransactionEffects>,
details: PendingCheckpointInfo,
details: &PendingCheckpointInfo,
) -> anyhow::Result<Vec<(CheckpointSummary, CheckpointContents)>> {
let _scope = monitored_scope("CheckpointBuilder::create_checkpoints");
let total = all_effects.len();
Expand Down Expand Up @@ -1167,6 +1209,7 @@ impl CheckpointBuilder {
let mut all_effects_and_transaction_sizes = Vec::with_capacity(all_effects.len());
let mut transactions = Vec::with_capacity(all_effects.len());
let mut transaction_keys = Vec::with_capacity(all_effects.len());
let mut randomness_rounds = BTreeMap::new();
{
let _guard = monitored_scope("CheckpointBuilder::wait_for_transactions_sequenced");
debug!(
Expand All @@ -1181,18 +1224,26 @@ impl CheckpointBuilder {
{
let (transaction, size) = transaction_and_size
.unwrap_or_else(|| panic!("Could not find executed transaction {:?}", effects));
// ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
// processed before we reach here
if !matches!(
transaction.inner().transaction_data().kind(),
match transaction.inner().transaction_data().kind() {
TransactionKind::ConsensusCommitPrologue(_)
| TransactionKind::ConsensusCommitPrologueV2(_)
| TransactionKind::AuthenticatorStateUpdate(_)
| TransactionKind::RandomnessStateUpdate(_)
) {
transaction_keys.push(SequencedConsensusTransactionKey::External(
ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
));
| TransactionKind::ConsensusCommitPrologueV2(_)
| TransactionKind::AuthenticatorStateUpdate(_) => {
// ConsensusCommitPrologue and AuthenticatorStateUpdate are guaranteed to be
// processed before we reach here.
}
TransactionKind::RandomnessStateUpdate(rsu) => {
// RandomnessStateUpdate does not come via consensus, so no need to include
// it in the call to `consensus_messages_processed_notify`.
randomness_rounds
.insert(*effects.transaction_digest(), rsu.randomness_round);
}
_ => {
// All other tx should be included in the call to
// `consensus_messages_processed_notify`.
transaction_keys.push(SequencedConsensusTransactionKey::External(
ConsensusTransactionKey::Certificate(*effects.transaction_digest()),
));
}
}
transactions.push(transaction);
all_effects_and_transaction_sizes.push((effects, size));
Expand Down Expand Up @@ -1300,6 +1351,24 @@ impl CheckpointBuilder {
None
};

let version_specific_data = if self
.epoch_store
.protocol_config()
.checkpoint_summary_version_specific_data()
{
let matching_randomness_rounds: Vec<_> = effects
.iter()
.filter_map(|e| randomness_rounds.get(e.transaction_digest()))
.copied()
.collect();
let data = CheckpointVersionSpecificData::V1(CheckpointVersionSpecificDataV1 {
randomness_rounds: matching_randomness_rounds,
});
bcs::to_bytes(&data)?
} else {
Vec::new()
};

let contents = CheckpointContents::new_with_digests_and_signatures(
effects.iter().map(TransactionEffects::execution_digests),
signatures,
Expand All @@ -1322,6 +1391,7 @@ impl CheckpointBuilder {
epoch_rolling_gas_cost_summary,
end_of_epoch_data,
timestamp_ms,
version_specific_data,
);
summary.report_checkpoint_age_ms(&self.metrics.last_created_checkpoint_age_ms);
if last_checkpoint_of_epoch {
Expand Down
1 change: 1 addition & 0 deletions crates/sui-core/src/unit_tests/batch_verification_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ fn gen_ckpts(
GasCostSummary::default(),
None,
0,
Vec::new(),
),
k,
name,
Expand Down
1 change: 1 addition & 0 deletions crates/sui-data-ingestion-core/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ fn mock_checkpoint_data_bytes(seq_number: CheckpointSequenceNumber) -> Vec<u8> {
GasCostSummary::default(),
None,
0,
Vec::new(),
);

let sign_infos: Vec<_> = keys
Expand Down
4 changes: 3 additions & 1 deletion crates/sui-open-rpc/spec/openrpc.json
Original file line number Diff line number Diff line change
Expand Up @@ -1359,7 +1359,7 @@
"name": "Result",
"value": {
"minSupportedProtocolVersion": "1",
"maxSupportedProtocolVersion": "49",
"maxSupportedProtocolVersion": "50",
"protocolVersion": "6",
"featureFlags": {
"accept_zklogin_in_multisig": false,
Expand All @@ -1368,6 +1368,7 @@
"allow_receiving_object_id": false,
"ban_entry_init": false,
"bridge": false,
"checkpoint_summary_version_specific_data": false,
"commit_root_state_digest": false,
"consensus_order_end_of_epoch_last": true,
"disable_invariant_violation_check_in_swap_loc": false,
Expand Down Expand Up @@ -1859,6 +1860,7 @@
"max_verifier_meter_ticks_per_function": {
"u64": "6000000"
},
"min_checkpoint_interval_ms": null,
"min_move_binary_format_version": null,
"move_binary_format_version": {
"u32": "6"
Expand Down
Loading

0 comments on commit 6ced907

Please sign in to comment.