From eb19deb75115a5f6ea040ac7e2294b480235956c Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Thu, 11 Jul 2024 16:33:40 +1000 Subject: [PATCH 1/4] Update `is_available` check to support PeerDAS. --- .../src/data_availability_checker.rs | 13 ++- .../src/data_availability_checker/error.rs | 2 + .../overflow_lru_cache.rs | 87 ++++++++++++++++--- .../src/data_column_verification.rs | 13 +++ beacon_node/beacon_chain/src/lib.rs | 1 + consensus/types/src/chain_spec.rs | 32 +++++++ 6 files changed, 134 insertions(+), 14 deletions(-) create mode 100644 beacon_node/beacon_chain/src/data_column_verification.rs diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index e0347d81c39..f4d695540e1 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -74,7 +74,18 @@ impl DataAvailabilityChecker { log: &Logger, spec: ChainSpec, ) -> Result { - let overflow_cache = OverflowLRUCache::new(OVERFLOW_LRU_CAPACITY, store, spec.clone())?; + // TODO(das): support supernode or custom custody requirement + let custody_subnet_count = spec.custody_requirement as usize; + let custody_column_count = + custody_subnet_count.saturating_mul(spec.data_columns_per_subnet()); + + let overflow_cache = OverflowLRUCache::new( + OVERFLOW_LRU_CAPACITY, + store, + custody_column_count, + spec.clone(), + )?; + Ok(Self { availability_cache: Arc::new(overflow_cache), slot_clock, diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index d22f6b2cc9f..bb92b0b6322 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -10,6 +10,7 @@ pub enum Error { blob_commitment: KzgCommitment, block_commitment: KzgCommitment, }, + UnableToDetermineImportRequirement, Unexpected, SszTypes(ssz_types::Error), MissingBlobs, @@ -41,6 +42,7 @@ impl Error { | Error::Unexpected | Error::ParentStateMissing(_) | Error::BlockReplayError(_) + | Error::UnableToDetermineImportRequirement | Error::RebuildingStateCaches(_) | Error::SlotClockError => ErrorCategory::Internal, Error::Kzg(_) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index adc1a1e202c..3ef9010ebf0 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -34,6 +34,7 @@ use crate::block_verification_types::{ AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock, }; use crate::data_availability_checker::{Availability, AvailabilityCheckError}; +use crate::data_column_verification::KzgVerifiedCustodyDataColumn; use crate::store::{DBColumn, KeyValueStore}; use crate::BeaconChainTypes; use lru::LruCache; @@ -54,9 +55,15 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock}; pub struct PendingComponents { pub block_root: Hash256, pub verified_blobs: FixedVector>, E::MaxBlobsPerBlock>, + pub verified_data_columns: Vec>, pub executed_block: Option>, } +pub enum BlockImportRequirement { + AllBlobs, + CustodyColumns(usize), +} + impl PendingComponents { /// Returns an immutable reference to the cached block. pub fn get_cached_block(&self) -> &Option> { @@ -109,6 +116,11 @@ impl PendingComponents { self.get_cached_blobs().iter().flatten().count() } + /// Returns the number of data columns that have been received and are stored in the cache. + pub fn num_received_data_columns(&self) -> usize { + self.verified_data_columns.len() + } + /// Inserts a block into the cache. pub fn insert_block(&mut self, block: DietAvailabilityPendingExecutedBlock) { *self.get_cached_block_mut() = Some(block) @@ -165,15 +177,29 @@ impl PendingComponents { self.merge_blobs(reinsert); } - /// Checks if the block and all of its expected blobs are available in the cache. + /// Checks if the block and all of its expected blobs or custody columns (post-PeerDAS) are + /// available in the cache. /// - /// Returns `true` if both the block exists and the number of received blobs matches the number - /// of expected blobs. - pub fn is_available(&self) -> bool { - if let Some(num_expected_blobs) = self.num_expected_blobs() { - num_expected_blobs == self.num_received_blobs() - } else { - false + /// Returns `true` if both the block exists and the number of received blobs / custody columns + /// matches the number of expected blobs / custody columns. + pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool { + match block_import_requirement { + BlockImportRequirement::AllBlobs => { + if let Some(num_expected_blobs) = self.num_expected_blobs() { + num_expected_blobs == self.num_received_blobs() + } else { + false + } + } + BlockImportRequirement::CustodyColumns(num_expected_columns) => { + let num_received_data_columns = self.num_received_data_columns(); + if let Some(num_expected_blobs) = self.num_expected_blobs() { + // No data columns when there are 0 blobs + num_expected_blobs == 0 || *num_expected_columns == num_received_data_columns + } else { + false + } + } } } @@ -182,6 +208,7 @@ impl PendingComponents { Self { block_root, verified_blobs: FixedVector::default(), + verified_data_columns: vec![], executed_block: None, } } @@ -201,6 +228,7 @@ impl PendingComponents { let Self { block_root, verified_blobs, + verified_data_columns: _, executed_block, } = self; @@ -538,12 +566,16 @@ pub struct OverflowLRUCache { maintenance_lock: Mutex<()>, /// The capacity of the LRU cache capacity: NonZeroUsize, + /// The number of data columns the node is custodying. + custody_column_count: usize, + spec: ChainSpec, } impl OverflowLRUCache { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, + custody_column_count: usize, spec: ChainSpec, ) -> Result { let overflow_store = OverflowStore(beacon_store.clone()); @@ -552,9 +584,11 @@ impl OverflowLRUCache { Ok(Self { critical: RwLock::new(critical), overflow_store, - state_cache: StateLRUCache::new(beacon_store, spec), + state_cache: StateLRUCache::new(beacon_store, spec.clone()), maintenance_lock: Mutex::new(()), capacity, + custody_column_count, + spec, }) } @@ -598,6 +632,24 @@ impl OverflowLRUCache { f(self.critical.read().peek_pending_components(block_root)) } + fn block_import_requirement( + &self, + pending_components: &PendingComponents, + ) -> Result { + let epoch = pending_components + .epoch() + .ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?; + + let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch); + if peer_das_enabled { + Ok(BlockImportRequirement::CustodyColumns( + self.custody_column_count, + )) + } else { + Ok(BlockImportRequirement::AllBlobs) + } + } + pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, @@ -621,7 +673,8 @@ impl OverflowLRUCache { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - if pending_components.is_available() { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(&block_import_requirement) { write_lock.put_pending_components( block_root, pending_components.clone(), @@ -665,7 +718,8 @@ impl OverflowLRUCache { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - if pending_components.is_available() { + let block_import_requirement = self.block_import_requirement(&pending_components)?; + if pending_components.is_available(&block_import_requirement) { write_lock.put_pending_components( block_root, pending_components.clone(), @@ -970,6 +1024,7 @@ mod test { use types::{ExecPayload, MinimalEthSpec}; const LOW_VALIDATOR_COUNT: usize = 32; + const DEFAULT_TEST_CUSTODY_COLUMN_COUNT: usize = 8; fn get_store_with_spec( db_path: &TempDir, @@ -1190,8 +1245,13 @@ mod test { let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); let cache = Arc::new( - OverflowLRUCache::::new(capacity_non_zero, test_store, spec.clone()) - .expect("should create cache"), + OverflowLRUCache::::new( + capacity_non_zero, + test_store, + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, + spec.clone(), + ) + .expect("should create cache"), ); (harness, cache, chain_db_path) } @@ -1709,6 +1769,7 @@ mod test { let recovered_cache = OverflowLRUCache::::new( new_non_zero_usize(capacity), harness.chain.store.clone(), + DEFAULT_TEST_CUSTODY_COLUMN_COUNT, harness.chain.spec.clone(), ) .expect("should recover cache"); diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs new file mode 100644 index 00000000000..014b8b49968 --- /dev/null +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -0,0 +1,13 @@ +use derivative::Derivative; +use ssz_derive::{Decode, Encode}; +use std::sync::Arc; +use types::data_column_sidecar::DataColumnSidecar; +use types::EthSpec; + +/// Data column that we must custody and has completed kzg verification +#[derive(Debug, Derivative, Clone, Encode, Decode)] +#[derivative(PartialEq, Eq)] +#[ssz(struct_behaviour = "transparent")] +pub struct KzgVerifiedCustodyDataColumn { + data: Arc>, +} diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index 466ab0b67e7..0fd000ff002 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -19,6 +19,7 @@ pub mod canonical_head; pub mod capella_readiness; pub mod chain_config; pub mod data_availability_checker; +mod data_column_verification; pub mod deneb_readiness; mod early_attester_cache; pub mod electra_readiness; diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 0f61f74efe1..11f08719d75 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -193,6 +193,9 @@ pub struct ChainSpec { /* * DAS params */ + pub eip7594_fork_epoch: Option, + pub custody_requirement: u64, + pub data_column_sidecar_subnet_count: u64, pub number_of_columns: usize, /* @@ -392,6 +395,13 @@ impl ChainSpec { } } + /// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`. + pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { + self.eip7594_fork_epoch.map_or(false, |eip7594_fork_epoch| { + block_epoch >= eip7594_fork_epoch + }) + } + /// For a given `BeaconState`, return the whistleblower reward quotient associated with its variant. pub fn whistleblower_reward_quotient_for_state( &self, @@ -587,6 +597,12 @@ impl ChainSpec { } } + pub fn data_columns_per_subnet(&self) -> usize { + self.number_of_columns + .safe_div(self.data_column_sidecar_subnet_count as usize) + .expect("Subnet count must be greater than 0") + } + /// Returns a `ChainSpec` compatible with the Ethereum Foundation specification. pub fn mainnet() -> Self { Self { @@ -777,6 +793,12 @@ impl ChainSpec { }) .expect("calculation does not overflow"), + /* + * DAS params + */ + eip7594_fork_epoch: None, + custody_requirement: 1, + data_column_sidecar_subnet_count: 32, number_of_columns: 128, /* @@ -880,6 +902,10 @@ impl ChainSpec { electra_fork_epoch: None, max_pending_partials_per_withdrawals_sweep: u64::checked_pow(2, 0) .expect("pow does not overflow"), + /* + * DAS params + */ + eip7594_fork_epoch: None, // Other network_id: 2, // lighthouse testnet network id deposit_chain_id: 5, @@ -1081,6 +1107,12 @@ impl ChainSpec { }) .expect("calculation does not overflow"), + /* + * DAS params + */ + eip7594_fork_epoch: None, + custody_requirement: 1, + data_column_sidecar_subnet_count: 32, number_of_columns: 128, /* From 9a83a7c4f8e59f8c5ba51755e32f0683cb695900 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Fri, 19 Jul 2024 14:58:57 +1000 Subject: [PATCH 2/4] Simplify code using `map_or` --- .../overflow_lru_cache.rs | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 52973c9364e..f94ba5ecbaa 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -153,21 +153,19 @@ impl PendingComponents { /// matches the number of expected blobs / custody columns. pub fn is_available(&self, block_import_requirement: &BlockImportRequirement) -> bool { match block_import_requirement { - BlockImportRequirement::AllBlobs => { - if let Some(num_expected_blobs) = self.num_expected_blobs() { + BlockImportRequirement::AllBlobs => self + .num_expected_blobs() + .map_or(false, |num_expected_blobs| { num_expected_blobs == self.num_received_blobs() - } else { - false - } - } + }), BlockImportRequirement::CustodyColumns(num_expected_columns) => { let num_received_data_columns = self.num_received_data_columns(); - if let Some(num_expected_blobs) = self.num_expected_blobs() { - // No data columns when there are 0 blobs - num_expected_blobs == 0 || *num_expected_columns == num_received_data_columns - } else { - false - } + // No data columns when there are 0 blobs + self.num_expected_blobs() + .map_or(false, |num_expected_blobs| { + num_expected_blobs == 0 + || *num_expected_columns == num_received_data_columns + }) } } } From 379f40a345a4276f836f9bc24ef8e75b68b54056 Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 22 Jul 2024 17:59:11 +1000 Subject: [PATCH 3/4] Remove `epoch` method from `PendingComponents` --- beacon_node/beacon_chain/src/beacon_chain.rs | 3 +- .../beacon_chain/src/blob_verification.rs | 7 ++- .../src/data_availability_checker.rs | 10 ++-- .../overflow_lru_cache.rs | 47 ++++--------------- consensus/types/src/blob_sidecar.rs | 9 +++- 5 files changed, 32 insertions(+), 44 deletions(-) diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 19ee3d116c1..7ebd011cf46 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -3288,9 +3288,10 @@ impl BeaconChain { } } } + let epoch = slot.epoch(T::EthSpec::slots_per_epoch()); let availability = self .data_availability_checker - .put_rpc_blobs(block_root, blobs)?; + .put_rpc_blobs(block_root, epoch, blobs)?; self.process_availability(slot, availability).await } diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index 2b62a83194b..bba58675933 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -16,7 +16,9 @@ use ssz_types::VariableList; use std::time::Duration; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; -use types::{BeaconStateError, BlobSidecar, EthSpec, Hash256, SignedBeaconBlockHeader, Slot}; +use types::{ + BeaconStateError, BlobSidecar, Epoch, EthSpec, Hash256, SignedBeaconBlockHeader, Slot, +}; /// An error occurred while validating a gossip blob. #[derive(Debug)] @@ -223,6 +225,9 @@ impl GossipVerifiedBlob { pub fn slot(&self) -> Slot { self.blob.blob.slot() } + pub fn epoch(&self) -> Epoch { + self.blob.blob.epoch() + } pub fn index(&self) -> u64 { self.blob.blob.index } diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 5b334cb5bed..90d4b6081e4 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -152,6 +152,7 @@ impl DataAvailabilityChecker { pub fn put_rpc_blobs( &self, block_root: Hash256, + epoch: Epoch, blobs: FixedBlobSidecarList, ) -> Result, AvailabilityCheckError> { let Some(kzg) = self.kzg.as_ref() else { @@ -168,7 +169,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::Kzg)?; self.availability_cache - .put_kzg_verified_blobs(block_root, verified_blobs) + .put_kzg_verified_blobs(block_root, epoch, verified_blobs) } /// Check if we've cached other blobs for this block. If it completes a set and we also @@ -180,8 +181,11 @@ impl DataAvailabilityChecker { &self, gossip_blob: GossipVerifiedBlob, ) -> Result, AvailabilityCheckError> { - self.availability_cache - .put_kzg_verified_blobs(gossip_blob.block_root(), vec![gossip_blob.into_inner()]) + self.availability_cache.put_kzg_verified_blobs( + gossip_blob.block_root(), + gossip_blob.epoch(), + vec![gossip_blob.into_inner()], + ) } /// Check if we have all the blobs for a block. Returns `Availability` which has information diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 8adae5cca41..09e88beef0d 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -238,35 +238,6 @@ impl PendingComponents { AvailableExecutedBlock::new(available_block, import_data, payload_verification_outcome), ))) } - - /// Returns the epoch of the block if it is cached, otherwise returns the epoch of the first blob. - pub fn epoch(&self) -> Option { - self.executed_block - .as_ref() - .map(|pending_block| pending_block.as_block().epoch()) - .or_else(|| { - for maybe_blob in self.verified_blobs.iter() { - if maybe_blob.is_some() { - return maybe_blob.as_ref().map(|kzg_verified_blob| { - kzg_verified_blob - .as_blob() - .slot() - .epoch(E::slots_per_epoch()) - }); - } - } - - if let Some(kzg_verified_data_column) = self.verified_data_columns.first() { - let epoch = kzg_verified_data_column - .as_data_column() - .slot() - .epoch(E::slots_per_epoch()); - return Some(epoch); - } - - None - }) - } } /// This is the main struct for this module. Outside methods should @@ -340,12 +311,8 @@ impl DataAvailabilityCheckerInner { fn block_import_requirement( &self, - pending_components: &PendingComponents, + epoch: Epoch, ) -> Result { - let epoch = pending_components - .epoch() - .ok_or(AvailabilityCheckError::UnableToDetermineImportRequirement)?; - let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch); if peer_das_enabled { Ok(BlockImportRequirement::CustodyColumns( @@ -359,6 +326,7 @@ impl DataAvailabilityCheckerInner { pub fn put_kzg_verified_blobs>>( &self, block_root: Hash256, + epoch: Epoch, kzg_verified_blobs: I, ) -> Result, AvailabilityCheckError> { let mut fixed_blobs = FixedVector::default(); @@ -380,7 +348,7 @@ impl DataAvailabilityCheckerInner { // Merge in the blobs. pending_components.merge_blobs(fixed_blobs); - let block_import_requirement = self.block_import_requirement(&pending_components)?; + let block_import_requirement = self.block_import_requirement(epoch)?; if pending_components.is_available(&block_import_requirement) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore @@ -402,6 +370,7 @@ impl DataAvailabilityCheckerInner { ) -> Result, AvailabilityCheckError> { let mut write_lock = self.critical.write(); let block_root = executed_block.import_data.block_root; + let epoch = executed_block.block.epoch(); // register the block to get the diet block let diet_executed_block = self @@ -418,7 +387,7 @@ impl DataAvailabilityCheckerInner { pending_components.merge_block(diet_executed_block); // Check if we have all components and entire set is consistent. - let block_import_requirement = self.block_import_requirement(&pending_components)?; + let block_import_requirement = self.block_import_requirement(epoch)?; if pending_components.is_available(&block_import_requirement) { write_lock.put(block_root, pending_components.clone()); // No need to hold the write lock anymore @@ -690,6 +659,7 @@ mod test { let (pending_block, blobs) = availability_pending_block(&harness).await; let root = pending_block.import_data.block_root; + let epoch = pending_block.block.epoch(); let blobs_expected = pending_block.num_blobs_expected(); assert_eq!( @@ -738,7 +708,7 @@ mod test { for (blob_index, gossip_blob) in blobs.into_iter().enumerate() { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) .expect("should put blob"); if blob_index == blobs_expected - 1 { assert!(matches!(availability, Availability::Available(_))); @@ -760,11 +730,12 @@ mod test { "should have expected number of blobs" ); let root = pending_block.import_data.block_root; + let epoch = pending_block.block.epoch(); let mut kzg_verified_blobs = vec![]; for gossip_blob in blobs { kzg_verified_blobs.push(gossip_blob.into_inner()); let availability = cache - .put_kzg_verified_blobs(root, kzg_verified_blobs.clone()) + .put_kzg_verified_blobs(root, epoch, kzg_verified_blobs.clone()) .expect("should put blob"); assert_eq!( availability, diff --git a/consensus/types/src/blob_sidecar.rs b/consensus/types/src/blob_sidecar.rs index 1f60f429db5..6b32523c35f 100644 --- a/consensus/types/src/blob_sidecar.rs +++ b/consensus/types/src/blob_sidecar.rs @@ -1,7 +1,7 @@ use crate::test_utils::TestRandom; use crate::{ beacon_block_body::BLOB_KZG_COMMITMENTS_INDEX, BeaconBlockHeader, BeaconStateError, Blob, - EthSpec, FixedVector, Hash256, SignedBeaconBlockHeader, Slot, VariableList, + Epoch, EthSpec, FixedVector, Hash256, SignedBeaconBlockHeader, Slot, VariableList, }; use crate::{KzgProofs, SignedBeaconBlock}; use bls::Signature; @@ -160,6 +160,13 @@ impl BlobSidecar { self.signed_block_header.message.slot } + pub fn epoch(&self) -> Epoch { + self.signed_block_header + .message + .slot + .epoch(E::slots_per_epoch()) + } + pub fn block_root(&self) -> Hash256 { self.signed_block_header.message.tree_hash_root() } From b2befe35b3a08b0d80a7783879e20e0adf8f51da Mon Sep 17 00:00:00 2001 From: Jimmy Chen Date: Mon, 22 Jul 2024 18:16:10 +1000 Subject: [PATCH 4/4] Add `put_kzg_verified_data_columns` method. --- .../overflow_lru_cache.rs | 66 +++++++++++++++++++ .../src/data_column_verification.rs | 6 +- 2 files changed, 69 insertions(+), 3 deletions(-) diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index 09e88beef0d..dfe369cc47f 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -46,6 +46,16 @@ impl PendingComponents { &self.verified_blobs } + /// Returns an immutable reference to the cached data column. + pub fn get_cached_data_column( + &self, + data_column_index: u64, + ) -> Option<&KzgVerifiedCustodyDataColumn> { + self.verified_data_columns + .iter() + .find(|d| d.index() == data_column_index) + } + /// Returns a mutable reference to the cached block. pub fn get_cached_block_mut(&mut self) -> &mut Option> { &mut self.executed_block @@ -85,6 +95,15 @@ impl PendingComponents { self.get_cached_blobs().iter().flatten().count() } + /// Checks if a data column of a given index exists in the cache. + /// + /// Returns: + /// - `true` if a data column for the given index exists. + /// - `false` otherwise. + fn data_column_exists(&self, data_column_index: u64) -> bool { + self.get_cached_data_column(data_column_index).is_some() + } + /// Returns the number of data columns that have been received and are stored in the cache. pub fn num_received_data_columns(&self) -> usize { self.verified_data_columns.len() @@ -137,6 +156,18 @@ impl PendingComponents { } } + /// Merges a given set of data columns into the cache. + fn merge_data_columns>>( + &mut self, + kzg_verified_data_columns: I, + ) { + for data_column in kzg_verified_data_columns { + if !self.data_column_exists(data_column.index()) { + self.verified_data_columns.push(data_column); + } + } + } + /// Inserts a new block and revalidates the existing blobs against it. /// /// Blobs that don't match the new block's commitments are evicted. @@ -362,6 +393,41 @@ impl DataAvailabilityCheckerInner { } } + // TODO(das): gossip and rpc code paths to be implemented. + #[allow(dead_code)] + pub fn put_kzg_verified_data_columns< + I: IntoIterator>, + >( + &self, + block_root: Hash256, + epoch: Epoch, + kzg_verified_data_columns: I, + ) -> Result, AvailabilityCheckError> { + let mut write_lock = self.critical.write(); + + // Grab existing entry or create a new entry. + let mut pending_components = write_lock + .pop_entry(&block_root) + .map(|(_, v)| v) + .unwrap_or_else(|| PendingComponents::empty(block_root)); + + // Merge in the data columns. + pending_components.merge_data_columns(kzg_verified_data_columns); + + let block_import_requirement = self.block_import_requirement(epoch)?; + if pending_components.is_available(&block_import_requirement) { + write_lock.put(block_root, pending_components.clone()); + // No need to hold the write lock anymore + drop(write_lock); + pending_components.make_available(|diet_block| { + self.state_cache.recover_pending_executed_block(diet_block) + }) + } else { + write_lock.put(block_root, pending_components); + Ok(Availability::MissingComponents(block_root)) + } + } + /// Check if we have all the blobs for a block. If we do, return the Availability variant that /// triggers import of the block. pub fn put_pending_executed_block( diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 457a62dbd15..da848ccc470 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -1,7 +1,7 @@ use derivative::Derivative; use ssz_derive::{Decode, Encode}; use std::sync::Arc; -use types::data_column_sidecar::DataColumnSidecar; +use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar}; use types::EthSpec; /// Data column that we must custody and has completed kzg verification @@ -13,7 +13,7 @@ pub struct KzgVerifiedCustodyDataColumn { } impl KzgVerifiedCustodyDataColumn { - pub fn as_data_column(&self) -> &DataColumnSidecar { - &self.data + pub fn index(&self) -> ColumnIndex { + self.data.index } }