From 89bc724a1948ff778d1d060494cef5ae5ce04925 Mon Sep 17 00:00:00 2001 From: Povilas Liubauskas Date: Wed, 15 Jan 2025 15:52:41 +0200 Subject: [PATCH] Preserve unfinalized fork choice tip states in state cache during pruning --- fork_choice_control/src/queries.rs | 15 +++------- .../src/state_cache_processor.rs | 6 ++-- fork_choice_store/src/store.rs | 23 ++++++++++++--- state_cache/src/state_cache.rs | 29 +++++++++++++++++-- 4 files changed, 52 insertions(+), 21 deletions(-) diff --git a/fork_choice_control/src/queries.rs b/fork_choice_control/src/queries.rs index 65bf130b..1b35ef28 100644 --- a/fork_choice_control/src/queries.rs +++ b/fork_choice_control/src/queries.rs @@ -6,7 +6,7 @@ use arc_swap::Guard; use eth2_libp2p::GossipId; use execution_engine::ExecutionEngine; use fork_choice_store::{ - AggregateAndProofOrigin, AttestationItem, ChainLink, Segment, StateCacheProcessor, Store, + AggregateAndProofOrigin, AttestationItem, ChainLink, StateCacheProcessor, Store, }; use helper_functions::misc; use itertools::Itertools as _; @@ -35,7 +35,7 @@ use crate::{ }; #[cfg(test)] -use ::{clock::Tick, types::phase0::consts::GENESIS_SLOT}; +use ::{clock::Tick, fork_choice_store::Segment, types::phase0::consts::GENESIS_SLOT}; // TODO(Grandine Team): There is currently no way to persist payload statuses. // We previously treated blocks loaded from the database as optimistic. @@ -182,15 +182,8 @@ where } store - .unfinalized() - .values() - .filter_map(Segment::last_non_invalid_block) - .map(|unfinalized_block| { - ( - &unfinalized_block.chain_link, - unfinalized_block.is_optimistic(), - ) - }) + .unfinalized_fork_tips() + .map(|chain_link| (chain_link, chain_link.is_optimistic())) .map_into() .collect_vec() } diff --git a/fork_choice_store/src/state_cache_processor.rs b/fork_choice_store/src/state_cache_processor.rs index c4b9ecce..41dce29c 100644 --- a/fork_choice_store/src/state_cache_processor.rs +++ b/fork_choice_store/src/state_cache_processor.rs @@ -1,5 +1,5 @@ use core::time::Duration; -use std::{backtrace::Backtrace, sync::Arc}; +use std::{backtrace::Backtrace, collections::HashSet, sync::Arc}; use anyhow::{bail, Result}; use features::Feature; @@ -65,8 +65,8 @@ impl StateCacheProcessor

{ .get_or_insert_with(block_root, slot, ignore_missing_rewards, f) } - pub fn prune(&self, last_pruned_slot: Slot) -> Result<()> { - self.state_cache.prune(last_pruned_slot) + pub fn prune(&self, last_pruned_slot: Slot, preserved_states: &HashSet) -> Result<()> { + self.state_cache.prune(last_pruned_slot, preserved_states) } pub fn try_state_at_slot( diff --git a/fork_choice_store/src/store.rs b/fork_choice_store/src/store.rs index 042fe8c5..a2599f2f 100644 --- a/fork_choice_store/src/store.rs +++ b/fork_choice_store/src/store.rs @@ -547,6 +547,13 @@ impl Store

{ Some(&mut self.unfinalized[segment_id][*position].chain_link) } + pub fn unfinalized_fork_tips(&self) -> impl Iterator> { + self.unfinalized() + .values() + .filter_map(Segment::last_non_invalid_block) + .map(|unfinalized_block| &unfinalized_block.chain_link) + } + // TODO(Grandine Team): The Optimistic Sync specification says that a node whose forks are all // non-viable due to invalid payloads should be considered optimistic, but // it's not clear if that means Eth Beacon Node API responses should have @@ -1900,7 +1907,7 @@ impl Store

{ self.update_head_segment_id(); self.blob_cache.on_slot(new_tick.slot); - self.prune_state_cache()?; + self.prune_state_cache(true)?; let changes = if self.reorganized(old_head_segment_id) { ApplyTickChanges::Reorganized { @@ -2462,12 +2469,12 @@ impl Store

{ self.accepted_blob_sidecars .retain(|(slot, _, _), _| finalized_slot <= *slot); self.prune_checkpoint_states(); - self.prune_state_cache().ok(); + self.prune_state_cache(false).ok(); self.aggregate_and_proof_supersets .prune(self.finalized_epoch()); } - fn prune_state_cache(&self) -> Result<()> { + fn prune_state_cache(&self, preserve_unfinalized_fork_tips: bool) -> Result<()> { let retain_slots = self.store_config.max_epochs_to_retain_states_in_cache * P::SlotsPerEpoch::U64; @@ -2476,7 +2483,15 @@ impl Store

{ .saturating_sub(retain_slots) .max(self.finalized_slot()); - self.state_cache.prune(prune_slot) + let fork_tip_block_roots = if preserve_unfinalized_fork_tips { + self.unfinalized_fork_tips() + .map(|chain_link| chain_link.block_root) + .collect() + } else { + [].into() + }; + + self.state_cache.prune(prune_slot, &fork_tip_block_roots) } /// Applies changes to [`Store.latest_messages`] and computes changes to attesting balances. diff --git a/state_cache/src/state_cache.rs b/state_cache/src/state_cache.rs index 88002c00..8d26f207 100644 --- a/state_cache/src/state_cache.rs +++ b/state_cache/src/state_cache.rs @@ -1,5 +1,5 @@ use core::time::Duration; -use std::sync::Arc; +use std::{collections::HashSet, sync::Arc}; use anyhow::{anyhow, Result}; use im::{HashMap, OrdMap}; @@ -198,9 +198,14 @@ impl StateCache

{ lengths.into_iter().sum::().pipe(Ok) } - pub fn prune(&self, last_pruned_slot: Slot) -> Result<()> { + pub fn prune(&self, last_pruned_slot: Slot, preserved_states: &HashSet) -> Result<()> { for (block_root, state_map_lock) in self.all_state_map_locks()? { let mut state_map = self.try_lock_map(&state_map_lock, block_root)?; + + if preserved_states.contains(&block_root) { + continue; + } + let (_, retained) = state_map.split(&last_pruned_slot); *state_map = retained; } @@ -346,7 +351,7 @@ mod tests { fn test_state_cache_prune() -> Result<()> { let cache = new_test_cache()?; - cache.prune(2)?; + cache.prune(2, &[].into())?; assert_eq!(cache.before_or_at_slot(ROOT_1, 1)?, None); assert_eq!(cache.before_or_at_slot(ROOT_2, 2)?, None); @@ -361,6 +366,24 @@ mod tests { assert_eq!(cache.len()?, 2); + cache.insert(ROOT_1, (state_at_slot(1), None))?; + cache.insert(ROOT_1, (state_at_slot(2), None))?; + cache.insert(ROOT_2, (state_at_slot(2), None))?; + + cache.prune(2, &[ROOT_1].into())?; + + assert_eq!( + cache.before_or_at_slot(ROOT_1, 1)?, + Some((state_at_slot(1), None)), + ); + assert_eq!( + cache.before_or_at_slot(ROOT_1, 2)?, + Some((state_at_slot(2), None)), + ); + assert_eq!(cache.before_or_at_slot(ROOT_2, 2)?, None); + + assert_eq!(cache.len()?, 4); + Ok(()) }