Skip to content

Commit 77e0cb6

Browse files
committed
revert refactor
1 parent 8688065 commit 77e0cb6

File tree

7 files changed

+136
-141
lines changed

7 files changed

+136
-141
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

Lines changed: 61 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,7 @@ use crate::chain_config::ChainConfig;
2323
use crate::data_availability_checker::{
2424
Availability, AvailabilityCheckError, AvailableBlock, DataAvailabilityChecker,
2525
};
26-
use crate::data_column_verification::{
27-
DataColumnsSameBlock, GossipDataColumnError, GossipVerifiedDataColumn,
28-
};
26+
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
2927
use crate::early_attester_cache::EarlyAttesterCache;
3028
use crate::errors::{BeaconChainError as Error, BlockProductionError};
3129
use crate::eth1_chain::{Eth1Chain, Eth1ChainBackend};
@@ -2970,13 +2968,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29702968

29712969
/// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
29722970
/// imported or errors.
2973-
pub async fn process_gossip_data_column(
2971+
pub async fn process_gossip_data_columns(
29742972
self: &Arc<Self>,
2975-
data_column: GossipVerifiedDataColumn<T>,
2973+
data_columns: Vec<GossipVerifiedDataColumn<T>>,
29762974
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
2975+
let Ok((slot, block_root)) = data_columns
2976+
.iter()
2977+
.map(|c| (c.slot(), c.block_root()))
2978+
.unique()
2979+
.exactly_one()
2980+
else {
2981+
return Err(BlockError::InternalError(
2982+
"Columns should be from the same block".to_string(),
2983+
));
2984+
};
2985+
29772986
// If this block has already been imported to forkchoice it must have been available, so
29782987
// we don't need to process its samples again.
2979-
let block_root = data_column.block_root();
29802988
if self
29812989
.canonical_head
29822990
.fork_choice_read_lock()
@@ -2986,7 +2994,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
29862994
}
29872995

29882996
let r = self
2989-
.check_gossip_data_columns_availability_and_import(data_column)
2997+
.check_gossip_data_columns_availability_and_import(slot, block_root, data_columns)
29902998
.await;
29912999
self.remove_notified_custody_columns(&block_root, r)
29923000
}
@@ -3029,11 +3037,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30293037
/// imported or errors.
30303038
pub async fn process_rpc_custody_columns(
30313039
self: &Arc<Self>,
3032-
custody_columns: DataColumnsSameBlock<T::EthSpec>,
3040+
custody_columns: DataColumnSidecarList<T::EthSpec>,
30333041
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
3042+
let Ok((slot, block_root)) = custody_columns
3043+
.iter()
3044+
.map(|c| (c.slot(), c.block_root()))
3045+
.unique()
3046+
.exactly_one()
3047+
else {
3048+
return Err(BlockError::InternalError(
3049+
"Columns should be from the same block".to_string(),
3050+
));
3051+
};
3052+
30343053
// If this block has already been imported to forkchoice it must have been available, so
30353054
// we don't need to process its columns again.
3036-
let block_root = custody_columns.block_root();
30373055
if self
30383056
.canonical_head
30393057
.fork_choice_read_lock()
@@ -3045,7 +3063,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30453063
// TODO(das): custody column SSE event
30463064

30473065
let r = self
3048-
.check_rpc_custody_columns_availability_and_import(custody_columns)
3066+
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
30493067
.await;
30503068
self.remove_notified(&block_root, r)
30513069
}
@@ -3328,16 +3346,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33283346
/// if so, otherwise caches the data column in the data availability checker.
33293347
async fn check_gossip_data_columns_availability_and_import(
33303348
self: &Arc<Self>,
3331-
data_column: GossipVerifiedDataColumn<T>,
3349+
slot: Slot,
3350+
block_root: Hash256,
3351+
data_columns: Vec<GossipVerifiedDataColumn<T>>,
33323352
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
33333353
if let Some(slasher) = self.slasher.as_ref() {
3334-
slasher.accept_block_header(data_column.signed_block_header());
3354+
for data_colum in &data_columns {
3355+
slasher.accept_block_header(data_colum.signed_block_header());
3356+
}
33353357
}
33363358

3337-
let slot = data_column.slot();
3338-
let availability = self
3339-
.data_availability_checker
3340-
.put_gossip_data_column(data_column)?;
3359+
let availability = self.data_availability_checker.put_gossip_data_columns(
3360+
slot,
3361+
block_root,
3362+
data_columns,
3363+
)?;
33413364

33423365
self.process_availability(slot, availability).await
33433366
}
@@ -3385,34 +3408,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33853408
/// if so, otherwise caches the columns in the data availability checker.
33863409
async fn check_rpc_custody_columns_availability_and_import(
33873410
self: &Arc<Self>,
3388-
custody_columns: DataColumnsSameBlock<T::EthSpec>,
3411+
slot: Slot,
3412+
block_root: Hash256,
3413+
custody_columns: DataColumnSidecarList<T::EthSpec>,
33893414
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
33903415
// Need to scope this to ensure the lock is dropped before calling `process_availability`
33913416
// Even an explicit drop is not enough to convince the borrow checker.
33923417
{
33933418
let mut slashable_cache = self.observed_slashable.write();
3394-
let header = custody_columns.signed_block_header();
3395-
let block_root = custody_columns.block_root();
3396-
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, header).is_ok() {
3397-
slashable_cache
3398-
.observe_slashable(
3399-
header.message.slot,
3400-
header.message.proposer_index,
3401-
block_root,
3402-
)
3403-
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
3404-
if let Some(slasher) = self.slasher.as_ref() {
3405-
slasher.accept_block_header(header.clone());
3419+
// Assumes all items in custody_columns are for the same block_root
3420+
if let Some(column) = custody_columns.first() {
3421+
let header = &column.signed_block_header;
3422+
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, header).is_ok() {
3423+
slashable_cache
3424+
.observe_slashable(
3425+
header.message.slot,
3426+
header.message.proposer_index,
3427+
block_root,
3428+
)
3429+
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
3430+
if let Some(slasher) = self.slasher.as_ref() {
3431+
slasher.accept_block_header(header.clone());
3432+
}
34063433
}
34073434
}
34083435
}
34093436

34103437
// This slot value is purely informative for the consumers of
34113438
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
3412-
let slot = custody_columns.slot();
3413-
let availability = self
3414-
.data_availability_checker
3415-
.put_rpc_custody_columns(custody_columns)?;
3439+
let availability = self.data_availability_checker.put_rpc_custody_columns(
3440+
block_root,
3441+
slot.epoch(T::EthSpec::slots_per_epoch()),
3442+
custody_columns,
3443+
)?;
34163444

34173445
self.process_availability(slot, availability).await
34183446
}

