diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 19ee3d116c1..7ebd011cf46 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3288,9 +3288,10 @@ impl BeaconChain { } } } + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let availability = self .data_availability_checker - .put_rpc_blobs(block_root, blobs)?; + .put_rpc_blobs(block_root, epoch, blobs)?; self.process_availability(slot, availability).await } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 2b62a83194b..bba58675933 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -16,7 +16,9 @@ use ssz_types::VariableList; use std::time::Duration; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; -use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot}; +use types::{ + BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot, +}; /// An error occurred while validating a gossip blob. #[derive(Debug)] @@ -223,6 +225,9 @@ impl GossipVerifiedBlob { pub fn slot(&self) -> Slot { self.blob.blob.slot() } + pub fn epoch(&self) -> Epoch { + self.blob.blob.epoch() + } pub fn index(&self) -> u64 { self.blob.blob.index } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 2431769ddb0..90d4b6081e4 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -94,8 +94,17 @@ impl DataAvailabilityChecker { log: &Logger, spec: ChainSpec, ) -> Result { - let overflow_cache = - DataAvailabilityCheckerInner::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + // TODO(das): support supernode or custom custody requirement + let custody_subnet_count = spec.custody_requirement as usize; + let custody_column_count = + custody_subnet_count.saturating_mul(spec.data_columns_per_subnet()); + + let overflow_cache = DataAvailabilityCheckerInner::new( + OVERFLOW_LRU_CAPACITY, + store, + custody_column_count, + spec.clone(), + )?; Ok(Self { availability_cache: Arc::new(overflow_cache), slot_clock, @@ -143,6 +152,7 @@ impl DataAvailabilityChecker { pub fn put_rpc_blobs( &self, block_root: Hash256, + epoch: Epoch, blobs: FixedBlobSidecarList, ) -> Result, AvailabilityCheckError> { let Some(kzg) = self.kzg.as_ref() else { @@ -159,7 +169,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::Kzg)?; self.availability_cache - .put_kzg_verified_blobs(block_root, verified_blobs) + .put_kzg_verified_blobs(block_root, epoch, verified_blobs) } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -171,8 +181,11 @@ impl DataAvailabilityChecker { &self, gossip_blob: GossipVerifiedBlob, ) -> Result, AvailabilityCheckError> { - self.availability_cache - .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) + self.availability_cache.put_kzg_verified_blobs( + gossip_blob.block_root(), + gossip_blob.epoch(), + vec![gossip_blob.into_inner()], + ) } /// Check if we have all the blobs for a block. Returns `Availability` which has information diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d22f6b2cc9f..bb92b0b6322 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,6 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, + UnableToDetermineImportRequirement, Unexpected, SszTypes(ssz_types::Error), MissingBlobs, @@ -41,6 +42,7 @@ impl Error { | Error::Unexpected | Error::ParentStateMissing(_) | Error::BlockReplayError(_) + | Error::UnableToDetermineImportRequirement | Error::RebuildingStateCaches(_) | Error::SlotClockError => ErrorCategory::Internal, Error::Kzg(_) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 5e0513c8d30..dfe369cc47f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -5,6 +5,7 @@ use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; +use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::BeaconChainTypes; use lru::LruCache; use parking_lot::RwLock; @@ -23,9 +24,15 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, + pub verified_data_columns: Vec>, pub executed_block: Option>, } +pub enum BlockImportRequirement { + AllBlobs, + CustodyColumns(usize), +} + impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -39,6 +46,16 @@ impl PendingComponents { &self.verified_blobs } + /// Returns an immutable reference to the cached data column. + pub fn get_cached_data_column( + &self, + data_column_index: u64, + ) -> Option<&KzgVerifiedCustodyDataColumn> { + self.verified_data_columns + .iter() + .find(|d| d.index() == data_column_index) + } + /// Returns a mutable reference to the cached block. pub fn get_cached_block_mut(&mut self) -> &mut Option> { &mut self.executed_block @@ -78,6 +95,20 @@ impl PendingComponents { self.get_cached_blobs().iter().flatten().count() } + /// Checks if a data column of a given index exists in the cache. + /// + /// Returns: + /// - `true` if a data column for the given index exists. + /// - `false` otherwise. + fn data_column_exists(&self, data_column_index: u64) -> bool { + self.get_cached_data_column(data_column_index).is_some() + } + + /// Returns the number of data columns that have been received and are stored in the cache. + pub fn num_received_data_columns(&self) -> usize { + self.verified_data_columns.len() + } + /// Inserts a block into the cache. pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { *self.get_cached_block_mut() = Some(block) @@ -125,6 +156,18 @@ impl PendingComponents { } } + /// Merges a given set of data columns into the cache. + fn merge_data_columns>>( + &mut self, + kzg_verified_data_columns: I, + ) { + for data_column in kzg_verified_data_columns { + if !self.data_column_exists(data_column.index()) { + self.verified_data_columns.push(data_column); + } + } + } + /// Inserts a new block and revalidates the existing blobs against it. /// /// Blobs that don't match the new block's commitments are evicted. @@ -134,15 +177,27 @@ impl PendingComponents { self.merge_blobs(reinsert); } - /// Checks if the block and all of its expected blobs are available in the cache. + /// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are + /// available in the cache. /// - /// Returns `true` if both the block exists and the number of received blobs matches the number - /// of expected blobs. - pub fn is_available(&self) -> bool { - if let Some(num_expected_blobs) = self.num_expected_blobs() { - num_expected_blobs == self.num_received_blobs() - } else { - false + /// Returns `true` if both the block exists and the number of received blobs / custody columns + /// matches the number of expected blobs / custody columns. + pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool { + match block_import_requirement { + BlockImportRequirement::AllBlobs => self + .num_expected_blobs() + .map_or(false, |num_expected_blobs| { + num_expected_blobs == self.num_received_blobs() + }), + BlockImportRequirement::CustodyColumns(num_expected_columns) => { + let num_received_data_columns = self.num_received_data_columns(); + // No data columns when there are 0 blobs + self.num_expected_blobs() + .map_or(false, |num_expected_blobs| { + num_expected_blobs == 0 + || *num_expected_columns == num_received_data_columns + }) + } } } @@ -151,6 +206,7 @@ impl PendingComponents { Self { block_root, verified_blobs: FixedVector::default(), + verified_data_columns: vec![], executed_block: None, } } @@ -170,6 +226,7 @@ impl PendingComponents { let Self { block_root, verified_blobs, + verified_data_columns: _, executed_block, } = self; @@ -222,17 +279,23 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, + /// The number of data columns the node is custodying. + custody_column_count: usize, + spec: ChainSpec, } impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, + custody_column_count: usize, spec: ChainSpec, ) -> Result { Ok(Self { critical: RwLock::new(LruCache::new(capacity)), - state_cache: StateLRUCache::new(beacon_store, spec), + state_cache: StateLRUCache::new(beacon_store, spec.clone()), + custody_column_count, + spec, }) } @@ -277,9 +340,24 @@ impl DataAvailabilityCheckerInner { f(self.critical.read().peek(block_root)) } + fn block_import_requirement( + &self, + epoch: Epoch, + ) -> Result { + let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch); + if peer_das_enabled { + Ok(BlockImportRequirement::CustodyColumns( + self.custody_column_count, + )) + } else { + Ok(BlockImportRequirement::AllBlobs) + } + } + pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, + epoch: Epoch, kzg_verified_blobs: I, ) -> Result, AvailabilityCheckError> { let mut fixed_blobs = FixedVector::default(); @@ -301,7 +379,43 @@ impl DataAvailabilityCheckerInner { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if pending_components.is_available() { + let block_import_requirement = self.block_import_requirement(epoch)?; + if pending_components.is_available(&block_import_requirement) { + write_lock.put(block_root, pending_components.clone()); + // No need to hold the write lock anymore + drop(write_lock); + pending_components.make_available(|diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) + } else { + write_lock.put(block_root, pending_components); + Ok(Availability::MissingComponents(block_root)) + } + } + + // TODO(das): gossip and rpc code paths to be implemented. + #[allow(dead_code)] + pub fn put_kzg_verified_data_columns< + I: IntoIterator>, + >( + &self, + block_root: Hash256, + epoch: Epoch, + kzg_verified_data_columns: I, + ) -> Result, AvailabilityCheckError> { + let mut write_lock = self.critical.write(); + + // Grab existing entry or create a new entry. + let mut pending_components = write_lock + .pop_entry(&block_root) + .map(|(_, v)| v) + .unwrap_or_else(|| PendingComponents::empty(block_root)); + + // Merge in the data columns. + pending_components.merge_data_columns(kzg_verified_data_columns); + + let block_import_requirement = self.block_import_requirement(epoch)?; + if pending_components.is_available(&block_import_requirement) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -322,6 +436,7 @@ impl DataAvailabilityCheckerInner { ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); let block_root = executed_block.import_data.block_root; + let epoch = executed_block.block.epoch(); // register the block to get the diet block let diet_executed_block = self @@ -338,7 +453,8 @@ impl DataAvailabilityCheckerInner { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - if pending_components.is_available() { + let block_import_requirement = self.block_import_requirement(epoch)?; + if pending_components.is_available(&block_import_requirement) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore drop(write_lock); @@ -401,6 +517,7 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; + const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8; fn get_store_with_spec( db_path: &TempDir, @@ -588,8 +705,13 @@ mod test { let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let cache = Arc::new( - DataAvailabilityCheckerInner::::new(capacity_non_zero, test_store, spec.clone()) - .expect("should create cache"), + DataAvailabilityCheckerInner::::new( + capacity_non_zero, + test_store, + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, + spec.clone(), + ) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -603,6 +725,7 @@ mod test { let (pending_block, blobs) = availability_pending_block(&harness).await; let root = pending_block.import_data.block_root; + let epoch = pending_block.block.epoch(); let blobs_expected = pending_block.num_blobs_expected(); assert_eq!( @@ -651,7 +774,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -673,11 +796,12 @@ mod test { "should have expected number of blobs" ); let root = pending_block.import_data.block_root; + let epoch = pending_block.block.epoch(); let mut kzg_verified_blobs = vec![]; for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) .expect("should put blob"); assert_eq!( availability, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs new file mode 100644 index 00000000000..da848ccc470 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -0,0 +1,19 @@ +use derivative::Derivative; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar}; +use types::EthSpec; + +/// Data column that we must custody and has completed kzg verification +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedCustodyDataColumn { + data: Arc>, +} + +impl KzgVerifiedCustodyDataColumn { + pub fn index(&self) -> ColumnIndex { + self.data.index + } +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 466ab0b67e7..0fd000ff002 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -19,6 +19,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; +mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; pub mod electra_readiness; diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 1f60f429db5..6b32523c35f 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -1,7 +1,7 @@ use crate::test_utils::TestRandom; use crate::{ beacon_block_body::BLOB_KZG_COMMITMENTS_INDEX, BeaconBlockHeader, BeaconStateError, Blob, - EthSpec, FixedVector, Hash256, SignedBeaconBlockHeader, Slot, VariableList, + Epoch, EthSpec, FixedVector, Hash256, SignedBeaconBlockHeader, Slot, VariableList, }; use crate::{KzgProofs, SignedBeaconBlock}; use bls::Signature; @@ -160,6 +160,13 @@ impl BlobSidecar { self.signed_block_header.message.slot } + pub fn epoch(&self) -> Epoch { + self.signed_block_header + .message + .slot + .epoch(E::slots_per_epoch()) + } + pub fn block_root(&self) -> Hash256 { self.signed_block_header.message.tree_hash_root() }