Skip to content

Commit 5169e03

Browse files
authored
Add PeerDAS RPC import boilerplate (#6238)
* Add PeerDAS RPC import boilerplate * revert refactor * Remove allow
1 parent a91f432 commit 5169e03

File tree

7 files changed

+205
-9
lines changed

7 files changed

+205
-9
lines changed

beacon_node/beacon_chain/src/beacon_chain.rs

+76
Original file line numberDiff line numberDiff line change
@@ -3033,6 +3033,41 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
30333033
self.remove_notified(&block_root, r)
30343034
}
30353035

3036+
/// Cache the columns in the processing cache, process it, then evict it from the cache if it was
3037+
/// imported or errors.
3038+
pub async fn process_rpc_custody_columns(
3039+
self: &Arc<Self>,
3040+
custody_columns: DataColumnSidecarList<T::EthSpec>,
3041+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
3042+
let Ok((slot, block_root)) = custody_columns
3043+
.iter()
3044+
.map(|c| (c.slot(), c.block_root()))
3045+
.unique()
3046+
.exactly_one()
3047+
else {
3048+
return Err(BlockError::InternalError(
3049+
"Columns should be from the same block".to_string(),
3050+
));
3051+
};
3052+
3053+
// If this block has already been imported to forkchoice it must have been available, so
3054+
// we don't need to process its columns again.
3055+
if self
3056+
.canonical_head
3057+
.fork_choice_read_lock()
3058+
.contains_block(&block_root)
3059+
{
3060+
return Err(BlockError::BlockIsAlreadyKnown(block_root));
3061+
}
3062+
3063+
// TODO(das): custody column SSE event
3064+
3065+
let r = self
3066+
.check_rpc_custody_columns_availability_and_import(slot, block_root, custody_columns)
3067+
.await;
3068+
self.remove_notified(&block_root, r)
3069+
}
3070+
30363071
/// Remove any block components from the *processing cache* if we no longer require them. If the
30373072
/// block was imported full or erred, we no longer require them.
30383073
fn remove_notified(
@@ -3369,6 +3404,47 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
33693404
self.process_availability(slot, availability).await
33703405
}
33713406

3407+
/// Checks if the provided columns can make any cached blocks available, and imports immediately
3408+
/// if so, otherwise caches the columns in the data availability checker.
3409+
async fn check_rpc_custody_columns_availability_and_import(
3410+
self: &Arc<Self>,
3411+
slot: Slot,
3412+
block_root: Hash256,
3413+
custody_columns: DataColumnSidecarList<T::EthSpec>,
3414+
) -> Result<AvailabilityProcessingStatus, BlockError<T::EthSpec>> {
3415+
// Need to scope this to ensure the lock is dropped before calling `process_availability`
3416+
// Even an explicit drop is not enough to convince the borrow checker.
3417+
{
3418+
let mut slashable_cache = self.observed_slashable.write();
3419+
// Assumes all items in custody_columns are for the same block_root
3420+
if let Some(column) = custody_columns.first() {
3421+
let header = &column.signed_block_header;
3422+
if verify_header_signature::<T, BlockError<T::EthSpec>>(self, header).is_ok() {
3423+
slashable_cache
3424+
.observe_slashable(
3425+
header.message.slot,
3426+
header.message.proposer_index,
3427+
block_root,
3428+
)
3429+
.map_err(|e| BlockError::BeaconChainError(e.into()))?;
3430+
if let Some(slasher) = self.slasher.as_ref() {
3431+
slasher.accept_block_header(header.clone());
3432+
}
3433+
}
3434+
}
3435+
}
3436+
3437+
// This slot value is purely informative for the consumers of
3438+
// `AvailabilityProcessingStatus::MissingComponents` to log an error with a slot.
3439+
let availability = self.data_availability_checker.put_rpc_custody_columns(
3440+
block_root,
3441+
slot.epoch(T::EthSpec::slots_per_epoch()),
3442+
custody_columns,
3443+
)?;
3444+
3445+
self.process_availability(slot, availability).await
3446+
}
3447+
33723448
/// Imports a fully available block. Otherwise, returns `AvailabilityProcessingStatus::MissingComponents`
33733449
///
33743450
/// An error is returned if the block was unable to be imported. It may be partially imported