beacon_node/beacon_chain/src/data_availability_checker.rs

Lines changed: 29 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,15 @@ use task_executor::TaskExecutor;
1616
use types::blob_sidecar::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList};
1717
use types::{
1818
BlobSidecarList, ChainSpec, DataColumnSidecarList, Epoch, EthSpec, Hash256, SignedBeaconBlock,
19+
Slot,
1920
};
2021

2122
mod error;
2223
mod overflow_lru_cache;
2324
mod state_lru_cache;
2425

2526
use crate::data_column_verification::{
26-
DataColumnsSameBlock, GossipVerifiedDataColumn, KzgVerifiedDataColumnsSameBlock,
27+
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
2728
};
2829
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
2930
use types::non_zero_usize::new_non_zero_usize;
@@ -187,18 +188,31 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
187188
#[allow(clippy::type_complexity)]
188189
pub fn put_rpc_custody_columns(
189190
&self,
190-
custody_columns: DataColumnsSameBlock<T::EthSpec>,
191+
block_root: Hash256,
192+
epoch: Epoch,
193+
custody_columns: DataColumnSidecarList<T::EthSpec>,
191194
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
192195
let Some(kzg) = self.kzg.as_ref() else {
193196
return Err(AvailabilityCheckError::KzgNotInitialized);
194197
};
195198

196199
// TODO(das): report which column is invalid for proper peer scoring
197200
// TODO(das): batch KZG verification here
198-
let verified_custody_columns = custody_columns.verify(kzg)?;
201+
let verified_custody_columns = custody_columns
202+
.iter()
203+
.map(|column| {
204+
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
205+
KzgVerifiedDataColumn::new(column.clone(), kzg)
206+
.map_err(AvailabilityCheckError::Kzg)?,
207+
))
208+
})
209+
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;
199210

200-
self.availability_cache
201-
.put_kzg_verified_data_columns(verified_custody_columns)
211+
self.availability_cache.put_kzg_verified_data_columns(
212+
block_root,
213+
epoch,
214+
verified_custody_columns,
215+
)
202216
}
203217

204218
/// Check if we've cached other blobs for this block. If it completes a set and we also
@@ -217,18 +231,20 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
217231
)
218232
}
219233

