diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index d0c294b44ff..ca21b519f15 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -34,6 +34,7 @@ use crate::execution_payload::{get_execution_payload, NotifyExecutionLayer, Prep use crate::fork_choice_signal::{ForkChoiceSignalRx, ForkChoiceSignalTx, ForkChoiceWaitResult}; use crate::graffiti_calculator::GraffitiCalculator; use crate::head_tracker::{HeadTracker, HeadTrackerReader, SszHeadTracker}; +use crate::kzg_utils::reconstruct_blobs; use crate::light_client_finality_update_verification::{ Error as LightClientFinalityUpdateError, VerifiedLightClientFinalityUpdate, }; @@ -1249,6 +1250,55 @@ impl BeaconChain { self.store.get_blobs(block_root).map_err(Error::from) } + /// Returns the data columns at the given root, if any. + /// + /// ## Errors + /// May return a database error. + pub fn get_data_columns( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + self.store.get_data_columns(block_root).map_err(Error::from) + } + + /// Returns the blobs at the given root, if any. + /// + /// Uses the `block.epoch()` to determine whether to retrieve blobs or columns from the store. + /// + /// If at least 50% of columns are retrieved, blobs will be reconstructed and returned, + /// otherwise an error `InsufficientColumnsToReconstructBlobs` is returned. + /// + /// ## Errors + /// May return a database error. + pub fn get_or_reconstruct_blobs( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + let Some(block) = self.store.get_blinded_block(block_root)? else { + return Ok(None); + }; + + if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + if let Some(columns) = self.store.get_data_columns(block_root)? { + let num_required_columns = self.spec.number_of_columns / 2; + let reconstruction_possible = columns.len() >= num_required_columns as usize; + if reconstruction_possible { + reconstruct_blobs(&self.kzg, &columns, None, &block, &self.spec) + .map(Some) + .map_err(Error::FailedToReconstructBlobs) + } else { + Err(Error::InsufficientColumnsToReconstructBlobs { + columns_found: columns.len(), + }) + } + } else { + Ok(None) + } + } else { + self.get_blobs(block_root).map(|b| b.blobs()) + } + } + /// Returns the data columns at the given root, if any. /// /// ## Errors @@ -5850,6 +5900,7 @@ impl BeaconChain { let kzg = self.kzg.as_ref(); + // TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing. kzg_utils::validate_blobs::( kzg, expected_kzg_commitments, diff --git a/beacon_node/beacon_chain/src/data_column_verification.rs b/beacon_node/beacon_chain/src/data_column_verification.rs index 1bd17485ab6..565e76704ef 100644 --- a/beacon_node/beacon_chain/src/data_column_verification.rs +++ b/beacon_node/beacon_chain/src/data_column_verification.rs @@ -699,7 +699,7 @@ mod test { #[tokio::test] async fn empty_data_column_sidecars_fails_validation() { - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::Fulu.make_genesis_spec(E::default_spec()); let harness = BeaconChainHarness::builder(E::default()) .spec(spec.into()) .deterministic_keypairs(64) diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 2a8fd4cd015..2e13ab4090e 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -226,6 +226,10 @@ pub enum BeaconChainError { EmptyRpcCustodyColumns, AttestationError(AttestationError), AttestationCommitteeIndexNotSet, + InsufficientColumnsToReconstructBlobs { + columns_found: usize, + }, + FailedToReconstructBlobs(String), } easy_from_to!(SlotProcessingError, BeaconChainError); diff --git a/beacon_node/beacon_chain/src/fulu_readiness.rs b/beacon_node/beacon_chain/src/fulu_readiness.rs index 71494623f83..872fe58f2ba 100644 --- a/beacon_node/beacon_chain/src/fulu_readiness.rs +++ b/beacon_node/beacon_chain/src/fulu_readiness.rs @@ -1,7 +1,7 @@ //! Provides tools for checking if a node is ready for the Fulu upgrade. use crate::{BeaconChain, BeaconChainTypes}; -use execution_layer::http::{ENGINE_GET_PAYLOAD_V5, ENGINE_NEW_PAYLOAD_V5}; +use execution_layer::http::{ENGINE_GET_PAYLOAD_V4, ENGINE_NEW_PAYLOAD_V4}; use serde::{Deserialize, Serialize}; use std::fmt; use std::time::Duration; @@ -87,14 +87,15 @@ impl BeaconChain { Ok(capabilities) => { let mut missing_methods = String::from("Required Methods Unsupported:"); let mut all_good = true; - if !capabilities.get_payload_v5 { + // TODO(fulu) switch to v5 when the EL is ready + if !capabilities.get_payload_v4 { missing_methods.push(' '); - missing_methods.push_str(ENGINE_GET_PAYLOAD_V5); + missing_methods.push_str(ENGINE_GET_PAYLOAD_V4); all_good = false; } - if !capabilities.new_payload_v5 { + if !capabilities.new_payload_v4 { missing_methods.push(' '); - missing_methods.push_str(ENGINE_NEW_PAYLOAD_V5); + missing_methods.push_str(ENGINE_NEW_PAYLOAD_V4); all_good = false; } diff --git a/beacon_node/beacon_chain/src/kzg_utils.rs b/beacon_node/beacon_chain/src/kzg_utils.rs index dcb3864f785..06cce141444 100644 --- a/beacon_node/beacon_chain/src/kzg_utils.rs +++ b/beacon_node/beacon_chain/src/kzg_utils.rs @@ -186,7 +186,7 @@ pub fn blobs_to_data_column_sidecars( .map_err(DataColumnSidecarError::BuildSidecarFailed) } -fn build_data_column_sidecars( +pub(crate) fn build_data_column_sidecars( kzg_commitments: KzgCommitments, kzg_commitments_inclusion_proof: FixedVector, signed_block_header: SignedBeaconBlockHeader, diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index ba0a2159da1..e88ce71a7b7 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,8 +1,9 @@ +use crate::blob_verification::GossipVerifiedBlob; use crate::block_verification_types::{AsBlock, RpcBlock}; -use crate::kzg_utils::blobs_to_data_column_sidecars; +use crate::data_column_verification::CustodyDataColumn; +use crate::kzg_utils::build_data_column_sidecars; use crate::observed_operations::ObservationOutcome; pub use crate::persisted_beacon_chain::PersistedBeaconChain; -use crate::BeaconBlockResponseWrapper; pub use crate::{ beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY}, migrate::MigratorConfig, @@ -16,6 +17,7 @@ use crate::{ BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler, StateSkipConfig, }; +use crate::{get_block_root, BeaconBlockResponseWrapper}; use bls::get_withdrawal_credentials; use eth2::types::SignedBlockContentsTuple; use execution_layer::test_utils::generate_genesis_header; @@ -74,6 +76,11 @@ pub const FORK_NAME_ENV_VAR: &str = "FORK_NAME"; // Environment variable to read if `ci_logger` feature is enabled. pub const CI_LOGGER_DIR_ENV_VAR: &str = "CI_LOGGER_DIR"; +// Pre-computed data column sidecar using a single static blob from: +// `beacon_node/execution_layer/src/test_utils/fixtures/mainnet/test_blobs_bundle.ssz` +const TEST_DATA_COLUMN_SIDECARS_SSZ: &[u8] = + include_bytes!("test_utils/fixtures/test_data_column_sidecars.ssz"); + // Default target aggregators to set during testing, this ensures an aggregator at each slot. // // You should mutate the `ChainSpec` prior to initialising the harness if you would like to use @@ -105,7 +112,7 @@ static KZG_NO_PRECOMP: LazyLock> = LazyLock::new(|| { }); pub fn get_kzg(spec: &ChainSpec) -> Arc { - if spec.eip7594_fork_epoch.is_some() { + if spec.fulu_fork_epoch.is_some() { KZG_PEERDAS.clone() } else if spec.deneb_fork_epoch.is_some() { KZG.clone() @@ -224,6 +231,7 @@ pub struct Builder { mock_execution_layer: Option>, testing_slot_clock: Option, validator_monitor_config: Option, + import_all_data_columns: bool, runtime: TestRuntime, log: Logger, } @@ -366,6 +374,7 @@ where mock_execution_layer: None, testing_slot_clock: None, validator_monitor_config: None, + import_all_data_columns: false, runtime, log, } @@ -458,6 +467,11 @@ where self } + pub fn import_all_data_columns(mut self, import_all_data_columns: bool) -> Self { + self.import_all_data_columns = import_all_data_columns; + self + } + pub fn execution_layer_from_url(mut self, url: &str) -> Self { assert!( self.execution_layer.is_none(), @@ -575,6 +589,7 @@ where .expect("should build dummy backend") .shutdown_sender(shutdown_tx) .chain_config(chain_config) + .import_all_data_columns(self.import_all_data_columns) .event_handler(Some(ServerSentEventHandler::new_with_capacity( log.clone(), 5, @@ -762,15 +777,13 @@ where pub fn get_head_block(&self) -> RpcBlock { let block = self.chain.head_beacon_block(); let block_root = block.canonical_root(); - let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); - RpcBlock::new(Some(block_root), block, blobs).unwrap() + self.build_rpc_block_from_store_blobs(Some(block_root), block) } pub fn get_full_block(&self, block_root: &Hash256) -> RpcBlock { let block = self.chain.get_blinded_block(block_root).unwrap().unwrap(); let full_block = self.chain.store.make_full_block(block_root, block).unwrap(); - let blobs = self.chain.get_blobs(block_root).unwrap().blobs(); - RpcBlock::new(Some(*block_root), Arc::new(full_block), blobs).unwrap() + self.build_rpc_block_from_store_blobs(Some(*block_root), Arc::new(full_block)) } pub fn get_all_validators(&self) -> Vec { @@ -2271,22 +2284,19 @@ where self.set_current_slot(slot); let (block, blob_items) = block_contents; - let sidecars = blob_items - .map(|(proofs, blobs)| BlobSidecar::build_sidecars(blobs, &block, proofs, &self.spec)) - .transpose() - .unwrap(); + let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( block_root, - RpcBlock::new(Some(block_root), block, sidecars).unwrap(), + rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::RangeSync, || Ok(()), ) .await? .try_into() - .unwrap(); + .expect("block blobs are available"); self.chain.recompute_head_at_current_slot().await; Ok(block_hash) } @@ -2297,16 +2307,13 @@ where ) -> Result { let (block, blob_items) = block_contents; - let sidecars = blob_items - .map(|(proofs, blobs)| BlobSidecar::build_sidecars(blobs, &block, proofs, &self.spec)) - .transpose() - .unwrap(); let block_root = block.canonical_root(); + let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( block_root, - RpcBlock::new(Some(block_root), block, sidecars).unwrap(), + rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::RangeSync, || Ok(()), @@ -2318,6 +2325,75 @@ where Ok(block_hash) } + /// Builds an `Rpc` block from a `SignedBeaconBlock` and blobs or data columns retrieved from + /// the database. + pub fn build_rpc_block_from_store_blobs( + &self, + block_root: Option, + block: Arc>, + ) -> RpcBlock { + let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); + let has_blobs = block + .message() + .body() + .blob_kzg_commitments() + .is_ok_and(|c| !c.is_empty()); + if !has_blobs { + return RpcBlock::new_without_blobs(Some(block_root), block); + } + + // Blobs are stored as data columns from Fulu (PeerDAS) + if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap(); + let custody_columns = columns + .into_iter() + .map(CustodyDataColumn::from_asserted_custody) + .collect::>(); + RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, &self.spec) + .unwrap() + } else { + let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); + RpcBlock::new(Some(block_root), block, blobs).unwrap() + } + } + + /// Builds an `RpcBlock` from a `SignedBeaconBlock` and `BlobsList`. + fn build_rpc_block_from_blobs( + &self, + block_root: Hash256, + block: Arc>>, + blob_items: Option<(KzgProofs, BlobsList)>, + ) -> Result, BlockError> { + Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { + let sampling_column_count = self + .chain + .data_availability_checker + .get_sampling_column_count(); + + if blob_items.is_some_and(|(_, blobs)| !blobs.is_empty()) { + // Note: this method ignores the actual custody columns and just take the first + // `sampling_column_count` for testing purpose only, because the chain does not + // currently have any knowledge of the columns being custodied. + let columns = generate_data_column_sidecars_from_block(&block, &self.spec) + .into_iter() + .take(sampling_column_count) + .map(CustodyDataColumn::from_asserted_custody) + .collect::>(); + RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)? + } else { + RpcBlock::new_without_blobs(Some(block_root), block) + } + } else { + let blobs = blob_items + .map(|(proofs, blobs)| { + BlobSidecar::build_sidecars(blobs, &block, proofs, &self.spec) + }) + .transpose() + .unwrap(); + RpcBlock::new(Some(block_root), block, blobs)? + }) + } + pub fn process_attestations(&self, attestations: HarnessAttestations) { let num_validators = self.validator_keypairs.len(); let mut unaggregated = Vec::with_capacity(num_validators); @@ -2991,6 +3067,56 @@ where Ok(()) } + + /// Simulate some of the blobs / data columns being seen on gossip. + /// Converts the blobs to data columns if the slot is Fulu or later. + pub async fn process_gossip_blobs_or_columns<'a>( + &self, + block: &SignedBeaconBlock, + blobs: impl Iterator>, + proofs: impl Iterator, + custody_columns_opt: Option>, + ) { + let is_peerdas_enabled = self.chain.spec.is_peer_das_enabled_for_epoch(block.epoch()); + if is_peerdas_enabled { + let custody_columns = custody_columns_opt.unwrap_or_else(|| { + let sampling_column_count = self + .chain + .data_availability_checker + .get_sampling_column_count() as u64; + (0..sampling_column_count).collect() + }); + + let verified_columns = generate_data_column_sidecars_from_block(block, &self.spec) + .into_iter() + .filter(|c| custody_columns.contains(&c.index)) + .map(|sidecar| { + let column_index = sidecar.index; + self.chain + .verify_data_column_sidecar_for_gossip(sidecar, column_index) + }) + .collect::, _>>() + .unwrap(); + + if !verified_columns.is_empty() { + self.chain + .process_gossip_data_columns(verified_columns, || Ok(())) + .await + .unwrap(); + } + } else { + for (i, (kzg_proof, blob)) in proofs.into_iter().zip(blobs).enumerate() { + let sidecar = + Arc::new(BlobSidecar::new(i, blob.clone(), block, *kzg_proof).unwrap()); + let gossip_blob = GossipVerifiedBlob::new(sidecar, i as u64, &self.chain) + .expect("should obtain gossip verified blob"); + self.chain + .process_gossip_blob(gossip_blob) + .await + .expect("should import valid gossip verified blob"); + } + } + } } // Junk `Debug` impl to satistfy certain trait bounds during testing. @@ -3176,10 +3302,59 @@ pub fn generate_rand_block_and_data_columns( SignedBeaconBlock>, DataColumnSidecarList, ) { - let kzg = get_kzg(spec); - let (block, blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng, spec); - let blob_refs = blobs.iter().map(|b| &b.blob).collect::>(); - let data_columns = blobs_to_data_column_sidecars(&blob_refs, &block, &kzg, spec).unwrap(); - + let (block, _blobs) = generate_rand_block_and_blobs(fork_name, num_blobs, rng, spec); + let data_columns = generate_data_column_sidecars_from_block(&block, spec); (block, data_columns) } + +/// Generate data column sidecars from pre-computed cells and proofs. +fn generate_data_column_sidecars_from_block( + block: &SignedBeaconBlock, + spec: &ChainSpec, +) -> DataColumnSidecarList { + let kzg_commitments = block.message().body().blob_kzg_commitments().unwrap(); + if kzg_commitments.is_empty() { + return vec![]; + } + + let kzg_commitments_inclusion_proof = block + .message() + .body() + .kzg_commitments_merkle_proof() + .unwrap(); + let signed_block_header = block.signed_block_header(); + + // load the precomputed column sidecar to avoid computing them for every block in the tests. + let template_data_columns = RuntimeVariableList::>::from_ssz_bytes( + TEST_DATA_COLUMN_SIDECARS_SSZ, + spec.number_of_columns as usize, + ) + .unwrap(); + + let (cells, proofs) = template_data_columns + .into_iter() + .map(|sidecar| { + let DataColumnSidecar { + column, kzg_proofs, .. + } = sidecar; + // There's only one cell per column for a single blob + let cell_bytes: Vec = column.into_iter().next().unwrap().into(); + let kzg_cell = cell_bytes.try_into().unwrap(); + let kzg_proof = kzg_proofs.into_iter().next().unwrap(); + (kzg_cell, kzg_proof) + }) + .collect::<(Vec<_>, Vec<_>)>(); + + // Repeat the cells and proofs for every blob + let blob_cells_and_proofs_vec = + vec![(cells.try_into().unwrap(), proofs.try_into().unwrap()); kzg_commitments.len()]; + + build_data_column_sidecars( + kzg_commitments.clone(), + kzg_commitments_inclusion_proof, + signed_block_header, + blob_cells_and_proofs_vec, + spec, + ) + .unwrap() +} diff --git a/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars.ssz b/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars.ssz new file mode 100644 index 00000000000..112dd43b047 Binary files /dev/null and b/beacon_node/beacon_chain/src/test_utils/fixtures/test_data_column_sidecars.ssz differ diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index 60001159938..621475a3ece 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -1,7 +1,6 @@ #![cfg(not(debug_assertions))] use beacon_chain::attestation_simulator::produce_unaggregated_attestation; -use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; use beacon_chain::validator_monitor::UNAGGREGATED_ATTESTATION_LAG_SLOTS; use beacon_chain::{metrics, StateSkipConfig, WhenSlotSkipped}; @@ -155,7 +154,6 @@ async fn produces_attestations() { .store .make_full_block(&block_root, blinded_block) .unwrap(); - let blobs = chain.get_blobs(&block_root).unwrap().blobs(); let epoch_boundary_slot = state .current_epoch() @@ -223,8 +221,7 @@ async fn produces_attestations() { assert_eq!(data.target.root, target_root, "bad target root"); let rpc_block = - RpcBlock::::new(None, Arc::new(block.clone()), blobs.clone()) - .unwrap(); + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available( available_block, ) = chain @@ -296,14 +293,8 @@ async fn early_attester_cache_old_request() { .get_block(&head.beacon_block_root) .unwrap(); - let head_blobs = harness - .chain - .get_blobs(&head.beacon_block_root) - .expect("should get blobs") - .blobs(); - - let rpc_block = - RpcBlock::::new(None, head.beacon_block.clone(), head_blobs).unwrap(); + let rpc_block = harness + .build_rpc_block_from_store_blobs(Some(head.beacon_block_root), head.beacon_block.clone()); let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(available_block) = harness .chain diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 46f5befbba2..2a881b5b0f9 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,6 +1,7 @@ #![cfg(not(debug_assertions))] use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::{ test_utils::{ test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, @@ -34,7 +35,12 @@ const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGT static KEYPAIRS: LazyLock> = LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT)); -async fn get_chain_segment() -> (Vec>, Vec>>) { +enum DataSidecars { + Blobs(BlobSidecarList), + DataColumns(Vec>), +} + +async fn get_chain_segment() -> (Vec>, Vec>>) { let harness = get_harness(VALIDATOR_COUNT); harness @@ -46,7 +52,7 @@ async fn get_chain_segment() -> (Vec>, Vec (Vec>, Vec (Vec>, Vec>>) { - let harness = get_harness(VALIDATOR_COUNT); - - harness - .extend_chain( - CHAIN_SEGMENT_LENGTH, - BlockStrategy::OnCanonicalHead, - AttestationStrategy::AllValidators, - ) - .await; + .blobs() + .map(DataSidecars::Blobs) + }; - let mut segment = Vec::with_capacity(CHAIN_SEGMENT_LENGTH); - let mut segment_blobs = Vec::with_capacity(CHAIN_SEGMENT_LENGTH); - for snapshot in harness - .chain - .chain_dump() - .expect("should dump chain") - .into_iter() - .skip(1) - { - let full_block = harness - .chain - .get_block(&snapshot.beacon_block_root) - .await - .unwrap() - .unwrap(); - segment.push(BeaconSnapshot { - beacon_block_root: snapshot.beacon_block_root, - beacon_block: Arc::new(full_block), - beacon_state: snapshot.beacon_state, - }); - let blob_sidecars = harness - .chain - .get_blobs(&snapshot.beacon_block_root) - .unwrap() - .blobs(); - segment_blobs.push(blob_sidecars) + segment_sidecars.push(data_sidecars); } - (segment, segment_blobs) + (segment, segment_sidecars) } fn get_harness(validator_count: usize) -> BeaconChainHarness> { @@ -137,17 +119,35 @@ fn get_harness(validator_count: usize) -> BeaconChainHarness], - blobs: &[Option>], + chain_segment_sidecars: &[Option>], + spec: &ChainSpec, ) -> Vec> { chain_segment .iter() - .zip(blobs.iter()) - .map(|(snapshot, blobs)| { - RpcBlock::new(None, snapshot.beacon_block.clone(), blobs.clone()).unwrap() + .zip(chain_segment_sidecars.iter()) + .map(|(snapshot, data_sidecars)| { + let block = snapshot.beacon_block.clone(); + build_rpc_block(block, data_sidecars, spec) }) .collect() } +fn build_rpc_block( + block: Arc>, + data_sidecars: &Option>, + spec: &ChainSpec, +) -> RpcBlock { + match data_sidecars { + Some(DataSidecars::Blobs(blobs)) => { + RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + } + Some(DataSidecars::DataColumns(columns)) => { + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() + } + None => RpcBlock::new_without_blobs(None, block), + } +} + fn junk_signature() -> Signature { let kp = generate_deterministic_keypair(VALIDATOR_COUNT); let message = Hash256::from_slice(&[42; 32]); @@ -186,18 +186,22 @@ fn update_proposal_signatures( } } -fn update_parent_roots( - snapshots: &mut [BeaconSnapshot], - blobs: &mut [Option>], -) { +fn update_parent_roots(snapshots: &mut [BeaconSnapshot], blobs: &mut [Option>]) { for i in 0..snapshots.len() { let root = snapshots[i].beacon_block.canonical_root(); if let (Some(child), Some(child_blobs)) = (snapshots.get_mut(i + 1), blobs.get_mut(i + 1)) { let (mut block, signature) = child.beacon_block.as_ref().clone().deconstruct(); *block.parent_root_mut() = root; let new_child = Arc::new(SignedBeaconBlock::from_block(block, signature)); - if let Some(blobs) = child_blobs { - update_blob_signed_header(&new_child, blobs); + if let Some(data_sidecars) = child_blobs { + match data_sidecars { + DataSidecars::Blobs(blobs) => { + update_blob_signed_header(&new_child, blobs); + } + DataSidecars::DataColumns(columns) => { + update_data_column_signed_header(&new_child, columns); + } + } } child.beacon_block = new_child; } @@ -225,13 +229,36 @@ fn update_blob_signed_header( } } +fn update_data_column_signed_header( + signed_block: &SignedBeaconBlock, + data_columns: &mut Vec>, +) { + for old_custody_column_sidecar in data_columns.as_mut_slice() { + let old_column_sidecar = old_custody_column_sidecar.as_data_column(); + let new_column_sidecar = Arc::new(DataColumnSidecar:: { + index: old_column_sidecar.index, + column: old_column_sidecar.column.clone(), + kzg_commitments: old_column_sidecar.kzg_commitments.clone(), + kzg_proofs: old_column_sidecar.kzg_proofs.clone(), + signed_block_header: signed_block.signed_block_header(), + kzg_commitments_inclusion_proof: signed_block + .message() + .body() + .kzg_commitments_merkle_proof() + .unwrap(), + }); + *old_custody_column_sidecar = CustodyDataColumn::from_asserted_custody(new_column_sidecar); + } +} + #[tokio::test] async fn chain_segment_full_segment() { let harness = get_harness(VALIDATOR_COUNT); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, &harness.spec) + .into_iter() + .collect(); harness .chain @@ -267,9 +294,10 @@ async fn chain_segment_varying_chunk_size() { for chunk_size in &[1, 2, 3, 5, 31, 32, 33, 42] { let harness = get_harness(VALIDATOR_COUNT); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, &harness.spec) + .into_iter() + .collect(); harness .chain @@ -308,9 +336,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a block removed. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, &harness.spec) + .into_iter() + .collect(); blocks.remove(2); assert!( @@ -328,9 +357,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a modified parent root. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, &harness.spec) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); @@ -365,9 +395,10 @@ async fn chain_segment_non_linear_slots() { * Test where a child is lower than the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, &harness.spec) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); blocks[3] = RpcBlock::new_without_blobs( @@ -391,9 +422,10 @@ async fn chain_segment_non_linear_slots() { * Test where a child is equal to the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, &harness.spec) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); blocks[3] = RpcBlock::new_without_blobs( @@ -416,7 +448,7 @@ async fn chain_segment_non_linear_slots() { async fn assert_invalid_signature( chain_segment: &[BeaconSnapshot], - chain_segment_blobs: &[Option>], + chain_segment_blobs: &[Option>], harness: &BeaconChainHarness>, block_index: usize, snapshots: &[BeaconSnapshot], @@ -426,7 +458,7 @@ async fn assert_invalid_signature( .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - RpcBlock::new(None, snapshot.beacon_block.clone(), blobs.clone()).unwrap() + build_rpc_block(snapshot.beacon_block.clone(), blobs, &harness.spec) }) .collect(); @@ -453,7 +485,7 @@ async fn assert_invalid_signature( .take(block_index) .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - RpcBlock::new(None, snapshot.beacon_block.clone(), blobs.clone()).unwrap() + build_rpc_block(snapshot.beacon_block.clone(), blobs, &harness.spec) }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been @@ -468,12 +500,11 @@ async fn assert_invalid_signature( .chain .process_block( snapshots[block_index].beacon_block.canonical_root(), - RpcBlock::new( - None, + build_rpc_block( snapshots[block_index].beacon_block.clone(), - chain_segment_blobs[block_index].clone(), - ) - .unwrap(), + &chain_segment_blobs[block_index], + &harness.spec, + ), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -531,7 +562,7 @@ async fn invalid_signature_gossip_block() { .take(block_index) .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - RpcBlock::new(None, snapshot.beacon_block.clone(), blobs.clone()).unwrap() + build_rpc_block(snapshot.beacon_block.clone(), blobs, &harness.spec) }) .collect(); harness @@ -583,7 +614,7 @@ async fn invalid_signature_block_proposal() { .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - RpcBlock::new(None, snapshot.beacon_block.clone(), blobs.clone()).unwrap() + build_rpc_block(snapshot.beacon_block.clone(), blobs, &harness.spec) }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. @@ -891,7 +922,7 @@ async fn invalid_signature_deposit() { .iter() .zip(chain_segment_blobs.iter()) .map(|(snapshot, blobs)| { - RpcBlock::new(None, snapshot.beacon_block.clone(), blobs.clone()).unwrap() + build_rpc_block(snapshot.beacon_block.clone(), blobs, &harness.spec) }) .collect(); assert!( @@ -957,7 +988,7 @@ fn unwrap_err(result: Result) -> U { #[tokio::test] async fn block_gossip_verification() { let harness = get_harness(VALIDATOR_COUNT); - let (chain_segment, chain_segment_blobs) = get_chain_segment_with_blob_sidecars().await; + let (chain_segment, chain_segment_blobs) = get_chain_segment().await; let block_index = CHAIN_SEGMENT_LENGTH - 2; @@ -969,7 +1000,7 @@ async fn block_gossip_verification() { // Import the ancestors prior to the block we're testing. for (snapshot, blobs_opt) in chain_segment[0..block_index] .iter() - .zip(chain_segment_blobs.iter()) + .zip(chain_segment_blobs.into_iter()) { let gossip_verified = harness .chain @@ -988,20 +1019,8 @@ async fn block_gossip_verification() { ) .await .expect("should import valid gossip verified block"); - if let Some(blob_sidecars) = blobs_opt { - for blob_sidecar in blob_sidecars { - let blob_index = blob_sidecar.index; - let gossip_verified = harness - .chain - .verify_blob_sidecar_for_gossip(blob_sidecar.clone(), blob_index) - .expect("should obtain gossip verified blob"); - - harness - .chain - .process_gossip_blob(gossip_verified) - .await - .expect("should import valid gossip verified blob"); - } + if let Some(data_sidecars) = blobs_opt { + verify_and_process_gossip_data_sidecars(&harness, data_sidecars).await; } } @@ -1229,6 +1248,51 @@ async fn block_gossip_verification() { ); } +async fn verify_and_process_gossip_data_sidecars( + harness: &BeaconChainHarness>, + data_sidecars: DataSidecars, +) { + match data_sidecars { + DataSidecars::Blobs(blob_sidecars) => { + for blob_sidecar in blob_sidecars { + let blob_index = blob_sidecar.index; + let gossip_verified = harness + .chain + .verify_blob_sidecar_for_gossip(blob_sidecar.clone(), blob_index) + .expect("should obtain gossip verified blob"); + + harness + .chain + .process_gossip_blob(gossip_verified) + .await + .expect("should import valid gossip verified blob"); + } + } + DataSidecars::DataColumns(column_sidecars) => { + let gossip_verified = column_sidecars + .into_iter() + .map(|column_sidecar| { + let subnet_id = DataColumnSubnetId::from_column_index( + column_sidecar.index(), + &harness.spec, + ); + harness.chain.verify_data_column_sidecar_for_gossip( + column_sidecar.into_inner(), + *subnet_id, + ) + }) + .collect::, _>>() + .expect("should obtain gossip verified columns"); + + harness + .chain + .process_gossip_data_columns(gossip_verified, || Ok(())) + .await + .expect("should import valid gossip verified columns"); + } + } +} + #[tokio::test] async fn verify_block_for_gossip_slashing_detection() { let slasher_dir = tempdir().unwrap(); @@ -1259,20 +1323,14 @@ async fn verify_block_for_gossip_slashing_detection() { let verified_block = harness.chain.verify_block_for_gossip(block1).await.unwrap(); if let Some((kzg_proofs, blobs)) = blobs1 { - let sidecars = - BlobSidecar::build_sidecars(blobs, verified_block.block(), kzg_proofs, &spec).unwrap(); - for sidecar in sidecars { - let blob_index = sidecar.index; - let verified_blob = harness - .chain - .verify_blob_sidecar_for_gossip(sidecar, blob_index) - .unwrap(); - harness - .chain - .process_gossip_blob(verified_blob) - .await - .unwrap(); - } + harness + .process_gossip_blobs_or_columns( + verified_block.block(), + blobs.iter(), + kzg_proofs.iter(), + None, + ) + .await; } harness .chain diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index d1a38b1cdec..8654b33646d 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -1,7 +1,6 @@ #![cfg(not(debug_assertions))] use beacon_chain::attestation_verification::Error as AttnError; -use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::builder::BeaconChainBuilder; use beacon_chain::data_availability_checker::AvailableBlock; use beacon_chain::schema_change::migrate_schema; @@ -82,13 +81,26 @@ fn get_harness( reconstruct_historic_states: true, ..ChainConfig::default() }; - get_harness_generic(store, validator_count, chain_config) + get_harness_generic(store, validator_count, chain_config, false) +} + +fn get_harness_import_all_data_columns( + store: Arc, BeaconNodeBackend>>, + validator_count: usize, +) -> TestHarness { + // Most tests expect to retain historic states, so we use this as the default. + let chain_config = ChainConfig { + reconstruct_historic_states: true, + ..ChainConfig::default() + }; + get_harness_generic(store, validator_count, chain_config, true) } fn get_harness_generic( store: Arc, BeaconNodeBackend>>, validator_count: usize, chain_config: ChainConfig, + import_all_data_columns: bool, ) -> TestHarness { let harness = TestHarness::builder(MinimalEthSpec) .spec(store.get_chain_spec().clone()) @@ -97,6 +109,7 @@ fn get_harness_generic( .fresh_disk_store(store) .mock_execution_layer() .chain_config(chain_config) + .import_all_data_columns(import_all_data_columns) .build(); harness.advance_slot(); harness @@ -2286,7 +2299,12 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { let temp1 = tempdir().unwrap(); let full_store = get_store(&temp1); - let harness = get_harness(full_store.clone(), LOW_VALIDATOR_COUNT); + + // TODO(das): Run a supernode so the node has full blobs stored. + // This may not be required in the future if we end up implementing downloading checkpoint + // blobs from p2p peers: + // https://github.com/sigp/lighthouse/issues/6837 + let harness = get_harness_import_all_data_columns(full_store.clone(), LOW_VALIDATOR_COUNT); let all_validators = (0..LOW_VALIDATOR_COUNT).collect::>(); @@ -2319,10 +2337,8 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .unwrap(); let wss_blobs_opt = harness .chain - .store - .get_blobs(&wss_block_root) - .unwrap() - .blobs(); + .get_or_reconstruct_blobs(&wss_block_root) + .unwrap(); let wss_state = full_store .get_state(&wss_state_root, Some(checkpoint_slot)) .unwrap() @@ -2395,14 +2411,16 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .await .unwrap() .unwrap(); + // This test may break in the future if we no longer store the full checkpoint data columns. let store_wss_blobs_opt = beacon_chain - .store - .get_blobs(&wss_block_root) - .unwrap() - .blobs(); + .get_or_reconstruct_blobs(&wss_block_root) + .unwrap(); assert_eq!(store_wss_block, wss_block); - assert_eq!(store_wss_blobs_opt, wss_blobs_opt); + // TODO(fulu): Remove this condition once #6760 (PeerDAS checkpoint sync) is merged. + if !beacon_chain.spec.is_peer_das_scheduled() { + assert_eq!(store_wss_blobs_opt, wss_blobs_opt); + } // Apply blocks forward to reach head. let chain_dump = harness.chain.chain_dump().unwrap(); @@ -2418,7 +2436,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .await .unwrap() .unwrap(); - let blobs = harness.chain.get_blobs(&block_root).expect("blobs").blobs(); + let slot = full_block.slot(); let state_root = full_block.state_root(); @@ -2426,7 +2444,7 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { beacon_chain .process_block( full_block.canonical_root(), - RpcBlock::new(Some(block_root), Arc::new(full_block), blobs).unwrap(), + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -2480,13 +2498,12 @@ async fn weak_subjectivity_sync_test(slots: Vec, checkpoint_slot: Slot) { .await .expect("should get block") .expect("should get block"); - let blobs = harness.chain.get_blobs(&block_root).expect("blobs").blobs(); if let MaybeAvailableBlock::Available(block) = harness .chain .data_availability_checker .verify_kzg_for_rpc_block( - RpcBlock::new(Some(block_root), Arc::new(full_block), blobs).unwrap(), + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), ) .expect("should verify kzg") { @@ -2587,7 +2604,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { reconstruct_historic_states: false, ..ChainConfig::default() }; - let harness = get_harness_generic(store.clone(), LOW_VALIDATOR_COUNT, chain_config); + let harness = get_harness_generic(store.clone(), LOW_VALIDATOR_COUNT, chain_config, false); let all_validators = (0..LOW_VALIDATOR_COUNT).collect::>(); @@ -3075,6 +3092,10 @@ async fn deneb_prune_blobs_happy_case() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); + if store.get_chain_spec().is_peer_das_scheduled() { + // TODO(fulu): add prune tests for Fulu / PeerDAS data columns. + return; + } let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { // No-op prior to Deneb. return; @@ -3122,6 +3143,10 @@ async fn deneb_prune_blobs_no_finalization() { let db_path = tempdir().unwrap(); let store = get_store(&db_path); + if store.get_chain_spec().is_peer_das_scheduled() { + // TODO(fulu): add prune tests for Fulu / PeerDAS data columns. + return; + } let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { // No-op prior to Deneb. return; @@ -3266,6 +3291,10 @@ async fn deneb_prune_blobs_margin_test(margin: u64) { let db_path = tempdir().unwrap(); let store = get_store_generic(&db_path, config, test_spec::()); + if store.get_chain_spec().is_peer_das_scheduled() { + // TODO(fulu): add prune tests for Fulu / PeerDAS data columns. + return; + } let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { // No-op prior to Deneb. return; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 0edda2f95b8..07d2a90df93 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -1717,7 +1717,7 @@ mod tests { #[test] fn min_queue_len() { // State with no validators. - let spec = ForkName::latest().make_genesis_spec(ChainSpec::mainnet()); + let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::mainnet()); let genesis_time = 0; let state = BeaconState::::new(genesis_time, Eth1Data::default(), &spec); assert_eq!(state.validators().len(), 0); diff --git a/beacon_node/execution_layer/src/engine_api/http.rs b/beacon_node/execution_layer/src/engine_api/http.rs index daf2bf6ed4b..747383754a5 100644 --- a/beacon_node/execution_layer/src/engine_api/http.rs +++ b/beacon_node/execution_layer/src/engine_api/http.rs @@ -829,7 +829,8 @@ impl HttpJsonRpc { Ok(response.into()) } - pub async fn new_payload_v5_fulu( + // TODO(fulu): switch to v5 endpoint when the EL is ready for Fulu + pub async fn new_payload_v4_fulu( &self, new_payload_request_fulu: NewPayloadRequestFulu<'_, E>, ) -> Result { @@ -844,7 +845,7 @@ impl HttpJsonRpc { let response: JsonPayloadStatusV1 = self .rpc_request( - ENGINE_NEW_PAYLOAD_V5, + ENGINE_NEW_PAYLOAD_V4, params, ENGINE_NEW_PAYLOAD_TIMEOUT * self.execution_timeout_multiplier, ) @@ -962,6 +963,19 @@ impl HttpJsonRpc { .try_into() .map_err(Error::BadResponse) } + // TODO(fulu): remove when v5 method is ready. + ForkName::Fulu => { + let response: JsonGetPayloadResponseV5 = self + .rpc_request( + ENGINE_GET_PAYLOAD_V4, + params, + ENGINE_GET_PAYLOAD_TIMEOUT * self.execution_timeout_multiplier, + ) + .await?; + JsonGetPayloadResponse::V5(response) + .try_into() + .map_err(Error::BadResponse) + } _ => Err(Error::UnsupportedForkVariant(format!( "called get_payload_v4 with {}", fork_name @@ -1263,10 +1277,11 @@ impl HttpJsonRpc { } } NewPayloadRequest::Fulu(new_payload_request_fulu) => { - if engine_capabilities.new_payload_v5 { - self.new_payload_v5_fulu(new_payload_request_fulu).await + // TODO(fulu): switch to v5 endpoint when the EL is ready for Fulu + if engine_capabilities.new_payload_v4 { + self.new_payload_v4_fulu(new_payload_request_fulu).await } else { - Err(Error::RequiredMethodUnsupported("engine_newPayloadV5")) + Err(Error::RequiredMethodUnsupported("engine_newPayloadV4")) } } } @@ -1305,8 +1320,9 @@ impl HttpJsonRpc { } } ForkName::Fulu => { - if engine_capabilities.get_payload_v5 { - self.get_payload_v5(fork_name, payload_id).await + // TODO(fulu): switch to v5 when the EL is ready + if engine_capabilities.get_payload_v4 { + self.get_payload_v4(fork_name, payload_id).await } else { Err(Error::RequiredMethodUnsupported("engine_getPayloadv5")) } diff --git a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs index 0babb9d1a3e..d727d2c1590 100644 --- a/beacon_node/execution_layer/src/test_utils/handle_rpc.rs +++ b/beacon_node/execution_layer/src/test_utils/handle_rpc.rs @@ -230,7 +230,8 @@ pub async fn handle_rpc( if method == ENGINE_NEW_PAYLOAD_V1 || method == ENGINE_NEW_PAYLOAD_V2 || method == ENGINE_NEW_PAYLOAD_V3 - || method == ENGINE_NEW_PAYLOAD_V4 + // TODO(fulu): Uncomment this once v5 method is ready for Fulu + // || method == ENGINE_NEW_PAYLOAD_V4 { return Err(( format!("{} called after Fulu fork!", method), @@ -264,15 +265,16 @@ pub async fn handle_rpc( GENERIC_ERROR_CODE, )); } - if matches!(request, JsonExecutionPayload::V4(_)) { - return Err(( - format!( - "{} called with `ExecutionPayloadV4` after Fulu fork!", - method - ), - GENERIC_ERROR_CODE, - )); - } + // TODO(fulu): remove once we switch to v5 + // if matches!(request, JsonExecutionPayload::V4(_)) { + // return Err(( + // format!( + // "{} called with `ExecutionPayloadV4` after Fulu fork!", + // method + // ), + // GENERIC_ERROR_CODE, + // )); + // } } _ => unreachable!(), }; @@ -381,8 +383,9 @@ pub async fn handle_rpc( == ForkName::Fulu && (method == ENGINE_GET_PAYLOAD_V1 || method == ENGINE_GET_PAYLOAD_V2 - || method == ENGINE_GET_PAYLOAD_V3 - || method == ENGINE_GET_PAYLOAD_V4) + || method == ENGINE_GET_PAYLOAD_V3) + // TODO(fulu): Uncomment this once v5 method is ready for Fulu + // || method == ENGINE_GET_PAYLOAD_V4) { return Err(( format!("{} called after Fulu fork!", method), @@ -448,6 +451,22 @@ pub async fn handle_rpc( }) .unwrap() } + // TODO(fulu): remove this once we switch to v5 method + JsonExecutionPayload::V5(execution_payload) => { + serde_json::to_value(JsonGetPayloadResponseV5 { + execution_payload, + block_value: Uint256::from(DEFAULT_MOCK_EL_PAYLOAD_VALUE_WEI), + blobs_bundle: maybe_blobs + .ok_or(( + "No blobs returned despite V5 Payload".to_string(), + GENERIC_ERROR_CODE, + ))? + .into(), + should_override_builder: false, + execution_requests: Default::default(), + }) + .unwrap() + } _ => unreachable!(), }), ENGINE_GET_PAYLOAD_V5 => Ok(match JsonExecutionPayload::from(response) { diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 7b48d64e36f..fbc92a45cce 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -8,6 +8,7 @@ use beacon_processor::{ }; use directory::DEFAULT_ROOT_DIR; use eth2::{BeaconNodeHttpClient, Timeouts}; +use lighthouse_network::rpc::methods::MetaDataV3; use lighthouse_network::{ discv5::enr::CombinedKey, libp2p::swarm::{ @@ -150,11 +151,21 @@ pub async fn create_api_server_with_config( let (network_senders, network_receivers) = NetworkSenders::new(); // Default metadata - let meta_data = MetaData::V2(MetaDataV2 { - seq_number: SEQ_NUMBER, - attnets: EnrAttestationBitfield::::default(), - syncnets: EnrSyncCommitteeBitfield::::default(), - }); + let meta_data = if chain.spec.is_peer_das_scheduled() { + MetaData::V3(MetaDataV3 { + seq_number: SEQ_NUMBER, + attnets: EnrAttestationBitfield::::default(), + syncnets: EnrSyncCommitteeBitfield::::default(), + custody_group_count: chain.spec.custody_requirement, + }) + } else { + MetaData::V2(MetaDataV2 { + seq_number: SEQ_NUMBER, + attnets: EnrAttestationBitfield::::default(), + syncnets: EnrSyncCommitteeBitfield::::default(), + }) + }; + let enr_key = CombinedKey::generate_secp256k1(); let enr = Enr::builder().build(&enr_key).unwrap(); let network_config = Arc::new(NetworkConfig::default()); diff --git a/beacon_node/http_api/tests/broadcast_validation_tests.rs b/beacon_node/http_api/tests/broadcast_validation_tests.rs index db4ef002579..1baa71699c7 100644 --- a/beacon_node/http_api/tests/broadcast_validation_tests.rs +++ b/beacon_node/http_api/tests/broadcast_validation_tests.rs @@ -1,4 +1,3 @@ -use beacon_chain::blob_verification::GossipVerifiedBlob; use beacon_chain::{ test_utils::{AttestationStrategy, BlockStrategy}, GossipVerifiedBlock, IntoGossipVerifiedBlock, @@ -7,9 +6,10 @@ use eth2::reqwest::StatusCode; use eth2::types::{BroadcastValidation, PublishBlockRequest}; use http_api::test_utils::InteractiveTester; use http_api::{publish_blinded_block, publish_block, reconstruct_block, Config, ProvenancedBlock}; +use std::collections::HashSet; use std::sync::Arc; use types::{ - BlobSidecar, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MainnetEthSpec, Slot, + ColumnIndex, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, MainnetEthSpec, Slot, }; use warp::Rejection; use warp_utils::reject::CustomBadRequest; @@ -17,6 +17,8 @@ use warp_utils::reject::CustomBadRequest; type E = MainnetEthSpec; /* + * TODO(fulu): write PeerDAS equivalent tests for these. + * * We have the following test cases, which are duplicated for the blinded variant of the route: * * - `broadcast_validation=gossip` @@ -1375,7 +1377,7 @@ pub async fn block_seen_on_gossip_without_blobs() { // `validator_count // 32`. let validator_count = 64; let num_initial: u64 = 31; - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::latest_stable().make_genesis_spec(E::default_spec()); let tester = InteractiveTester::::new(Some(spec), validator_count).await; // Create some chain depth. @@ -1437,7 +1439,7 @@ pub async fn block_seen_on_gossip_with_some_blobs() { // `validator_count // 32`. let validator_count = 64; let num_initial: u64 = 31; - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::latest_stable().make_genesis_spec(E::default_spec()); let tester = InteractiveTester::::new(Some(spec), validator_count).await; // Create some chain depth. @@ -1464,8 +1466,8 @@ pub async fn block_seen_on_gossip_with_some_blobs() { blobs.0.len() ); - let partial_kzg_proofs = vec![*blobs.0.first().unwrap()]; - let partial_blobs = vec![blobs.1.first().unwrap().clone()]; + let partial_kzg_proofs = [*blobs.0.first().unwrap()]; + let partial_blobs = [blobs.1.first().unwrap().clone()]; // Simulate the block being seen on gossip. block @@ -1474,21 +1476,15 @@ pub async fn block_seen_on_gossip_with_some_blobs() { .unwrap(); // Simulate some of the blobs being seen on gossip. - for (i, (kzg_proof, blob)) in partial_kzg_proofs - .into_iter() - .zip(partial_blobs) - .enumerate() - { - let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap()); - let gossip_blob = - GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); - tester - .harness - .chain - .process_gossip_blob(gossip_blob) - .await - .unwrap(); - } + tester + .harness + .process_gossip_blobs_or_columns( + &block, + partial_blobs.iter(), + partial_kzg_proofs.iter(), + Some(get_custody_columns(&tester)), + ) + .await; // It should not yet be added to fork choice because all blobs have not been seen. assert!(!tester @@ -1523,7 +1519,7 @@ pub async fn blobs_seen_on_gossip_without_block() { // `validator_count // 32`. let validator_count = 64; let num_initial: u64 = 31; - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::latest_stable().make_genesis_spec(E::default_spec()); let tester = InteractiveTester::::new(Some(spec), validator_count).await; // Create some chain depth. @@ -1546,22 +1542,15 @@ pub async fn blobs_seen_on_gossip_without_block() { let (kzg_proofs, blobs) = blobs.expect("should have some blobs"); // Simulate the blobs being seen on gossip. - for (i, (kzg_proof, blob)) in kzg_proofs - .clone() - .into_iter() - .zip(blobs.clone()) - .enumerate() - { - let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap()); - let gossip_blob = - GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); - tester - .harness - .chain - .process_gossip_blob(gossip_blob) - .await - .unwrap(); - } + tester + .harness + .process_gossip_blobs_or_columns( + &block, + blobs.iter(), + kzg_proofs.iter(), + Some(get_custody_columns(&tester)), + ) + .await; // It should not yet be added to fork choice because the block has not been seen. assert!(!tester @@ -1596,7 +1585,7 @@ pub async fn blobs_seen_on_gossip_without_block_and_no_http_blobs() { // `validator_count // 32`. let validator_count = 64; let num_initial: u64 = 31; - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::latest_stable().make_genesis_spec(E::default_spec()); let tester = InteractiveTester::::new(Some(spec), validator_count).await; // Create some chain depth. @@ -1620,22 +1609,15 @@ pub async fn blobs_seen_on_gossip_without_block_and_no_http_blobs() { assert!(!blobs.is_empty()); // Simulate the blobs being seen on gossip. - for (i, (kzg_proof, blob)) in kzg_proofs - .clone() - .into_iter() - .zip(blobs.clone()) - .enumerate() - { - let sidecar = Arc::new(BlobSidecar::new(i, blob, &block, kzg_proof).unwrap()); - let gossip_blob = - GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); - tester - .harness - .chain - .process_gossip_blob(gossip_blob) - .await - .unwrap(); - } + tester + .harness + .process_gossip_blobs_or_columns( + &block, + blobs.iter(), + kzg_proofs.iter(), + Some(get_custody_columns(&tester)), + ) + .await; // It should not yet be added to fork choice because the block has not been seen. assert!(!tester @@ -1672,7 +1654,7 @@ pub async fn slashable_blobs_seen_on_gossip_cause_failure() { // `validator_count // 32`. let validator_count = 64; let num_initial: u64 = 31; - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::latest_stable().make_genesis_spec(E::default_spec()); let tester = InteractiveTester::::new(Some(spec), validator_count).await; // Create some chain depth. @@ -1697,17 +1679,15 @@ pub async fn slashable_blobs_seen_on_gossip_cause_failure() { let (kzg_proofs_b, blobs_b) = blobs_b.expect("should have some blobs"); // Simulate the blobs of block B being seen on gossip. - for (i, (kzg_proof, blob)) in kzg_proofs_b.into_iter().zip(blobs_b).enumerate() { - let sidecar = Arc::new(BlobSidecar::new(i, blob, &block_b, kzg_proof).unwrap()); - let gossip_blob = - GossipVerifiedBlob::new(sidecar, i as u64, &tester.harness.chain).unwrap(); - tester - .harness - .chain - .process_gossip_blob(gossip_blob) - .await - .unwrap(); - } + tester + .harness + .process_gossip_blobs_or_columns( + &block_b, + blobs_b.iter(), + kzg_proofs_b.iter(), + Some(get_custody_columns(&tester)), + ) + .await; // It should not yet be added to fork choice because block B has not been seen. assert!(!tester @@ -1742,7 +1722,7 @@ pub async fn duplicate_block_status_code() { // `validator_count // 32`. let validator_count = 64; let num_initial: u64 = 31; - let spec = ForkName::latest().make_genesis_spec(E::default_spec()); + let spec = ForkName::latest_stable().make_genesis_spec(E::default_spec()); let duplicate_block_status_code = StatusCode::IM_A_TEAPOT; let tester = InteractiveTester::::new_with_initializer_and_mutator( Some(spec), @@ -1804,3 +1784,13 @@ fn assert_server_message_error(error_response: eth2::Error, expected_message: St }; assert_eq!(err.message, expected_message); } + +fn get_custody_columns(tester: &InteractiveTester) -> HashSet { + tester + .ctx + .network_globals + .as_ref() + .unwrap() + .sampling_columns + .clone() +} diff --git a/beacon_node/lighthouse_network/src/discovery/enr.rs b/beacon_node/lighthouse_network/src/discovery/enr.rs index 062a119e0d5..80677119546 100644 --- a/beacon_node/lighthouse_network/src/discovery/enr.rs +++ b/beacon_node/lighthouse_network/src/discovery/enr.rs @@ -339,9 +339,9 @@ mod test { type E = MainnetEthSpec; - fn make_eip7594_spec() -> ChainSpec { + fn make_fulu_spec() -> ChainSpec { let mut spec = E::default_spec(); - spec.eip7594_fork_epoch = Some(Epoch::new(10)); + spec.fulu_fork_epoch = Some(Epoch::new(10)); spec } @@ -359,7 +359,7 @@ mod test { subscribe_all_data_column_subnets: false, ..NetworkConfig::default() }; - let spec = make_eip7594_spec(); + let spec = make_fulu_spec(); let enr = build_enr_with_config(config, &spec).0; @@ -375,7 +375,7 @@ mod test { subscribe_all_data_column_subnets: true, ..NetworkConfig::default() }; - let spec = make_eip7594_spec(); + let spec = make_fulu_spec(); let enr = build_enr_with_config(config, &spec).0; assert_eq!( diff --git a/beacon_node/lighthouse_network/src/rpc/codec.rs b/beacon_node/lighthouse_network/src/rpc/codec.rs index 6a70eef9bd2..2bf35b0e35e 100644 --- a/beacon_node/lighthouse_network/src/rpc/codec.rs +++ b/beacon_node/lighthouse_network/src/rpc/codec.rs @@ -485,17 +485,9 @@ fn context_bytes( RpcSuccessResponse::BlobsByRange(_) | RpcSuccessResponse::BlobsByRoot(_) => { return fork_context.to_context_bytes(ForkName::Deneb); } - RpcSuccessResponse::DataColumnsByRoot(d) - | RpcSuccessResponse::DataColumnsByRange(d) => { - // TODO(das): Remove deneb fork after `peerdas-devnet-2`. - return if matches!( - fork_context.spec.fork_name_at_slot::(d.slot()), - ForkName::Deneb - ) { - fork_context.to_context_bytes(ForkName::Deneb) - } else { - fork_context.to_context_bytes(ForkName::Electra) - }; + RpcSuccessResponse::DataColumnsByRoot(_) + | RpcSuccessResponse::DataColumnsByRange(_) => { + return fork_context.to_context_bytes(ForkName::Fulu); } RpcSuccessResponse::LightClientBootstrap(lc_bootstrap) => { return lc_bootstrap @@ -730,10 +722,7 @@ fn handle_rpc_response( }, SupportedProtocol::DataColumnsByRootV1 => match fork_name { Some(fork_name) => { - // TODO(das): PeerDAS is currently supported for both deneb and electra. This check - // does not advertise the topic on deneb, simply allows it to decode it. Advertise - // logic is in `SupportedTopic::currently_supported`. - if fork_name.deneb_enabled() { + if fork_name.fulu_enabled() { Ok(Some(RpcSuccessResponse::DataColumnsByRoot(Arc::new( DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, )))) @@ -754,7 +743,7 @@ fn handle_rpc_response( }, SupportedProtocol::DataColumnsByRangeV1 => match fork_name { Some(fork_name) => { - if fork_name.deneb_enabled() { + if fork_name.fulu_enabled() { Ok(Some(RpcSuccessResponse::DataColumnsByRange(Arc::new( DataColumnSidecar::from_ssz_bytes(decoded_buffer)?, )))) @@ -945,9 +934,10 @@ mod tests { use crate::rpc::protocol::*; use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield}; use types::{ - blob_sidecar::BlobIdentifier, BeaconBlock, BeaconBlockAltair, BeaconBlockBase, - BeaconBlockBellatrix, DataColumnIdentifier, EmptyBlock, Epoch, FixedBytesExtended, - FullPayload, Signature, Slot, + blob_sidecar::BlobIdentifier, data_column_sidecar::Cell, BeaconBlock, BeaconBlockAltair, + BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnIdentifier, EmptyBlock, + Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature, + SignedBeaconBlockHeader, Slot, }; type Spec = types::MainnetEthSpec; @@ -998,7 +988,17 @@ mod tests { } fn empty_data_column_sidecar() -> Arc> { - Arc::new(DataColumnSidecar::empty()) + Arc::new(DataColumnSidecar { + index: 0, + column: VariableList::new(vec![Cell::::default()]).unwrap(), + kzg_commitments: VariableList::new(vec![KzgCommitment::empty_for_testing()]).unwrap(), + kzg_proofs: VariableList::new(vec![KzgProof::empty()]).unwrap(), + signed_block_header: SignedBeaconBlockHeader { + message: BeaconBlockHeader::empty(), + signature: Signature::empty(), + }, + kzg_commitments_inclusion_proof: Default::default(), + }) } /// Bellatrix block with length < max_rpc_size. diff --git a/beacon_node/lighthouse_network/src/rpc/protocol.rs b/beacon_node/lighthouse_network/src/rpc/protocol.rs index 80f15c94459..eac7d67490d 100644 --- a/beacon_node/lighthouse_network/src/rpc/protocol.rs +++ b/beacon_node/lighthouse_network/src/rpc/protocol.rs @@ -554,9 +554,11 @@ impl ProtocolId { Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()), Protocol::BlobsByRange => rpc_blob_limits::(), Protocol::BlobsByRoot => rpc_blob_limits::(), - Protocol::DataColumnsByRoot => rpc_data_column_limits::(fork_context.current_fork()), + Protocol::DataColumnsByRoot => { + rpc_data_column_limits::(fork_context.current_fork(), &fork_context.spec) + } Protocol::DataColumnsByRange => { - rpc_data_column_limits::(fork_context.current_fork()) + rpc_data_column_limits::(fork_context.current_fork(), &fork_context.spec) } Protocol::Ping => RpcLimits::new( ::ssz_fixed_len(), @@ -637,13 +639,10 @@ pub fn rpc_blob_limits() -> RpcLimits { } } -// TODO(das): fix hardcoded max here -pub fn rpc_data_column_limits(fork_name: ForkName) -> RpcLimits { +pub fn rpc_data_column_limits(fork_name: ForkName, spec: &ChainSpec) -> RpcLimits { RpcLimits::new( - DataColumnSidecar::::empty().as_ssz_bytes().len(), - DataColumnSidecar::::max_size( - E::default_spec().max_blobs_per_block_by_fork(fork_name) as usize - ), + DataColumnSidecar::::min_size(), + DataColumnSidecar::::max_size(spec.max_blobs_per_block_by_fork(fork_name) as usize), ) } diff --git a/beacon_node/lighthouse_network/src/types/globals.rs b/beacon_node/lighthouse_network/src/types/globals.rs index 8cce9a0f255..c9e84e2dd11 100644 --- a/beacon_node/lighthouse_network/src/types/globals.rs +++ b/beacon_node/lighthouse_network/src/types/globals.rs @@ -223,7 +223,7 @@ mod test { fn test_sampling_subnets() { let log = logging::test_logger(); let mut spec = E::default_spec(); - spec.eip7594_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); let custody_group_count = spec.number_of_custody_groups / 2; let subnet_sampling_size = spec.sampling_size(custody_group_count).unwrap(); @@ -247,7 +247,7 @@ mod test { fn test_sampling_columns() { let log = logging::test_logger(); let mut spec = E::default_spec(); - spec.eip7594_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); let custody_group_count = spec.number_of_custody_groups / 2; let subnet_sampling_size = spec.sampling_size(custody_group_count).unwrap(); diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 1e1f3efa18c..c199d2312b0 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -283,27 +283,15 @@ impl PubsubMessage { } GossipKind::DataColumnSidecar(subnet_id) => { match fork_context.from_context_bytes(gossip_topic.fork_digest) { - // TODO(das): Remove Deneb fork - Some(fork) if fork.deneb_enabled() => { + Some(fork) if fork.fulu_enabled() => { let col_sidecar = Arc::new( DataColumnSidecar::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?, ); - let peer_das_enabled = - fork_context.spec.is_peer_das_enabled_for_epoch( - col_sidecar.slot().epoch(E::slots_per_epoch()), - ); - if peer_das_enabled { - Ok(PubsubMessage::DataColumnSidecar(Box::new(( - *subnet_id, - col_sidecar, - )))) - } else { - Err(format!( - "data_column_sidecar topic invalid for given fork digest {:?}", - gossip_topic.fork_digest - )) - } + Ok(PubsubMessage::DataColumnSidecar(Box::new(( + *subnet_id, + col_sidecar, + )))) } Some(_) | None => Err(format!( "data_column_sidecar topic invalid for given fork digest {:?}", diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 4a3fb28e102..8d07ef1a129 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -613,6 +613,11 @@ impl NetworkBeaconProcessor { blocks: Vec>, ) -> Result<(), Error> { let is_backfill = matches!(&process_id, ChainSegmentProcessId::BackSyncBatchId { .. }); + debug!(self.log, "Batch sending for process"; + "blocks" => blocks.len(), + "id" => ?process_id, + ); + let processor = self.clone(); let process_fn = async move { let notify_execution_layer = if processor diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 8238fa146dd..8415ece6383 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -15,7 +15,7 @@ use beacon_chain::test_utils::{ use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; use lighthouse_network::discovery::ConnectionId; -use lighthouse_network::rpc::methods::BlobsByRangeRequest; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; use lighthouse_network::rpc::{RequestId, SubstreamId}; use lighthouse_network::{ discv5::enr::{self, CombinedKey}, @@ -198,11 +198,21 @@ impl TestRig { let (sync_tx, _sync_rx) = mpsc::unbounded_channel(); // Default metadata - let meta_data = MetaData::V2(MetaDataV2 { - seq_number: SEQ_NUMBER, - attnets: EnrAttestationBitfield::::default(), - syncnets: EnrSyncCommitteeBitfield::::default(), - }); + let meta_data = if spec.is_peer_das_scheduled() { + MetaData::V3(MetaDataV3 { + seq_number: SEQ_NUMBER, + attnets: EnrAttestationBitfield::::default(), + syncnets: EnrSyncCommitteeBitfield::::default(), + custody_group_count: spec.custody_requirement, + }) + } else { + MetaData::V2(MetaDataV2 { + seq_number: SEQ_NUMBER, + attnets: EnrAttestationBitfield::::default(), + syncnets: EnrSyncCommitteeBitfield::::default(), + }) + }; + let enr_key = CombinedKey::generate_secp256k1(); let enr = enr::Enr::builder().build(&enr_key).unwrap(); let network_config = Arc::new(NetworkConfig::default()); @@ -342,6 +352,7 @@ impl TestRig { ) .unwrap(); } + pub fn enqueue_single_lookup_rpc_blobs(&self) { if let Some(blobs) = self.next_blobs.clone() { let blobs = FixedBlobSidecarList::new(blobs.into_iter().map(Some).collect::>()); @@ -350,7 +361,7 @@ impl TestRig { self.next_block.canonical_root(), blobs, std::time::Duration::default(), - BlockProcessType::SingleBlock { id: 1 }, + BlockProcessType::SingleBlob { id: 1 }, ) .unwrap(); } diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index ab654ddf778..49f73bf9c8d 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -751,11 +751,6 @@ impl NetworkService { } } - // TODO(das): This is added here for the purpose of testing, *without* having to - // activate Electra. This should happen as part of the Electra upgrade and we should - // move the subscription logic once it's ready to rebase PeerDAS on Electra, or if - // we decide to activate via the soft fork route: - // https://github.com/sigp/lighthouse/pull/5899 if self.fork_context.spec.is_peer_das_scheduled() { self.subscribe_to_peer_das_topics(&mut subscribed_topics); } @@ -806,32 +801,32 @@ impl NetworkService { } } + /// Keeping these separate from core topics because it has custom logic: + /// 1. Data column subscription logic depends on subscription configuration. + /// 2. Data column topic subscriptions will be dynamic based on validator balances due to + /// validator custody. + /// + /// TODO(das): The downside with not including it in core fork topic is - we subscribe to + /// PeerDAS topics on startup if Fulu is scheduled, rather than waiting until the fork. + /// If this is an issue we could potentially consider adding the logic to + /// `network.subscribe_new_fork_topics()`. fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec) { - if self.subscribe_all_data_column_subnets { - for column_subnet in 0..self.fork_context.spec.data_column_sidecar_subnet_count { - for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = - Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into(); - let topic = - GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } - } - } + let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets { + &(0..self.fork_context.spec.data_column_sidecar_subnet_count) + .map(DataColumnSubnetId::new) + .collect() } else { - for column_subnet in &self.network_globals.sampling_subnets { - for fork_digest in self.required_gossip_fork_digests() { - let gossip_kind = Subnet::DataColumn(*column_subnet).into(); - let topic = - GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); - if self.libp2p.subscribe(topic.clone()) { - subscribed_topics.push(topic); - } else { - warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); - } + &self.network_globals.sampling_subnets + }; + + for column_subnet in column_subnets_to_subscribe.iter() { + for fork_digest in self.required_gossip_fork_digests() { + let gossip_kind = Subnet::DataColumn(*column_subnet).into(); + let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest); + if self.libp2p.subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); } } } diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 7a234eaef04..70a3fe4f5aa 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -321,7 +321,7 @@ mod tests { let blocks = (0..4) .map(|_| { generate_rand_block_and_data_columns::( - ForkName::Deneb, + ForkName::Fulu, NumBlobs::Number(1), &mut rng, &spec, @@ -384,7 +384,7 @@ mod tests { let blocks = (0..4) .map(|_| { generate_rand_block_and_data_columns::( - ForkName::Deneb, + ForkName::Fulu, NumBlobs::Number(1), &mut rng, &spec, diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index e21041192d8..4135f901b1c 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -373,6 +373,7 @@ impl SyncNetworkContext { "count" => request.count(), "epoch" => epoch, "peer" => %peer_id, + "id" => id, ); let rpc_request = match request { BlocksByRangeRequest::V1(ref req) => { @@ -442,6 +443,7 @@ impl SyncNetworkContext { "epoch" => epoch, "columns" => ?columns_by_range_request.columns, "peer" => %peer_id, + "id" => id, ); self.send_network_msg(NetworkMessage::SendRequest { diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 341fe8667cb..9ab581950c9 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -43,8 +43,8 @@ use types::ForkContext; use types::{ data_column_sidecar::ColumnIndex, test_utils::{SeedableRng, TestRandom, XorShiftRng}, - BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, Epoch, EthSpec, ForkName, - Hash256, MinimalEthSpec as E, SignedBeaconBlock, Slot, + BeaconState, BeaconStateBase, BlobSidecar, DataColumnSidecar, EthSpec, ForkName, Hash256, + MinimalEthSpec as E, SignedBeaconBlock, Slot, }; const D: Duration = Duration::new(0, 0); @@ -54,12 +54,8 @@ const SAMPLING_REQUIRED_SUCCESSES: usize = 2; type DCByRootIds = Vec; type DCByRootId = (SyncRequestId, Vec); -struct TestRigConfig { - peer_das_enabled: bool, -} - impl TestRig { - fn test_setup_with_config(config: Option) -> Self { + pub fn test_setup() -> Self { let logger_type = if cfg!(feature = "test_logger") { LoggerType::Test } else if cfg!(feature = "ci_logger") { @@ -70,13 +66,7 @@ impl TestRig { let log = build_log(slog::Level::Trace, logger_type); // Use `fork_from_env` logic to set correct fork epochs - let mut spec = test_spec::(); - - if let Some(config) = config { - if config.peer_das_enabled { - spec.eip7594_fork_epoch = Some(Epoch::new(0)); - } - } + let spec = test_spec::(); // Initialise a new beacon chain let harness = BeaconChainHarness::>::builder(E) @@ -155,24 +145,18 @@ impl TestRig { } } - pub fn test_setup() -> Self { - Self::test_setup_with_config(None) - } - - fn test_setup_after_deneb() -> Option { + fn test_setup_after_deneb_before_fulu() -> Option { let r = Self::test_setup(); - if r.after_deneb() { + if r.after_deneb() && !r.fork_name.fulu_enabled() { Some(r) } else { None } } - fn test_setup_after_peerdas() -> Option { - let r = Self::test_setup_with_config(Some(TestRigConfig { - peer_das_enabled: true, - })); - if r.after_deneb() { + fn test_setup_after_fulu() -> Option { + let r = Self::test_setup(); + if r.fork_name.fulu_enabled() { Some(r) } else { None @@ -187,6 +171,10 @@ impl TestRig { self.fork_name.deneb_enabled() } + pub fn after_fulu(&self) -> bool { + self.fork_name.fulu_enabled() + } + fn trigger_unknown_parent_block(&mut self, peer_id: PeerId, block: Arc>) { let block_root = block.canonical_root(); self.send_sync_message(SyncMessage::UnknownParentBlock(peer_id, block, block_root)) @@ -387,7 +375,7 @@ impl TestRig { .__add_connected_peer_testing_only(false, &self.harness.spec) } - fn new_connected_supernode_peer(&mut self) -> PeerId { + pub fn new_connected_supernode_peer(&mut self) -> PeerId { self.network_globals .peers .write() @@ -1945,7 +1933,7 @@ fn test_same_chain_race_condition() { #[test] fn block_in_da_checker_skips_download() { - let Some(mut r) = TestRig::test_setup_after_deneb() else { + let Some(mut r) = TestRig::test_setup_after_deneb_before_fulu() else { return; }; let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); @@ -1963,7 +1951,7 @@ fn block_in_da_checker_skips_download() { #[test] fn block_in_processing_cache_becomes_invalid() { - let Some(mut r) = TestRig::test_setup_after_deneb() else { + let Some(mut r) = TestRig::test_setup_after_deneb_before_fulu() else { return; }; let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); @@ -1989,7 +1977,7 @@ fn block_in_processing_cache_becomes_invalid() { #[test] fn block_in_processing_cache_becomes_valid_imported() { - let Some(mut r) = TestRig::test_setup_after_deneb() else { + let Some(mut r) = TestRig::test_setup_after_deneb_before_fulu() else { return; }; let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); @@ -2014,7 +2002,7 @@ fn block_in_processing_cache_becomes_valid_imported() { #[ignore] #[test] fn blobs_in_da_checker_skip_download() { - let Some(mut r) = TestRig::test_setup_after_deneb() else { + let Some(mut r) = TestRig::test_setup_after_deneb_before_fulu() else { return; }; let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(1)); @@ -2033,7 +2021,7 @@ fn blobs_in_da_checker_skip_download() { #[test] fn sampling_happy_path() { - let Some(mut r) = TestRig::test_setup_after_peerdas() else { + let Some(mut r) = TestRig::test_setup_after_fulu() else { return; }; r.new_connected_peers_for_peerdas(); @@ -2050,7 +2038,7 @@ fn sampling_happy_path() { #[test] fn sampling_with_retries() { - let Some(mut r) = TestRig::test_setup_after_peerdas() else { + let Some(mut r) = TestRig::test_setup_after_fulu() else { return; }; r.new_connected_peers_for_peerdas(); @@ -2072,7 +2060,7 @@ fn sampling_with_retries() { #[test] fn sampling_avoid_retrying_same_peer() { - let Some(mut r) = TestRig::test_setup_after_peerdas() else { + let Some(mut r) = TestRig::test_setup_after_fulu() else { return; }; let peer_id_1 = r.new_connected_supernode_peer(); @@ -2093,7 +2081,7 @@ fn sampling_avoid_retrying_same_peer() { #[test] fn sampling_batch_requests() { - let Some(mut r) = TestRig::test_setup_after_peerdas() else { + let Some(mut r) = TestRig::test_setup_after_fulu() else { return; }; let _supernode = r.new_connected_supernode_peer(); @@ -2119,7 +2107,7 @@ fn sampling_batch_requests() { #[test] fn sampling_batch_requests_not_enough_responses_returned() { - let Some(mut r) = TestRig::test_setup_after_peerdas() else { + let Some(mut r) = TestRig::test_setup_after_fulu() else { return; }; let _supernode = r.new_connected_supernode_peer(); @@ -2164,7 +2152,7 @@ fn sampling_batch_requests_not_enough_responses_returned() { #[test] fn custody_lookup_happy_path() { - let Some(mut r) = TestRig::test_setup_after_peerdas() else { + let Some(mut r) = TestRig::test_setup_after_fulu() else { return; }; let spec = E::default_spec(); @@ -2238,7 +2226,7 @@ mod deneb_only { impl DenebTester { fn new(request_trigger: RequestTrigger) -> Option { - let Some(mut rig) = TestRig::test_setup_after_deneb() else { + let Some(mut rig) = TestRig::test_setup_after_deneb_before_fulu() else { return None; }; let (block, blobs) = rig.rand_block_and_blobs(NumBlobs::Random); @@ -2963,7 +2951,7 @@ mod deneb_only { #[ignore] #[test] fn no_peer_penalty_when_rpc_response_already_known_from_gossip() { - let Some(mut r) = TestRig::test_setup_after_deneb() else { + let Some(mut r) = TestRig::test_setup_after_deneb_before_fulu() else { return; }; let (block, blobs) = r.rand_block_and_blobs(NumBlobs::Number(2)); diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index 05d5e4a4143..cfd89f7b44e 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -3,8 +3,13 @@ use crate::status::ToStatusMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; use crate::sync::range_sync::RangeSyncType; use crate::sync::SyncMessage; +use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{block_verification_types::RpcBlock, EngineState, NotifyExecutionLayer}; +use lighthouse_network::rpc::methods::{ + BlobsByRangeRequest, DataColumnsByRangeRequest, OldBlocksByRangeRequest, + OldBlocksByRangeRequestV2, +}; use lighthouse_network::rpc::{RequestType, StatusMessage}; use lighthouse_network::service::api_types::{AppRequestId, Id, SyncRequestId}; use lighthouse_network::{PeerId, SyncInfo}; @@ -16,6 +21,47 @@ use types::{ const D: Duration = Duration::new(0, 0); +pub(crate) enum DataSidecars { + Blobs(BlobSidecarList), + DataColumns(Vec>), +} + +enum ByRangeDataRequestIds { + PreDeneb, + PrePeerDAS(Id, PeerId), + PostPeerDAS(Vec<(Id, PeerId)>), +} + +/// Sync tests are usually written in the form: +/// - Do some action +/// - Expect a request to be sent +/// - Complete the above request +/// +/// To make writting tests succint, the machinery in this testing rig automatically identifies +/// _which_ request to complete. Picking the right request is critical for tests to pass, so this +/// filter allows better expressivity on the criteria to identify the right request. +#[derive(Default)] +struct RequestFilter { + peer: Option, + epoch: Option, +} + +impl RequestFilter { + fn peer(mut self, peer: PeerId) -> Self { + self.peer = Some(peer); + self + } + + fn epoch(mut self, epoch: u64) -> Self { + self.epoch = Some(epoch); + self + } +} + +fn filter() -> RequestFilter { + RequestFilter::default() +} + impl TestRig { /// Produce a head peer with an advanced head fn add_head_peer(&mut self) -> PeerId { @@ -67,7 +113,9 @@ impl TestRig { fn add_peer(&mut self, remote_info: SyncInfo) -> PeerId { // Create valid peer known to network globals - let peer_id = self.new_connected_peer(); + // TODO(fulu): Using supernode peers to ensure we have peer across all column + // subnets for syncing. Should add tests connecting to full node peers. + let peer_id = self.new_connected_supernode_peer(); // Send peer to sync self.send_sync_message(SyncMessage::AddPeer(peer_id, remote_info.clone())); peer_id @@ -86,11 +134,13 @@ impl TestRig { } #[track_caller] - fn expect_chain_segment(&mut self) { - self.pop_received_processor_event(|ev| { - (ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(()) - }) - .unwrap_or_else(|e| panic!("Expect ChainSegment work event: {e:?}")); + fn expect_chain_segments(&mut self, count: usize) { + for i in 0..count { + self.pop_received_processor_event(|ev| { + (ev.work_type() == beacon_processor::WorkType::ChainSegment).then_some(()) + }) + .unwrap_or_else(|e| panic!("Expect ChainSegment work event count {i}: {e:?}")); + } } fn update_execution_engine_state(&mut self, state: EngineState) { @@ -98,39 +148,80 @@ impl TestRig { self.sync_manager.update_execution_engine_state(state); } - fn find_blocks_by_range_request(&mut self, target_peer_id: &PeerId) -> (Id, Option) { + fn find_blocks_by_range_request( + &mut self, + request_filter: RequestFilter, + ) -> ((Id, PeerId), ByRangeDataRequestIds) { + let filter_f = |peer: PeerId, start_slot: u64| { + if let Some(expected_epoch) = request_filter.epoch { + let epoch = Slot::new(start_slot).epoch(E::slots_per_epoch()).as_u64(); + if epoch != expected_epoch { + return false; + } + } + if let Some(expected_peer) = request_filter.peer { + if peer != expected_peer { + return false; + } + } + true + }; + let block_req_id = self .pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, - request: RequestType::BlocksByRange(_), + request: + RequestType::BlocksByRange(OldBlocksByRangeRequest::V2( + OldBlocksByRangeRequestV2 { start_slot, .. }, + )), request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - } if peer_id == target_peer_id => Some(*id), + } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) .expect("Should have a blocks by range request"); - let blob_req_id = if self.after_deneb() { - Some( - self.pop_received_network_event(|ev| match ev { + let by_range_data_requests = if self.after_fulu() { + let mut data_columns_requests = vec![]; + while let Ok(data_columns_request) = self.pop_received_network_event(|ev| match ev { + NetworkMessage::SendRequest { + peer_id, + request: + RequestType::DataColumnsByRange(DataColumnsByRangeRequest { + start_slot, .. + }), + request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), + } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), + _ => None, + }) { + data_columns_requests.push(data_columns_request); + } + if data_columns_requests.is_empty() { + panic!("Found zero DataColumnsByRange requests"); + } + ByRangeDataRequestIds::PostPeerDAS(data_columns_requests) + } else if self.after_deneb() { + let (id, peer) = self + .pop_received_network_event(|ev| match ev { NetworkMessage::SendRequest { peer_id, - request: RequestType::BlobsByRange(_), + request: RequestType::BlobsByRange(BlobsByRangeRequest { start_slot, .. }), request_id: AppRequestId::Sync(SyncRequestId::RangeBlockAndBlobs { id }), - } if peer_id == target_peer_id => Some(*id), + } if filter_f(*peer_id, *start_slot) => Some((*id, *peer_id)), _ => None, }) - .expect("Should have a blobs by range request"), - ) + .expect("Should have a blobs by range request"); + ByRangeDataRequestIds::PrePeerDAS(id, peer) } else { - None + ByRangeDataRequestIds::PreDeneb }; - (block_req_id, blob_req_id) + (block_req_id, by_range_data_requests) } - fn find_and_complete_blocks_by_range_request(&mut self, target_peer_id: PeerId) { - let (blocks_req_id, blobs_req_id) = self.find_blocks_by_range_request(&target_peer_id); + fn find_and_complete_blocks_by_range_request(&mut self, request_filter: RequestFilter) { + let ((blocks_req_id, block_peer), by_range_data_request_ids) = + self.find_blocks_by_range_request(request_filter); // Complete the request with a single stream termination self.log(&format!( @@ -138,28 +229,43 @@ impl TestRig { )); self.send_sync_message(SyncMessage::RpcBlock { request_id: SyncRequestId::RangeBlockAndBlobs { id: blocks_req_id }, - peer_id: target_peer_id, + peer_id: block_peer, beacon_block: None, seen_timestamp: D, }); - if let Some(blobs_req_id) = blobs_req_id { - // Complete the request with a single stream termination - self.log(&format!( - "Completing BlobsByRange request {blobs_req_id} with empty stream" - )); - self.send_sync_message(SyncMessage::RpcBlob { - request_id: SyncRequestId::RangeBlockAndBlobs { id: blobs_req_id }, - peer_id: target_peer_id, - blob_sidecar: None, - seen_timestamp: D, - }); + match by_range_data_request_ids { + ByRangeDataRequestIds::PreDeneb => {} + ByRangeDataRequestIds::PrePeerDAS(id, peer_id) => { + // Complete the request with a single stream termination + self.log(&format!( + "Completing BlobsByRange request {id} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcBlob { + request_id: SyncRequestId::RangeBlockAndBlobs { id }, + peer_id, + blob_sidecar: None, + seen_timestamp: D, + }); + } + ByRangeDataRequestIds::PostPeerDAS(data_column_req_ids) => { + // Complete the request with a single stream termination + for (id, peer_id) in data_column_req_ids { + self.log(&format!( + "Completing DataColumnsByRange request {id} with empty stream" + )); + self.send_sync_message(SyncMessage::RpcDataColumn { + request_id: SyncRequestId::RangeBlockAndBlobs { id }, + peer_id, + data_column: None, + seen_timestamp: D, + }); + } + } } } - async fn create_canonical_block( - &mut self, - ) -> (SignedBeaconBlock, Option>) { + async fn create_canonical_block(&mut self) -> (SignedBeaconBlock, Option>) { self.harness.advance_slot(); let block_root = self @@ -170,20 +276,38 @@ impl TestRig { AttestationStrategy::AllValidators, ) .await; - // TODO(das): this does not handle data columns yet + let store = &self.harness.chain.store; let block = store.get_full_block(&block_root).unwrap().unwrap(); - let blobs = if block.fork_name_unchecked().deneb_enabled() { - store.get_blobs(&block_root).unwrap().blobs() + let fork = block.fork_name_unchecked(); + + let data_sidecars = if fork.fulu_enabled() { + store + .get_data_columns(&block_root) + .unwrap() + .map(|columns| { + columns + .into_iter() + .map(CustodyDataColumn::from_asserted_custody) + .collect() + }) + .map(DataSidecars::DataColumns) + } else if fork.deneb_enabled() { + store + .get_blobs(&block_root) + .unwrap() + .blobs() + .map(DataSidecars::Blobs) } else { None }; - (block, blobs) + + (block, data_sidecars) } async fn remember_block( &mut self, - (block, blob_sidecars): (SignedBeaconBlock, Option>), + (block, data_sidecars): (SignedBeaconBlock, Option>), ) { // This code is kind of duplicated from Harness::process_block, but takes sidecars directly. let block_root = block.canonical_root(); @@ -193,7 +317,7 @@ impl TestRig { .chain .process_block( block_root, - RpcBlock::new(Some(block_root), block.into(), blob_sidecars).unwrap(), + build_rpc_block(block.into(), &data_sidecars, &self.spec), NotifyExecutionLayer::Yes, BlockImportSource::RangeSync, || Ok(()), @@ -206,6 +330,22 @@ impl TestRig { } } +fn build_rpc_block( + block: Arc>, + data_sidecars: &Option>, + spec: &ChainSpec, +) -> RpcBlock { + match data_sidecars { + Some(DataSidecars::Blobs(blobs)) => { + RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + } + Some(DataSidecars::DataColumns(columns)) => { + RpcBlock::new_with_custody_columns(None, block, columns.clone(), spec).unwrap() + } + None => RpcBlock::new_without_blobs(None, block), + } +} + #[test] fn head_chain_removed_while_finalized_syncing() { // NOTE: this is a regression test. @@ -217,14 +357,14 @@ fn head_chain_removed_while_finalized_syncing() { rig.assert_state(RangeSyncType::Head); // Sync should have requested a batch, grab the request. - let _ = rig.find_blocks_by_range_request(&head_peer); + let _ = rig.find_blocks_by_range_request(filter().peer(head_peer)); // Now get a peer with an advanced finalized epoch. let finalized_peer = rig.add_finalized_peer(); rig.assert_state(RangeSyncType::Finalized); // Sync should have requested a batch, grab the request - let _ = rig.find_blocks_by_range_request(&finalized_peer); + let _ = rig.find_blocks_by_range_request(filter().peer(finalized_peer)); // Fail the head chain by disconnecting the peer. rig.peer_disconnected(head_peer); @@ -251,14 +391,14 @@ async fn state_update_while_purging() { rig.assert_state(RangeSyncType::Head); // Sync should have requested a batch, grab the request. - let _ = rig.find_blocks_by_range_request(&head_peer); + let _ = rig.find_blocks_by_range_request(filter().peer(head_peer)); // Now get a peer with an advanced finalized epoch. let finalized_peer = rig.add_finalized_peer_with_root(finalized_peer_root); rig.assert_state(RangeSyncType::Finalized); // Sync should have requested a batch, grab the request - let _ = rig.find_blocks_by_range_request(&finalized_peer); + let _ = rig.find_blocks_by_range_request(filter().peer(finalized_peer)); // Now the chain knows both chains target roots. rig.remember_block(head_peer_block).await; @@ -277,15 +417,18 @@ fn pause_and_resume_on_ee_offline() { // make the ee offline rig.update_execution_engine_state(EngineState::Offline); // send the response to the request - rig.find_and_complete_blocks_by_range_request(peer1); + rig.find_and_complete_blocks_by_range_request(filter().peer(peer1).epoch(0)); // the beacon processor shouldn't have received any work rig.expect_empty_processor(); // while the ee is offline, more peers might arrive. Add a new finalized peer. - let peer2 = rig.add_finalized_peer(); + let _peer2 = rig.add_finalized_peer(); // send the response to the request - rig.find_and_complete_blocks_by_range_request(peer2); + // Don't filter requests and the columns requests may be sent to peer1 or peer2 + // We need to filter by epoch, because the previous batch eagerly sent requests for the next + // epoch for the other batch. So we can either filter by epoch of by sync type. + rig.find_and_complete_blocks_by_range_request(filter().epoch(0)); // the beacon processor shouldn't have received any work rig.expect_empty_processor(); // make the beacon processor available again. @@ -293,6 +436,6 @@ fn pause_and_resume_on_ee_offline() { // now resume range, we should have two processing requests in the beacon processor. rig.update_execution_engine_state(EngineState::Online); - rig.expect_chain_segment(); - rig.expect_chain_segment(); + // The head chain and finalized chain (2) should be in the processing queue + rig.expect_chain_segments(2); } diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 45b19834925..02014a05a3f 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -119,6 +119,11 @@ impl BlockCache { pub fn get_blobs<'a>(&'a mut self, block_root: &Hash256) -> Option<&'a BlobSidecarList> { self.blob_cache.get(block_root) } + pub fn get_data_columns(&mut self, block_root: &Hash256) -> Option> { + self.data_column_cache + .get(block_root) + .map(|map| map.values().cloned().collect::>()) + } pub fn get_data_column<'a>( &'a mut self, block_root: &Hash256, @@ -322,16 +327,15 @@ impl HotColdDB, BeaconNodeBackend> { db.compare_and_set_blob_info_with_write(<_>::default(), new_blob_info.clone())?; let data_column_info = db.load_data_column_info()?; - let eip7594_fork_slot = db + let fulu_fork_slot = db .spec - .eip7594_fork_epoch + .fulu_fork_epoch .map(|epoch| epoch.start_slot(E::slots_per_epoch())); let new_data_column_info = match &data_column_info { Some(data_column_info) => { // Set the oldest data column slot to the fork slot if it is not yet set. - let oldest_data_column_slot = data_column_info - .oldest_data_column_slot - .or(eip7594_fork_slot); + let oldest_data_column_slot = + data_column_info.oldest_data_column_slot.or(fulu_fork_slot); DataColumnInfo { oldest_data_column_slot, } @@ -339,7 +343,7 @@ impl HotColdDB, BeaconNodeBackend> { // First start. None => DataColumnInfo { // Set the oldest data column slot to the fork slot if it is not yet set. - oldest_data_column_slot: eip7594_fork_slot, + oldest_data_column_slot: fulu_fork_slot, }, }; db.compare_and_set_data_column_info_with_write( @@ -2037,6 +2041,40 @@ impl, Cold: ItemStore> HotColdDB }) } + /// Fetch columns for a given block from the store. + pub fn get_data_columns( + &self, + block_root: &Hash256, + ) -> Result>, Error> { + if let Some(columns) = self.block_cache.lock().get_data_columns(block_root) { + metrics::inc_counter(&metrics::BEACON_DATA_COLUMNS_CACHE_HIT_COUNT); + return Ok(Some(columns)); + } + + let columns = self + .blobs_db + .iter_column_from::>(DBColumn::BeaconDataColumn, block_root.as_slice()) + .take_while(|res| { + res.as_ref() + .is_ok_and(|(key, _)| key.starts_with(block_root.as_slice())) + }) + .map(|result| { + let (_key, value) = result?; + let column = DataColumnSidecar::::from_ssz_bytes(&value).map(Arc::new)?; + self.block_cache + .lock() + .put_data_column(*block_root, column.clone()); + Ok(column) + }) + .collect::, Error>>()?; + + if columns.is_empty() { + Ok(None) + } else { + Ok(Some(columns)) + } + } + /// Fetch blobs for a given block from the store. pub fn get_blobs(&self, block_root: &Hash256) -> Result, Error> { // Check the cache. @@ -2079,13 +2117,8 @@ impl, Cold: ItemStore> HotColdDB self.blobs_db .iter_column_from::>(DBColumn::BeaconDataColumn, block_root.as_slice()) .take_while(|res| { - let Ok((key, _)) = res else { return false }; - - if !key.starts_with(block_root.as_slice()) { - return false; - } - - true + res.as_ref() + .is_ok_and(|(key, _)| key.starts_with(block_root.as_slice())) }) .map(|key| key.and_then(|(key, _)| parse_data_column_key(key).map(|key| key.1))) .collect() @@ -2282,7 +2315,7 @@ impl, Cold: ItemStore> HotColdDB /// Initialize the `DataColumnInfo` when starting from genesis or a checkpoint. pub fn init_data_column_info(&self, anchor_slot: Slot) -> Result { - let oldest_data_column_slot = self.spec.eip7594_fork_epoch.map(|fork_epoch| { + let oldest_data_column_slot = self.spec.fulu_fork_epoch.map(|fork_epoch| { std::cmp::max(anchor_slot, fork_epoch.start_slot(E::slots_per_epoch())) }); let data_column_info = DataColumnInfo { diff --git a/beacon_node/store/src/metadata.rs b/beacon_node/store/src/metadata.rs index 3f076a767ac..1d70e105b94 100644 --- a/beacon_node/store/src/metadata.rs +++ b/beacon_node/store/src/metadata.rs @@ -225,10 +225,10 @@ impl StoreItem for BlobInfo { pub struct DataColumnInfo { /// The slot after which data columns are or *will be* available (>=). /// - /// If this slot is in the future, then it is the first slot of the EIP-7594 fork, from which + /// If this slot is in the future, then it is the first slot of the Fulu fork, from which /// data columns will be available. /// - /// If the `oldest_data_column_slot` is `None` then this means that the EIP-7594 fork epoch is + /// If the `oldest_data_column_slot` is `None` then this means that the Fulu fork epoch is /// not yet known. pub oldest_data_column_slot: Option, } diff --git a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml index f92de4225dc..74fe7278674 100644 --- a/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml +++ b/common/eth2_network_config/built_in_network_configs/mainnet/config.yaml @@ -53,8 +53,6 @@ ELECTRA_FORK_EPOCH: 18446744073709551615 # Fulu FULU_FORK_VERSION: 0x06000000 FULU_FORK_EPOCH: 18446744073709551615 -# PeerDAS -EIP7594_FORK_EPOCH: 18446744073709551615 # Time parameters # --------------------------------------------------------------- diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 70b4b73d528..b224cde048e 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -54,7 +54,7 @@ impl ForkChoiceTest { /// Creates a new tester with a custom chain config. pub fn new_with_chain_config(chain_config: ChainConfig) -> Self { // Run fork choice tests against the latest fork. - let spec = ForkName::latest().make_genesis_spec(ChainSpec::default()); + let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::default()); let harness = BeaconChainHarness::builder(MainnetEthSpec) .spec(spec.into()) .chain_config(chain_config) diff --git a/consensus/types/presets/gnosis/deneb.yaml b/consensus/types/presets/gnosis/deneb.yaml index 9a46a6dafe5..d25c4d3d381 100644 --- a/consensus/types/presets/gnosis/deneb.yaml +++ b/consensus/types/presets/gnosis/deneb.yaml @@ -1,6 +1,4 @@ # Gnosis preset - Deneb -# NOTE: The below are PLACEHOLDER values from Mainnet. -# Gnosis preset for the Deneb fork TBD: https://github.com/gnosischain/configs/tree/main/presets/gnosis # Misc # --------------------------------------------------------------- diff --git a/consensus/types/presets/gnosis/eip7594.yaml b/consensus/types/presets/gnosis/eip7594.yaml deleted file mode 100644 index 813febf26d5..00000000000 --- a/consensus/types/presets/gnosis/eip7594.yaml +++ /dev/null @@ -1,10 +0,0 @@ -# Mainnet preset - EIP7594 - -# Misc -# --------------------------------------------------------------- -# `uint64(2**6)` (= 64) -FIELD_ELEMENTS_PER_CELL: 64 -# `uint64(2 * 4096)` (= 8192) -FIELD_ELEMENTS_PER_EXT_BLOB: 8192 -# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) -KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 diff --git a/consensus/types/presets/gnosis/fulu.yaml b/consensus/types/presets/gnosis/fulu.yaml index 35a7c98fbf3..e5f3ce02122 100644 --- a/consensus/types/presets/gnosis/fulu.yaml +++ b/consensus/types/presets/gnosis/fulu.yaml @@ -1,3 +1,10 @@ # Gnosis preset - Fulu -FULU_PLACEHOLDER: 0 +# Misc +# --------------------------------------------------------------- +# `uint64(2**6)` (= 64) +FIELD_ELEMENTS_PER_CELL: 64 +# `uint64(2 * 4096)` (= 8192) +FIELD_ELEMENTS_PER_EXT_BLOB: 8192 +# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) +KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 diff --git a/consensus/types/presets/mainnet/eip7594.yaml b/consensus/types/presets/mainnet/eip7594.yaml deleted file mode 100644 index 813febf26d5..00000000000 --- a/consensus/types/presets/mainnet/eip7594.yaml +++ /dev/null @@ -1,10 +0,0 @@ -# Mainnet preset - EIP7594 - -# Misc -# --------------------------------------------------------------- -# `uint64(2**6)` (= 64) -FIELD_ELEMENTS_PER_CELL: 64 -# `uint64(2 * 4096)` (= 8192) -FIELD_ELEMENTS_PER_EXT_BLOB: 8192 -# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) -KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 diff --git a/consensus/types/presets/mainnet/fulu.yaml b/consensus/types/presets/mainnet/fulu.yaml index 8aa9ccdcc37..394f335f903 100644 --- a/consensus/types/presets/mainnet/fulu.yaml +++ b/consensus/types/presets/mainnet/fulu.yaml @@ -1,3 +1,10 @@ # Mainnet preset - Fulu -FULU_PLACEHOLDER: 0 +# Misc +# --------------------------------------------------------------- +# `uint64(2**6)` (= 64) +FIELD_ELEMENTS_PER_CELL: 64 +# `uint64(2 * 4096)` (= 8192) +FIELD_ELEMENTS_PER_EXT_BLOB: 8192 +# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) +KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 diff --git a/consensus/types/presets/minimal/eip7594.yaml b/consensus/types/presets/minimal/eip7594.yaml deleted file mode 100644 index 847719a4210..00000000000 --- a/consensus/types/presets/minimal/eip7594.yaml +++ /dev/null @@ -1,10 +0,0 @@ -# Minimal preset - EIP7594 - -# Misc -# --------------------------------------------------------------- -# `uint64(2**6)` (= 64) -FIELD_ELEMENTS_PER_CELL: 64 -# `uint64(2 * 4096)` (= 8192) -FIELD_ELEMENTS_PER_EXT_BLOB: 8192 -# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) -KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 diff --git a/consensus/types/presets/minimal/fulu.yaml b/consensus/types/presets/minimal/fulu.yaml index 121c9858f41..c961eb7f3ca 100644 --- a/consensus/types/presets/minimal/fulu.yaml +++ b/consensus/types/presets/minimal/fulu.yaml @@ -1,3 +1,10 @@ # Minimal preset - Fulu -FULU_PLACEHOLDER: 0 +# Misc +# --------------------------------------------------------------- +# `uint64(2**6)` (= 64) +FIELD_ELEMENTS_PER_CELL: 64 +# `uint64(2 * 4096)` (= 8192) +FIELD_ELEMENTS_PER_EXT_BLOB: 8192 +# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) +KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index 91d64f5c8eb..230805e86c6 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -198,12 +198,6 @@ pub struct ChainSpec { pub fulu_fork_version: [u8; 4], /// The Fulu fork epoch is optional, with `None` representing "Fulu never happens". pub fulu_fork_epoch: Option, - pub fulu_placeholder: u64, - - /* - * DAS params - */ - pub eip7594_fork_epoch: Option, pub number_of_columns: u64, pub number_of_custody_groups: u64, pub data_column_sidecar_subnet_count: u64, @@ -440,16 +434,16 @@ impl ChainSpec { } } - /// Returns true if the given epoch is greater than or equal to the `EIP7594_FORK_EPOCH`. + /// Returns true if the given epoch is greater than or equal to the `FULU_FORK_EPOCH`. pub fn is_peer_das_enabled_for_epoch(&self, block_epoch: Epoch) -> bool { - self.eip7594_fork_epoch - .is_some_and(|eip7594_fork_epoch| block_epoch >= eip7594_fork_epoch) + self.fulu_fork_epoch + .is_some_and(|fulu_fork_epoch| block_epoch >= fulu_fork_epoch) } - /// Returns true if `EIP7594_FORK_EPOCH` is set and is not set to `FAR_FUTURE_EPOCH`. + /// Returns true if `FULU_FORK_EPOCH` is set and is not set to `FAR_FUTURE_EPOCH`. pub fn is_peer_das_scheduled(&self) -> bool { - self.eip7594_fork_epoch - .is_some_and(|eip7594_fork_epoch| eip7594_fork_epoch != self.far_future_epoch) + self.fulu_fork_epoch + .is_some_and(|fulu_fork_epoch| fulu_fork_epoch != self.far_future_epoch) } /// Returns a full `Fork` struct for a given epoch. @@ -916,17 +910,11 @@ impl ChainSpec { */ fulu_fork_version: [0x06, 0x00, 0x00, 0x00], fulu_fork_epoch: None, - fulu_placeholder: 0, - - /* - * DAS params - */ - eip7594_fork_epoch: None, - number_of_columns: 128, + custody_requirement: 4, number_of_custody_groups: 128, data_column_sidecar_subnet_count: 128, + number_of_columns: 128, samples_per_slot: 8, - custody_requirement: 4, /* * Network specific @@ -1045,8 +1033,6 @@ impl ChainSpec { // Fulu fulu_fork_version: [0x06, 0x00, 0x00, 0x01], fulu_fork_epoch: None, - // PeerDAS - eip7594_fork_epoch: None, // Other network_id: 2, // lighthouse testnet network id deposit_chain_id: 5, @@ -1254,17 +1240,11 @@ impl ChainSpec { */ fulu_fork_version: [0x06, 0x00, 0x00, 0x64], fulu_fork_epoch: None, - fulu_placeholder: 0, - - /* - * DAS params - */ - eip7594_fork_epoch: None, - number_of_columns: 128, + custody_requirement: 4, number_of_custody_groups: 128, data_column_sidecar_subnet_count: 128, + number_of_columns: 128, samples_per_slot: 8, - custody_requirement: 4, /* * Network specific @@ -1408,11 +1388,6 @@ pub struct Config { #[serde(deserialize_with = "deserialize_fork_epoch")] pub fulu_fork_epoch: Option>, - #[serde(default)] - #[serde(serialize_with = "serialize_fork_epoch")] - #[serde(deserialize_with = "deserialize_fork_epoch")] - pub eip7594_fork_epoch: Option>, - #[serde(with = "serde_utils::quoted_u64")] seconds_per_slot: u64, #[serde(with = "serde_utils::quoted_u64")] @@ -1855,10 +1830,6 @@ impl Config { .fulu_fork_epoch .map(|epoch| MaybeQuoted { value: epoch }), - eip7594_fork_epoch: spec - .eip7594_fork_epoch - .map(|epoch| MaybeQuoted { value: epoch }), - seconds_per_slot: spec.seconds_per_slot, seconds_per_eth1_block: spec.seconds_per_eth1_block, min_validator_withdrawability_delay: spec.min_validator_withdrawability_delay, @@ -1945,7 +1916,6 @@ impl Config { electra_fork_version, fulu_fork_epoch, fulu_fork_version, - eip7594_fork_epoch, seconds_per_slot, seconds_per_eth1_block, min_validator_withdrawability_delay, @@ -2015,7 +1985,6 @@ impl Config { electra_fork_version, fulu_fork_epoch: fulu_fork_epoch.map(|q| q.value), fulu_fork_version, - eip7594_fork_epoch: eip7594_fork_epoch.map(|q| q.value), seconds_per_slot, seconds_per_eth1_block, min_validator_withdrawability_delay, diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs index b2a050e9d5e..90a914dfae3 100644 --- a/consensus/types/src/data_column_sidecar.rs +++ b/consensus/types/src/data_column_sidecar.rs @@ -133,20 +133,6 @@ impl DataColumnSidecar { .len() } - pub fn empty() -> Self { - Self { - index: 0, - column: DataColumn::::default(), - kzg_commitments: VariableList::default(), - kzg_proofs: VariableList::default(), - signed_block_header: SignedBeaconBlockHeader { - message: BeaconBlockHeader::empty(), - signature: Signature::empty(), - }, - kzg_commitments_inclusion_proof: Default::default(), - } - } - pub fn id(&self) -> DataColumnIdentifier { DataColumnIdentifier { block_root: self.block_root(), diff --git a/consensus/types/src/fork_name.rs b/consensus/types/src/fork_name.rs index b61e0a4d4a5..40557e0cb97 100644 --- a/consensus/types/src/fork_name.rs +++ b/consensus/types/src/fork_name.rs @@ -49,6 +49,13 @@ impl ForkName { *ForkName::list_all().last().unwrap() } + /// Returns the fork primarily used for testing purposes. + /// This fork serves as the baseline for many tests, and the goal + /// is to ensure features are passing on this fork. + pub fn latest_stable() -> ForkName { + ForkName::Electra + } + /// Set the activation slots in the given `ChainSpec` so that the fork named by `self` /// is the only fork in effect from genesis. pub fn make_genesis_spec(&self, mut spec: ChainSpec) -> ChainSpec { diff --git a/consensus/types/src/preset.rs b/consensus/types/src/preset.rs index 9a9915e458a..707d2d4697e 100644 --- a/consensus/types/src/preset.rs +++ b/consensus/types/src/preset.rs @@ -276,21 +276,6 @@ impl ElectraPreset { #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[serde(rename_all = "UPPERCASE")] pub struct FuluPreset { - #[serde(with = "serde_utils::quoted_u64")] - pub fulu_placeholder: u64, -} - -impl FuluPreset { - pub fn from_chain_spec(spec: &ChainSpec) -> Self { - Self { - fulu_placeholder: spec.fulu_placeholder, - } - } -} - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] -#[serde(rename_all = "UPPERCASE")] -pub struct Eip7594Preset { #[serde(with = "serde_utils::quoted_u64")] pub field_elements_per_cell: u64, #[serde(with = "serde_utils::quoted_u64")] @@ -299,7 +284,7 @@ pub struct Eip7594Preset { pub kzg_commitments_inclusion_proof_depth: u64, } -impl Eip7594Preset { +impl FuluPreset { pub fn from_chain_spec(_spec: &ChainSpec) -> Self { Self { field_elements_per_cell: E::field_elements_per_cell() as u64, @@ -357,9 +342,6 @@ mod test { let fulu: FuluPreset = preset_from_file(&preset_name, "fulu.yaml"); assert_eq!(fulu, FuluPreset::from_chain_spec::(&spec)); - - let eip7594: Eip7594Preset = preset_from_file(&preset_name, "eip7594.yaml"); - assert_eq!(eip7594, Eip7594Preset::from_chain_spec::(&spec)); } #[test] diff --git a/crypto/kzg/src/lib.rs b/crypto/kzg/src/lib.rs index 348ed785af0..2a5c6e47f59 100644 --- a/crypto/kzg/src/lib.rs +++ b/crypto/kzg/src/lib.rs @@ -21,6 +21,9 @@ pub use rust_eth_kzg::{ Cell, CellIndex as CellID, CellRef, TrustedSetup as PeerDASTrustedSetup, }; +// Note: `spec.number_of_columns` is a config and should match `CELLS_PER_EXT_BLOB` - however this +// is a constant in the KZG library - be aware that overriding `number_of_columns` will break KZG +// operations. pub type CellsAndKzgProofs = ([Cell; CELLS_PER_EXT_BLOB], [KzgProof; CELLS_PER_EXT_BLOB]); pub type KzgBlobRef<'a> = &'a [u8; BYTES_PER_BLOB]; diff --git a/scripts/local_testnet/network_params_das.yaml b/scripts/local_testnet/network_params_das.yaml index ab2f07a24ec..030aa2b8200 100644 --- a/scripts/local_testnet/network_params_das.yaml +++ b/scripts/local_testnet/network_params_das.yaml @@ -3,6 +3,7 @@ participants: cl_image: lighthouse:local cl_extra_params: - --subscribe-all-data-column-subnets + - --subscribe-all-subnets - --target-peers=3 count: 2 - cl_type: lighthouse @@ -11,11 +12,14 @@ participants: - --target-peers=3 count: 2 network_params: - eip7594_fork_epoch: 0 + electra_fork_epoch: 1 + fulu_fork_epoch: 2 seconds_per_slot: 6 snooper_enabled: false global_log_level: debug additional_services: - dora - - goomy_blob + - spamoor_blob - prometheus_grafana +dora_params: + image: ethpandaops/dora:fulu-support \ No newline at end of file diff --git a/testing/ef_tests/check_all_files_accessed.py b/testing/ef_tests/check_all_files_accessed.py index bf9e5d6cfa4..02a01555b4e 100755 --- a/testing/ef_tests/check_all_files_accessed.py +++ b/testing/ef_tests/check_all_files_accessed.py @@ -49,11 +49,10 @@ "bls12-381-tests/hash_to_G2", "tests/.*/eip6110", "tests/.*/whisk", - "tests/.*/eip7594", - # Fulu tests are not yet being run - "tests/.*/fulu", # TODO(electra): SingleAttestation tests are waiting on Eitan's PR - "tests/.*/electra/ssz_static/SingleAttestation" + "tests/.*/electra/ssz_static/SingleAttestation", + "tests/.*/fulu/ssz_static/SingleAttestation", + "tests/.*/fulu/ssz_static/MatrixEntry", ] diff --git a/testing/ef_tests/src/cases.rs b/testing/ef_tests/src/cases.rs index 54a142a96b6..4a202ee3d2d 100644 --- a/testing/ef_tests/src/cases.rs +++ b/testing/ef_tests/src/cases.rs @@ -91,6 +91,9 @@ pub use transition::TransitionTest; /// to return `true` for the feature in order for the feature test vector to be tested. #[derive(Debug, PartialEq, Clone, Copy)] pub enum FeatureName { + // TODO(fulu): to be removed once we start using Fulu types for test vectors. + // Existing SSZ types for PeerDAS (Fulu) are the same as Electra, so the test vectors get + // loaded as Electra types (default serde behaviour for untagged enums). Fulu, } diff --git a/testing/ef_tests/src/cases/compute_columns_for_custody_groups.rs b/testing/ef_tests/src/cases/compute_columns_for_custody_groups.rs index 1d0bf951bc6..8a6330d3996 100644 --- a/testing/ef_tests/src/cases/compute_columns_for_custody_groups.rs +++ b/testing/ef_tests/src/cases/compute_columns_for_custody_groups.rs @@ -21,12 +21,8 @@ impl LoadCase for ComputeColumnsForCustodyGroups { } impl Case for ComputeColumnsForCustodyGroups { - fn is_enabled_for_fork(_fork_name: ForkName) -> bool { - false - } - - fn is_enabled_for_feature(feature_name: FeatureName) -> bool { - feature_name == FeatureName::Fulu + fn is_enabled_for_fork(fork_name: ForkName) -> bool { + fork_name.fulu_enabled() } fn result(&self, _case_index: usize, _fork_name: ForkName) -> Result<(), Error> { diff --git a/testing/ef_tests/src/cases/get_custody_groups.rs b/testing/ef_tests/src/cases/get_custody_groups.rs index f8c4370aeb7..1c1294305f3 100644 --- a/testing/ef_tests/src/cases/get_custody_groups.rs +++ b/testing/ef_tests/src/cases/get_custody_groups.rs @@ -24,12 +24,8 @@ impl LoadCase for GetCustodyGroups { } impl Case for GetCustodyGroups { - fn is_enabled_for_fork(_fork_name: ForkName) -> bool { - false - } - - fn is_enabled_for_feature(feature_name: FeatureName) -> bool { - feature_name == FeatureName::Fulu + fn is_enabled_for_fork(fork_name: ForkName) -> bool { + fork_name.fulu_enabled() } fn result(&self, _case_index: usize, _fork_name: ForkName) -> Result<(), Error> { diff --git a/testing/ef_tests/src/cases/kzg_compute_cells_and_kzg_proofs.rs b/testing/ef_tests/src/cases/kzg_compute_cells_and_kzg_proofs.rs index 8df43bb2671..6ab9a8db65a 100644 --- a/testing/ef_tests/src/cases/kzg_compute_cells_and_kzg_proofs.rs +++ b/testing/ef_tests/src/cases/kzg_compute_cells_and_kzg_proofs.rs @@ -26,12 +26,8 @@ impl LoadCase for KZGComputeCellsAndKZGProofs { } impl Case for KZGComputeCellsAndKZGProofs { - fn is_enabled_for_fork(_fork_name: ForkName) -> bool { - false - } - - fn is_enabled_for_feature(feature_name: FeatureName) -> bool { - feature_name == FeatureName::Fulu + fn is_enabled_for_fork(fork_name: ForkName) -> bool { + fork_name.fulu_enabled() } fn result(&self, _case_index: usize, _fork_name: ForkName) -> Result<(), Error> { diff --git a/testing/ef_tests/src/cases/kzg_recover_cells_and_kzg_proofs.rs b/testing/ef_tests/src/cases/kzg_recover_cells_and_kzg_proofs.rs index 26ab4e96b59..732cb54f318 100644 --- a/testing/ef_tests/src/cases/kzg_recover_cells_and_kzg_proofs.rs +++ b/testing/ef_tests/src/cases/kzg_recover_cells_and_kzg_proofs.rs @@ -27,12 +27,8 @@ impl LoadCase for KZGRecoverCellsAndKZGProofs { } impl Case for KZGRecoverCellsAndKZGProofs { - fn is_enabled_for_fork(_fork_name: ForkName) -> bool { - false - } - - fn is_enabled_for_feature(feature_name: FeatureName) -> bool { - feature_name == FeatureName::Fulu + fn is_enabled_for_fork(fork_name: ForkName) -> bool { + fork_name.fulu_enabled() } fn result(&self, _case_index: usize, _fork_name: ForkName) -> Result<(), Error> { diff --git a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs index fc625063b11..e3edc0df0a1 100644 --- a/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs +++ b/testing/ef_tests/src/cases/kzg_verify_cell_kzg_proof_batch.rs @@ -29,12 +29,8 @@ impl LoadCase for KZGVerifyCellKZGProofBatch { } impl Case for KZGVerifyCellKZGProofBatch { - fn is_enabled_for_fork(_fork_name: ForkName) -> bool { - false - } - - fn is_enabled_for_feature(feature_name: FeatureName) -> bool { - feature_name == FeatureName::Fulu + fn is_enabled_for_fork(fork_name: ForkName) -> bool { + fork_name.fulu_enabled() } fn result(&self, _case_index: usize, _fork_name: ForkName) -> Result<(), Error> { diff --git a/testing/ef_tests/src/handler.rs b/testing/ef_tests/src/handler.rs index 6c0165efab1..d1ddd6a48ff 100644 --- a/testing/ef_tests/src/handler.rs +++ b/testing/ef_tests/src/handler.rs @@ -355,11 +355,14 @@ where } fn is_enabled_for_feature(&self, feature_name: FeatureName) -> bool { - // This ensures we only run the tests **once** for the feature, using the types matching the - // correct fork, e.g. `Fulu` uses SSZ types from `Electra` fork as of spec test version - // `v1.5.0-beta.0`, therefore the `Fulu` tests should get included when testing Electra types. + // TODO(fulu): to be removed once Fulu types start differing from Electra. We currently run Fulu tests as a + // "feature" - this means we use Electra types for Fulu SSZ tests (except for PeerDAS types, e.g. `DataColumnSidecar`). // - // e.g. Fulu test vectors are executed in the first line below, but excluded in the 2nd + // This ensures we only run the tests **once** for `Fulu`, using the types matching the + // correct fork, e.g. `Fulu` uses SSZ types from `Electra` as of spec test version + // `v1.5.0-beta.0`, therefore the `Fulu` tests should get included when testing Deneb types. + // + // e.g. Fulu test vectors are executed in the 2nd line below, but excluded in the 1st // line when testing the type `AttestationElectra`: // // ``` @@ -890,6 +893,10 @@ impl Handler for GetCustodyGroupsHandler { fn handler_name(&self) -> String { "get_custody_groups".into() } + + fn is_enabled_for_feature(&self, feature_name: FeatureName) -> bool { + feature_name == FeatureName::Fulu + } } #[derive(Derivative)] @@ -910,6 +917,10 @@ impl Handler for ComputeColumnsForCustodyGroupHandler fn handler_name(&self) -> String { "compute_columns_for_custody_group".into() } + + fn is_enabled_for_feature(&self, feature_name: FeatureName) -> bool { + feature_name == FeatureName::Fulu + } } #[derive(Derivative)] @@ -930,6 +941,10 @@ impl Handler for KZGComputeCellsAndKZGProofHandler { fn handler_name(&self) -> String { "compute_cells_and_kzg_proofs".into() } + + fn is_enabled_for_feature(&self, feature_name: FeatureName) -> bool { + feature_name == FeatureName::Fulu + } } #[derive(Derivative)] @@ -950,6 +965,10 @@ impl Handler for KZGVerifyCellKZGProofBatchHandler { fn handler_name(&self) -> String { "verify_cell_kzg_proof_batch".into() } + + fn is_enabled_for_feature(&self, feature_name: FeatureName) -> bool { + feature_name == FeatureName::Fulu + } } #[derive(Derivative)] @@ -970,6 +989,10 @@ impl Handler for KZGRecoverCellsAndKZGProofHandler { fn handler_name(&self) -> String { "recover_cells_and_kzg_proofs".into() } + + fn is_enabled_for_feature(&self, feature_name: FeatureName) -> bool { + feature_name == FeatureName::Fulu + } } #[derive(Derivative)] diff --git a/testing/ef_tests/src/type_name.rs b/testing/ef_tests/src/type_name.rs index c50032a63de..285ac951a6e 100644 --- a/testing/ef_tests/src/type_name.rs +++ b/testing/ef_tests/src/type_name.rs @@ -54,6 +54,7 @@ type_name_generic!(BeaconBlockBodyBellatrix, "BeaconBlockBody"); type_name_generic!(BeaconBlockBodyCapella, "BeaconBlockBody"); type_name_generic!(BeaconBlockBodyDeneb, "BeaconBlockBody"); type_name_generic!(BeaconBlockBodyElectra, "BeaconBlockBody"); +type_name_generic!(BeaconBlockBodyFulu, "BeaconBlockBody"); type_name!(BeaconBlockHeader); type_name_generic!(BeaconState); type_name!(BlobIdentifier); @@ -74,12 +75,14 @@ type_name_generic!(ExecutionPayloadBellatrix, "ExecutionPayload"); type_name_generic!(ExecutionPayloadCapella, "ExecutionPayload"); type_name_generic!(ExecutionPayloadDeneb, "ExecutionPayload"); type_name_generic!(ExecutionPayloadElectra, "ExecutionPayload"); +type_name_generic!(ExecutionPayloadFulu, "ExecutionPayload"); type_name_generic!(FullPayload, "ExecutionPayload"); type_name_generic!(ExecutionPayloadHeader); type_name_generic!(ExecutionPayloadHeaderBellatrix, "ExecutionPayloadHeader"); type_name_generic!(ExecutionPayloadHeaderCapella, "ExecutionPayloadHeader"); type_name_generic!(ExecutionPayloadHeaderDeneb, "ExecutionPayloadHeader"); type_name_generic!(ExecutionPayloadHeaderElectra, "ExecutionPayloadHeader"); +type_name_generic!(ExecutionPayloadHeaderFulu, "ExecutionPayloadHeader"); type_name_generic!(ExecutionRequests); type_name_generic!(BlindedPayload, "ExecutionPayloadHeader"); type_name!(Fork); @@ -93,6 +96,7 @@ type_name_generic!(LightClientBootstrapAltair, "LightClientBootstrap"); type_name_generic!(LightClientBootstrapCapella, "LightClientBootstrap"); type_name_generic!(LightClientBootstrapDeneb, "LightClientBootstrap"); type_name_generic!(LightClientBootstrapElectra, "LightClientBootstrap"); +type_name_generic!(LightClientBootstrapFulu, "LightClientBootstrap"); type_name_generic!(LightClientFinalityUpdate); type_name_generic!(LightClientFinalityUpdateAltair, "LightClientFinalityUpdate"); type_name_generic!( @@ -104,11 +108,13 @@ type_name_generic!( LightClientFinalityUpdateElectra, "LightClientFinalityUpdate" ); +type_name_generic!(LightClientFinalityUpdateFulu, "LightClientFinalityUpdate"); type_name_generic!(LightClientHeader); type_name_generic!(LightClientHeaderAltair, "LightClientHeader"); type_name_generic!(LightClientHeaderCapella, "LightClientHeader"); type_name_generic!(LightClientHeaderDeneb, "LightClientHeader"); type_name_generic!(LightClientHeaderElectra, "LightClientHeader"); +type_name_generic!(LightClientHeaderFulu, "LightClientHeader"); type_name_generic!(LightClientOptimisticUpdate); type_name_generic!( LightClientOptimisticUpdateAltair, @@ -126,11 +132,16 @@ type_name_generic!( LightClientOptimisticUpdateElectra, "LightClientOptimisticUpdate" ); +type_name_generic!( + LightClientOptimisticUpdateFulu, + "LightClientOptimisticUpdate" +); type_name_generic!(LightClientUpdate); type_name_generic!(LightClientUpdateAltair, "LightClientUpdate"); type_name_generic!(LightClientUpdateCapella, "LightClientUpdate"); type_name_generic!(LightClientUpdateDeneb, "LightClientUpdate"); type_name_generic!(LightClientUpdateElectra, "LightClientUpdate"); +type_name_generic!(LightClientUpdateFulu, "LightClientUpdate"); type_name_generic!(PendingAttestation); type_name!(PendingConsolidation); type_name!(PendingPartialWithdrawal); diff --git a/testing/ef_tests/tests/tests.rs b/testing/ef_tests/tests/tests.rs index 61581128d4f..bba7efde495 100644 --- a/testing/ef_tests/tests/tests.rs +++ b/testing/ef_tests/tests/tests.rs @@ -276,9 +276,9 @@ mod ssz_static { fn attestation() { SszStaticHandler::, MinimalEthSpec>::pre_electra().run(); SszStaticHandler::, MainnetEthSpec>::pre_electra().run(); - SszStaticHandler::, MinimalEthSpec>::electra_only() + SszStaticHandler::, MinimalEthSpec>::electra_and_later() .run(); - SszStaticHandler::, MainnetEthSpec>::electra_only() + SszStaticHandler::, MainnetEthSpec>::electra_and_later() .run(); } @@ -288,9 +288,9 @@ mod ssz_static { .run(); SszStaticHandler::, MainnetEthSpec>::pre_electra() .run(); - SszStaticHandler::, MinimalEthSpec>::electra_only() + SszStaticHandler::, MinimalEthSpec>::electra_and_later() .run(); - SszStaticHandler::, MainnetEthSpec>::electra_only() + SszStaticHandler::, MainnetEthSpec>::electra_and_later() .run(); } @@ -300,9 +300,9 @@ mod ssz_static { .run(); SszStaticHandler::, MainnetEthSpec>::pre_electra() .run(); - SszStaticHandler::, MinimalEthSpec>::electra_only() + SszStaticHandler::, MinimalEthSpec>::electra_and_later() .run(); - SszStaticHandler::, MainnetEthSpec>::electra_only() + SszStaticHandler::, MainnetEthSpec>::electra_and_later() .run(); } @@ -314,10 +314,10 @@ mod ssz_static { SszStaticHandler::, MainnetEthSpec>::pre_electra( ) .run(); - SszStaticHandler::, MinimalEthSpec>::electra_only( + SszStaticHandler::, MinimalEthSpec>::electra_and_later( ) .run(); - SszStaticHandler::, MainnetEthSpec>::electra_only( + SszStaticHandler::, MainnetEthSpec>::electra_and_later( ) .run(); } @@ -328,10 +328,10 @@ mod ssz_static { .run(); SszStaticHandler::, MainnetEthSpec>::pre_electra() .run(); - SszStaticHandler::, MinimalEthSpec>::electra_only( + SszStaticHandler::, MinimalEthSpec>::electra_and_later( ) .run(); - SszStaticHandler::, MainnetEthSpec>::electra_only( + SszStaticHandler::, MainnetEthSpec>::electra_and_later( ) .run(); } @@ -361,6 +361,8 @@ mod ssz_static { .run(); SszStaticHandler::, MainnetEthSpec>::electra_only() .run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only().run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only().run(); } // Altair and later @@ -399,6 +401,10 @@ mod ssz_static { .run(); SszStaticHandler::, MainnetEthSpec>::electra_only() .run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only() + .run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only() + .run(); } // LightClientHeader has no internal indicator of which fork it is for, so we test it separately. @@ -430,6 +436,10 @@ mod ssz_static { SszStaticHandler::, MainnetEthSpec>::electra_only( ) .run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only() + .run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only() + .run(); } // LightClientOptimisticUpdate has no internal indicator of which fork it is for, so we test it separately. @@ -445,6 +455,8 @@ mod ssz_static { SszStaticHandler::, MainnetEthSpec>::deneb_only().run(); SszStaticHandler::, MinimalEthSpec>::electra_only().run(); SszStaticHandler::, MainnetEthSpec>::electra_only().run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only().run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only().run(); } // LightClientFinalityUpdate has no internal indicator of which fork it is for, so we test it separately. @@ -480,6 +492,12 @@ mod ssz_static { SszStaticHandler::, MainnetEthSpec>::electra_only( ) .run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only( + ) + .run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only( + ) + .run(); } // LightClientUpdate has no internal indicator of which fork it is for, so we test it separately. @@ -509,6 +527,10 @@ mod ssz_static { SszStaticHandler::, MainnetEthSpec>::electra_only( ) .run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only() + .run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only() + .run(); } #[test] @@ -566,6 +588,8 @@ mod ssz_static { .run(); SszStaticHandler::, MainnetEthSpec>::electra_only() .run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only().run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only().run(); } #[test] @@ -586,6 +610,10 @@ mod ssz_static { ::electra_only().run(); SszStaticHandler::, MainnetEthSpec> ::electra_only().run(); + SszStaticHandler::, MinimalEthSpec>::fulu_only() + .run(); + SszStaticHandler::, MainnetEthSpec>::fulu_only() + .run(); } #[test] @@ -626,17 +654,17 @@ mod ssz_static { #[test] fn data_column_sidecar() { - SszStaticHandler::, MinimalEthSpec>::deneb_only() + SszStaticHandler::, MinimalEthSpec>::default() .run_for_feature(FeatureName::Fulu); - SszStaticHandler::, MainnetEthSpec>::deneb_only() + SszStaticHandler::, MainnetEthSpec>::default() .run_for_feature(FeatureName::Fulu); } #[test] fn data_column_identifier() { - SszStaticHandler::::deneb_only() + SszStaticHandler::::default() .run_for_feature(FeatureName::Fulu); - SszStaticHandler::::deneb_only() + SszStaticHandler::::default() .run_for_feature(FeatureName::Fulu); } @@ -901,20 +929,17 @@ fn kzg_verify_kzg_proof() { #[test] fn kzg_compute_cells_and_proofs() { - KZGComputeCellsAndKZGProofHandler::::default() - .run_for_feature(FeatureName::Fulu); + KZGComputeCellsAndKZGProofHandler::::default().run(); } #[test] fn kzg_verify_cell_proof_batch() { - KZGVerifyCellKZGProofBatchHandler::::default() - .run_for_feature(FeatureName::Fulu); + KZGVerifyCellKZGProofBatchHandler::::default().run(); } #[test] fn kzg_recover_cells_and_proofs() { - KZGRecoverCellsAndKZGProofHandler::::default() - .run_for_feature(FeatureName::Fulu); + KZGRecoverCellsAndKZGProofHandler::::default().run(); } #[test] @@ -949,14 +974,12 @@ fn rewards() { #[test] fn get_custody_groups() { - GetCustodyGroupsHandler::::default().run_for_feature(FeatureName::Fulu); - GetCustodyGroupsHandler::::default().run_for_feature(FeatureName::Fulu); + GetCustodyGroupsHandler::::default().run(); + GetCustodyGroupsHandler::::default().run() } #[test] fn compute_columns_for_custody_group() { - ComputeColumnsForCustodyGroupHandler::::default() - .run_for_feature(FeatureName::Fulu); - ComputeColumnsForCustodyGroupHandler::::default() - .run_for_feature(FeatureName::Fulu); + ComputeColumnsForCustodyGroupHandler::::default().run(); + ComputeColumnsForCustodyGroupHandler::::default().run(); }