From 1890278d900f229e4aaf57b9d324b99dd0779e6d Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Thu, 19 Sep 2024 17:19:49 +1000 Subject: [PATCH] Simplify database anchor (#6397) * Simplify database anchor * Update beacon_node/store/src/reconstruct.rs * Add migration for anchor * Fix and simplify light_client store tests * Fix incompatible config test --- .../beacon_chain/src/block_verification.rs | 19 --- .../beacon_chain/src/historical_blocks.rs | 9 +- beacon_node/beacon_chain/src/migrate.rs | 2 +- .../src/schema_change/migration_schema_v22.rs | 37 ++-- beacon_node/beacon_chain/tests/store_tests.rs | 157 ++--------------- beacon_node/client/src/notifier.rs | 36 ++-- .../lighthouse_network/src/types/globals.rs | 2 +- .../src/types/sync_state.rs | 2 - .../network_beacon_processor/sync_methods.rs | 10 -- .../network/src/sync/backfill_sync/mod.rs | 94 ++++------- beacon_node/store/src/config.rs | 28 +-- beacon_node/store/src/forwards_iter.rs | 32 ++-- beacon_node/store/src/hot_cold_store.rs | 159 +++++++----------- beacon_node/store/src/metadata.rs | 31 ++++ beacon_node/store/src/reconstruct.rs | 21 ++- common/eth2/src/lighthouse.rs | 2 +- database_manager/src/lib.rs | 17 +- 17 files changed, 219 insertions(+), 439 deletions(-) diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index 55547aaa18c..960dd79a729 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -905,9 +905,6 @@ impl GossipVerifiedBlock { let block_root = get_block_header_root(block_header); - // Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any. - check_block_against_anchor_slot(block.message(), chain)?; - // Do not gossip a block from a finalized slot. check_block_against_finalized_slot(block.message(), block_root, chain)?; @@ -1138,9 +1135,6 @@ impl SignatureVerifiedBlock { .fork_name(&chain.spec) .map_err(BlockError::InconsistentFork)?; - // Check the anchor slot before loading the parent, to avoid spurious lookups. - check_block_against_anchor_slot(block.message(), chain)?; - let (mut parent, block) = load_parent(block, chain)?; let state = cheap_state_advance_to_obtain_committees::<_, BlockError>( @@ -1775,19 +1769,6 @@ impl ExecutionPendingBlock { } } -/// Returns `Ok(())` if the block's slot is greater than the anchor block's slot (if any). -fn check_block_against_anchor_slot( - block: BeaconBlockRef<'_, T::EthSpec>, - chain: &BeaconChain, -) -> Result<(), BlockError> { - if let Some(anchor_slot) = chain.store.get_anchor_slot() { - if block.slot() <= anchor_slot { - return Err(BlockError::WeakSubjectivityConflict); - } - } - Ok(()) -} - /// Returns `Ok(())` if the block is later than the finalized slot on `chain`. /// /// Returns an error if the block is earlier or equal to the finalized slot, or there was an error diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index bb9b57a25c6..7ff09035036 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -33,8 +33,6 @@ pub enum HistoricalBlockError { InvalidSignature, /// Transitory error, caller should retry with the same blocks. ValidatorPubkeyCacheTimeout, - /// No historical sync needed. - NoAnchorInfo, /// Logic error: should never occur. IndexOutOfBounds, } @@ -62,10 +60,7 @@ impl BeaconChain { &self, mut blocks: Vec>, ) -> Result { - let anchor_info = self - .store - .get_anchor_info() - .ok_or(HistoricalBlockError::NoAnchorInfo)?; + let anchor_info = self.store.get_anchor_info(); let blob_info = self.store.get_blob_info(); let data_column_info = self.store.get_data_column_info(); @@ -263,7 +258,7 @@ impl BeaconChain { let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot); anchor_and_blob_batch.push( self.store - .compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?, + .compare_and_set_anchor_info(anchor_info, new_anchor)?, ); self.store.hot_db.do_atomically(anchor_and_blob_batch)?; diff --git a/beacon_node/beacon_chain/src/migrate.rs b/beacon_node/beacon_chain/src/migrate.rs index 529f69a4353..37a2e8917ba 100644 --- a/beacon_node/beacon_chain/src/migrate.rs +++ b/beacon_node/beacon_chain/src/migrate.rs @@ -216,7 +216,7 @@ impl, Cold: ItemStore> BackgroundMigrator( ) -> Result<(), Error> { info!(log, "Upgrading from v21 to v22"); - let old_anchor = db.get_anchor_info(); + let mut old_anchor = db.get_anchor_info(); + + // If the anchor was uninitialized in the old schema (`None`), this represents a full archive + // node. + if old_anchor == ANCHOR_UNINITIALIZED { + old_anchor = ANCHOR_FOR_ARCHIVE_NODE; + } + let split_slot = db.get_split_slot(); let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?; @@ -70,9 +79,7 @@ pub fn upgrade_to_v22( // Write the block roots in the new format in a new column. Similar to above, we do this // separately from deleting the old format block roots so that this is crash safe. - let oldest_block_slot = old_anchor - .as_ref() - .map_or(Slot::new(0), |a| a.oldest_block_slot); + let oldest_block_slot = old_anchor.oldest_block_slot; write_new_schema_block_roots::( &db, genesis_block_root, @@ -90,22 +97,12 @@ pub fn upgrade_to_v22( // If we crash after commiting this change, then there will be some leftover cruft left in the // freezer database, but no corruption because all the new-format data has already been written // above. - let new_anchor = if let Some(old_anchor) = &old_anchor { - AnchorInfo { - state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, - state_lower_limit: Slot::new(0), - ..old_anchor.clone() - } - } else { - AnchorInfo { - anchor_slot: Slot::new(0), - oldest_block_slot: Slot::new(0), - oldest_block_parent: Hash256::ZERO, - state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, - state_lower_limit: Slot::new(0), - } + let new_anchor = AnchorInfo { + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), + ..old_anchor.clone() }; - let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, Some(new_anchor))?]; + let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, new_anchor)?]; db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?; // Finally, clean up the old-format data from the freezer database. diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index ddf5b101c10..4a772bf5173 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -109,12 +109,8 @@ async fn light_client_bootstrap_test() { return; }; - let checkpoint_slot = Slot::new(E::slots_per_epoch() * 6); let db_path = tempdir().unwrap(); - let log = test_logger(); - - let seconds_per_slot = spec.seconds_per_slot; - let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::()); + let store = get_store_generic(&db_path, StoreConfig::default(), spec.clone()); let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let all_validators = (0..LOW_VALIDATOR_COUNT).collect::>(); let num_initial_slots = E::slots_per_epoch() * 7; @@ -132,71 +128,6 @@ async fn light_client_bootstrap_test() { ) .await; - let wss_block_root = harness - .chain - .block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev) - .unwrap() - .unwrap(); - let wss_state_root = harness - .chain - .state_root_at_slot(checkpoint_slot) - .unwrap() - .unwrap(); - let wss_block = harness - .chain - .store - .get_full_block(&wss_block_root) - .unwrap() - .unwrap(); - let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap(); - let wss_state = store - .get_state(&wss_state_root, Some(checkpoint_slot)) - .unwrap() - .unwrap(); - - let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone()); - - let mock = - mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone()); - - // Initialise a new beacon chain from the finalized checkpoint. - // The slot clock must be set to a time ahead of the checkpoint state. - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(harness.chain.genesis_time), - Duration::from_secs(seconds_per_slot), - ); - slot_clock.set_slot(harness.get_current_slot().as_u64()); - - let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1); - - let beacon_chain = BeaconChainBuilder::>::new(MinimalEthSpec) - .store(store.clone()) - .custom_spec(test_spec::()) - .task_executor(harness.chain.task_executor.clone()) - .logger(log.clone()) - .weak_subjectivity_state( - wss_state, - wss_block.clone(), - wss_blobs_opt.clone(), - genesis_state, - ) - .unwrap() - .store_migrator_config(MigratorConfig::default().blocking()) - .dummy_eth1_backend() - .expect("should build dummy backend") - .slot_clock(slot_clock) - .shutdown_sender(shutdown_tx) - .chain_config(ChainConfig::default()) - .event_handler(Some(ServerSentEventHandler::new_with_capacity( - log.clone(), - 1, - ))) - .execution_layer(Some(mock.el)) - .kzg(kzg) - .build() - .expect("should build"); - let current_state = harness.get_current_state(); if ForkName::Electra == current_state.fork_name_unchecked() { @@ -204,7 +135,8 @@ async fn light_client_bootstrap_test() { return; } - let finalized_checkpoint = beacon_chain + let finalized_checkpoint = harness + .chain .canonical_head .cached_head() .finalized_checkpoint(); @@ -239,11 +171,7 @@ async fn light_client_updates_test() { }; let num_final_blocks = E::slots_per_epoch() * 2; - let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9); let db_path = tempdir().unwrap(); - let log = test_logger(); - - let seconds_per_slot = spec.seconds_per_slot; let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::()); let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); let all_validators = (0..LOW_VALIDATOR_COUNT).collect::>(); @@ -260,33 +188,6 @@ async fn light_client_updates_test() { ) .await; - let wss_block_root = harness - .chain - .block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev) - .unwrap() - .unwrap(); - let wss_state_root = harness - .chain - .state_root_at_slot(checkpoint_slot) - .unwrap() - .unwrap(); - let wss_block = harness - .chain - .store - .get_full_block(&wss_block_root) - .unwrap() - .unwrap(); - let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap(); - let wss_state = store - .get_state(&wss_state_root, Some(checkpoint_slot)) - .unwrap() - .unwrap(); - - let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone()); - - let mock = - mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone()); - harness.advance_slot(); harness .extend_chain_with_light_client_data( @@ -296,46 +197,6 @@ async fn light_client_updates_test() { ) .await; - // Initialise a new beacon chain from the finalized checkpoint. - // The slot clock must be set to a time ahead of the checkpoint state. - let slot_clock = TestingSlotClock::new( - Slot::new(0), - Duration::from_secs(harness.chain.genesis_time), - Duration::from_secs(seconds_per_slot), - ); - slot_clock.set_slot(harness.get_current_slot().as_u64()); - - let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1); - - let beacon_chain = BeaconChainBuilder::>::new(MinimalEthSpec) - .store(store.clone()) - .custom_spec(test_spec::()) - .task_executor(harness.chain.task_executor.clone()) - .logger(log.clone()) - .weak_subjectivity_state( - wss_state, - wss_block.clone(), - wss_blobs_opt.clone(), - genesis_state, - ) - .unwrap() - .store_migrator_config(MigratorConfig::default().blocking()) - .dummy_eth1_backend() - .expect("should build dummy backend") - .slot_clock(slot_clock) - .shutdown_sender(shutdown_tx) - .chain_config(ChainConfig::default()) - .event_handler(Some(ServerSentEventHandler::new_with_capacity( - log.clone(), - 1, - ))) - .execution_layer(Some(mock.el)) - .kzg(kzg) - .build() - .expect("should build"); - - let beacon_chain = Arc::new(beacon_chain); - let current_state = harness.get_current_state(); if ForkName::Electra == current_state.fork_name_unchecked() { @@ -351,7 +212,8 @@ async fn light_client_updates_test() { // fetch a range of light client updates. right now there should only be one light client update // in the db. - let lc_updates = beacon_chain + let lc_updates = harness + .chain .get_light_client_updates(sync_period, 100) .unwrap(); @@ -371,7 +233,8 @@ async fn light_client_updates_test() { .await; // we should now have two light client updates in the db - let lc_updates = beacon_chain + let lc_updates = harness + .chain .get_light_client_updates(sync_period, 100) .unwrap(); @@ -2646,11 +2509,11 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { } // Anchor slot is still set to the slot of the checkpoint block. - assert_eq!(store.get_anchor_slot(), Some(wss_block.slot())); + assert_eq!(store.get_anchor_info().anchor_slot, wss_block.slot()); // Reconstruct states. store.clone().reconstruct_historic_states(None).unwrap(); - assert_eq!(store.get_anchor_slot(), None); + assert_eq!(store.get_anchor_info().anchor_slot, 0); } /// Test that blocks and attestations that refer to states around an unaligned split state are @@ -3489,7 +3352,7 @@ async fn prune_historic_states() { .unwrap(); // Check that anchor info is updated. - let anchor_info = store.get_anchor_info().unwrap(); + let anchor_info = store.get_anchor_info(); assert_eq!(anchor_info.state_lower_limit, 0); assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN); diff --git a/beacon_node/client/src/notifier.rs b/beacon_node/client/src/notifier.rs index 632188014eb..4330c45bcde 100644 --- a/beacon_node/client/src/notifier.rs +++ b/beacon_node/client/src/notifier.rs @@ -45,10 +45,7 @@ pub fn spawn_notifier( let mut current_sync_state = network.sync_state(); // Store info if we are required to do a backfill sync. - let original_anchor_slot = beacon_chain - .store - .get_anchor_info() - .map(|ai| ai.oldest_block_slot); + let original_oldest_block_slot = beacon_chain.store.get_anchor_info().oldest_block_slot; let interval_future = async move { // Perform pre-genesis logging. @@ -141,22 +138,17 @@ pub fn spawn_notifier( match current_sync_state { SyncState::BackFillSyncing { .. } => { // Observe backfilling sync info. - if let Some(oldest_slot) = original_anchor_slot { - if let Some(current_anchor_slot) = beacon_chain - .store - .get_anchor_info() - .map(|ai| ai.oldest_block_slot) - { - sync_distance = current_anchor_slot - .saturating_sub(beacon_chain.genesis_backfill_slot); - speedo - // For backfill sync use a fake slot which is the distance we've progressed from the starting `oldest_block_slot`. - .observe( - oldest_slot.saturating_sub(current_anchor_slot), - Instant::now(), - ); - } - } + let current_oldest_block_slot = + beacon_chain.store.get_anchor_info().oldest_block_slot; + sync_distance = current_oldest_block_slot + .saturating_sub(beacon_chain.genesis_backfill_slot); + speedo + // For backfill sync use a fake slot which is the distance we've progressed + // from the starting `original_oldest_block_slot`. + .observe( + original_oldest_block_slot.saturating_sub(current_oldest_block_slot), + Instant::now(), + ); } SyncState::SyncingFinalized { .. } | SyncState::SyncingHead { .. } @@ -213,14 +205,14 @@ pub fn spawn_notifier( "Downloading historical blocks"; "distance" => distance, "speed" => sync_speed_pretty(speed), - "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot).saturating_sub(beacon_chain.genesis_backfill_slot))), + "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_oldest_block_slot.saturating_sub(beacon_chain.genesis_backfill_slot))), ); } else { info!( log, "Downloading historical blocks"; "distance" => distance, - "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_anchor_slot.unwrap_or(current_slot).saturating_sub(beacon_chain.genesis_backfill_slot))), + "est_time" => estimated_time_pretty(speedo.estimated_time_till_slot(original_oldest_block_slot.saturating_sub(beacon_chain.genesis_backfill_slot))), ); } } else if !is_backfilling && last_backfill_log_slot.is_some() { diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index ac78e2cb01e..f739a7c952d 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -72,7 +72,7 @@ impl NetworkGlobals { peers: RwLock::new(PeerDB::new(trusted_peers, disable_peer_scoring, log)), gossipsub_subscriptions: RwLock::new(HashSet::new()), sync_state: RwLock::new(SyncState::Stalled), - backfill_state: RwLock::new(BackFillState::NotRequired), + backfill_state: RwLock::new(BackFillState::Paused), custody_subnets, custody_columns, spec, diff --git a/beacon_node/lighthouse_network/src/types/sync_state.rs b/beacon_node/lighthouse_network/src/types/sync_state.rs index b82e63bd9c0..aa8a11e6674 100644 --- a/beacon_node/lighthouse_network/src/types/sync_state.rs +++ b/beacon_node/lighthouse_network/src/types/sync_state.rs @@ -35,8 +35,6 @@ pub enum BackFillState { Syncing, /// A backfill sync has completed. Completed, - /// A backfill sync is not required. - NotRequired, /// Too many failed attempts at backfilling. Consider it failed. Failed, } diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index c21054dab50..43d3c71940b 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -656,16 +656,6 @@ impl NetworkBeaconProcessor { peer_action: None, } } - HistoricalBlockError::NoAnchorInfo => { - warn!(self.log, "Backfill not required"); - - ChainSegmentFailed { - message: String::from("no_anchor_info"), - // There is no need to do a historical sync, this is not a fault of - // the peer. - peer_action: None, - } - } HistoricalBlockError::IndexOutOfBounds => { error!( self.log, diff --git a/beacon_node/network/src/sync/backfill_sync/mod.rs b/beacon_node/network/src/sync/backfill_sync/mod.rs index 946d25237bf..8031f843dce 100644 --- a/beacon_node/network/src/sync/backfill_sync/mod.rs +++ b/beacon_node/network/src/sync/backfill_sync/mod.rs @@ -163,21 +163,18 @@ impl BackFillSync { // If, for some reason a backfill has already been completed (or we've used a trusted // genesis root) then backfill has been completed. - let (state, current_start) = match beacon_chain.store.get_anchor_info() { - Some(anchor_info) => { - if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) { - (BackFillState::Completed, Epoch::new(0)) - } else { - ( - BackFillState::Paused, - anchor_info - .oldest_block_slot - .epoch(T::EthSpec::slots_per_epoch()), - ) - } - } - None => (BackFillState::NotRequired, Epoch::new(0)), - }; + let anchor_info = beacon_chain.store.get_anchor_info(); + let (state, current_start) = + if anchor_info.block_backfill_complete(beacon_chain.genesis_backfill_slot) { + (BackFillState::Completed, Epoch::new(0)) + } else { + ( + BackFillState::Paused, + anchor_info + .oldest_block_slot + .epoch(T::EthSpec::slots_per_epoch()), + ) + }; let bfs = BackFillSync { batches: BTreeMap::new(), @@ -253,25 +250,13 @@ impl BackFillSync { self.set_state(BackFillState::Syncing); // Obtain a new start slot, from the beacon chain and handle possible errors. - match self.reset_start_epoch() { - Err(ResetEpochError::SyncCompleted) => { - error!(self.log, "Backfill sync completed whilst in failed status"); - self.set_state(BackFillState::Completed); - return Err(BackFillError::InvalidSyncState(String::from( - "chain completed", - ))); - } - Err(ResetEpochError::NotRequired) => { - error!( - self.log, - "Backfill sync not required whilst in failed status" - ); - self.set_state(BackFillState::NotRequired); - return Err(BackFillError::InvalidSyncState(String::from( - "backfill not required", - ))); - } - Ok(_) => {} + if let Err(e) = self.reset_start_epoch() { + let ResetEpochError::SyncCompleted = e; + error!(self.log, "Backfill sync completed whilst in failed status"); + self.set_state(BackFillState::Completed); + return Err(BackFillError::InvalidSyncState(String::from( + "chain completed", + ))); } debug!(self.log, "Resuming a failed backfill sync"; "start_epoch" => self.current_start); @@ -279,9 +264,7 @@ impl BackFillSync { // begin requesting blocks from the peer pool, until all peers are exhausted. self.request_batches(network)?; } - BackFillState::Completed | BackFillState::NotRequired => { - return Ok(SyncStart::NotSyncing) - } + BackFillState::Completed => return Ok(SyncStart::NotSyncing), } Ok(SyncStart::Syncing { @@ -313,10 +296,7 @@ impl BackFillSync { peer_id: &PeerId, network: &mut SyncNetworkContext, ) -> Result<(), BackFillError> { - if matches!( - self.state(), - BackFillState::Failed | BackFillState::NotRequired - ) { + if matches!(self.state(), BackFillState::Failed) { return Ok(()); } @@ -1142,17 +1122,14 @@ impl BackFillSync { /// This errors if the beacon chain indicates that backfill sync has already completed or is /// not required. fn reset_start_epoch(&mut self) -> Result<(), ResetEpochError> { - if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() { - if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { - Err(ResetEpochError::SyncCompleted) - } else { - self.current_start = anchor_info - .oldest_block_slot - .epoch(T::EthSpec::slots_per_epoch()); - Ok(()) - } + let anchor_info = self.beacon_chain.store.get_anchor_info(); + if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { + Err(ResetEpochError::SyncCompleted) } else { - Err(ResetEpochError::NotRequired) + self.current_start = anchor_info + .oldest_block_slot + .epoch(T::EthSpec::slots_per_epoch()); + Ok(()) } } @@ -1160,13 +1137,12 @@ impl BackFillSync { fn check_completed(&mut self) -> bool { if self.would_complete(self.current_start) { // Check that the beacon chain agrees - if let Some(anchor_info) = self.beacon_chain.store.get_anchor_info() { - // Conditions that we have completed a backfill sync - if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { - return true; - } else { - error!(self.log, "Backfill out of sync with beacon chain"); - } + let anchor_info = self.beacon_chain.store.get_anchor_info(); + // Conditions that we have completed a backfill sync + if anchor_info.block_backfill_complete(self.beacon_chain.genesis_backfill_slot) { + return true; + } else { + error!(self.log, "Backfill out of sync with beacon chain"); } } false @@ -1195,6 +1171,4 @@ impl BackFillSync { enum ResetEpochError { /// The chain has already completed. SyncCompleted, - /// Backfill is not required. - NotRequired, } diff --git a/beacon_node/store/src/config.rs b/beacon_node/store/src/config.rs index 40c33528d9e..6a9f4f2a91d 100644 --- a/beacon_node/store/src/config.rs +++ b/beacon_node/store/src/config.rs @@ -124,12 +124,10 @@ impl StoreConfig { &self, on_disk_config: &OnDiskStoreConfig, split: &Split, - anchor: Option<&AnchorInfo>, + anchor: &AnchorInfo, ) -> Result<(), StoreConfigError> { // Allow changing the hierarchy exponents if no historic states are stored. - // anchor == None implies full archive node thus all historic states - let no_historic_states_stored = - anchor.map_or(false, |anchor| anchor.no_historic_states_stored(split.slot)); + let no_historic_states_stored = anchor.no_historic_states_stored(split.slot); let hierarchy_config_changed = if let Ok(on_disk_hierarchy_config) = on_disk_config.hierarchy_config() { *on_disk_hierarchy_config != self.hierarchy_config @@ -247,7 +245,10 @@ impl StoreItem for OnDiskStoreConfig { #[cfg(test)] mod test { use super::*; - use crate::{metadata::STATE_UPPER_LIMIT_NO_RETAIN, AnchorInfo, Split}; + use crate::{ + metadata::{ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN}, + AnchorInfo, Split, + }; use ssz::DecodeError; use types::{Hash256, Slot}; @@ -261,7 +262,7 @@ mod test { )); let split = Split::default(); assert!(store_config - .check_compatibility(&on_disk_config, &split, None) + .check_compatibility(&on_disk_config, &split, &ANCHOR_UNINITIALIZED) .is_ok()); } @@ -275,21 +276,22 @@ mod test { }); let split = Split::default(); assert!(store_config - .check_compatibility(&on_disk_config, &split, None) + .check_compatibility(&on_disk_config, &split, &ANCHOR_UNINITIALIZED) .is_ok()); } #[test] fn check_compatibility_hierarchy_config_incompatible() { - let store_config = StoreConfig { - ..Default::default() - }; + let store_config = StoreConfig::default(); let on_disk_config = OnDiskStoreConfig::V22(OnDiskStoreConfigV22::new(HierarchyConfig { exponents: vec![5, 8, 11, 13, 16, 18, 21], })); - let split = Split::default(); + let split = Split { + slot: Slot::new(32), + ..Default::default() + }; assert!(store_config - .check_compatibility(&on_disk_config, &split, None) + .check_compatibility(&on_disk_config, &split, &ANCHOR_FOR_ARCHIVE_NODE) .is_err()); } @@ -310,7 +312,7 @@ mod test { state_lower_limit: Slot::new(0), }; assert!(store_config - .check_compatibility(&on_disk_config, &split, Some(&anchor)) + .check_compatibility(&on_disk_config, &split, &anchor) .is_ok()); } diff --git a/beacon_node/store/src/forwards_iter.rs b/beacon_node/store/src/forwards_iter.rs index adffc576dd8..e0f44f3affb 100644 --- a/beacon_node/store/src/forwards_iter.rs +++ b/beacon_node/store/src/forwards_iter.rs @@ -99,27 +99,19 @@ impl, Cold: ItemStore> HotColdDB fn freezer_upper_bound_for_state_roots(&self, start_slot: Slot) -> Option { let split_slot = self.get_split_slot(); - let anchor_info = self.get_anchor_info(); + let anchor = self.get_anchor_info(); - match anchor_info { - Some(anchor) => { - if start_slot <= anchor.state_lower_limit { - // Starting slot is prior to lower limit, so that's the upper limit. We can't - // iterate past the lower limit into the gap. The +1 accounts for exclusivity. - Some(anchor.state_lower_limit + 1) - } else if start_slot >= anchor.state_upper_limit { - // Starting slot is after the upper limit, so the split is the upper limit. - // The split state's root is not available in the freezer so this is exclusive. - Some(split_slot) - } else { - // In the gap, nothing is available. - None - } - } - None => { - // No anchor indicates that all state roots up to the split slot are available. - Some(split_slot) - } + if start_slot >= anchor.state_upper_limit { + // Starting slot is after the upper limit, so the split is the upper limit. + // The split state's root is not available in the freezer so this is exclusive. + Some(split_slot) + } else if start_slot <= anchor.state_lower_limit { + // Starting slot is prior to lower limit, so that's the upper limit. We can't + // iterate past the lower limit into the gap. The +1 accounts for exclusivity. + Some(anchor.state_lower_limit + 1) + } else { + // In the gap, nothing is available. + None } } } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 02553b5e76c..be42f57a610 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -7,9 +7,9 @@ use crate::leveldb_store::{BytesKey, LevelDB}; use crate::memory_store::MemoryStore; use crate::metadata::{ AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, - ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, - DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, - STATE_UPPER_LIMIT_NO_RETAIN, + ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_INFO_KEY, ANCHOR_UNINITIALIZED, BLOB_INFO_KEY, + COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, DATA_COLUMN_INFO_KEY, + PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, }; use crate::state_cache::{PutStateOutcome, StateCache}; use crate::{ @@ -55,7 +55,7 @@ pub struct HotColdDB, Cold: ItemStore> { /// greater than or equal are in the hot DB. pub(crate) split: RwLock, /// The starting slots for the range of blocks & states stored in the database. - anchor_info: RwLock>, + anchor_info: RwLock, /// The starting slots for the range of blobs stored in the database. blob_info: RwLock, /// The starting slots for the range of data columns stored in the database. @@ -209,7 +209,7 @@ impl HotColdDB, MemoryStore> { let db = HotColdDB { split: RwLock::new(Split::default()), - anchor_info: RwLock::new(None), + anchor_info: RwLock::new(ANCHOR_UNINITIALIZED), blob_info: RwLock::new(BlobInfo::default()), data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: MemoryStore::open(), @@ -247,14 +247,17 @@ impl HotColdDB, LevelDB> { let hierarchy = config.hierarchy_config.to_moduli()?; + let hot_db = LevelDB::open(hot_path)?; + let anchor_info = RwLock::new(Self::load_anchor_info(&hot_db)?); + let db = HotColdDB { split: RwLock::new(Split::default()), - anchor_info: RwLock::new(None), + anchor_info, blob_info: RwLock::new(BlobInfo::default()), data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: LevelDB::open(cold_path)?, blobs_db: LevelDB::open(blobs_db_path)?, - hot_db: LevelDB::open(hot_path)?, + hot_db, block_cache: Mutex::new(BlockCache::new(config.block_cache_size)), state_cache: Mutex::new(StateCache::new(config.state_cache_size)), hdiff_buffer_cache: Mutex::new(LruCache::new(config.hdiff_buffer_cache_size)), @@ -274,7 +277,6 @@ impl HotColdDB, LevelDB> { // because some migrations load states and depend on the split. if let Some(split) = db.load_split()? { *db.split.write() = split; - *db.anchor_info.write() = db.load_anchor_info()?; info!( db.log, @@ -370,7 +372,7 @@ impl HotColdDB, LevelDB> { let split = db.get_split_info(); let anchor = db.get_anchor_info(); db.config - .check_compatibility(&disk_config, &split, anchor.as_ref())?; + .check_compatibility(&disk_config, &split, &anchor)?; // Inform user if hierarchy config is changing. if let Ok(hierarchy_config) = disk_config.hierarchy_config() { @@ -467,20 +469,19 @@ impl, Cold: ItemStore> HotColdDB hdiff_buffer_cache_byte_size as i64, ); - if let Some(anchor_info) = self.get_anchor_info() { - metrics::set_gauge( - &metrics::STORE_BEACON_ANCHOR_SLOT, - anchor_info.anchor_slot.as_u64() as i64, - ); - metrics::set_gauge( - &metrics::STORE_BEACON_OLDEST_BLOCK_SLOT, - anchor_info.oldest_block_slot.as_u64() as i64, - ); - metrics::set_gauge( - &metrics::STORE_BEACON_STATE_LOWER_LIMIT, - anchor_info.state_lower_limit.as_u64() as i64, - ); - } + let anchor_info = self.get_anchor_info(); + metrics::set_gauge( + &metrics::STORE_BEACON_ANCHOR_SLOT, + anchor_info.anchor_slot.as_u64() as i64, + ); + metrics::set_gauge( + &metrics::STORE_BEACON_OLDEST_BLOCK_SLOT, + anchor_info.oldest_block_slot.as_u64() as i64, + ); + metrics::set_gauge( + &metrics::STORE_BEACON_STATE_LOWER_LIMIT, + anchor_info.state_lower_limit.as_u64() as i64, + ); } /// Store a block and update the LRU cache. @@ -2071,23 +2072,23 @@ impl, Cold: ItemStore> HotColdDB }; let anchor_info = if state_upper_limit == 0 && anchor_slot == 0 { // Genesis archive node: no anchor because we *will* store all states. - None + ANCHOR_FOR_ARCHIVE_NODE } else { - Some(AnchorInfo { + AnchorInfo { anchor_slot, oldest_block_slot: anchor_slot, oldest_block_parent: block.parent_root(), state_upper_limit, state_lower_limit: self.spec.genesis_slot, - }) + } }; - self.compare_and_set_anchor_info(None, anchor_info) + self.compare_and_set_anchor_info(ANCHOR_UNINITIALIZED, anchor_info) } /// Get a clone of the store's anchor info. /// /// To do mutations, use `compare_and_set_anchor_info`. - pub fn get_anchor_info(&self) -> Option { + pub fn get_anchor_info(&self) -> AnchorInfo { self.anchor_info.read_recursive().clone() } @@ -2100,8 +2101,8 @@ impl, Cold: ItemStore> HotColdDB /// is not correct. pub fn compare_and_set_anchor_info( &self, - prev_value: Option, - new_value: Option, + prev_value: AnchorInfo, + new_value: AnchorInfo, ) -> Result { let mut anchor_info = self.anchor_info.write(); if *anchor_info == prev_value { @@ -2116,39 +2117,26 @@ impl, Cold: ItemStore> HotColdDB /// As for `compare_and_set_anchor_info`, but also writes the anchor to disk immediately. pub fn compare_and_set_anchor_info_with_write( &self, - prev_value: Option, - new_value: Option, + prev_value: AnchorInfo, + new_value: AnchorInfo, ) -> Result<(), Error> { let kv_store_op = self.compare_and_set_anchor_info(prev_value, new_value)?; self.hot_db.do_atomically(vec![kv_store_op]) } - /// Load the anchor info from disk, but do not set `self.anchor_info`. - fn load_anchor_info(&self) -> Result, Error> { - self.hot_db.get(&ANCHOR_INFO_KEY) + /// Load the anchor info from disk. + fn load_anchor_info(hot_db: &Hot) -> Result { + Ok(hot_db + .get(&ANCHOR_INFO_KEY)? + .unwrap_or(ANCHOR_UNINITIALIZED)) } /// Store the given `anchor_info` to disk. /// /// The argument is intended to be `self.anchor_info`, but is passed manually to avoid issues /// with recursive locking. - fn store_anchor_info_in_batch(&self, anchor_info: &Option) -> KeyValueStoreOp { - if let Some(ref anchor_info) = anchor_info { - anchor_info.as_kv_store_op(ANCHOR_INFO_KEY) - } else { - KeyValueStoreOp::DeleteKey(get_key_for_col( - DBColumn::BeaconMeta.into(), - ANCHOR_INFO_KEY.as_slice(), - )) - } - } - - /// If an anchor exists, return its `anchor_slot` field. - pub fn get_anchor_slot(&self) -> Option { - self.anchor_info - .read_recursive() - .as_ref() - .map(|a| a.anchor_slot) + fn store_anchor_info_in_batch(&self, anchor_info: &AnchorInfo) -> KeyValueStoreOp { + anchor_info.as_kv_store_op(ANCHOR_INFO_KEY) } /// Initialize the `BlobInfo` when starting from genesis or a checkpoint. @@ -2296,7 +2284,7 @@ impl, Cold: ItemStore> HotColdDB /// instance. pub fn get_historic_state_limits(&self) -> (Slot, Slot) { // If checkpoint sync is used then states in the hot DB will always be available, but may - // become unavailable as finalisation advances due to the lack of a restore point in the + // become unavailable as finalisation advances due to the lack of a snapshot in the // database. For this reason we take the minimum of the split slot and the // restore-point-aligned `state_upper_limit`, which should be set _ahead_ of the checkpoint // slot during initialisation. @@ -2307,20 +2295,16 @@ impl, Cold: ItemStore> HotColdDB // a new restore point will be created at that slot, making all states from 4096 onwards // permanently available. let split_slot = self.get_split_slot(); - self.anchor_info - .read_recursive() - .as_ref() - .map_or((split_slot, self.spec.genesis_slot), |a| { - (a.state_lower_limit, min(a.state_upper_limit, split_slot)) - }) + let anchor = self.anchor_info.read_recursive(); + ( + anchor.state_lower_limit, + min(anchor.state_upper_limit, split_slot), + ) } /// Return the minimum slot such that blocks are available for all subsequent slots. pub fn get_oldest_block_slot(&self) -> Slot { - self.anchor_info - .read_recursive() - .as_ref() - .map_or(self.spec.genesis_slot, |anchor| anchor.oldest_block_slot) + self.anchor_info.read_recursive().oldest_block_slot } /// Return the in-memory configuration used by the database. @@ -2502,7 +2486,7 @@ impl, Cold: ItemStore> HotColdDB "Pruning finalized payloads"; "info" => "you may notice degraded I/O performance while this runs" ); - let anchor_slot = self.get_anchor_info().map(|info| info.anchor_slot); + let anchor_slot = self.get_anchor_info().anchor_slot; let mut ops = vec![]; let mut last_pruned_block_root = None; @@ -2543,7 +2527,7 @@ impl, Cold: ItemStore> HotColdDB ops.push(StoreOp::DeleteExecutionPayload(block_root)); } - if Some(slot) == anchor_slot { + if slot == anchor_slot { info!( self.log, "Payload pruning reached anchor state"; @@ -2650,16 +2634,15 @@ impl, Cold: ItemStore> HotColdDB } // Sanity checks. - if let Some(anchor) = self.get_anchor_info() { - if oldest_blob_slot < anchor.oldest_block_slot { - error!( - self.log, - "Oldest blob is older than oldest block"; - "oldest_blob_slot" => oldest_blob_slot, - "oldest_block_slot" => anchor.oldest_block_slot - ); - return Err(HotColdDBError::BlobPruneLogicError.into()); - } + let anchor = self.get_anchor_info(); + if oldest_blob_slot < anchor.oldest_block_slot { + error!( + self.log, + "Oldest blob is older than oldest block"; + "oldest_blob_slot" => oldest_blob_slot, + "oldest_block_slot" => anchor.oldest_block_slot + ); + return Err(HotColdDBError::BlobPruneLogicError.into()); } // Iterate block roots forwards from the oldest blob slot. @@ -2760,25 +2743,15 @@ impl, Cold: ItemStore> HotColdDB ) -> Result<(), Error> { // Update the anchor to use the dummy state upper limit and disable historic state storage. let old_anchor = self.get_anchor_info(); - let new_anchor = if let Some(old_anchor) = old_anchor.clone() { - AnchorInfo { - state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, - state_lower_limit: Slot::new(0), - ..old_anchor.clone() - } - } else { - AnchorInfo { - anchor_slot: Slot::new(0), - oldest_block_slot: Slot::new(0), - oldest_block_parent: Hash256::zero(), - state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, - state_lower_limit: Slot::new(0), - } + let new_anchor = AnchorInfo { + state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN, + state_lower_limit: Slot::new(0), + ..old_anchor.clone() }; // Commit the anchor change immediately: if the cold database ops fail they can always be // retried, and we can't do them atomically with this change anyway. - self.compare_and_set_anchor_info_with_write(old_anchor, Some(new_anchor))?; + self.compare_and_set_anchor_info_with_write(old_anchor, new_anchor)?; // Stage freezer data for deletion. Do not bother loading and deserializing values as this // wastes time and is less schema-agnostic. My hope is that this method will be useful for @@ -2983,11 +2956,7 @@ pub fn migrate_database, Cold: ItemStore>( // Do not try to store states if a restore point is yet to be stored, or will never be // stored (see `STATE_UPPER_LIMIT_NO_RETAIN`). Make an exception for the genesis state // which always needs to be copied from the hot DB to the freezer and should not be deleted. - if slot != 0 - && anchor_info - .as_ref() - .map_or(false, |anchor| slot < anchor.state_upper_limit) - { + if slot != 0 && slot < anchor_info.state_upper_limit { debug!(store.log, "Pruning finalized state"; "slot" => slot); continue; } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 081203ea6de..3f076a767ac 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -21,6 +21,27 @@ pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); +/// The `AnchorInfo` encoding full availability of all historic blocks & states. +pub const ANCHOR_FOR_ARCHIVE_NODE: AnchorInfo = AnchorInfo { + anchor_slot: Slot::new(0), + oldest_block_slot: Slot::new(0), + oldest_block_parent: Hash256::ZERO, + state_upper_limit: Slot::new(0), + state_lower_limit: Slot::new(0), +}; + +/// The `AnchorInfo` encoding an uninitialized anchor. +/// +/// This value should never exist except on initial start-up prior to the anchor being initialised +/// by `init_anchor_info`. +pub const ANCHOR_UNINITIALIZED: AnchorInfo = AnchorInfo { + anchor_slot: Slot::new(u64::MAX), + oldest_block_slot: Slot::new(u64::MAX), + oldest_block_parent: Hash256::ZERO, + state_upper_limit: Slot::new(u64::MAX), + state_lower_limit: Slot::new(0), +}; + #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] pub struct SchemaVersion(pub u64); @@ -140,10 +161,20 @@ impl AnchorInfo { self.oldest_block_slot <= target_slot } + /// Return true if all historic states are stored, i.e. if state reconstruction is complete. + pub fn all_historic_states_stored(&self) -> bool { + self.state_lower_limit == self.state_upper_limit + } + /// Return true if no historic states other than genesis are stored in the database. pub fn no_historic_states_stored(&self, split_slot: Slot) -> bool { self.state_lower_limit == 0 && self.state_upper_limit >= split_slot } + + /// Return true if no historic states other than genesis *will ever be stored*. + pub fn full_state_pruning_enabled(&self) -> bool { + self.state_lower_limit == 0 && self.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN + } } impl StoreItem for AnchorInfo { diff --git a/beacon_node/store/src/reconstruct.rs b/beacon_node/store/src/reconstruct.rs index 856660ae8b9..56ab04249c2 100644 --- a/beacon_node/store/src/reconstruct.rs +++ b/beacon_node/store/src/reconstruct.rs @@ -1,5 +1,6 @@ //! Implementation of historic state reconstruction (given complete block history). use crate::hot_cold_store::{HotColdDB, HotColdDBError}; +use crate::metadata::ANCHOR_FOR_ARCHIVE_NODE; use crate::metrics; use crate::{Error, ItemStore}; use itertools::{process_results, Itertools}; @@ -21,10 +22,12 @@ where self: &Arc, num_blocks: Option, ) -> Result<(), Error> { - let Some(mut anchor) = self.get_anchor_info() else { - // Nothing to do, history is complete. + let mut anchor = self.get_anchor_info(); + + // Nothing to do, history is complete. + if anchor.all_historic_states_stored() { return Ok(()); - }; + } // Check that all historic blocks are known. if anchor.oldest_block_slot != 0 { @@ -132,7 +135,7 @@ where self.cold_db.do_atomically(std::mem::take(&mut io_batch))?; // Update anchor. - let old_anchor = Some(anchor.clone()); + let old_anchor = anchor.clone(); if reconstruction_complete { // The two limits have met in the middle! We're done! @@ -146,17 +149,17 @@ where }); } - self.compare_and_set_anchor_info_with_write(old_anchor, None)?; + self.compare_and_set_anchor_info_with_write( + old_anchor, + ANCHOR_FOR_ARCHIVE_NODE, + )?; return Ok(()); } else { // The lower limit has been raised, store it. anchor.state_lower_limit = slot; - self.compare_and_set_anchor_info_with_write( - old_anchor, - Some(anchor.clone()), - )?; + self.compare_and_set_anchor_info_with_write(old_anchor, anchor.clone())?; } // If this is the end of the batch, return Ok. The caller will run another diff --git a/common/eth2/src/lighthouse.rs b/common/eth2/src/lighthouse.rs index e978d922450..309d8228aaf 100644 --- a/common/eth2/src/lighthouse.rs +++ b/common/eth2/src/lighthouse.rs @@ -361,7 +361,7 @@ pub struct DatabaseInfo { pub schema_version: u64, pub config: StoreConfig, pub split: Split, - pub anchor: Option, + pub anchor: AnchorInfo, pub blob_info: BlobInfo, } diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index a439ceb69b4..04e0df193f4 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -16,7 +16,6 @@ use slog::{info, warn, Logger}; use std::fs; use std::io::Write; use std::path::PathBuf; -use store::metadata::STATE_UPPER_LIMIT_NO_RETAIN; use store::{ errors::Error, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, @@ -433,18 +432,12 @@ pub fn prune_states( // Check that the user has confirmed they want to proceed. if !prune_config.confirm { - match db.get_anchor_info() { - Some(anchor_info) - if anchor_info.state_lower_limit == 0 - && anchor_info.state_upper_limit == STATE_UPPER_LIMIT_NO_RETAIN => - { - info!(log, "States have already been pruned"); - return Ok(()); - } - _ => { - info!(log, "Ready to prune states"); - } + if db.get_anchor_info().full_state_pruning_enabled() { + info!(log, "States have already been pruned"); + return Ok(()); } + + info!(log, "Ready to prune states"); warn!( log, "Pruning states is irreversible";