Skip to content

Commit

Permalink
replay: only recache tower stats for computed banks on tower adoption…
Browse files Browse the repository at this point in the history
… (#1632)

* replay: only recache tower stats for computed banks on tower adoption

* pr feedback: refactor and add test
  • Loading branch information
AshwinSekar authored Jun 12, 2024
1 parent 3e077b7 commit 000ca4c
Showing 1 changed file with 201 additions and 97 deletions.
298 changes: 201 additions & 97 deletions core/src/replay_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3372,103 +3372,15 @@ impl ReplayStage {
.expect("All frozen banks must exist in the Progress map")
.computed;
if !is_computed {
// Check if our tower is behind, if so (and the feature migration flag is in use)
// overwrite with the newer bank.
if let Some(vote_account) = bank.get_vote_account(my_vote_pubkey) {
if let Ok(mut bank_vote_state) = vote_account.vote_state().cloned() {
if bank_vote_state.last_voted_slot()
> tower.vote_state.last_voted_slot()
{
info!(
"Frozen bank vote state slot {:?}
is newer than our local vote state slot {:?},
adopting the bank vote state as our own.
Bank votes: {:?}, root: {:?},
Local votes: {:?}, root: {:?}",
bank_vote_state.last_voted_slot(),
tower.vote_state.last_voted_slot(),
bank_vote_state.votes,
bank_vote_state.root_slot,
tower.vote_state.votes,
tower.vote_state.root_slot
);

if let Some(local_root) = tower.vote_state.root_slot {
if bank_vote_state
.root_slot
.map(|bank_root| local_root > bank_root)
.unwrap_or(true)
{
// If the local root is larger than this on chain vote state
// root (possible due to supermajority roots being set on
// startup), then we need to adjust the tower
bank_vote_state.root_slot = Some(local_root);
bank_vote_state
.votes
.retain(|lockout| lockout.slot() > local_root);
info!(
"Local root is larger than on chain root,
overwrote bank root {:?} and updated votes {:?}",
bank_vote_state.root_slot, bank_vote_state.votes
);

if let Some(first_vote) = bank_vote_state.votes.front() {
assert!(ancestors
.get(&first_vote.slot())
.expect(
"Ancestors map must contain an
entry for all slots on this fork
greater than `local_root` and less
than `bank_slot`"
)
.contains(&local_root));
}
}
}

tower.vote_state.root_slot = bank_vote_state.root_slot;
tower.vote_state.votes = bank_vote_state.votes;

let last_voted_slot = tower.vote_state.last_voted_slot().unwrap_or(
// If our local root is higher than the highest slot in `bank_vote_state` due to
// supermajority roots, then it's expected that the vote state will be empty.
// In this case we use the root as our last vote. This root cannot be None, because
// `tower.vote_state.last_voted_slot()` is None only if `tower.vote_state.root_slot`
// is Some.
tower
.vote_state
.root_slot
.expect("root_slot cannot be None here"),
);
// This is safe because `last_voted_slot` is now equal to
// `bank_vote_state.last_voted_slot()` or `local_root`.
// Since this vote state is contained in `bank`, which we have frozen,
// we must have frozen all slots contained in `bank_vote_state`,
// and by definition we must have frozen `local_root`.
//
// If `bank` is a duplicate, since we are able to replay it successfully, any slots
// in its vote state must also be part of the duplicate fork, and thus present in our
// progress map.
//
// Finally if both `bank` and `bank_vote_state.last_voted_slot()` are duplicate,
// we must have the compatible versions of both duplicates in order to replay `bank`
// successfully, so we are once again guaranteed that `bank_vote_state.last_voted_slot()`
// is present in progress map.
tower.update_last_vote_from_vote_state(
progress
.get_hash(last_voted_slot)
.expect("Must exist for us to have frozen descendant"),
bank.feature_set
.is_active(&feature_set::enable_tower_sync_ix::id()),
);
// Since we are updating our tower we need to update associated caches for previously computed
// slots as well.
for slot in frozen_banks.iter().map(|b| b.slot()) {
Self::cache_tower_stats(progress, tower, slot, ancestors);
}
}
}
}
// Check if our tower is behind, if so adopt the on chain tower from this Bank
Self::adopt_on_chain_tower_if_behind(
my_vote_pubkey,
ancestors,
frozen_banks,
tower,
progress,
bank,
);
let computed_bank_state = Tower::collect_vote_lockouts(
my_vote_pubkey,
bank_slot,
Expand Down Expand Up @@ -3533,6 +3445,117 @@ impl ReplayStage {
new_stats
}

fn adopt_on_chain_tower_if_behind(
my_vote_pubkey: &Pubkey,
ancestors: &HashMap<Slot, HashSet<Slot>>,
frozen_banks: &[Arc<Bank>],
tower: &mut Tower,
progress: &mut ProgressMap,
bank: &Arc<Bank>,
) {
let Some(vote_account) = bank.get_vote_account(my_vote_pubkey) else {
return;
};
let Ok(mut bank_vote_state) = vote_account.vote_state().cloned() else {
return;
};
if bank_vote_state.last_voted_slot() <= tower.vote_state.last_voted_slot() {
return;
}
info!(
"Frozen bank vote state slot {:?}
is newer than our local vote state slot {:?},
adopting the bank vote state as our own.
Bank votes: {:?}, root: {:?},
Local votes: {:?}, root: {:?}",
bank_vote_state.last_voted_slot(),
tower.vote_state.last_voted_slot(),
bank_vote_state.votes,
bank_vote_state.root_slot,
tower.vote_state.votes,
tower.vote_state.root_slot
);

if let Some(local_root) = tower.vote_state.root_slot {
if bank_vote_state
.root_slot
.map(|bank_root| local_root > bank_root)
.unwrap_or(true)
{
// If the local root is larger than this on chain vote state
// root (possible due to supermajority roots being set on
// startup), then we need to adjust the tower
bank_vote_state.root_slot = Some(local_root);
bank_vote_state
.votes
.retain(|lockout| lockout.slot() > local_root);
info!(
"Local root is larger than on chain root,
overwrote bank root {:?} and updated votes {:?}",
bank_vote_state.root_slot, bank_vote_state.votes
);

if let Some(first_vote) = bank_vote_state.votes.front() {
assert!(ancestors
.get(&first_vote.slot())
.expect(
"Ancestors map must contain an entry for all slots on this fork \
greater than `local_root` and less than `bank_slot`"
)
.contains(&local_root));
}
}
}

tower.vote_state.root_slot = bank_vote_state.root_slot;
tower.vote_state.votes = bank_vote_state.votes;

let last_voted_slot = tower.vote_state.last_voted_slot().unwrap_or(
// If our local root is higher than the highest slot in `bank_vote_state` due to
// supermajority roots, then it's expected that the vote state will be empty.
// In this case we use the root as our last vote. This root cannot be None, because
// `tower.vote_state.last_voted_slot()` is None only if `tower.vote_state.root_slot`
// is Some.
tower
.vote_state
.root_slot
.expect("root_slot cannot be None here"),
);
// This is safe because `last_voted_slot` is now equal to
// `bank_vote_state.last_voted_slot()` or `local_root`.
// Since this vote state is contained in `bank`, which we have frozen,
// we must have frozen all slots contained in `bank_vote_state`,
// and by definition we must have frozen `local_root`.
//
// If `bank` is a duplicate, since we are able to replay it successfully, any slots
// in its vote state must also be part of the duplicate fork, and thus present in our
// progress map.
//
// Finally if both `bank` and `bank_vote_state.last_voted_slot()` are duplicate,
// we must have the compatible versions of both duplicates in order to replay `bank`
// successfully, so we are once again guaranteed that `bank_vote_state.last_voted_slot()`
// is present in progress map.
tower.update_last_vote_from_vote_state(
progress
.get_hash(last_voted_slot)
.expect("Must exist for us to have frozen descendant"),
bank.feature_set
.is_active(&feature_set::enable_tower_sync_ix::id()),
);
// Since we are updating our tower we need to update associated caches for previously computed
// slots as well.
for slot in frozen_banks.iter().map(|b| b.slot()) {
if !progress
.get_fork_stats(slot)
.expect("All frozen banks must exist in fork stats")
.computed
{
continue;
}
Self::cache_tower_stats(progress, tower, slot, ancestors);
}
}

fn cache_tower_stats(
progress: &mut ProgressMap,
tower: &Tower,
Expand Down Expand Up @@ -8846,6 +8869,87 @@ pub(crate) mod tests {
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
}

#[test]
fn test_tower_adopt_from_bank_cache_only_computed() {
solana_logger::setup_with_default(
"error,solana_core::replay_stage=info,solana_core::consensus=info",
);
/*
Fork structure:
slot 0
|
slot 1
/ \
slot 3 |
| slot 2
slot 4 |
slot 5
|
slot 6
We had some point voted 0 - 6, we are sitting with an oudated tower that has voted until 1.
*/

let generate_votes = |pubkeys: Vec<Pubkey>| {
pubkeys
.into_iter()
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
.collect()
};
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
let (vote_simulator, _blockstore) =
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
let (bank_forks, mut progress) = (vote_simulator.bank_forks, vote_simulator.progress);
let bank_hash = |slot| bank_forks.read().unwrap().bank_hash(slot).unwrap();
let my_vote_pubkey = vote_simulator.vote_pubkeys[0];
let mut tower = Tower::default();
tower.node_pubkey = vote_simulator.node_pubkeys[0];
tower.record_vote(0, bank_hash(0));
tower.record_vote(1, bank_hash(1));

let mut frozen_banks: Vec<_> = bank_forks
.read()
.unwrap()
.frozen_banks()
.values()
.cloned()
.collect();
let ancestors = &bank_forks.read().unwrap().ancestors();

// slot 3 was computed in a previous iteration and failed threshold check, but was not locked out
let fork_stats_3 = progress.get_fork_stats_mut(3).unwrap();
fork_stats_3.vote_threshold = vec![ThresholdDecision::FailedThreshold(4, 4000)];
fork_stats_3.is_locked_out = false;
fork_stats_3.computed = true;
// slot 4 is yet to be computed.
let fork_stats_4 = progress.get_fork_stats_mut(4).unwrap();
fork_stats_4.computed = false;

frozen_banks.sort_by_key(|bank| bank.slot());
let bank_6 = frozen_banks.get(6).unwrap();
assert_eq!(bank_6.slot(), 6);

ReplayStage::adopt_on_chain_tower_if_behind(
&my_vote_pubkey,
ancestors,
&frozen_banks,
&mut tower,
&mut progress,
bank_6,
);

// slot 3 should now pass the threshold check but be locked out.
let fork_stats_3 = progress.get_fork_stats(3).unwrap();
assert!(fork_stats_3.vote_threshold.is_empty());
assert!(fork_stats_3.is_locked_out);
assert!(fork_stats_3.computed);
// slot 4 should be untouched since it is yet to be computed.
let fork_stats_4 = progress.get_fork_stats(4).unwrap();
assert!(!fork_stats_4.is_locked_out);
assert!(!fork_stats_4.computed);
}

#[test]
fn test_tower_load_missing() {
let tower_file = tempdir().unwrap().into_path();
Expand Down

0 comments on commit 000ca4c

Please sign in to comment.