beacon_node/beacon_chain/src/data_availability_checker.rs

+34-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@ mod error;
2323
mod overflow_lru_cache;
2424
mod state_lru_cache;
2525

26-
use crate::data_column_verification::{GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn};
26+
use crate::data_column_verification::{
27+
GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn,
28+
};
2729
pub use error::{Error as AvailabilityCheckError, ErrorCategory as AvailabilityCheckErrorCategory};
2830
use types::non_zero_usize::new_non_zero_usize;
2931

@@ -187,6 +189,37 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
187189
.put_kzg_verified_blobs(block_root, epoch, verified_blobs)
188190
}
189191

192+
/// Put a list of custody columns received via RPC into the availability cache. This performs KZG
193+
/// verification on the blobs in the list.
194+
pub fn put_rpc_custody_columns(
195+
&self,
196+
block_root: Hash256,
197+
epoch: Epoch,
198+
custody_columns: DataColumnSidecarList<T::EthSpec>,
199+
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
200+
let Some(kzg) = self.kzg.as_ref() else {
201+
return Err(AvailabilityCheckError::KzgNotInitialized);
202+
};
203+
204+
// TODO(das): report which column is invalid for proper peer scoring
205+
// TODO(das): batch KZG verification here
206+
let verified_custody_columns = custody_columns
207+
.iter()
208+
.map(|column| {
209+
Ok(KzgVerifiedCustodyDataColumn::from_asserted_custody(
210+
KzgVerifiedDataColumn::new(column.clone(), kzg)
211+
.map_err(AvailabilityCheckError::Kzg)?,
212+
))
213+
})
214+
.collect::<Result<Vec<_>, AvailabilityCheckError>>()?;
215+
216+
self.availability_cache.put_kzg_verified_data_columns(
217+
block_root,
218+
epoch,
219+
verified_custody_columns,
220+
)
221+
}
222+
190223
/// Check if we've cached other blobs for this block. If it completes a set and we also
191224
/// have a block cached, return the `Availability` variant triggering block import.
192225
/// Otherwise cache the blob sidecar.

beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs

-2
Original file line numberDiff line numberDiff line change
@@ -442,8 +442,6 @@ impl<T: BeaconChainTypes> DataAvailabilityCheckerInner<T> {
442442
}
443443
}
444444

