Skip to content

Store changes to persist data columns #6073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 2, 2024
Merged
2 changes: 2 additions & 0 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub enum Error {
AnchorInfoConcurrentMutation,
/// The store's `blob_info` was mutated concurrently, the latest modification wasn't applied.
BlobInfoConcurrentMutation,
/// The store's `data_column_info` was mutated concurrently, the latest modification wasn't applied.
DataColumnInfoConcurrentMutation,
/// The block or state is unavailable due to weak subjectivity sync.
HistoryUnavailable,
/// State reconstruction cannot commence because not all historic blocks are known.
Expand Down
278 changes: 264 additions & 14 deletions beacon_node/store/src/hot_cold_store.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions beacon_node/store/src/leveldb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,10 @@ impl db_key::Key for BytesKey {
}

impl BytesKey {
pub fn starts_with(&self, prefix: &Self) -> bool {
self.key.starts_with(&prefix.key)
}

/// Return `true` iff this `BytesKey` was created with the given `column`.
pub fn matches_column(&self, column: DBColumn) -> bool {
self.key.starts_with(column.as_bytes())
Expand Down
33 changes: 30 additions & 3 deletions beacon_node/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ use std::sync::Arc;
use strum::{EnumString, IntoStaticStr};
pub use types::*;

const DATA_COLUMN_DB_KEY_SIZE: usize = 32 + 8;

pub type ColumnIter<'a, K> = Box<dyn Iterator<Item = Result<(K, Vec<u8>), Error>> + 'a>;
pub type ColumnKeyIter<'a, K> = Box<dyn Iterator<Item = Result<K, Error>> + 'a>;

Expand Down Expand Up @@ -109,9 +111,7 @@ pub trait KeyValueStore<E: EthSpec>: Sync + Send + Sized + 'static {
Box::new(std::iter::empty())
}

fn iter_raw_keys(&self, _column: DBColumn, _prefix: &[u8]) -> RawKeyIter {
Box::new(std::iter::empty())
}
fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter;

/// Iterate through all keys in a particular column.
fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K>;
Expand Down Expand Up @@ -150,6 +150,28 @@ pub fn get_col_from_key(key: &[u8]) -> Option<String> {
String::from_utf8(key[0..3].to_vec()).ok()
}

pub fn get_data_column_key(block_root: &Hash256, column_index: &ColumnIndex) -> Vec<u8> {
let mut result = block_root.as_bytes().to_vec();
result.extend_from_slice(&column_index.to_le_bytes());
result
}

pub fn parse_data_column_key(data: Vec<u8>) -> Result<(Hash256, ColumnIndex), Error> {
if data.len() != DBColumn::BeaconDataColumn.key_size() {
return Err(Error::InvalidKey);
}
// split_at panics if 32 < 40 which will never happen after the length check above
let (block_root_bytes, column_index_bytes) = data.split_at(32);
let block_root = Hash256::from_slice(block_root_bytes);
// column_index_bytes is asserted to be 8 bytes after the length check above
let column_index = ColumnIndex::from_le_bytes(
column_index_bytes
.try_into()
.map_err(|_| Error::InvalidKey)?,
);
Ok((block_root, column_index))
}

#[must_use]
#[derive(Clone)]
pub enum KeyValueStoreOp {
Expand Down Expand Up @@ -210,11 +232,13 @@ pub enum StoreOp<'a, E: EthSpec> {
PutBlock(Hash256, Arc<SignedBeaconBlock<E>>),
PutState(Hash256, &'a BeaconState<E>),
PutBlobs(Hash256, BlobSidecarList<E>),
PutDataColumns(Hash256, DataColumnSidecarList<E>),
PutStateSummary(Hash256, HotStateSummary),
PutStateTemporaryFlag(Hash256),
DeleteStateTemporaryFlag(Hash256),
DeleteBlock(Hash256),
DeleteBlobs(Hash256),
DeleteDataColumns(Hash256, Vec<ColumnIndex>),
DeleteState(Hash256, Option<Slot>),
DeleteExecutionPayload(Hash256),
KeyValueOp(KeyValueStoreOp),
Expand All @@ -230,6 +254,8 @@ pub enum DBColumn {
BeaconBlock,
#[strum(serialize = "blb")]
BeaconBlob,
#[strum(serialize = "bdc")]
BeaconDataColumn,
/// For full `BeaconState`s in the hot database (finalized or fork-boundary states).
#[strum(serialize = "ste")]
BeaconState,
Expand Down Expand Up @@ -317,6 +343,7 @@ impl DBColumn {
| Self::BeaconHistoricalRoots
| Self::BeaconHistoricalSummaries
| Self::BeaconRandaoMixes => 8,
Self::BeaconDataColumn => DATA_COLUMN_DB_KEY_SIZE,
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion beacon_node/store/src/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
get_key_for_col, leveldb_store::BytesKey, ColumnIter, ColumnKeyIter, DBColumn, Error,
ItemStore, Key, KeyValueStore, KeyValueStoreOp,
ItemStore, Key, KeyValueStore, KeyValueStoreOp, RawKeyIter,
};
use parking_lot::{Mutex, MutexGuard, RwLock};
use std::collections::BTreeMap;
Expand Down Expand Up @@ -100,6 +100,18 @@ impl<E: EthSpec> KeyValueStore<E> for MemoryStore<E> {
}))
}

fn iter_raw_keys(&self, column: DBColumn, prefix: &[u8]) -> RawKeyIter {
let start_key = BytesKey::from_vec(get_key_for_col(column.as_str(), prefix));
let keys = self
.db
.read()
.range(start_key.clone()..)
.take_while(|(k, _)| k.starts_with(&start_key))
.filter_map(|(k, _)| k.remove_column_variable(column).map(|k| k.to_vec()))
.collect::<Vec<_>>();
Box::new(keys.into_iter().map(Ok))
}

fn iter_column_keys<K: Key>(&self, column: DBColumn) -> ColumnKeyIter<K> {
Box::new(self.iter_column(column).map(|res| res.map(|(k, _)| k)))
}
Expand Down
28 changes: 28 additions & 0 deletions beacon_node/store/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const PRUNING_CHECKPOINT_KEY: Hash256 = Hash256::repeat_byte(3);
pub const COMPACTION_TIMESTAMP_KEY: Hash256 = Hash256::repeat_byte(4);
pub const ANCHOR_INFO_KEY: Hash256 = Hash256::repeat_byte(5);
pub const BLOB_INFO_KEY: Hash256 = Hash256::repeat_byte(6);
pub const DATA_COLUMN_INFO_KEY: Hash256 = Hash256::repeat_byte(7);

/// State upper limit value used to indicate that a node is not storing historic states.
pub const STATE_UPPER_LIMIT_NO_RETAIN: Slot = Slot::new(u64::MAX);
Expand Down Expand Up @@ -152,3 +153,30 @@ impl StoreItem for BlobInfo {
Ok(Self::from_ssz_bytes(bytes)?)
}
}

/// Database parameters relevant to data column sync.
#[derive(Debug, PartialEq, Eq, Clone, Encode, Decode, Serialize, Deserialize, Default)]
pub struct DataColumnInfo {
/// The slot after which data columns are or *will be* available (>=).
///
/// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which
/// data columns will be available.
///
/// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is
/// not yet known.
pub oldest_data_column_slot: Option<Slot>,
}

impl StoreItem for DataColumnInfo {
fn db_column() -> DBColumn {
DBColumn::BeaconMeta
}

fn as_store_bytes(&self) -> Vec<u8> {
self.as_ssz_bytes()
}

fn from_store_bytes(bytes: &[u8]) -> Result<Self, Error> {
Ok(Self::from_ssz_bytes(bytes)?)
}
}
7 changes: 7 additions & 0 deletions beacon_node/store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ pub static BEACON_BLOBS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> = LazyLock
"Number of hits to the store's blob cache",
)
});
pub static BEACON_DATA_COLUMNS_CACHE_HIT_COUNT: LazyLock<Result<IntCounter>> =
LazyLock::new(|| {
try_create_int_counter(
"store_beacon_data_columns_cache_hit_total",
"Number of hits to the store's data column cache",
)
});

/// Updates the global metrics registry with store-related information.
pub fn scrape_for_metrics(db_path: &Path, freezer_db_path: &Path) {
Expand Down
Loading