From 80b416ff0229ea27e0948b712cd03f37cb686305 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 11 Jul 2024 11:38:06 +1000 Subject: [PATCH 1/6] Store changes to persist data columns. Co-authored-by: dapplion <35266934+dapplion@users.noreply.github.com> --- beacon_node/store/src/errors.rs | 2 + beacon_node/store/src/hot_cold_store.rs | 274 ++++++++++++++++++++++-- beacon_node/store/src/leveldb_store.rs | 4 + beacon_node/store/src/lib.rs | 32 ++- beacon_node/store/src/memory_store.rs | 14 +- beacon_node/store/src/metadata.rs | 28 +++ beacon_node/store/src/metrics.rs | 4 + consensus/types/src/chain_spec.rs | 20 ++ 8 files changed, 360 insertions(+), 18 deletions(-) diff --git a/beacon_node/store/src/errors.rs b/beacon_node/store/src/errors.rs index 91e6a920ba3..e3b2d327b0a 100644 --- a/beacon_node/store/src/errors.rs +++ b/beacon_node/store/src/errors.rs @@ -27,6 +27,8 @@ pub enum Error { AnchorInfoConcurrentMutation, /// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied. BlobInfoConcurrentMutation, + /// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied. + DataColumnInfoConcurrentMutation, /// The block or state is unavailable due to weak subjectivity sync. HistoryUnavailable, /// State reconstruction cannot commence because not all historic blocks are known. diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 9c247c983a9..9133ef4d70f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -12,12 +12,13 @@ use crate::leveldb_store::BytesKey; use crate::leveldb_store::LevelDB; use crate::memory_store::MemoryStore; use crate::metadata::{ - AnchorInfo, BlobInfo, CompactionTimestamp, PruningCheckpoint, SchemaVersion, ANCHOR_INFO_KEY, - BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, - PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, STATE_UPPER_LIMIT_NO_RETAIN, + AnchorInfo, BlobInfo, CompactionTimestamp, DataColumnInfo, PruningCheckpoint, SchemaVersion, + ANCHOR_INFO_KEY, BLOB_INFO_KEY, COMPACTION_TIMESTAMP_KEY, CONFIG_KEY, CURRENT_SCHEMA_VERSION, + DATA_COLUMN_INFO_KEY, PRUNING_CHECKPOINT_KEY, SCHEMA_VERSION_KEY, SPLIT_KEY, + STATE_UPPER_LIMIT_NO_RETAIN, }; -use crate::metrics; use crate::state_cache::{PutStateOutcome, StateCache}; +use crate::{get_data_column_key, metrics, parse_data_column_key}; use crate::{ get_key_for_col, ChunkWriter, DBColumn, DatabaseBlock, Error, ItemStore, KeyValueStoreOp, PartialBeaconState, StoreItem, StoreOp, @@ -35,11 +36,13 @@ use state_processing::{ SlotProcessingError, }; use std::cmp::min; +use std::collections::HashMap; use std::marker::PhantomData; use std::num::NonZeroUsize; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidecarList}; use types::*; /// On-disk database that stores finalized states efficiently. @@ -57,6 +60,8 @@ pub struct HotColdDB, Cold: ItemStore> { anchor_info: RwLock>, /// The starting slots for the range of blobs stored in the database. blob_info: RwLock, + /// The starting slots for the range of data columns stored in the database. + data_column_info: RwLock, pub(crate) config: StoreConfig, /// Cold database containing compact historical data. pub cold_db: Cold, @@ -86,6 +91,7 @@ pub struct HotColdDB, Cold: ItemStore> { struct BlockCache { block_cache: LruCache>, blob_cache: LruCache>, + data_column_cache: LruCache>>>, } impl BlockCache { @@ -93,6 +99,7 @@ impl BlockCache { Self { block_cache: LruCache::new(size), blob_cache: LruCache::new(size), + data_column_cache: LruCache::new(size), } } pub fn put_block(&mut self, block_root: Hash256, block: SignedBeaconBlock) { @@ -101,12 +108,26 @@ impl BlockCache { pub fn put_blobs(&mut self, block_root: Hash256, blobs: BlobSidecarList) { self.blob_cache.put(block_root, blobs); } + pub fn put_data_column(&mut self, block_root: Hash256, data_column: Arc>) { + self.data_column_cache + .get_or_insert_mut(block_root, Default::default) + .insert(data_column.index, data_column); + } pub fn get_block<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a SignedBeaconBlock> { self.block_cache.get(block_root) } pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { self.blob_cache.get(block_root) } + pub fn get_data_column<'a>( + &'a mut self, + block_root: &Hash256, + column_index: &ColumnIndex, + ) -> Option<&'a Arc>> { + self.data_column_cache + .get(block_root) + .and_then(|map| map.get(column_index)) + } pub fn delete_block(&mut self, block_root: &Hash256) { let _ = self.block_cache.pop(block_root); } @@ -180,6 +201,7 @@ impl HotColdDB, MemoryStore> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: MemoryStore::open(), blobs_db: MemoryStore::open(), hot_db: MemoryStore::open(), @@ -216,6 +238,7 @@ impl HotColdDB, LevelDB> { split: RwLock::new(Split::default()), anchor_info: RwLock::new(None), blob_info: RwLock::new(BlobInfo::default()), + data_column_info: RwLock::new(DataColumnInfo::default()), cold_db: LevelDB::open(cold_path)?, blobs_db: LevelDB::open(blobs_db_path)?, hot_db: LevelDB::open(hot_path)?, @@ -294,11 +317,35 @@ impl HotColdDB, LevelDB> { }, }; db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; + + let data_column_info = db.load_data_column_info()?; + let new_data_column_info = match &data_column_info { + // TODO[das]: update to EIP-7594 fork + Some(data_column_info) => { + // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + let oldest_data_column_slot = + data_column_info.oldest_data_column_slot.or(deneb_fork_slot); + DataColumnInfo { + oldest_data_column_slot, + } + } + // First start. + None => DataColumnInfo { + // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + oldest_data_column_slot: deneb_fork_slot, + }, + }; + db.compare_and_set_data_column_info_with_write( + <_>::default(), + new_data_column_info.clone(), + )?; + info!( db.log, "Blob DB initialized"; "path" => ?blobs_db_path, "oldest_blob_slot" => ?new_blob_info.oldest_blob_slot, + "oldest_data_column_slot" => ?new_data_column_info.oldest_data_column_slot, ); // Ensure that the schema version of the on-disk database matches the software. @@ -626,6 +673,24 @@ impl, Cold: ItemStore> HotColdDB ops.push(KeyValueStoreOp::PutKeyValue(db_key, blobs.as_ssz_bytes())); } + pub fn data_columns_as_kv_store_ops( + &self, + block_root: &Hash256, + data_columns: DataColumnSidecarList, + ops: &mut Vec, + ) { + for data_column in data_columns { + let db_key = get_key_for_col( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(block_root, &data_column.index), + ); + ops.push(KeyValueStoreOp::PutKeyValue( + db_key, + data_column.as_ssz_bytes(), + )); + } + } + pub fn put_state_summary( &self, state_root: &Hash256, @@ -909,6 +974,14 @@ impl, Cold: ItemStore> HotColdDB self.blobs_as_kv_store_ops(&block_root, blobs, &mut key_value_batch); } + StoreOp::PutDataColumns(block_root, data_columns) => { + self.data_columns_as_kv_store_ops( + &block_root, + data_columns, + &mut key_value_batch, + ); + } + StoreOp::PutStateSummary(state_root, summary) => { key_value_batch.push(summary.as_kv_store_op(state_root)); } @@ -933,6 +1006,16 @@ impl, Cold: ItemStore> HotColdDB key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); } + StoreOp::DeleteDataColumns(block_root, column_indices) => { + for index in column_indices { + let key = get_key_for_col( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(&block_root, &index), + ); + key_value_batch.push(KeyValueStoreOp::DeleteKey(key)); + } + } + StoreOp::DeleteState(state_root, slot) => { let state_summary_key = get_key_for_col(DBColumn::BeaconStateSummary.into(), state_root.as_bytes()); @@ -963,9 +1046,10 @@ impl, Cold: ItemStore> HotColdDB batch: Vec>, ) -> Result<(), Error> { let mut blobs_to_delete = Vec::new(); + let mut data_columns_to_delete = Vec::new(); let (blobs_ops, hot_db_ops): (Vec>, Vec>) = batch.into_iter().partition(|store_op| match store_op { - StoreOp::PutBlobs(_, _) => true, + StoreOp::PutBlobs(_, _) | StoreOp::PutDataColumns(_, _) => true, StoreOp::DeleteBlobs(block_root) => { match self.get_blobs(block_root) { Ok(Some(blob_sidecar_list)) => { @@ -982,6 +1066,31 @@ impl, Cold: ItemStore> HotColdDB } true } + StoreOp::DeleteDataColumns(block_root, indices) => { + match indices + .iter() + .map(|index| self.get_data_column(block_root, index)) + .collect::, _>>() + { + Ok(data_column_sidecar_list_opt) => { + let data_column_sidecar_list = data_column_sidecar_list_opt + .into_iter() + .flatten() + .collect::>(); + // Must push the same number of items as StoreOp::DeleteDataColumns items to + // prevent a `HotColdDBError::Rollback` error below in case of rollback + data_columns_to_delete.push((*block_root, data_column_sidecar_list)); + } + Err(e) => { + error!( + self.log, "Error getting data columns"; + "block_root" => %block_root, + "error" => ?e + ); + } + } + true + } StoreOp::PutBlock(_, _) | StoreOp::DeleteBlock(_) => false, _ => false, }); @@ -1013,10 +1122,20 @@ impl, Cold: ItemStore> HotColdDB for op in blob_cache_ops.iter_mut() { let reverse_op = match op { StoreOp::PutBlobs(block_root, _) => StoreOp::DeleteBlobs(*block_root), + StoreOp::PutDataColumns(block_root, data_columns) => { + let indices = data_columns.iter().map(|c| c.index).collect(); + StoreOp::DeleteDataColumns(*block_root, indices) + } StoreOp::DeleteBlobs(_) => match blobs_to_delete.pop() { Some((block_root, blobs)) => StoreOp::PutBlobs(block_root, blobs), None => return Err(HotColdDBError::Rollback.into()), }, + StoreOp::DeleteDataColumns(_, _) => match data_columns_to_delete.pop() { + Some((block_root, data_columns)) => { + StoreOp::PutDataColumns(block_root, data_columns) + } + None => return Err(HotColdDBError::Rollback.into()), + }, _ => return Err(HotColdDBError::Rollback.into()), }; *op = reverse_op; @@ -1034,6 +1153,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::PutBlobs(_, _) => (), + StoreOp::PutDataColumns(_, _) => (), + StoreOp::PutState(_, _) => (), StoreOp::PutStateSummary(_, _) => (), @@ -1053,6 +1174,8 @@ impl, Cold: ItemStore> HotColdDB StoreOp::DeleteBlobs(_) => (), + StoreOp::DeleteDataColumns(_, _) => (), + StoreOp::DeleteExecutionPayload(_) => (), StoreOp::KeyValueOp(_) => (), @@ -1552,6 +1675,45 @@ impl, Cold: ItemStore> HotColdDB } } + /// Fetch all keys in the data_column column with prefix `block_root` + pub fn get_data_column_keys(&self, block_root: Hash256) -> Result, Error> { + self.blobs_db + .iter_raw_keys(DBColumn::BeaconDataColumn, block_root.as_bytes()) + .map(|key| key.and_then(|key| parse_data_column_key(key).map(|key| key.1))) + .collect() + } + + /// Fetch a single data_column for a given block from the store. + pub fn get_data_column( + &self, + block_root: &Hash256, + column_index: &ColumnIndex, + ) -> Result>>, Error> { + // Check the cache. + if let Some(data_column) = self + .block_cache + .lock() + .get_data_column(block_root, column_index) + { + metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT); + return Ok(Some(data_column.clone())); + } + + match self.blobs_db.get_bytes( + DBColumn::BeaconDataColumn.into(), + &get_data_column_key(block_root, column_index), + )? { + Some(ref data_column_bytes) => { + let data_column = Arc::new(DataColumnSidecar::from_ssz_bytes(data_column_bytes)?); + self.block_cache + .lock() + .put_data_column(*block_root, data_column.clone()); + Ok(Some(data_column)) + } + None => Ok(None), + } + } + /// Get a reference to the `ChainSpec` used by the database. pub fn get_chain_spec(&self) -> &ChainSpec { &self.spec @@ -1748,6 +1910,24 @@ impl, Cold: ItemStore> HotColdDB self.blob_info.read_recursive().clone() } + /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. + pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { + let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { + std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) + }); + let data_column_info = DataColumnInfo { + oldest_data_column_slot, + }; + self.compare_and_set_data_column_info(self.get_data_column_info(), data_column_info) + } + + /// Get a clone of the store's data column info. + /// + /// To do mutations, use `compare_and_set_data_column_info`. + pub fn get_data_column_info(&self) -> DataColumnInfo { + self.data_column_info.read_recursive().clone() + } + /// Atomically update the blob info from `prev_value` to `new_value`. /// /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other @@ -1793,6 +1973,54 @@ impl, Cold: ItemStore> HotColdDB blob_info.as_kv_store_op(BLOB_INFO_KEY) } + /// Atomically update the data column info from `prev_value` to `new_value`. + /// + /// Return a `KeyValueStoreOp` which should be written to disk, possibly atomically with other + /// values. + /// + /// Return an `DataColumnInfoConcurrentMutation` error if the `prev_value` provided + /// is not correct. + pub fn compare_and_set_data_column_info( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result { + let mut data_column_info = self.data_column_info.write(); + if *data_column_info == prev_value { + let kv_op = self.store_data_column_info_in_batch(&new_value); + *data_column_info = new_value; + Ok(kv_op) + } else { + Err(Error::DataColumnInfoConcurrentMutation) + } + } + + /// As for `compare_and_set_data_column_info`, but also writes the blob info to disk immediately. + pub fn compare_and_set_data_column_info_with_write( + &self, + prev_value: DataColumnInfo, + new_value: DataColumnInfo, + ) -> Result<(), Error> { + let kv_store_op = self.compare_and_set_data_column_info(prev_value, new_value)?; + self.hot_db.do_atomically(vec![kv_store_op]) + } + + /// Load the blob info from disk, but do not set `self.data_column_info`. + fn load_data_column_info(&self) -> Result, Error> { + self.hot_db.get(&DATA_COLUMN_INFO_KEY) + } + + /// Store the given `data_column_info` to disk. + /// + /// The argument is intended to be `self.data_column_info`, but is passed manually to avoid issues + /// with recursive locking. + fn store_data_column_info_in_batch( + &self, + data_column_info: &DataColumnInfo, + ) -> KeyValueStoreOp { + data_column_info.as_kv_store_op(DATA_COLUMN_INFO_KEY) + } + /// Return the slot-window describing the available historic states. /// /// Returns `(lower_limit, upper_limit)`. @@ -2285,15 +2513,33 @@ impl, Cold: ItemStore> HotColdDB } }; - if Some(block_root) != last_pruned_block_root && self.blobs_exist(&block_root)? { - trace!( - self.log, - "Pruning blobs of block"; - "slot" => slot, - "block_root" => ?block_root, - ); - last_pruned_block_root = Some(block_root); - ops.push(StoreOp::DeleteBlobs(block_root)); + if Some(block_root) != last_pruned_block_root { + if self + .spec + .is_peer_das_enabled_for_epoch(slot.epoch(E::slots_per_epoch())) + { + // data columns + let indices = self.get_data_column_keys(block_root)?; + if !indices.is_empty() { + trace!( + self.log, + "Pruning data columns of block"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteDataColumns(block_root, indices)); + } + } else if self.blobs_exist(&block_root)? { + trace!( + self.log, + "Pruning blobs of block"; + "slot" => slot, + "block_root" => ?block_root, + ); + last_pruned_block_root = Some(block_root); + ops.push(StoreOp::DeleteBlobs(block_root)); + } } if slot >= end_slot { diff --git a/beacon_node/store/src/leveldb_store.rs b/beacon_node/store/src/leveldb_store.rs index ffd55c16a04..b224319ae4f 100644 --- a/beacon_node/store/src/leveldb_store.rs +++ b/beacon_node/store/src/leveldb_store.rs @@ -255,6 +255,10 @@ impl db_key::Key for BytesKey { } impl BytesKey { + pub fn starts_with(&self, prefix: &Self) -> bool { + self.key.starts_with(&prefix.key) + } + /// Return `true` iff this `BytesKey` was created with the given `column`. pub fn matches_column(&self, column: DBColumn) -> bool { self.key.starts_with(column.as_bytes()) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 0247bea5541..07ad352d83a 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -42,6 +42,7 @@ pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; +use types::data_column_sidecar::{ColumnIndex, DataColumnSidecarList}; pub use types::*; pub type ColumnIter<'a, K> = Box), Error>> + 'a>; @@ -109,9 +110,7 @@ pub trait KeyValueStore: Sync + Send + Sized + 'static { Box::new(std::iter::empty()) } - fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter { - Box::new(std::iter::empty()) - } + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter; /// Iterate through all keys in a particular column. fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter; @@ -143,6 +142,28 @@ pub fn get_key_for_col(column: &str, key: &[u8]) -> Vec { result } +pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> Vec { + let mut result = block_root.as_bytes().to_vec(); + result.extend_from_slice(&column_index.to_le_bytes()); + result +} + +pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Error> { + if data.len() != 32 + 8 { + return Err(Error::InvalidKey); + } + // split_at panics if 32 < 40 which will never happen after the length check above + let (block_root_bytes, column_index_bytes) = data.split_at(32); + let block_root = Hash256::from_slice(block_root_bytes); + // column_index_bytes is asserted to be 8 bytes after the length check above + let column_index = ColumnIndex::from_le_bytes( + column_index_bytes + .try_into() + .expect("slice with incorrect length"), + ); + Ok((block_root, column_index)) +} + #[must_use] #[derive(Clone)] pub enum KeyValueStoreOp { @@ -203,11 +224,13 @@ pub enum StoreOp<'a, E: EthSpec> { PutBlock(Hash256, Arc>), PutState(Hash256, &'a BeaconState), PutBlobs(Hash256, BlobSidecarList), + PutDataColumns(Hash256, DataColumnSidecarList), PutStateSummary(Hash256, HotStateSummary), PutStateTemporaryFlag(Hash256), DeleteStateTemporaryFlag(Hash256), DeleteBlock(Hash256), DeleteBlobs(Hash256), + DeleteDataColumns(Hash256, Vec), DeleteState(Hash256, Option), DeleteExecutionPayload(Hash256), KeyValueOp(KeyValueStoreOp), @@ -223,6 +246,8 @@ pub enum DBColumn { BeaconBlock, #[strum(serialize = "blb")] BeaconBlob, + #[strum(serialize = "bdc")] + BeaconDataColumn, /// For full `BeaconState`s in the hot database (finalized or fork-boundary states). #[strum(serialize = "ste")] BeaconState, @@ -310,6 +335,7 @@ impl DBColumn { | Self::BeaconHistoricalRoots | Self::BeaconHistoricalSummaries | Self::BeaconRandaoMixes => 8, + Self::BeaconDataColumn => 32 + 8, } } } diff --git a/beacon_node/store/src/memory_store.rs b/beacon_node/store/src/memory_store.rs index 302d2c2add2..4c7bfdf10ff 100644 --- a/beacon_node/store/src/memory_store.rs +++ b/beacon_node/store/src/memory_store.rs @@ -1,6 +1,6 @@ use crate::{ get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error, - ItemStore, Key, KeyValueStore, KeyValueStoreOp, + ItemStore, Key, KeyValueStore, KeyValueStoreOp, RawKeyIter, }; use parking_lot::{Mutex, MutexGuard, RwLock}; use std::collections::BTreeMap; @@ -100,6 +100,18 @@ impl KeyValueStore for MemoryStore { })) } + fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter { + let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix)); + let keys = self + .db + .read() + .range(start_key.clone()..) + .take_while(|(k, _)| k.starts_with(&start_key)) + .filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec())) + .collect::>(); + Box::new(keys.into_iter().map(Ok)) + } + fn iter_column_keys(&self, column: DBColumn) -> ColumnKeyIter { Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k))) } diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index a22dc4aab4c..0c93251fe2e 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3); pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4); pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5); pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6); +pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7); /// State upper limit value used to indicate that a node is not storing historic states. pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX); @@ -152,3 +153,30 @@ impl StoreItem for BlobInfo { Ok(Self::from_ssz_bytes(bytes)?) } } + +/// Database parameters relevant to data column sync. +#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)] +pub struct DataColumnInfo { + /// The slot after which data columns are or *will be* available (>=). + /// + /// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which + /// data columns will be available. + /// + /// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is + /// not yet known. + pub oldest_data_column_slot: Option, +} + +impl StoreItem for DataColumnInfo { + fn db_column() -> DBColumn { + DBColumn::BeaconMeta + } + + fn as_store_bytes(&self) -> Vec { + self.as_ssz_bytes() + } + + fn from_store_bytes(bytes: &[u8]) -> Result { + Ok(Self::from_ssz_bytes(bytes)?) + } +} diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index f8dbbfec988..e8fc55c9114 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -98,6 +98,10 @@ lazy_static! { "store_beacon_blobs_cache_hit_total", "Number of hits to the store's blob cache" ); + pub static ref BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: Result = try_create_int_counter( + "store_beacon_data_columns_cache_hit_total", + "Number of hits to the store's data column cache" + ); } /// Updates the global metrics registry with store-related information. diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 0f61f74efe1..ed94d2a2acc 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -193,6 +193,7 @@ pub struct ChainSpec { /* * DAS params */ + pub eip7594_fork_epoch: Option, pub number_of_columns: usize, /* @@ -392,6 +393,13 @@ impl ChainSpec { } } + /// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`. + pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { + self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| { + block_epoch >= eip7594_fork_epoch + }) + } + /// For a given `BeaconState`, return the whistleblower reward quotient associated with its variant. pub fn whistleblower_reward_quotient_for_state( &self, @@ -777,6 +785,10 @@ impl ChainSpec { }) .expect("calculation does not overflow"), + /* + * DAS params + */ + eip7594_fork_epoch: None, number_of_columns: 128, /* @@ -880,6 +892,10 @@ impl ChainSpec { electra_fork_epoch: None, max_pending_partials_per_withdrawals_sweep: u64::checked_pow(2, 0) .expect("pow does not overflow"), + /* + * DAS params + */ + eip7594_fork_epoch: None, // Other network_id: 2, // lighthouse testnet network id deposit_chain_id: 5, @@ -1081,6 +1097,10 @@ impl ChainSpec { }) .expect("calculation does not overflow"), + /* + * DAS params + */ + eip7594_fork_epoch: None, number_of_columns: 128, /* From a3e3b20f1b664c64b704ed476bc5d1a372899492 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 11 Jul 2024 11:50:20 +1000 Subject: [PATCH 2/6] Update to use `eip7594_fork_epoch` for data column slot in Store. --- beacon_node/store/src/hot_cold_store.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 9133ef4d70f..5867cc70f0c 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -319,12 +319,15 @@ impl HotColdDB, LevelDB> { db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; let data_column_info = db.load_data_column_info()?; + let eip7594_fork_slot = db + .spec + .eip7594_fork_epoch + .map(|epoch| epoch.start_slot(E::slots_per_epoch())); let new_data_column_info = match &data_column_info { - // TODO[das]: update to EIP-7594 fork Some(data_column_info) => { // Set the oldest data column slot to the Deneb fork slot if it is not yet set. let oldest_data_column_slot = - data_column_info.oldest_data_column_slot.or(deneb_fork_slot); + data_column_info.oldest_data_column_slot.or(eip7594_fork_slot); DataColumnInfo { oldest_data_column_slot, } @@ -332,7 +335,7 @@ impl HotColdDB, LevelDB> { // First start. None => DataColumnInfo { // Set the oldest data column slot to the Deneb fork slot if it is not yet set. - oldest_data_column_slot: deneb_fork_slot, + oldest_data_column_slot: eip7594_fork_slot, }, }; db.compare_and_set_data_column_info_with_write( From 9f5e74bca7de57b5e979ba691ab918dc49b80704 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 11 Jul 2024 12:25:54 +1000 Subject: [PATCH 3/6] Fix formatting. --- beacon_node/store/src/hot_cold_store.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 5867cc70f0c..ac5d8975439 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -326,8 +326,9 @@ impl HotColdDB, LevelDB> { let new_data_column_info = match &data_column_info { Some(data_column_info) => { // Set the oldest data column slot to the Deneb fork slot if it is not yet set. - let oldest_data_column_slot = - data_column_info.oldest_data_column_slot.or(eip7594_fork_slot); + let oldest_data_column_slot = data_column_info + .oldest_data_column_slot + .or(eip7594_fork_slot); DataColumnInfo { oldest_data_column_slot, } From c9390d4f9e4abf6c55fc6b19215f4b437946f83c Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 22 Jul 2024 22:37:21 +1000 Subject: [PATCH 4/6] Minor refactor. --- beacon_node/store/src/lib.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index 3daf4a87d74..d3e3534c25b 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -45,6 +45,8 @@ use strum::{EnumString, IntoStaticStr}; use types::data_column_sidecar::{ColumnIndex, DataColumnSidecarList}; pub use types::*; +const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; + pub type ColumnIter<'a, K> = Box), Error>> + 'a>; pub type ColumnKeyIter<'a, K> = Box> + 'a>; @@ -156,7 +158,7 @@ pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> } pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Error> { - if data.len() != 32 + 8 { + if data.len() != DATA_COLUMN_DB_KEY_SIZE { return Err(Error::InvalidKey); } // split_at panics if 32 < 40 which will never happen after the length check above @@ -166,7 +168,7 @@ pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Er let column_index = ColumnIndex::from_le_bytes( column_index_bytes .try_into() - .expect("slice with incorrect length"), + .map_err(|_| Error::InvalidKey)?, ); Ok((block_root, column_index)) } @@ -342,7 +344,7 @@ impl DBColumn { | Self::BeaconHistoricalRoots | Self::BeaconHistoricalSummaries | Self::BeaconRandaoMixes => 8, - Self::BeaconDataColumn => 32 + 8, + Self::BeaconDataColumn => DATA_COLUMN_DB_KEY_SIZE, } } } From 9b2016875a3922331e5390e9ed7f115df87479b3 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 1 Aug 2024 17:13:53 +1000 Subject: [PATCH 5/6] Init data colum info at PeerDAS epoch instead of Deneb fork epoch. Address review comments. --- beacon_node/store/src/hot_cold_store.rs | 2 +- beacon_node/store/src/lib.rs | 3 +-- beacon_node/store/src/metrics.rs | 13 +++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index ac5d8975439..14ee28d5417 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -1916,7 +1916,7 @@ impl, Cold: ItemStore> HotColdDB /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { - let oldest_data_column_slot = self.spec.deneb_fork_epoch.map(|fork_epoch| { + let oldest_data_column_slot = self.spec.eip7594_fork_epoch.map(|fork_epoch| { std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) }); let data_column_info = DataColumnInfo { diff --git a/beacon_node/store/src/lib.rs b/beacon_node/store/src/lib.rs index d3e3534c25b..1f8cc8ca019 100644 --- a/beacon_node/store/src/lib.rs +++ b/beacon_node/store/src/lib.rs @@ -42,7 +42,6 @@ pub use metrics::scrape_for_metrics; use parking_lot::MutexGuard; use std::sync::Arc; use strum::{EnumString, IntoStaticStr}; -use types::data_column_sidecar::{ColumnIndex, DataColumnSidecarList}; pub use types::*; const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8; @@ -158,7 +157,7 @@ pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> } pub fn parse_data_column_key(data: Vec) -> Result<(Hash256, ColumnIndex), Error> { - if data.len() != DATA_COLUMN_DB_KEY_SIZE { + if data.len() != DBColumn::BeaconDataColumn.key_size() { return Err(Error::InvalidKey); } // split_at panics if 32 < 40 which will never happen after the length check above diff --git a/beacon_node/store/src/metrics.rs b/beacon_node/store/src/metrics.rs index 50e6a4212cc..902c440be86 100644 --- a/beacon_node/store/src/metrics.rs +++ b/beacon_node/store/src/metrics.rs @@ -151,12 +151,13 @@ pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock> = LazyLock "Number of hits to the store's blob cache", ) }); -pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock> = LazyLock::new(|| { - try_create_int_counter( - "store_beacon_data_columns_cache_hit_total", - "Number of hits to the store's data column cache" - ) -}); +pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "store_beacon_data_columns_cache_hit_total", + "Number of hits to the store's data column cache", + ) + }); /// Updates the global metrics registry with store-related information. pub fn scrape_for_metrics(db_path: &Path, freezer_db_path: &Path) { From 99308a7f8083551dfc3608a9e023cb306a3a2a89 Mon Sep 17 00:00:00 2001 From: Michael Sproul Date: Fri, 2 Aug 2024 14:57:03 +1000 Subject: [PATCH 6/6] Remove Deneb-related comments --- beacon_node/store/src/hot_cold_store.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 14ee28d5417..8b144c1be93 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -325,7 +325,7 @@ impl HotColdDB, LevelDB> { .map(|epoch| epoch.start_slot(E::slots_per_epoch())); let new_data_column_info = match &data_column_info { Some(data_column_info) => { - // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + // Set the oldest data column slot to the fork slot if it is not yet set. let oldest_data_column_slot = data_column_info .oldest_data_column_slot .or(eip7594_fork_slot); @@ -335,7 +335,7 @@ impl HotColdDB, LevelDB> { } // First start. None => DataColumnInfo { - // Set the oldest data column slot to the Deneb fork slot if it is not yet set. + // Set the oldest data column slot to the fork slot if it is not yet set. oldest_data_column_slot: eip7594_fork_slot, }, };