445-
// TODO(das): rpc code paths to be implemented.
446-
#[allow(dead_code)]
447445
pub fn put_kzg_verified_data_columns<
448446
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
449447
>(

beacon_node/beacon_chain/src/data_column_verification.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ impl<T: BeaconChainTypes> GossipVerifiedDataColumn<T> {
177177
pub fn id(&self) -> DataColumnIdentifier {
178178
DataColumnIdentifier {
179179
block_root: self.block_root,
180-
index: self.data_column.data_column_index(),
180+
index: self.data_column.index(),
181181
}
182182
}
183183

@@ -221,7 +221,7 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
221221
self.data.clone()
222222
}
223223

224-
pub fn data_column_index(&self) -> u64 {
224+
pub fn index(&self) -> ColumnIndex {
225225
self.data.index
226226
}
227227
}

beacon_node/beacon_processor/src/lib.rs

+14-3
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ pub struct BeaconProcessorQueueLengths {
108108
unknown_light_client_update_queue: usize,
109109
rpc_block_queue: usize,
110110
rpc_blob_queue: usize,
111+
rpc_custody_column_queue: usize,
111112
chain_segment_queue: usize,
112113
backfill_chain_segment: usize,
113114
gossip_block_queue: usize,
@@ -163,6 +164,7 @@ impl BeaconProcessorQueueLengths {
163164
unknown_light_client_update_queue: 128,
164165
rpc_block_queue: 1024,
165166
rpc_blob_queue: 1024,
167+
rpc_custody_column_queue: 1024,
166168
chain_segment_queue: 64,
167169
backfill_chain_segment: 64,
168170
gossip_block_queue: 1024,
@@ -228,6 +230,7 @@ pub const GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE: &str = "light_client_optimistic
228230
pub const RPC_BLOCK: &str = "rpc_block";
229231
pub const IGNORED_RPC_BLOCK: &str = "ignored_rpc_block";
230232
pub const RPC_BLOBS: &str = "rpc_blob";
233+
pub const RPC_CUSTODY_COLUMN: &str = "rpc_custody_column";
231234
pub const CHAIN_SEGMENT: &str = "chain_segment";
232235
pub const CHAIN_SEGMENT_BACKFILL: &str = "chain_segment_backfill";
233236
pub const STATUS_PROCESSING: &str = "status_processing";
@@ -606,6 +609,7 @@ pub enum Work<E: EthSpec> {
606609
RpcBlobs {
607610
process_fn: AsyncFn,
608611
},
612+
RpcCustodyColumn(AsyncFn),
609613
IgnoredRpcBlock {
610614
process_fn: BlockingFn,
611615
},
@@ -653,6 +657,7 @@ impl<E: EthSpec> Work<E> {
653657
Work::GossipLightClientOptimisticUpdate(_) => GOSSIP_LIGHT_CLIENT_OPTIMISTIC_UPDATE,
654658
Work::RpcBlock { .. } => RPC_BLOCK,
655659
Work::RpcBlobs { .. } => RPC_BLOBS,
660+
Work::RpcCustodyColumn { .. } => RPC_CUSTODY_COLUMN,
656661
Work::IgnoredRpcBlock { .. } => IGNORED_RPC_BLOCK,
657662
Work::ChainSegment { .. } => CHAIN_SEGMENT,
658663
Work::ChainSegmentBackfill(_) => CHAIN_SEGMENT_BACKFILL,
@@ -815,6 +820,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
815820
// Using a FIFO queue since blocks need to be imported sequentially.
816821
let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
817822
let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
823+
let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
818824
let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
819825
let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
820826
let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
@@ -970,6 +976,8 @@ impl<E: EthSpec> BeaconProcessor<E> {
970976
self.spawn_worker(item, idle_tx);
971977
} else if let Some(item) = rpc_blob_queue.pop() {
972978
self.spawn_worker(item, idle_tx);
979+
} else if let Some(item) = rpc_custody_column_queue.pop() {
980+
self.spawn_worker(item, idle_tx);
973981
// Check delayed blocks before gossip blocks, the gossip blocks might rely
974982
// on the delayed ones.
975983
} else if let Some(item) = delayed_block_queue.pop() {
@@ -1262,6 +1270,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
12621270
rpc_block_queue.push(work, work_id, &self.log)
12631271
}
12641272
Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id, &self.log),
1273+
Work::RpcCustodyColumn { .. } => {
1274+
rpc_custody_column_queue.push(work, work_id, &self.log)
1275+
}
12651276
Work::ChainSegment { .. } => {
12661277
chain_segment_queue.push(work, work_id, &self.log)
12671278
}
@@ -1497,9 +1508,9 @@ impl<E: EthSpec> BeaconProcessor<E> {
14971508
beacon_block_root: _,
14981509
process_fn,
14991510
} => task_spawner.spawn_async(process_fn),
1500-
Work::RpcBlock { process_fn } | Work::RpcBlobs { process_fn } => {
1501-
task_spawner.spawn_async(process_fn)
1502-
}
1511+
Work::RpcBlock { process_fn }
1512+
| Work::RpcBlobs { process_fn }
1513+
| Work::RpcCustodyColumn(process_fn) => task_spawner.spawn_async(process_fn),
15031514
Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
15041515
Work::GossipBlock(work)
15051516
| Work::GossipBlobSidecar(work)

beacon_node/network/src/network_beacon_processor/mod.rs

+24
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,30 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
476476
})
477477
}
478478

479+
/// Create a new `Work` event for some custody columns. `process_rpc_custody_columns` reports
480+
/// the result back to sync.
481+
pub fn send_rpc_custody_columns(
482+
self: &Arc<Self>,
483+
block_root: Hash256,
484+
custody_columns: DataColumnSidecarList<T::EthSpec>,
485+
seen_timestamp: Duration,
486+
process_type: BlockProcessType,
487+
) -> Result<(), Error<T::EthSpec>> {
488+
let s = self.clone();
489+
self.try_send(BeaconWorkEvent {
490+
drop_during_sync: false,
491+
work: Work::RpcCustodyColumn(Box::pin(async move {
492+
s.process_rpc_custody_columns(
493+
block_root,
494+
custody_columns,
495+
seen_timestamp,
496+
process_type,
497+
)
498+
.await;
499+
})),
500+
})
501+
}
502+
479503
/// Create a new work event to import `blocks` as a beacon chain segment.
480504
pub fn send_chain_segment(
481505
self: &Arc<Self>,

beacon_node/network/src/network_beacon_processor/sync_methods.rs

+55-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use store::KzgCommitment;
2424
use tokio::sync::mpsc;
2525
use types::beacon_block_body::format_kzg_commitments;
2626
use types::blob_sidecar::FixedBlobSidecarList;
27-
use types::BlockImportSource;
27+
use types::{BlockImportSource, DataColumnSidecarList};
2828
use types::{Epoch, Hash256};
2929

3030
/// Id associated to a batch processing request, either a sync batch or a parent lookup.
@@ -307,6 +307,60 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
307307
});
308308
}
309309

