Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds multi-commit checkpoint batching in Sui. #17955

Merged
merged 13 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3504,14 +3504,16 @@ impl AuthorityPerEpochStore {
Ok(())
}

pub fn last_built_checkpoint_commit_height(&self) -> SuiResult<Option<CheckpointHeight>> {
pub fn last_built_checkpoint_builder_summary(
&self,
) -> SuiResult<Option<BuilderCheckpointSummary>> {
Ok(self
.tables()?
.builder_checkpoint_summary_v2
.unbounded_iter()
.skip_to_last()
.next()
.and_then(|(_, b)| b.checkpoint_height))
.map(|(_, s)| s))
}

pub fn last_built_checkpoint_summary(
Expand Down
53 changes: 34 additions & 19 deletions crates/sui-core/src/checkpoints/checkpoint_executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,7 @@ async fn execute_checkpoint(
// get_unexecuted_transactions()
// - Second, we execute all remaining transactions.

let (execution_digests, all_tx_digests, executable_txns, randomness_round) =
let (execution_digests, all_tx_digests, executable_txns, randomness_rounds) =
get_unexecuted_transactions(
checkpoint.clone(),
transaction_cache_reader,
Expand Down Expand Up @@ -748,9 +748,9 @@ async fn execute_checkpoint(

// Once execution is complete, we know that any randomness contained in this checkpoint has
// been successfully included in a checkpoint certified by quorum of validators.
if let Some(round) = randomness_round {
// RandomnessManager is only present on validators.
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
// (RandomnessManager/RandomnessReporter is only present on validators.)
if let Some(randomness_reporter) = epoch_store.randomness_reporter() {
for round in randomness_rounds {
debug!(
?round,
"notifying RandomnessReporter that randomness update was executed in checkpoint"
Expand Down Expand Up @@ -980,7 +980,7 @@ fn extract_end_of_epoch_tx(
}

// Given a checkpoint, filter out any already executed transactions, then return the remaining
// execution digests, transaction digests, transactions to be executed, and randomness round
// execution digests, transaction digests, transactions to be executed, and randomness rounds
// (if any) included in the checkpoint.
#[allow(clippy::type_complexity)]
fn get_unexecuted_transactions(
Expand All @@ -992,7 +992,7 @@ fn get_unexecuted_transactions(
Vec<ExecutionDigests>,
Vec<TransactionDigest>,
Vec<(VerifiedExecutableTransaction, TransactionEffectsDigest)>,
Option<RandomnessRound>,
Vec<RandomnessRound>,
) {
let checkpoint_sequence = checkpoint.sequence_number();
let full_contents = checkpoint_store
Expand Down Expand Up @@ -1039,27 +1039,42 @@ fn get_unexecuted_transactions(
assert!(change_epoch_tx.data().intent_message().value.is_end_of_epoch_tx());
});

// Look for a randomness state update tx. It must be first if it exists, because all other
// transactions in a checkpoint that includes a randomness state update are causally
// dependent on it.
let randomness_round = if let Some(first_digest) = execution_digests.first() {
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
let randomness_rounds = if let Some(version_specific_data) =
checkpoint.version_specific_data(epoch_store.protocol_config())
{
// With version-specific data, randomness rounds are stored in checkpoint summary.
version_specific_data.into_v1().randomness_rounds
} else {
// Before version-specific data, checkpoint batching must be disabled. In this case,
// randomness state update tx must be first if it exists, because all other
// transactions in a checkpoint that includes a randomness state update are causally
// dependent on it.
assert_eq!(
0,
epoch_store
.protocol_config()
.min_checkpoint_interval_ms_as_option()
.unwrap_or_default(),
);
if let Some(first_digest) = execution_digests.first() {
let maybe_randomness_tx = cache_reader.get_transaction_block(&first_digest.transaction)
.expect("read cannot fail")
.unwrap_or_else(||
panic!(
"state-sync should have ensured that transaction with digest {first_digest:?} exists for checkpoint: {}",
checkpoint.sequence_number()
)
);
if let TransactionKind::RandomnessStateUpdate(rsu) =
maybe_randomness_tx.data().transaction_data().kind()
{
Some(rsu.randomness_round)
if let TransactionKind::RandomnessStateUpdate(rsu) =
maybe_randomness_tx.data().transaction_data().kind()
{
vec![rsu.randomness_round]
} else {
Vec::new()
}
} else {
None
Vec::new()
}
} else {
None
};

let all_tx_digests: Vec<TransactionDigest> =
Expand Down Expand Up @@ -1141,7 +1156,7 @@ fn get_unexecuted_transactions(
execution_digests,
all_tx_digests,
executable_txns,
randomness_round,
randomness_rounds,
)
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/src/checkpoints/checkpoint_executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use typed_store::Map;
/// picks up where it left off in the event of a mid-epoch node crash.
#[tokio::test]
pub async fn test_checkpoint_executor_crash_recovery() {
telemetry_subscribers::init_for_testing();

let buffer_size = num_cpus::get() * 2;
let tempdir = tempdir().unwrap();
let checkpoint_store = CheckpointStore::new(tempdir.path());
Expand Down
Loading
Loading