Skip to content

Commit

Permalink
Rework randomness state machine & network loop to use watermarks (#18083
Browse files Browse the repository at this point in the history
)

## Description 

We don't need to track individual pending rounds, everything can operate
more simply with tracking highest completed round (as reported by
checkpoint executor) and highest requested round (as reported by
consensus handler).

This change resolves a race bug when checkpoint execution is ahead of
consensus.

## Test plan 

Verified with unit tests. Manual test in synthetic environment.
  • Loading branch information
aschran authored Jun 12, 2024
1 parent f7e47e4 commit 54cb6c5
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 190 deletions.
32 changes: 3 additions & 29 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,8 +511,9 @@ pub struct AuthorityEpochTables {
/// Records the final output of DKG after completion, including the public VSS key and
/// any local private shares.
pub(crate) dkg_output: DBMap<u64, dkg::Output<PkG, EncG>>,
/// RandomnessRound numbers that are still pending generation.
pub(crate) randomness_rounds_pending: DBMap<RandomnessRound, ()>,
/// This table is no longer used (can be removed when DBMap supports removing tables)
#[allow(dead_code)]
randomness_rounds_pending: DBMap<RandomnessRound, ()>,
/// Holds the value of the next RandomnessRound to be generated.
pub(crate) randomness_next_round: DBMap<u64, RandomnessRound>,
/// Holds the value of the highest completed RandomnessRound (as reported to RandomnessReporter).
Expand Down Expand Up @@ -666,31 +667,6 @@ impl AuthorityEpochTables {
batch.write()?;
Ok(())
}

pub fn check_and_fix_consistency(&self) {
if let Some(randomness_highest_completed_round) = self
.randomness_highest_completed_round
.get(&crate::epoch::randomness::SINGLETON_KEY)
.expect("typed_store should not fail")
{
let old_randomness_rounds = self
.randomness_rounds_pending
.unbounded_iter()
.map(|(round, _)| round)
.take_while(|round| *round <= randomness_highest_completed_round)
.collect::<Vec<_>>();
// TODO: enable this debug_assert once race is fixed.
// debug_assert!(old_randomness_rounds.is_empty());
if !old_randomness_rounds.is_empty() {
error!("Found {} pending randomness rounds that are older than the highest completed round {randomness_highest_completed_round}. Removing them now.", old_randomness_rounds.len());
};
for round in old_randomness_rounds {
self.randomness_rounds_pending
.remove(&round)
.expect("typed_store should not fail");
}
}
}
}

pub(crate) const MUTEX_TABLE_SIZE: usize = 1024;
Expand All @@ -715,8 +691,6 @@ impl AuthorityPerEpochStore {
let epoch_id = committee.epoch;

let tables = AuthorityEpochTables::open(epoch_id, parent_path, db_options.clone());
tables.check_and_fix_consistency();

let end_of_publish =
StakeAggregator::from_iter(committee.clone(), tables.end_of_publish.unbounded_iter());
let reconfig_state = tables
Expand Down
24 changes: 9 additions & 15 deletions crates/sui-core/src/epoch/randomness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,15 @@ impl RandomnessManager {
"random beacon: starting from next_randomness_round={}",
rm.next_randomness_round.0
);
for result in tables.randomness_rounds_pending.safe_iter() {
let (round, _) = result.expect("typed_store should not fail");
if highest_completed_round + 1 < rm.next_randomness_round {
info!(
"random beacon: resuming generation for randomness round {}",
round.0
"random beacon: resuming generation for randomness rounds from {} to {}",
highest_completed_round + 1,
rm.next_randomness_round - 1,
);
network_handle.send_partial_signatures(committee.epoch(), round);
for r in highest_completed_round.0 + 1..rm.next_randomness_round.0 {
network_handle.send_partial_signatures(committee.epoch(), RandomnessRound(r));
}
}

Some(rm)
Expand Down Expand Up @@ -589,10 +591,6 @@ impl RandomnessManager {
.checked_add(1)
.expect("RandomnessRound should not overflow");

batch.insert_batch(
&tables.randomness_rounds_pending,
std::iter::once((randomness_round, ())),
)?;
batch.insert_batch(
&tables.randomness_next_round,
std::iter::once((SINGLETON_KEY, self.next_randomness_round)),
Expand Down Expand Up @@ -693,20 +691,16 @@ impl RandomnessReporter {
.epoch_store
.upgrade()
.ok_or(SuiError::EpochEnded(self.epoch))?;
epoch_store
.tables()?
.randomness_rounds_pending
.remove(&round)?;
let mut highest_completed_round = self.highest_completed_round.lock();
if round > *highest_completed_round {
*highest_completed_round = round;
epoch_store
.tables()?
.randomness_highest_completed_round
.insert(&SINGLETON_KEY, &highest_completed_round)?;
self.network_handle
.complete_round(epoch_store.committee().epoch(), round);
}
self.network_handle
.complete_round(epoch_store.committee().epoch(), round);
Ok(())
}
}
Expand Down
5 changes: 2 additions & 3 deletions crates/sui-network/src/randomness/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,13 @@ impl UnstartedRandomness {
peer_share_ids: None,
dkg_output: None,
aggregation_threshold: 0,
pending_tasks: BTreeSet::new(),
highest_requested_round: BTreeMap::new(),
send_tasks: BTreeMap::new(),
round_request_time: BTreeMap::new(),
future_epoch_partial_sigs: BTreeMap::new(),
received_partial_sigs: BTreeMap::new(),
completed_sigs: BTreeSet::new(),
completed_rounds: BTreeSet::new(),
recovered_last_completed_round: None,
highest_completed_round: BTreeMap::new(),
},
handle,
)
Expand Down
Loading

0 comments on commit 54cb6c5

Please sign in to comment.