@@ -41,12 +41,14 @@ use discv5::{
41
41
} ;
42
42
use ethereum_types:: U256 ;
43
43
use futures:: channel:: oneshot;
44
+ use futures:: future:: join_all;
44
45
use log:: error;
45
46
use parking_lot:: RwLock ;
46
47
use rand:: seq:: IteratorRandom ;
47
48
use ssz:: Encode ;
48
49
use ssz_types:: VariableList ;
49
50
use tokio:: sync:: mpsc:: UnboundedSender ;
51
+ use tokio:: task:: JoinHandle ;
50
52
use tracing:: { debug, warn} ;
51
53
52
54
pub use super :: overlay_service:: { OverlayRequestError , RequestDirection } ;
@@ -113,8 +115,8 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
113
115
phantom_content_key : PhantomData < TContentKey > ,
114
116
/// Associate a metric with the overlay network.
115
117
phantom_metric : PhantomData < TMetric > ,
116
- /// Declare the Validator type for a given overlay network.
117
- phantom_validator : PhantomData < TValidator > ,
118
+ /// Accepted content validator that makes requests to this/other overlay networks (or infura)
119
+ validator : Arc < TValidator > ,
118
120
}
119
121
120
122
impl <
@@ -153,7 +155,7 @@ where
153
155
protocol. clone ( ) ,
154
156
utp_listener_tx. clone ( ) ,
155
157
config. enable_metrics ,
156
- validator,
158
+ Arc :: clone ( & validator) ,
157
159
config. query_timeout ,
158
160
config. query_peer_timeout ,
159
161
config. query_parallelism ,
@@ -173,7 +175,7 @@ where
173
175
utp_listener_tx,
174
176
phantom_content_key : PhantomData ,
175
177
phantom_metric : PhantomData ,
176
- phantom_validator : PhantomData ,
178
+ validator ,
177
179
}
178
180
}
179
181
@@ -249,35 +251,71 @@ where
249
251
) ) ;
250
252
}
251
253
252
- // TODO: Verify overlay content data with an Oracle
253
-
254
- // Temporarily store content key/value pairs to propagate here
255
- let mut content_keys_values : Vec < ( TContentKey , ByteList ) > = Vec :: new ( ) ;
254
+ let validator = Arc :: clone ( & self . validator ) ;
255
+ let storage = Arc :: clone ( & self . storage ) ;
256
+ let kbuckets = Arc :: clone ( & self . kbuckets ) ;
257
+ let command_tx = self . command_tx . clone ( ) ;
256
258
257
- // Try to store the content into the database and propagate gossip the content
258
- for ( content_key, content_value) in content_keys. into_iter ( ) . zip ( content_values. to_vec ( ) ) {
259
- match TContentKey :: try_from ( content_key) {
260
- Ok ( key) => {
261
- // Store accepted content in DB
262
- self . store_overlay_content ( & key, content_value. clone ( ) ) ;
263
- content_keys_values. push ( ( key, content_value) )
264
- }
265
- Err ( err) => {
266
- return Err ( anyhow ! (
267
- "Unexpected error while decoding overlay content key: {err}"
268
- ) ) ;
269
- }
270
- }
271
- }
272
- // Propagate gossip accepted content
273
- self . propagate_gossip ( content_keys_values) ?;
259
+ // Spawn a task that spawns a validation task for each piece of content,
260
+ // collects validated content and propagates it via gossip.
261
+ tokio:: spawn ( async move {
262
+ let handles: Vec < JoinHandle < _ > > = content_keys
263
+ . into_iter ( )
264
+ . zip ( content_values. to_vec ( ) )
265
+ . map (
266
+ |( content_key, content_value) | match TContentKey :: try_from ( content_key) {
267
+ Ok ( key) => {
268
+ // Spawn a task that...
269
+ // - Validates accepted content (this step requires a dedicated task since it
270
+ // might require non-blocking requests to this/other overlay networks)
271
+ // - Checks if validated content should be stored, and stores it if true
272
+ // - Propagate all validated content
273
+ let validator = Arc :: clone ( & validator) ;
274
+ let storage = Arc :: clone ( & storage) ;
275
+ Some ( tokio:: spawn ( async move {
276
+ // Validated received content
277
+ validator
278
+ . validate_content ( & key, & content_value. to_vec ( ) )
279
+ . await
280
+ // Skip storing & propagating content if it's not valid
281
+ . expect ( "Unable to validate received content: {err:?}" ) ;
282
+
283
+ // Check if data should be stored, and store if true.
284
+ // Ignore error since all validated content is propagated.
285
+ let _ = storage
286
+ . write ( )
287
+ . store_if_should ( & key, & content_value. to_vec ( ) ) ;
288
+
289
+ ( key, content_value)
290
+ } ) )
291
+ }
292
+ Err ( err) => {
293
+ warn ! ( "Unexpected error while decoding overlay content key: {err}" ) ;
294
+ None
295
+ }
296
+ } ,
297
+ )
298
+ . flatten ( )
299
+ . collect ( ) ;
300
+ let validated_content = join_all ( handles)
301
+ . await
302
+ . into_iter ( )
303
+ . filter_map ( |content| content. ok ( ) )
304
+ . collect ( ) ;
305
+ // Propagate all validated content, whether or not it was stored.
306
+ Self :: propagate_gossip ( validated_content, kbuckets, command_tx) ;
307
+ } ) ;
274
308
Ok ( ( ) )
275
309
}
276
310
277
311
/// Propagate gossip accepted content via OFFER/ACCEPT:
278
- fn propagate_gossip ( & self , content : Vec < ( TContentKey , ByteList ) > ) -> anyhow:: Result < ( ) > {
312
+ fn propagate_gossip (
313
+ content : Vec < ( TContentKey , ByteList ) > ,
314
+ kbuckets : Arc < RwLock < KBucketsTable < NodeId , Node > > > ,
315
+ command_tx : UnboundedSender < OverlayCommand < TContentKey > > ,
316
+ ) {
279
317
// Get all nodes from overlay routing table
280
- let kbuckets = self . kbuckets . read ( ) ;
318
+ let kbuckets = kbuckets. read ( ) ;
281
319
let all_nodes: Vec < & kbucket:: Node < NodeId , Node > > = kbuckets
282
320
. buckets_iter ( )
283
321
. map ( |kbucket| {
@@ -312,7 +350,13 @@ where
312
350
}
313
351
314
352
// Get log2 random ENRs to gossip
315
- let random_enrs = log2_random_enrs ( interested_enrs) ?;
353
+ let random_enrs = match log2_random_enrs ( interested_enrs) {
354
+ Ok ( val) => val,
355
+ Err ( msg) => {
356
+ debug ! ( "Error calculating log2 random enrs for gossip propagation: {msg}" ) ;
357
+ return ;
358
+ }
359
+ } ;
316
360
317
361
// Temporarily store all randomly selected nodes with the content of interest.
318
362
// We want this so we can offer all the content to interested node in one request.
@@ -346,27 +390,10 @@ where
346
390
None ,
347
391
) ;
348
392
349
- if let Err ( err) = self
350
- . command_tx
351
- . send ( OverlayCommand :: Request ( overlay_request) )
352
- {
393
+ if let Err ( err) = command_tx. send ( OverlayCommand :: Request ( overlay_request) ) {
353
394
error ! ( "Unable to send OFFER request to overlay: {err}." )
354
395
}
355
396
}
356
- Ok ( ( ) )
357
- }
358
-
359
- /// Try to store overlay content into database
360
- fn store_overlay_content ( & self , content_key : & TContentKey , value : ByteList ) {
361
- let should_store = self . storage . read ( ) . should_store ( content_key) ;
362
- match should_store {
363
- Ok ( _) => {
364
- if let Err ( err) = self . storage . write ( ) . store ( content_key, & value. into ( ) ) {
365
- warn ! ( "Unable to store accepted content: {err}" ) ;
366
- }
367
- }
368
- Err ( err) => error ! ( "Unable to determine whether to store accepted content: {err}" ) ,
369
- }
370
397
}
371
398
372
399
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
0 commit comments