Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement PeerDAS Fulu fork activation #6795

Open
wants to merge 21 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2e11554
Implement PeerDAS Fulu fork activation.
jimmygchen Jan 13, 2025
cd77b2c
Update spec tests.
jimmygchen Jan 13, 2025
b029342
Fix compilation and update Kurtosis test config for PeerDAS.
jimmygchen Jan 13, 2025
64e44e1
Fix failing tests now `fulu` fork is included.
jimmygchen Jan 14, 2025
0c9d64b
Merge remote-tracking branch 'origin/unstable' into jimmy/lh-2271-act…
jimmygchen Jan 15, 2025
4e25302
Address review comments and fix lint.
jimmygchen Jan 15, 2025
8cdf82e
Use engine v4 methods for Fulu (v5 methods do not exist yet). Update …
jimmygchen Jan 15, 2025
4d407fe
Merge remote-tracking branch 'origin/unstable' into jimmy/lh-2271-act…
jimmygchen Jan 15, 2025
8980832
Update Fulu spec tests. Revert back to testing Fulu as "feature", bec…
jimmygchen Jan 17, 2025
6d5b5ed
Merge remote-tracking branch 'origin/unstable' into jimmy/lh-2271-act…
jimmygchen Jan 17, 2025
b7da075
More test fixes for Fulu.
jimmygchen Jan 20, 2025
b63a6c4
Merge remote-tracking branch 'origin/unstable' into jimmy/lh-2271-act…
jimmygchen Jan 20, 2025
eff9a5b
More test fixes for Fulu.
jimmygchen Jan 20, 2025
614f984
Fix range sync to select custody peers from its syncing chain instead…
jimmygchen Jan 21, 2025
0e8f671
Merge branch 'unstable' into jimmy/lh-2271-activate-peerdas-at-fulu-f…
jimmygchen Jan 21, 2025
e813532
Skip blob pruning tests for Fulu.
jimmygchen Jan 21, 2025
b3da74b
Merge remote-tracking branch 'origin/unstable' into jimmy/lh-2271-act…
jimmygchen Jan 23, 2025
492c1c6
Use pre-computed data columns for testing and fix tests.
jimmygchen Jan 24, 2025
e21b31e
More beacon chain test fixes.
jimmygchen Jan 24, 2025
d8cba4b
Revert change: select peers from chain for custody by range requests
dapplion Jan 28, 2025
38c7f05
Improve request identification in range sync test
dapplion Jan 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,17 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.store.get_blobs(block_root).map_err(Error::from)
}

/// Returns the data columns at the given root, if any.
///
/// ## Errors
/// May return a database error.
pub fn get_data_columns(
&self,
block_root: &Hash256,
) -> Result<Option<DataColumnSidecarList<T::EthSpec>>, Error> {
self.store.get_data_columns(block_root).map_err(Error::from)
}

/// Returns the data columns at the given root, if any.
///
/// ## Errors
Expand Down Expand Up @@ -5850,6 +5861,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let kzg = self.kzg.as_ref();

// TODO(fulu): we no longer need blob proofs from PeerDAS and could avoid computing.
kzg_utils::validate_blobs::<T::EthSpec>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we validating blobs in the first place if we have compute the proofs? And what do you mean we don't need the KZG proofs anymore?

Copy link
Member Author

@jimmygchen jimmygchen Jan 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question - the blob KZG proofs come from the EL (from the tx sender and EL validates them, not sure if this has changed), so we should verify them, but I could be wrong. We also verify them again before we publish them.

Blob KZG proofs are no longer used from PeerDAS - we don't send BlobSidecars to peers anymore, and we only use the cell KZG proofs, so validating these blob proofs may not be useful.

