@@ -23,6 +23,7 @@ use crate::chain_config::ChainConfig;
23
23
use crate :: data_availability_checker:: {
24
24
Availability , AvailabilityCheckError , AvailableBlock , DataAvailabilityChecker ,
25
25
} ;
26
+ use crate :: data_column_verification:: { GossipDataColumnError , GossipVerifiedDataColumn } ;
26
27
use crate :: early_attester_cache:: EarlyAttesterCache ;
27
28
use crate :: errors:: { BeaconChainError as Error , BlockProductionError } ;
28
29
use crate :: eth1_chain:: { Eth1Chain , Eth1ChainBackend } ;
@@ -2118,6 +2119,19 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
2118
2119
} )
2119
2120
}
2120
2121
2122
+ pub fn verify_data_column_sidecar_for_gossip (
2123
+ self : & Arc < Self > ,
2124
+ data_column_sidecar : Arc < DataColumnSidecar < T :: EthSpec > > ,
2125
+ subnet_id : u64 ,
2126
+ ) -> Result < GossipVerifiedDataColumn < T > , GossipDataColumnError > {
2127
+ metrics:: inc_counter ( & metrics:: DATA_COLUMN_SIDECAR_PROCESSING_REQUESTS ) ;
2128
+ let _timer = metrics:: start_timer ( & metrics:: DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES ) ;
2129
+ GossipVerifiedDataColumn :: new ( data_column_sidecar, subnet_id, self ) . map ( |v| {
2130
+ metrics:: inc_counter ( & metrics:: DATA_COLUMN_SIDECAR_PROCESSING_SUCCESSES ) ;
2131
+ v
2132
+ } )
2133
+ }
2134
+
2121
2135
pub fn verify_blob_sidecar_for_gossip (
2122
2136
self : & Arc < Self > ,
2123
2137
blob_sidecar : Arc < BlobSidecar < T :: EthSpec > > ,
@@ -2964,6 +2978,39 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
2964
2978
self . remove_notified ( & block_root, r)
2965
2979
}
2966
2980
2981
+ /// Cache the data columns in the processing cache, process it, then evict it from the cache if it was
2982
+ /// imported or errors.
2983
+ pub async fn process_gossip_data_columns (
2984
+ self : & Arc < Self > ,
2985
+ data_columns : Vec < GossipVerifiedDataColumn < T > > ,
2986
+ ) -> Result < AvailabilityProcessingStatus , BlockError < T :: EthSpec > > {
2987
+ let Ok ( block_root) = data_columns
2988
+ . iter ( )
2989
+ . map ( |c| c. block_root ( ) )
2990
+ . unique ( )
2991
+ . exactly_one ( )
2992
+ else {
2993
+ return Err ( BlockError :: InternalError (
2994
+ "Columns should be from the same block" . to_string ( ) ,
2995
+ ) ) ;
2996
+ } ;
2997
+
2998
+ // If this block has already been imported to forkchoice it must have been available, so
2999
+ // we don't need to process its samples again.
3000
+ if self
3001
+ . canonical_head
3002
+ . fork_choice_read_lock ( )
3003
+ . contains_block ( & block_root)
3004
+ {
3005
+ return Err ( BlockError :: BlockIsAlreadyKnown ( block_root) ) ;
3006
+ }
3007
+
3008
+ let r = self
3009
+ . check_gossip_data_columns_availability_and_import ( data_columns)
3010
+ . await ;
3011
+ self . remove_notified_custody_columns ( & block_root, r)
3012
+ }
3013
+
2967
3014
/// Cache the blobs in the processing cache, process it, then evict it from the cache if it was
2968
3015
/// imported or errors.
2969
3016
pub async fn process_rpc_blobs (
@@ -3013,6 +3060,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3013
3060
r
3014
3061
}
3015
3062
3063
+ /// Remove any block components from the *processing cache* if we no longer require them. If the
3064
+ /// block was imported full or erred, we no longer require them.
3065
+ fn remove_notified_custody_columns (
3066
+ & self ,
3067
+ block_root : & Hash256 ,
3068
+ r : Result < AvailabilityProcessingStatus , BlockError < T :: EthSpec > > ,
3069
+ ) -> Result < AvailabilityProcessingStatus , BlockError < T :: EthSpec > > {
3070
+ let has_missing_components =
3071
+ matches ! ( r, Ok ( AvailabilityProcessingStatus :: MissingComponents ( _, _) ) ) ;
3072
+ if !has_missing_components {
3073
+ self . reqresp_pre_import_cache . write ( ) . remove ( block_root) ;
3074
+ }
3075
+ r
3076
+ }
3077
+
3016
3078
/// Wraps `process_block` in logic to cache the block's commitments in the processing cache
3017
3079
/// and evict if the block was imported or errored.
3018
3080
pub async fn process_block_with_early_caching < B : IntoExecutionPendingBlock < T > > (
@@ -3257,6 +3319,31 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
3257
3319
self . process_availability ( slot, availability) . await
3258
3320
}
3259
3321
3322
+ /// Checks if the provided data column can make any cached blocks available, and imports immediately
3323
+ /// if so, otherwise caches the data column in the data availability checker.
3324
+ async fn check_gossip_data_columns_availability_and_import (
3325
+ self : & Arc < Self > ,
3326
+ data_columns : Vec < GossipVerifiedDataColumn < T > > ,
3327
+ ) -> Result < AvailabilityProcessingStatus , BlockError < T :: EthSpec > > {
3328
+ if let Some ( slasher) = self . slasher . as_ref ( ) {
3329
+ for data_colum in & data_columns {
3330
+ slasher. accept_block_header ( data_colum. signed_block_header ( ) ) ;
3331
+ }
3332
+ }
3333
+
3334
+ let Ok ( slot) = data_columns. iter ( ) . map ( |c| c. slot ( ) ) . unique ( ) . exactly_one ( ) else {
3335
+ return Err ( BlockError :: InternalError (
3336
+ "Columns for the same block should have matching slot" . to_string ( ) ,
3337
+ ) ) ;
3338
+ } ;
3339
+
3340
+ let availability = self
3341
+ . data_availability_checker
3342
+ . put_gossip_data_columns ( data_columns) ?;
3343
+
3344
+ self . process_availability ( slot, availability) . await
3345
+ }
3346
+
3260
3347
/// Checks if the provided blobs can make any cached blocks available, and imports immediately
3261
3348
/// if so, otherwise caches the blob in the data availability checker.
3262
3349
async fn check_rpc_blob_availability_and_import (
0 commit comments