310+
pub async fn process_rpc_custody_columns(
311+
self: Arc<NetworkBeaconProcessor<T>>,
312+
block_root: Hash256,
313+
custody_columns: DataColumnSidecarList<T::EthSpec>,
314+
_seen_timestamp: Duration,
315+
process_type: BlockProcessType,
316+
) {
317+
let result = self
318+
.chain
319+
.process_rpc_custody_columns(custody_columns)
320+
.await;
321+
322+
match &result {
323+
Ok(availability) => match availability {
324+
AvailabilityProcessingStatus::Imported(hash) => {
325+
debug!(
326+
self.log,
327+
"Block components retrieved";
328+
"result" => "imported block and custody columns",
329+
"block_hash" => %hash,
330+
);
331+
self.chain.recompute_head_at_current_slot().await;
332+
}
333+
AvailabilityProcessingStatus::MissingComponents(_, _) => {
334+
debug!(
335+
self.log,
336+
"Missing components over rpc";
337+
"block_hash" => %block_root,
338+
);
339+
}
340+
},
341+
Err(BlockError::BlockIsAlreadyKnown(_)) => {
342+
debug!(
343+
self.log,
344+
"Custody columns have already been imported";
345+
"block_hash" => %block_root,
346+
);
347+
}
348+
Err(e) => {
349+
warn!(
350+
self.log,
351+
"Error when importing rpc custody columns";
352+
"error" => ?e,
353+
"block_hash" => %block_root,
354+
);
355+
}
356+
}
357+
358+
self.send_sync_message(SyncMessage::BlockComponentProcessed {
359+
process_type,
360+
result: result.into(),
361+
});
362+
}
363+
310364
/// Attempt to import the chain segment (`blocks`) to the beacon chain, informing the sync
311365
/// thread if more blocks are needed to process it.
312366
pub async fn process_chain_segment(

0 commit comments

Comments
 (0)