Skip to content

Commit

Permalink
Simplify database anchor (#6397)
Browse files Browse the repository at this point in the history
* Simplify database anchor

* Update beacon_node/store/src/reconstruct.rs

* Add migration for anchor

* Fix and simplify light_client store tests

* Fix incompatible config test
  • Loading branch information
michaelsproul committed Sep 19, 2024
1 parent 3d90ac6 commit 1890278
Show file tree
Hide file tree
Showing 17 changed files with 219 additions and 439 deletions.
19 changes: 0 additions & 19 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,9 +905,6 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {

let block_root = get_block_header_root(block_header);

// Disallow blocks that conflict with the anchor (weak subjectivity checkpoint), if any.
check_block_against_anchor_slot(block.message(), chain)?;

// Do not gossip a block from a finalized slot.
check_block_against_finalized_slot(block.message(), block_root, chain)?;

Expand Down Expand Up @@ -1138,9 +1135,6 @@ impl<T: BeaconChainTypes> SignatureVerifiedBlock<T> {
.fork_name(&chain.spec)
.map_err(BlockError::InconsistentFork)?;

// Check the anchor slot before loading the parent, to avoid spurious lookups.
check_block_against_anchor_slot(block.message(), chain)?;

let (mut parent, block) = load_parent(block, chain)?;

let state = cheap_state_advance_to_obtain_committees::<_, BlockError>(
Expand Down Expand Up @@ -1775,19 +1769,6 @@ impl<T: BeaconChainTypes> ExecutionPendingBlock<T> {
}
}

/// Returns `Ok(())` if the block's slot is greater than the anchor block's slot (if any).
fn check_block_against_anchor_slot<T: BeaconChainTypes>(
block: BeaconBlockRef<'_, T::EthSpec>,
chain: &BeaconChain<T>,
) -> Result<(), BlockError> {
if let Some(anchor_slot) = chain.store.get_anchor_slot() {
if block.slot() <= anchor_slot {
return Err(BlockError::WeakSubjectivityConflict);
}
}
Ok(())
}

/// Returns `Ok(())` if the block is later than the finalized slot on `chain`.
///
/// Returns an error if the block is earlier or equal to the finalized slot, or there was an error
Expand Down
9 changes: 2 additions & 7 deletions beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ pub enum HistoricalBlockError {
InvalidSignature,
/// Transitory error, caller should retry with the same blocks.
ValidatorPubkeyCacheTimeout,
/// No historical sync needed.
NoAnchorInfo,
/// Logic error: should never occur.
IndexOutOfBounds,
}
Expand Down Expand Up @@ -62,10 +60,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
mut blocks: Vec<AvailableBlock<T::EthSpec>>,
) -> Result<usize, Error> {
let anchor_info = self
.store
.get_anchor_info()
.ok_or(HistoricalBlockError::NoAnchorInfo)?;
let anchor_info = self.store.get_anchor_info();
let blob_info = self.store.get_blob_info();
let data_column_info = self.store.get_data_column_info();

Expand Down Expand Up @@ -263,7 +258,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let backfill_complete = new_anchor.block_backfill_complete(self.genesis_backfill_slot);
anchor_and_blob_batch.push(
self.store
.compare_and_set_anchor_info(Some(anchor_info), Some(new_anchor))?,
.compare_and_set_anchor_info(anchor_info, new_anchor)?,
);
self.store.hot_db.do_atomically(anchor_and_blob_batch)?;

Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
// Schedule another reconstruction batch if required and we have access to the
// channel for requeueing.
if let Some(tx) = opt_tx {
if db.get_anchor_info().is_some() {
if !db.get_anchor_info().all_historic_states_stored() {
if let Err(e) = tx.send(Notification::Reconstruction) {
error!(
log,
Expand Down
37 changes: 17 additions & 20 deletions beacon_node/beacon_chain/src/schema_change/migration_schema_v22.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use store::chunked_iter::ChunkedVectorIter;
use store::{
chunked_vector::BlockRootsChunked,
get_key_for_col,
metadata::{SchemaVersion, STATE_UPPER_LIMIT_NO_RETAIN},
metadata::{
SchemaVersion, ANCHOR_FOR_ARCHIVE_NODE, ANCHOR_UNINITIALIZED, STATE_UPPER_LIMIT_NO_RETAIN,
},
partial_beacon_state::PartialBeaconState,
AnchorInfo, DBColumn, Error, HotColdDB, KeyValueStore, KeyValueStoreOp,
};
Expand Down Expand Up @@ -43,7 +45,14 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
) -> Result<(), Error> {
info!(log, "Upgrading from v21 to v22");

let old_anchor = db.get_anchor_info();
let mut old_anchor = db.get_anchor_info();

// If the anchor was uninitialized in the old schema (`None`), this represents a full archive
// node.
if old_anchor == ANCHOR_UNINITIALIZED {
old_anchor = ANCHOR_FOR_ARCHIVE_NODE;
}

let split_slot = db.get_split_slot();
let genesis_state_root = genesis_state_root.ok_or(Error::GenesisStateUnknown)?;

Expand All @@ -70,9 +79,7 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(

// Write the block roots in the new format in a new column. Similar to above, we do this
// separately from deleting the old format block roots so that this is crash safe.
let oldest_block_slot = old_anchor
.as_ref()
.map_or(Slot::new(0), |a| a.oldest_block_slot);
let oldest_block_slot = old_anchor.oldest_block_slot;
write_new_schema_block_roots::<T>(
&db,
genesis_block_root,
Expand All @@ -90,22 +97,12 @@ pub fn upgrade_to_v22<T: BeaconChainTypes>(
// If we crash after commiting this change, then there will be some leftover cruft left in the
// freezer database, but no corruption because all the new-format data has already been written
// above.
let new_anchor = if let Some(old_anchor) = &old_anchor {
AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
}
} else {
AnchorInfo {
anchor_slot: Slot::new(0),
oldest_block_slot: Slot::new(0),
oldest_block_parent: Hash256::ZERO,
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
}
let new_anchor = AnchorInfo {
state_upper_limit: STATE_UPPER_LIMIT_NO_RETAIN,
state_lower_limit: Slot::new(0),
..old_anchor.clone()
};
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, Some(new_anchor))?];
let hot_ops = vec![db.compare_and_set_anchor_info(old_anchor, new_anchor)?];
db.store_schema_version_atomically(SchemaVersion(22), hot_ops)?;

// Finally, clean up the old-format data from the freezer database.
Expand Down
157 changes: 10 additions & 147 deletions beacon_node/beacon_chain/tests/store_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,8 @@ async fn light_client_bootstrap_test() {
return;
};

let checkpoint_slot = Slot::new(E::slots_per_epoch() * 6);
let db_path = tempdir().unwrap();
let log = test_logger();

let seconds_per_slot = spec.seconds_per_slot;
let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::<E>());
let store = get_store_generic(&db_path, StoreConfig::default(), spec.clone());
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
let num_initial_slots = E::slots_per_epoch() * 7;
Expand All @@ -132,79 +128,15 @@ async fn light_client_bootstrap_test() {
)
.await;

let wss_block_root = harness
.chain
.block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
let wss_state_root = harness
.chain
.state_root_at_slot(checkpoint_slot)
.unwrap()
.unwrap();
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.unwrap()
.unwrap();
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
let wss_state = store
.get_state(&wss_state_root, Some(checkpoint_slot))
.unwrap()
.unwrap();

let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone());

let mock =
mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone());

// Initialise a new beacon chain from the finalized checkpoint.
// The slot clock must be set to a time ahead of the checkpoint state.
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());

let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);

let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec)
.store(store.clone())
.custom_spec(test_spec::<E>())
.task_executor(harness.chain.task_executor.clone())
.logger(log.clone())
.weak_subjectivity_state(
wss_state,
wss_block.clone(),
wss_blobs_opt.clone(),
genesis_state,
)
.unwrap()
.store_migrator_config(MigratorConfig::default().blocking())
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.execution_layer(Some(mock.el))
.kzg(kzg)
.build()
.expect("should build");

let current_state = harness.get_current_state();

if ForkName::Electra == current_state.fork_name_unchecked() {
// TODO(electra) fix beacon state `compute_merkle_proof`
return;
}

let finalized_checkpoint = beacon_chain
let finalized_checkpoint = harness
.chain
.canonical_head
.cached_head()
.finalized_checkpoint();
Expand Down Expand Up @@ -239,11 +171,7 @@ async fn light_client_updates_test() {
};

let num_final_blocks = E::slots_per_epoch() * 2;
let checkpoint_slot = Slot::new(E::slots_per_epoch() * 9);
let db_path = tempdir().unwrap();
let log = test_logger();

let seconds_per_slot = spec.seconds_per_slot;
let store = get_store_generic(&db_path, StoreConfig::default(), test_spec::<E>());
let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT);
let all_validators = (0..LOW_VALIDATOR_COUNT).collect::<Vec<_>>();
Expand All @@ -260,33 +188,6 @@ async fn light_client_updates_test() {
)
.await;

let wss_block_root = harness
.chain
.block_root_at_slot(checkpoint_slot, WhenSlotSkipped::Prev)
.unwrap()
.unwrap();
let wss_state_root = harness
.chain
.state_root_at_slot(checkpoint_slot)
.unwrap()
.unwrap();
let wss_block = harness
.chain
.store
.get_full_block(&wss_block_root)
.unwrap()
.unwrap();
let wss_blobs_opt = harness.chain.store.get_blobs(&wss_block_root).unwrap();
let wss_state = store
.get_state(&wss_state_root, Some(checkpoint_slot))
.unwrap()
.unwrap();

let kzg = spec.deneb_fork_epoch.map(|_| KZG.clone());

let mock =
mock_execution_layer_from_parts(&harness.spec, harness.runtime.task_executor.clone());

harness.advance_slot();
harness
.extend_chain_with_light_client_data(
Expand All @@ -296,46 +197,6 @@ async fn light_client_updates_test() {
)
.await;

// Initialise a new beacon chain from the finalized checkpoint.
// The slot clock must be set to a time ahead of the checkpoint state.
let slot_clock = TestingSlotClock::new(
Slot::new(0),
Duration::from_secs(harness.chain.genesis_time),
Duration::from_secs(seconds_per_slot),
);
slot_clock.set_slot(harness.get_current_slot().as_u64());

let (shutdown_tx, _shutdown_rx) = futures::channel::mpsc::channel(1);

let beacon_chain = BeaconChainBuilder::<DiskHarnessType<E>>::new(MinimalEthSpec)
.store(store.clone())
.custom_spec(test_spec::<E>())
.task_executor(harness.chain.task_executor.clone())
.logger(log.clone())
.weak_subjectivity_state(
wss_state,
wss_block.clone(),
wss_blobs_opt.clone(),
genesis_state,
)
.unwrap()
.store_migrator_config(MigratorConfig::default().blocking())
.dummy_eth1_backend()
.expect("should build dummy backend")
.slot_clock(slot_clock)
.shutdown_sender(shutdown_tx)
.chain_config(ChainConfig::default())
.event_handler(Some(ServerSentEventHandler::new_with_capacity(
log.clone(),
1,
)))
.execution_layer(Some(mock.el))
.kzg(kzg)
.build()
.expect("should build");

let beacon_chain = Arc::new(beacon_chain);

let current_state = harness.get_current_state();

if ForkName::Electra == current_state.fork_name_unchecked() {
Expand All @@ -351,7 +212,8 @@ async fn light_client_updates_test() {

// fetch a range of light client updates. right now there should only be one light client update
// in the db.
let lc_updates = beacon_chain
let lc_updates = harness
.chain
.get_light_client_updates(sync_period, 100)
.unwrap();

Expand All @@ -371,7 +233,8 @@ async fn light_client_updates_test() {
.await;

// we should now have two light client updates in the db
let lc_updates = beacon_chain
let lc_updates = harness
.chain
.get_light_client_updates(sync_period, 100)
.unwrap();

Expand Down Expand Up @@ -2646,11 +2509,11 @@ async fn weak_subjectivity_sync_test(slots: Vec<Slot>, checkpoint_slot: Slot) {
}

// Anchor slot is still set to the slot of the checkpoint block.
assert_eq!(store.get_anchor_slot(), Some(wss_block.slot()));
assert_eq!(store.get_anchor_info().anchor_slot, wss_block.slot());

// Reconstruct states.
store.clone().reconstruct_historic_states(None).unwrap();
assert_eq!(store.get_anchor_slot(), None);
assert_eq!(store.get_anchor_info().anchor_slot, 0);
}

/// Test that blocks and attestations that refer to states around an unaligned split state are
Expand Down Expand Up @@ -3489,7 +3352,7 @@ async fn prune_historic_states() {
.unwrap();

// Check that anchor info is updated.
let anchor_info = store.get_anchor_info().unwrap();
let anchor_info = store.get_anchor_info();
assert_eq!(anchor_info.state_lower_limit, 0);
assert_eq!(anchor_info.state_upper_limit, STATE_UPPER_LIMIT_NO_RETAIN);

Expand Down
Loading

0 comments on commit 1890278

Please sign in to comment.