220-
pub fn put_gossip_data_column(
234+
pub fn put_gossip_data_columns(
221235
&self,
222-
gossip_data_column: GossipVerifiedDataColumn<T>,
236+
slot: Slot,
237+
block_root: Hash256,
238+
gossip_data_columns: Vec<GossipVerifiedDataColumn<T>>,
223239
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
224-
let custody_column = gossip_data_column.into_inner();
225-
226-
// Will never error as there's exactly one column
227-
let custody_columns_same_block = KzgVerifiedDataColumnsSameBlock::new(vec![custody_column])
228-
.expect("exactly one column is always in same block");
240+
let epoch = slot.epoch(T::EthSpec::slots_per_epoch());
241+
let custody_columns = gossip_data_columns
242+
.into_iter()
243+
.map(|c| KzgVerifiedCustodyDataColumn::from_asserted_custody(c.into_inner()))
244+
.collect::<Vec<_>>();
229245

230246
self.availability_cache
231-
.put_kzg_verified_data_columns(custody_columns_same_block)
247+
.put_kzg_verified_data_columns(block_root, epoch, custody_columns)
232248
}
233249

234250
/// Check if we have all the blobs for a block. Returns `Availability` which has information

beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::block_verification_types::{
55
AvailabilityPendingExecutedBlock, AvailableBlock, AvailableExecutedBlock,
66
};
77
use crate::data_availability_checker::{Availability, AvailabilityCheckError};
8-
use crate::data_column_verification::{KzgVerifiedDataColumn, KzgVerifiedDataColumnsSameBlock};
8+
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
99
use crate::BeaconChainTypes;
1010
use lru::LruCache;
1111
use parking_lot::RwLock;
@@ -24,7 +24,7 @@ use types::{BlobSidecar, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock};
2424
pub struct PendingComponents<E: EthSpec> {
2525
pub block_root: Hash256,
2626
pub verified_blobs: FixedVector<Option<KzgVerifiedBlob<E>>, E::MaxBlobsPerBlock>,
27-
pub verified_data_columns: Vec<KzgVerifiedDataColumn<E>>,
27+
pub verified_data_columns: Vec<KzgVerifiedCustodyDataColumn<E>>,
2828
pub executed_block: Option<DietAvailabilityPendingExecutedBlock<E>>,
2929
}
3030

@@ -50,7 +50,7 @@ impl<E: EthSpec> PendingComponents<E> {
5050
pub fn get_cached_data_column(
5151
&self,
5252
data_column_index: u64,
53-
) -> Option<&KzgVerifiedDataColumn<E>> {
53+
) -> Option<&KzgVerifiedCustodyDataColumn<E>> {
5454
self.verified_data_columns
5555
.iter()
5656
.find(|d| d.index() == data_column_index)
@@ -157,7 +157,10 @@ impl<E: EthSpec> PendingComponents<E> {
157157
}
158158

159159
/// Merges a given set of data columns into the cache.
160-
fn merge_data_columns(&mut self, kzg_verified_data_columns: &[KzgVerifiedDataColumn<E>]) {
160+
fn merge_data_columns<I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<E>>>(
161+
&mut self,
162+
kzg_verified_data_columns: I,
163+
) {
161164
for data_column in kzg_verified_data_columns {
162165
if !self.data_column_exists(data_column.index()) {
163166
self.verified_data_columns.push(data_column.clone());
@@ -258,7 +261,7 @@ impl<E: EthSpec> PendingComponents<E> {
258261
BlockImportRequirement::CustodyColumns(_) => {
259262
let verified_data_columns = verified_data_columns
260263
.into_iter()
261-
.map(|d| d.to_data_column())
264+
.map(|d| d.into_inner())
262265
.collect();
263266
(None, Some(verified_data_columns))
264267
}
@@ -427,13 +430,15 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
427430
}
428431
}
429432

430-
pub fn put_kzg_verified_data_columns(
433+
pub fn put_kzg_verified_data_columns<
434+
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
435+
>(
431436
&self,
432-
data_columns: KzgVerifiedDataColumnsSameBlock<T::EthSpec>,
437+
block_root: Hash256,
438+
epoch: Epoch,
439+
kzg_verified_data_columns: I,
433440
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
434441
let mut write_lock = self.critical.write();
435-
let block_root = data_columns.block_root();
436-
let epoch = data_columns.slot().epoch(T::EthSpec::slots_per_epoch());
437442

438443
// Grab existing entry or create a new entry.
439444
let mut pending_components = write_lock
@@ -442,7 +447,7 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
442447
.unwrap_or_else(|| PendingComponents::empty(block_root));
443448

444449
// Merge in the data columns.
445-
pending_components.merge_data_columns(data_columns.columns());
450+
pending_components.merge_data_columns(kzg_verified_data_columns);
446451

447452
let block_import_requirement = self.block_import_requirement(epoch)?;
448453
if pending_components.is_available(&block_import_requirement) {

0 commit comments

Comments
 (0)