Skip to content

Commit

Permalink
Preserve unfinalized fork choice tip states in state cache during pru…
Browse files Browse the repository at this point in the history
…ning
  • Loading branch information
povi committed Jan 15, 2025
1 parent 23091c1 commit 89bc724
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 21 deletions.
15 changes: 4 additions & 11 deletions fork_choice_control/src/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use arc_swap::Guard;
use eth2_libp2p::GossipId;
use execution_engine::ExecutionEngine;
use fork_choice_store::{
AggregateAndProofOrigin, AttestationItem, ChainLink, Segment, StateCacheProcessor, Store,
AggregateAndProofOrigin, AttestationItem, ChainLink, StateCacheProcessor, Store,
};
use helper_functions::misc;
use itertools::Itertools as _;
Expand Down Expand Up @@ -35,7 +35,7 @@ use crate::{
};

#[cfg(test)]
use ::{clock::Tick, types::phase0::consts::GENESIS_SLOT};
use ::{clock::Tick, fork_choice_store::Segment, types::phase0::consts::GENESIS_SLOT};

// TODO(Grandine Team): There is currently no way to persist payload statuses.
// We previously treated blocks loaded from the database as optimistic.
Expand Down Expand Up @@ -182,15 +182,8 @@ where
}

store
.unfinalized()
.values()
.filter_map(Segment::last_non_invalid_block)
.map(|unfinalized_block| {
(
&unfinalized_block.chain_link,
unfinalized_block.is_optimistic(),
)
})
.unfinalized_fork_tips()
.map(|chain_link| (chain_link, chain_link.is_optimistic()))
.map_into()
.collect_vec()
}
Expand Down
6 changes: 3 additions & 3 deletions fork_choice_store/src/state_cache_processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::time::Duration;
use std::{backtrace::Backtrace, sync::Arc};
use std::{backtrace::Backtrace, collections::HashSet, sync::Arc};

use anyhow::{bail, Result};
use features::Feature;
Expand Down Expand Up @@ -65,8 +65,8 @@ impl<P: Preset> StateCacheProcessor<P> {
.get_or_insert_with(block_root, slot, ignore_missing_rewards, f)
}

pub fn prune(&self, last_pruned_slot: Slot) -> Result<()> {
self.state_cache.prune(last_pruned_slot)
pub fn prune(&self, last_pruned_slot: Slot, preserved_states: &HashSet<H256>) -> Result<()> {
self.state_cache.prune(last_pruned_slot, preserved_states)
}

pub fn try_state_at_slot(
Expand Down
23 changes: 19 additions & 4 deletions fork_choice_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,13 @@ impl<P: Preset> Store<P> {
Some(&mut self.unfinalized[segment_id][*position].chain_link)
}

pub fn unfinalized_fork_tips(&self) -> impl Iterator<Item = &ChainLink<P>> {
self.unfinalized()
.values()
.filter_map(Segment::last_non_invalid_block)
.map(|unfinalized_block| &unfinalized_block.chain_link)
}

// TODO(Grandine Team): The Optimistic Sync specification says that a node whose forks are all
// non-viable due to invalid payloads should be considered optimistic, but
// it's not clear if that means Eth Beacon Node API responses should have
Expand Down Expand Up @@ -1900,7 +1907,7 @@ impl<P: Preset> Store<P> {
self.update_head_segment_id();

self.blob_cache.on_slot(new_tick.slot);
self.prune_state_cache()?;
self.prune_state_cache(true)?;

let changes = if self.reorganized(old_head_segment_id) {
ApplyTickChanges::Reorganized {
Expand Down Expand Up @@ -2462,12 +2469,12 @@ impl<P: Preset> Store<P> {
self.accepted_blob_sidecars
.retain(|(slot, _, _), _| finalized_slot <= *slot);
self.prune_checkpoint_states();
self.prune_state_cache().ok();
self.prune_state_cache(false).ok();
self.aggregate_and_proof_supersets
.prune(self.finalized_epoch());
}

fn prune_state_cache(&self) -> Result<()> {
fn prune_state_cache(&self, preserve_unfinalized_fork_tips: bool) -> Result<()> {
let retain_slots =
self.store_config.max_epochs_to_retain_states_in_cache * P::SlotsPerEpoch::U64;

Expand All @@ -2476,7 +2483,15 @@ impl<P: Preset> Store<P> {
.saturating_sub(retain_slots)
.max(self.finalized_slot());

self.state_cache.prune(prune_slot)
let fork_tip_block_roots = if preserve_unfinalized_fork_tips {
self.unfinalized_fork_tips()
.map(|chain_link| chain_link.block_root)
.collect()
} else {
[].into()
};

self.state_cache.prune(prune_slot, &fork_tip_block_roots)
}

/// Applies changes to [`Store.latest_messages`] and computes changes to attesting balances.
Expand Down
29 changes: 26 additions & 3 deletions state_cache/src/state_cache.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use core::time::Duration;
use std::sync::Arc;
use std::{collections::HashSet, sync::Arc};

use anyhow::{anyhow, Result};
use im::{HashMap, OrdMap};
Expand Down Expand Up @@ -198,9 +198,14 @@ impl<P: Preset> StateCache<P> {
lengths.into_iter().sum::<usize>().pipe(Ok)
}

pub fn prune(&self, last_pruned_slot: Slot) -> Result<()> {
pub fn prune(&self, last_pruned_slot: Slot, preserved_states: &HashSet<H256>) -> Result<()> {
for (block_root, state_map_lock) in self.all_state_map_locks()? {
let mut state_map = self.try_lock_map(&state_map_lock, block_root)?;

if preserved_states.contains(&block_root) {
continue;
}

let (_, retained) = state_map.split(&last_pruned_slot);
*state_map = retained;
}
Expand Down Expand Up @@ -346,7 +351,7 @@ mod tests {
fn test_state_cache_prune() -> Result<()> {
let cache = new_test_cache()?;

cache.prune(2)?;
cache.prune(2, &[].into())?;

assert_eq!(cache.before_or_at_slot(ROOT_1, 1)?, None);
assert_eq!(cache.before_or_at_slot(ROOT_2, 2)?, None);
Expand All @@ -361,6 +366,24 @@ mod tests {

assert_eq!(cache.len()?, 2);

cache.insert(ROOT_1, (state_at_slot(1), None))?;
cache.insert(ROOT_1, (state_at_slot(2), None))?;
cache.insert(ROOT_2, (state_at_slot(2), None))?;

cache.prune(2, &[ROOT_1].into())?;

assert_eq!(
cache.before_or_at_slot(ROOT_1, 1)?,
Some((state_at_slot(1), None)),
);
assert_eq!(
cache.before_or_at_slot(ROOT_1, 2)?,
Some((state_at_slot(2), None)),
);
assert_eq!(cache.before_or_at_slot(ROOT_2, 2)?, None);

assert_eq!(cache.len()?, 4);

Ok(())
}

Expand Down

0 comments on commit 89bc724

Please sign in to comment.