Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Subnet Sampling for PeerDAS #6410

Merged
merged 7 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3836,6 +3836,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
}

if let Some(data_columns) = data_columns {
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec.
dapplion marked this conversation as resolved.
Show resolved Hide resolved
if !data_columns.is_empty() {
debug!(
self.log, "Writing data_columns to store";
Expand Down
23 changes: 12 additions & 11 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,19 +103,22 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
spec: ChainSpec,
) -> Result<Self, AvailabilityCheckError> {
let spec = Arc::new(spec);

let custody_subnet_count = if import_all_data_columns {
spec.data_column_sidecar_subnet_count as usize
} else {
spec.custody_requirement as usize
};

let custody_column_count =
custody_subnet_count.saturating_mul(spec.data_columns_per_subnet());
let subnet_sampling_size =
std::cmp::max(custody_subnet_count, spec.samples_per_slot as usize);
let sampling_column_count =
subnet_sampling_size.saturating_mul(spec.data_columns_per_subnet());

let inner = DataAvailabilityCheckerInner::new(
OVERFLOW_LRU_CAPACITY,
store,
custody_column_count,
sampling_column_count,
spec.clone(),
)?;
Ok(Self {
Expand All @@ -126,10 +129,8 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

pub fn get_custody_columns_count(&self) -> usize {
self.availability_cache
.custody_subnet_count()
.saturating_mul(self.spec.data_columns_per_subnet())
pub fn get_sampling_column_count(&self) -> usize {
self.availability_cache.sampling_column_count()
}

/// Checks if the block root is currenlty in the availability cache awaiting import because
Expand All @@ -142,9 +143,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.get_execution_valid_block(block_root)
}

/// Return the set of imported blob indexes for `block_root`. Returns None if there is no block
/// Return the set of cached blob indexes for `block_root`. Returns None if there is no block
/// component for `block_root`.
pub fn imported_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
pub fn cached_blob_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| {
Expand All @@ -157,9 +158,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
})
}

/// Return the set of imported custody column indexes for `block_root`. Returns None if there is
/// Return the set of cached custody column indexes for `block_root`. Returns None if there is
/// no block component for `block_root`.
pub fn imported_custody_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
pub fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.availability_cache
.peek_pending_components(block_root, |components| {
components.map(|components| components.get_cached_data_columns_indices())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct PendingComponents<E: EthSpec> {

pub enum BlockImportRequirement {
AllBlobs,
CustodyColumns(usize),
ColumnSampling(usize),
}

impl<E: EthSpec> PendingComponents<E> {
Expand Down Expand Up @@ -210,7 +210,7 @@ impl<E: EthSpec> PendingComponents<E> {
.map_or(false, |num_expected_blobs| {
num_expected_blobs == self.num_received_blobs()
}),
BlockImportRequirement::CustodyColumns(num_expected_columns) => {
BlockImportRequirement::ColumnSampling(num_expected_columns) => {
let num_received_data_columns = self.num_received_data_columns();
// No data columns when there are 0 blobs
self.num_expected_blobs()
Expand Down Expand Up @@ -281,7 +281,7 @@ impl<E: EthSpec> PendingComponents<E> {
};
(Some(VariableList::new(verified_blobs)?), None)
}
BlockImportRequirement::CustodyColumns(_) => {
BlockImportRequirement::ColumnSampling(_) => {
let verified_data_columns = verified_data_columns
.into_iter()
.map(|d| d.into_inner())
Expand Down Expand Up @@ -353,28 +353,28 @@ pub struct DataAvailabilityCheckerInner<T: BeaconChainTypes> {
/// This cache holds a limited number of states in memory and reconstructs them
/// from disk when necessary. This is necessary until we merge tree-states
state_cache: StateLRUCache<T>,
/// The number of data columns the node is custodying.
custody_column_count: usize,
/// The number of data columns the node is sampling via subnet sampling.
sampling_column_count: usize,
spec: Arc<ChainSpec>,
}

impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
pub fn new(
capacity: NonZeroUsize,
beacon_store: BeaconStore<T>,
custody_column_count: usize,
sampling_column_count: usize,
spec: Arc<ChainSpec>,
) -> Result<Self, AvailabilityCheckError> {
Ok(Self {
critical: RwLock::new(LruCache::new(capacity)),
state_cache: StateLRUCache::new(beacon_store, spec.clone()),
custody_column_count,
sampling_column_count,
spec,
})
}

pub fn custody_subnet_count(&self) -> usize {
self.custody_column_count
pub fn sampling_column_count(&self) -> usize {
self.sampling_column_count
}

/// Returns true if the block root is known, without altering the LRU ordering
Expand Down Expand Up @@ -440,8 +440,8 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
) -> Result<BlockImportRequirement, AvailabilityCheckError> {
let peer_das_enabled = self.spec.is_peer_das_enabled_for_epoch(epoch);
if peer_das_enabled {
Ok(BlockImportRequirement::CustodyColumns(
self.custody_column_count,
Ok(BlockImportRequirement::ColumnSampling(
self.sampling_column_count,
))
} else {
Ok(BlockImportRequirement::AllBlobs)
Expand All @@ -456,7 +456,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
block_import_requirement: &BlockImportRequirement,
pending_components: &PendingComponents<T::EthSpec>,
) -> bool {
let BlockImportRequirement::CustodyColumns(num_expected_columns) = block_import_requirement
let BlockImportRequirement::ColumnSampling(num_expected_columns) = block_import_requirement
else {
return false;
};
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/historical_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Blobs are stored per block, and data columns are each stored individually
let n_blob_ops_per_block = if self.spec.is_peer_das_scheduled() {
self.data_availability_checker.get_custody_columns_count()
// TODO(das): `available_block includes all sampled columns, but we only need to store
// custody columns. To be clarified in spec PR.
self.data_availability_checker.get_sampling_column_count()
} else {
1
};
Expand Down
9 changes: 4 additions & 5 deletions beacon_node/http_api/src/publish_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,13 @@ pub async fn publish_block<T: BeaconChainTypes, B: IntoGossipVerifiedBlockConten
}

if let Some(gossip_verified_data_columns) = gossip_verified_data_columns {
let custody_columns_indices = &network_globals.custody_columns;

let custody_columns = gossip_verified_data_columns
let sampling_columns_indices = &network_globals.sampling_columns;
let sampling_columns = gossip_verified_data_columns
.into_iter()
.filter(|data_column| custody_columns_indices.contains(&data_column.index()))
.filter(|data_column| sampling_columns_indices.contains(&data_column.index()))
.collect();

if let Err(e) = Box::pin(chain.process_gossip_data_columns(custody_columns)).await {
if let Err(e) = Box::pin(chain.process_gossip_data_columns(sampling_columns)).await {
let msg = format!("Invalid data column: {e}");
return if let BroadcastValidation::Gossip = validation_level {
Err(warp_utils::reject::broadcast_without_import(msg))
Expand Down
48 changes: 31 additions & 17 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pub struct NetworkGlobals<E: EthSpec> {
pub sync_state: RwLock<SyncState>,
/// The current state of the backfill sync.
pub backfill_state: RwLock<BackFillState>,
/// The computed custody subnets and columns is stored to avoid re-computing.
pub custody_subnets: Vec<DataColumnSubnetId>,
pub custody_columns: Vec<ColumnIndex>,
/// The computed sampling subnets and columns is stored to avoid re-computing.
pub sampling_subnets: Vec<DataColumnSubnetId>,
pub sampling_columns: Vec<ColumnIndex>,
pub spec: ChainSpec,
}

Expand All @@ -42,24 +42,31 @@ impl<E: EthSpec> NetworkGlobals<E> {
log: &slog::Logger,
spec: ChainSpec,
) -> Self {
let (custody_subnets, custody_columns) = if spec.is_peer_das_scheduled() {
let (sampling_subnets, sampling_columns) = if spec.is_peer_das_scheduled() {
let node_id = enr.node_id().raw();

let custody_subnet_count = local_metadata
.custody_subnet_count()
.copied()
.expect("custody subnet count must be set if PeerDAS is scheduled");
let custody_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
enr.node_id().raw(),
custody_subnet_count,

let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);

let sampling_subnets = DataColumnSubnetId::compute_custody_subnets::<E>(
node_id,
subnet_sampling_size,
&spec,
)
.expect("custody subnet count must be valid")
.expect("sampling subnet count must be valid")
.collect::<Vec<_>>();
let custody_columns = custody_subnets

let sampling_columns = sampling_subnets
.iter()
.flat_map(|subnet| subnet.columns::<E>(&spec))
.sorted()
.collect();
(custody_subnets, custody_columns)

(sampling_subnets, sampling_columns)
} else {
(vec![], vec![])
};
Expand All @@ -73,8 +80,8 @@ impl<E: EthSpec> NetworkGlobals<E> {
gossipsub_subscriptions: RwLock::new(HashSet::new()),
sync_state: RwLock::new(SyncState::Stalled),
backfill_state: RwLock::new(BackFillState::NotRequired),
custody_subnets,
custody_columns,
sampling_subnets,
sampling_columns,
spec,
}
}
Expand Down Expand Up @@ -191,32 +198,39 @@ mod test {
use types::{Epoch, EthSpec, MainnetEthSpec as E};

#[test]
fn test_custody_subnets() {
fn test_sampling_subnets() {
let log = logging::test_logger();
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(0));

let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
let metadata = get_metadata(custody_subnet_count);

let globals =
NetworkGlobals::<E>::new_test_globals_with_metadata(vec![], metadata, &log, spec);
assert_eq!(globals.custody_subnets.len(), custody_subnet_count as usize);
assert_eq!(
globals.sampling_subnets.len(),
subnet_sampling_size as usize
);
}

#[test]
fn test_custody_columns() {
fn test_sampling_columns() {
let log = logging::test_logger();
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(0));

let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let custody_columns_count = spec.number_of_columns / 2;
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
let metadata = get_metadata(custody_subnet_count);

let globals =
NetworkGlobals::<E>::new_test_globals_with_metadata(vec![], metadata, &log, spec);
assert_eq!(globals.custody_columns.len(), custody_columns_count);
assert_eq!(
globals.sampling_columns.len(),
subnet_sampling_size as usize
);
}

fn get_metadata(custody_subnet_count: u64) -> MetaData<E> {
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}
} else {
for column_subnet in &self.network_globals.custody_subnets {
for column_subnet in &self.network_globals.sampling_subnets {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic =
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/block_lookups/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::network_beacon_processor::NetworkBeaconProcessor;
use crate::sync::manager::{BlockProcessType, SyncManager};
use crate::sync::sampling::SamplingConfig;
use crate::sync::peer_sampling::SamplingConfig;
use crate::sync::{SamplingId, SyncMessage};
use crate::NetworkMessage;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@
use super::backfill_sync::{BackFillSync, ProcessResult, SyncStart};
use super::block_lookups::BlockLookups;
use super::network_context::{BlockOrBlob, RangeRequestId, RpcEvent, SyncNetworkContext};
use super::peer_sampling::{Sampling, SamplingConfig, SamplingResult};
use super::peer_sync_info::{remote_sync_type, PeerSyncType};
use super::range_sync::{RangeSync, RangeSyncType, EPOCHS_PER_BATCH};
use super::sampling::{Sampling, SamplingConfig, SamplingResult};
use crate::network_beacon_processor::{ChainSegmentProcessId, NetworkBeaconProcessor};
use crate::service::NetworkMessage;
use crate::status::ToStatusMessage;
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ mod block_lookups;
mod block_sidecar_coupling;
pub mod manager;
mod network_context;
mod peer_sampling;
mod peer_sync_info;
mod range_sync;
mod sampling;

pub use lighthouse_network::service::api_types::SamplingId;
pub use manager::{BatchProcessResult, SyncMessage};
Expand Down
18 changes: 9 additions & 9 deletions beacon_node/network/src/sync/network_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,13 +387,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
false
};

let (expects_custody_columns, num_of_custody_column_req) =
let (expects_columns, num_of_column_req) =
if matches!(batch_type, ByRangeRequestType::BlocksAndColumns) {
let custody_indexes = self.network_globals().custody_columns.clone();
let column_indexes = self.network_globals().sampling_columns.clone();
let mut num_of_custody_column_req = 0;

for (peer_id, columns_by_range_request) in
self.make_columns_by_range_requests(request, &custody_indexes)?
self.make_columns_by_range_requests(request, &column_indexes)?
{
requested_peers.push(peer_id);

Expand All @@ -417,15 +417,15 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
num_of_custody_column_req += 1;
}

(Some(custody_indexes), Some(num_of_custody_column_req))
(Some(column_indexes), Some(num_of_custody_column_req))
} else {
(None, None)
};

let info = RangeBlockComponentsRequest::new(
expected_blobs,
expects_custody_columns,
num_of_custody_column_req,
expects_columns,
num_of_column_req,
requested_peers,
);
self.range_block_components_requests
Expand Down Expand Up @@ -637,7 +637,7 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let imported_blob_indexes = self
.chain
.data_availability_checker
.imported_blob_indexes(&block_root)
.cached_blob_indexes(&block_root)
.unwrap_or_default();
// Include only the blob indexes not yet imported (received through gossip)
let indices = (0..expected_blobs as u64)
Expand Down Expand Up @@ -755,13 +755,13 @@ impl<T: BeaconChainTypes> SyncNetworkContext<T> {
let custody_indexes_imported = self
.chain
.data_availability_checker
.imported_custody_column_indexes(&block_root)
.cached_data_column_indexes(&block_root)
.unwrap_or_default();

// Include only the blob indexes not yet imported (received through gossip)
let custody_indexes_to_fetch = self
.network_globals()
.custody_columns
.sampling_columns
.clone()
.into_iter()
.filter(|index| !custody_indexes_imported.contains(index))
Expand Down
Loading
Loading