kzg,
expected_kzg_commitments,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,7 @@ mod test {

#[tokio::test]
async fn empty_data_column_sidecars_fails_validation() {
let spec = ForkName::latest().make_genesis_spec(E::default_spec());
let spec = ForkName::Fulu.make_genesis_spec(E::default_spec());
let harness = BeaconChainHarness::builder(E::default())
.spec(spec.into())
.deterministic_keypairs(64)
Expand Down
12 changes: 9 additions & 3 deletions beacon_node/beacon_chain/src/fetch_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes
use execution_layer::json_structures::BlobAndProofV1;
use execution_layer::Error as ExecutionLayerError;
use metrics::{inc_counter, inc_counter_by, TryExt};
use slog::{debug, error, o, Logger};
use slog::{debug, error, o, warn, Logger};
use ssz_types::FixedVector;
use state_processing::per_block_processing::deneb::kzg_commitment_to_versioned_hash;
use std::sync::Arc;
Expand Down Expand Up @@ -248,8 +248,14 @@ fn spawn_compute_and_publish_data_columns_task<T: BeaconChainTypes>(
}
};

if let Err(e) = data_columns_sender.send(all_data_columns.clone()) {
error!(log, "Failed to send computed data columns"; "error" => ?e);
if data_columns_sender.send(all_data_columns.clone()).is_err() {
// Data column receiver have been dropped - this may not be an issue if the block is
// already fully imported. This should not happen after the race condition
// described in #6816 is fixed.
warn!(
log,
"Failed to send computed data columns";
);
};

// Check indices from cache before sending the columns, to make sure we don't
Expand Down
11 changes: 6 additions & 5 deletions beacon_node/beacon_chain/src/fulu_readiness.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Provides tools for checking if a node is ready for the Fulu upgrade.

use crate::{BeaconChain, BeaconChainTypes};
use execution_layer::http::{ENGINE_GET_PAYLOAD_V5, ENGINE_NEW_PAYLOAD_V5};
use execution_layer::http::{ENGINE_GET_PAYLOAD_V4, ENGINE_NEW_PAYLOAD_V4};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::time::Duration;
Expand Down Expand Up @@ -87,14 +87,15 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(capabilities) => {
let mut missing_methods = String::from("Required Methods Unsupported:");
let mut all_good = true;
if !capabilities.get_payload_v5 {
// TODO(fulu) switch to v5 when the EL is ready
if !capabilities.get_payload_v4 {
missing_methods.push(' ');
missing_methods.push_str(ENGINE_GET_PAYLOAD_V5);
missing_methods.push_str(ENGINE_GET_PAYLOAD_V4);
all_good = false;
}
if !capabilities.new_payload_v5 {
if !capabilities.new_payload_v4 {
missing_methods.push(' ');
missing_methods.push_str(ENGINE_NEW_PAYLOAD_V5);
missing_methods.push_str(ENGINE_NEW_PAYLOAD_V4);
all_good = false;
}

Expand Down
158 changes: 141 additions & 17 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::blob_verification::GossipVerifiedBlob;
use crate::block_verification_types::{AsBlock, RpcBlock};
use crate::data_column_verification::CustodyDataColumn;
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_operations::ObservationOutcome;
pub use crate::persisted_beacon_chain::PersistedBeaconChain;
use crate::BeaconBlockResponseWrapper;
pub use crate::{
beacon_chain::{BEACON_CHAIN_DB_KEY, ETH1_CACHE_DB_KEY, FORK_CHOICE_DB_KEY, OP_POOL_DB_KEY},
migrate::MigratorConfig,
Expand All @@ -16,6 +17,7 @@ use crate::{
BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler,
StateSkipConfig,
};
use crate::{get_block_root, BeaconBlockResponseWrapper};
use bls::get_withdrawal_credentials;
use eth2::types::SignedBlockContentsTuple;
use execution_layer::test_utils::generate_genesis_header;
Expand Down Expand Up @@ -104,7 +106,7 @@ static KZG_NO_PRECOMP: LazyLock<Arc<Kzg>> = LazyLock::new(|| {
});

pub fn get_kzg(spec: &ChainSpec) -> Arc<Kzg> {
if spec.eip7594_fork_epoch.is_some() {
if spec.fulu_fork_epoch.is_some() {
KZG_PEERDAS.clone()
} else if spec.deneb_fork_epoch.is_some() {
KZG.clone()
Expand Down Expand Up @@ -755,15 +757,13 @@ where
pub fn get_head_block(&self) -> RpcBlock<E> {
let block = self.chain.head_beacon_block();
let block_root = block.canonical_root();
let blobs = self.chain.get_blobs(&block_root).unwrap().blobs();
RpcBlock::new(Some(block_root), block, blobs).unwrap()
self.build_rpc_block_from_store_blobs(Some(block_root), block)
}

pub fn get_full_block(&self, block_root: &Hash256) -> RpcBlock<E> {
let block = self.chain.get_blinded_block(block_root).unwrap().unwrap();
let full_block = self.chain.store.make_full_block(block_root, block).unwrap();
let blobs = self.chain.get_blobs(block_root).unwrap().blobs();
RpcBlock::new(Some(*block_root), Arc::new(full_block), blobs).unwrap()
self.build_rpc_block_from_store_blobs(Some(*block_root), Arc::new(full_block))
}

pub fn get_all_validators(&self) -> Vec<usize> {
Expand Down Expand Up @@ -2264,22 +2264,19 @@ where
self.set_current_slot(slot);
let (block, blob_items) = block_contents;

let sidecars = blob_items
.map(|(proofs, blobs)| BlobSidecar::build_sidecars(blobs, &block, proofs, &self.spec))
.transpose()
.unwrap();
let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?;
let block_hash: SignedBeaconBlockHash = self
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
rpc_block,
NotifyExecutionLayer::Yes,
BlockImportSource::RangeSync,
|| Ok(()),
)
.await?
.try_into()
.unwrap();
.expect("block blobs are available");
self.chain.recompute_head_at_current_slot().await;
Ok(block_hash)
}
Expand All @@ -2290,16 +2287,13 @@ where
) -> Result<SignedBeaconBlockHash, BlockError> {
let (block, blob_items) = block_contents;

let sidecars = blob_items
.map(|(proofs, blobs)| BlobSidecar::build_sidecars(blobs, &block, proofs, &self.spec))
.transpose()
.unwrap();
let block_root = block.canonical_root();
let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?;
let block_hash: SignedBeaconBlockHash = self
.chain
.process_block(
block_root,
RpcBlock::new(Some(block_root), block, sidecars).unwrap(),
rpc_block,
NotifyExecutionLayer::Yes,
BlockImportSource::RangeSync,
|| Ok(()),
Expand All @@ -2311,6 +2305,82 @@ where
Ok(block_hash)
}

/// Builds an `Rpc` block from a `SignedBeaconBlock` and blobs or data columns retrieved from
/// the database.
pub fn build_rpc_block_from_store_blobs(
&self,
block_root: Option<Hash256>,
block: Arc<SignedBeaconBlock<E>>,
) -> RpcBlock<E> {
let block_root = block_root.unwrap_or_else(|| get_block_root(&block));
let has_blobs = block
.message()
.body()
.blob_kzg_commitments()
.is_ok_and(|c| !c.is_empty());
if !has_blobs {
return RpcBlock::new_without_blobs(Some(block_root), block);
}

// Blobs are stored as data columns from Fulu (PeerDAS)
if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap();
let custody_columns = columns
.into_iter()
.map(CustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>();
RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns, &self.spec)
.unwrap()
} else {
let blobs = self.chain.get_blobs(&block_root).unwrap().blobs();
RpcBlock::new(Some(block_root), block, blobs).unwrap()
}
}

/// Builds an `RpcBlock` from a `SignedBeaconBlock` and `BlobsList`.
fn build_rpc_block_from_blobs(
&self,
block_root: Hash256,
block: Arc<SignedBeaconBlock<E, FullPayload<E>>>,
blob_items: Option<(KzgProofs<E>, BlobsList<E>)>,
) -> Result<RpcBlock<E>, BlockError> {
Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) {
let sampling_column_count = self
.chain
.data_availability_checker
.get_sampling_column_count();

let columns = blob_items
.map(|(_proofs, blobs)| {
blobs_to_data_column_sidecars(
&blobs.iter().collect::<Vec<_>>(),
&block,
&self.chain.kzg,
&self.spec,
)
.map(|column_sidecars| {
column_sidecars
.into_iter()
.take(sampling_column_count)
.map(CustodyDataColumn::from_asserted_custody)
.collect::<Vec<_>>()
})
})
.transpose()
.expect("should convert blobs to columns")
.unwrap_or_default();
RpcBlock::new_with_custody_columns(Some(block_root), block, columns, &self.spec)?
} else {
let blobs = blob_items
.map(|(proofs, blobs)| {
BlobSidecar::build_sidecars(blobs, &block, proofs, &self.spec)
})
.transpose()
.unwrap();
RpcBlock::new(Some(block_root), block, blobs)?
})
}

pub fn process_attestations(&self, attestations: HarnessAttestations<E>) {
let num_validators = self.validator_keypairs.len();
let mut unaggregated = Vec::with_capacity(num_validators);
Expand Down Expand Up @@ -2984,6 +3054,60 @@ where

Ok(())
}

/// Simulate some of the blobs / data columns being seen on gossip.
/// Converts the blobs to data columns if the slot is Fulu or later.
pub async fn process_gossip_blobs_or_columns<'a>(
&self,
block: &SignedBeaconBlock<E>,
blobs: impl Iterator<Item = &'a Blob<E>>,
proofs: impl Iterator<Item = &'a KzgProof>,
custody_columns_opt: Option<HashSet<ColumnIndex>>,
) {
let is_peerdas_enabled = self.chain.spec.is_peer_das_enabled_for_epoch(block.epoch());
if is_peerdas_enabled {
let sidecars = blobs_to_data_column_sidecars(
&blobs.collect::<Vec<_>>(),
block,
&self.chain.kzg,
&self.spec,
)
.unwrap();

let custody_columns = custody_columns_opt.unwrap_or_else(|| {
let spec = &self.chain.spec;
let sampling_size = spec.sampling_size(spec.custody_requirement).unwrap();
(0..sampling_size).collect()
});

let verified_columns = sidecars
.into_iter()
.filter(|c| custody_columns.contains(&c.index))
.map(|sidecar| {
let column_index = sidecar.index;
self.chain
.verify_data_column_sidecar_for_gossip(sidecar, column_index)
})
.collect::<Result<Vec<_>, _>>()
.unwrap();

self.chain
.process_gossip_data_columns(verified_columns, || Ok(()))
.await
.unwrap();
} else {
for (i, (kzg_proof, blob)) in proofs.into_iter().zip(blobs).enumerate() {
let sidecar =
Arc::new(BlobSidecar::new(i, blob.clone(), block, *kzg_proof).unwrap());
let gossip_blob = GossipVerifiedBlob::new(sidecar, i as u64, &self.chain)
.expect("should obtain gossip verified blob");
self.chain
.process_gossip_blob(gossip_blob)
.await
.expect("should import valid gossip verified blob");
}
}
}
}

// Junk `Debug` impl to satistfy certain trait bounds during testing.
Expand Down
15 changes: 3 additions & 12 deletions beacon_node/beacon_chain/tests/attestation_production.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![cfg(not(debug_assertions))]

use beacon_chain::attestation_simulator::produce_unaggregated_attestation;
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy};
use beacon_chain::validator_monitor::UNAGGREGATED_ATTESTATION_LAG_SLOTS;
use beacon_chain::{metrics, StateSkipConfig, WhenSlotSkipped};
Expand Down Expand Up @@ -155,7 +154,6 @@ async fn produces_attestations() {
.store
.make_full_block(&block_root, blinded_block)
.unwrap();
let blobs = chain.get_blobs(&block_root).unwrap().blobs();

let epoch_boundary_slot = state
.current_epoch()
Expand Down Expand Up @@ -223,8 +221,7 @@ async fn produces_attestations() {
assert_eq!(data.target.root, target_root, "bad target root");

let rpc_block =
RpcBlock::<MainnetEthSpec>::new(None, Arc::new(block.clone()), blobs.clone())
.unwrap();
harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(block.clone()));
let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(
available_block,
) = chain
Expand Down Expand Up @@ -296,14 +293,8 @@ async fn early_attester_cache_old_request() {
.get_block(&head.beacon_block_root)
.unwrap();

let head_blobs = harness
.chain
.get_blobs(&head.beacon_block_root)
.expect("should get blobs")
.blobs();

let rpc_block =
RpcBlock::<MainnetEthSpec>::new(None, head.beacon_block.clone(), head_blobs).unwrap();
let rpc_block = harness
.build_rpc_block_from_store_blobs(Some(head.beacon_block_root), head.beacon_block.clone());
let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(available_block) =
harness
.chain
Expand Down
Loading
Loading