@@ -116,8 +116,8 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
116
116
phantom_content_key : PhantomData < TContentKey > ,
117
117
/// Associate a metric with the overlay network.
118
118
phantom_metric : PhantomData < TMetric > ,
119
- /// Declare the Validator type for a given overlay network.
120
- phantom_validator : PhantomData < TValidator > ,
119
+ /// Accepted content validator that makes requests to this/other overlay networks (or infura)
120
+ validator : Arc < TValidator > ,
121
121
}
122
122
123
123
impl <
@@ -156,7 +156,7 @@ where
156
156
protocol. clone ( ) ,
157
157
utp_listener_tx. clone ( ) ,
158
158
config. enable_metrics ,
159
- validator,
159
+ Arc :: clone ( & validator) ,
160
160
config. query_timeout ,
161
161
config. query_peer_timeout ,
162
162
config. query_parallelism ,
@@ -176,7 +176,7 @@ where
176
176
utp_listener_tx,
177
177
phantom_content_key : PhantomData ,
178
178
phantom_metric : PhantomData ,
179
- phantom_validator : PhantomData ,
179
+ validator ,
180
180
}
181
181
}
182
182
@@ -253,35 +253,54 @@ where
253
253
) ) ;
254
254
}
255
255
256
- // TODO: Verify overlay content data with an Oracle
257
-
258
- // Temporarily store content key/value pairs to propagate here
259
- let mut content_keys_values: Vec < ( TContentKey , ByteList ) > = Vec :: new ( ) ;
260
-
261
- // Try to store the content into the database and propagate gossip the content
262
256
for ( content_key, content_value) in content_keys. into_iter ( ) . zip ( content_values. to_vec ( ) ) {
263
257
match TContentKey :: try_from ( content_key) {
264
258
Ok ( key) => {
265
- // Store accepted content in DB
266
- self . store_overlay_content ( & key, content_value. clone ( ) ) ;
267
- content_keys_values. push ( ( key, content_value) )
259
+ // Spawn a task that...
260
+ // - Validates accepted content (this step requires a dedicated task since it
261
+ // might require non-blocking requests to this/other overlay networks)
262
+ // - Checks if validated content should be stored, and stores it if true
263
+ // - Propagate all validated content
264
+ let validator = Arc :: clone ( & self . validator ) ;
265
+ let storage = Arc :: clone ( & self . storage ) ;
266
+ let kbuckets = Arc :: clone ( & self . kbuckets ) ;
267
+ let request_tx = self . request_tx . clone ( ) ;
268
+
269
+ tokio:: spawn ( async move {
270
+ // Validated received content
271
+ validator
272
+ . validate_content ( & key, & content_value. to_vec ( ) )
273
+ . await
274
+ // Skip storing & propagating content if it's not valid
275
+ . expect ( "Unable to validate received content: {err:?}" ) ;
276
+
277
+ // Check if data should be stored, and store if true.
278
+ // Ignore error since all validated content is propagated.
279
+ let _ = storage
280
+ . write ( )
281
+ . store_if_should ( & key, & content_value. to_vec ( ) ) ;
282
+
283
+ // Propagate all validated content, whether or not it was stored.
284
+ Self :: propagate_gossip ( ( key, content_value) , kbuckets, request_tx) ;
285
+ } ) ;
268
286
}
269
287
Err ( err) => {
270
- return Err ( anyhow ! (
271
- "Unexpected error while decoding overlay content key: {err}"
272
- ) ) ;
288
+ warn ! ( "Unexpected error while decoding overlay content key: {err}" ) ;
289
+ continue ;
273
290
}
274
291
}
275
292
}
276
- // Propagate gossip accepted content
277
- self . propagate_gossip ( content_keys_values) ?;
278
293
Ok ( ( ) )
279
294
}
280
295
281
296
/// Propagate gossip accepted content via OFFER/ACCEPT:
282
- fn propagate_gossip ( & self , content : Vec < ( TContentKey , ByteList ) > ) -> anyhow:: Result < ( ) > {
297
+ fn propagate_gossip (
298
+ content_key_value : ( TContentKey , ByteList ) ,
299
+ kbuckets : Arc < RwLock < KBucketsTable < NodeId , Node > > > ,
300
+ request_tx : UnboundedSender < OverlayRequest > ,
301
+ ) {
283
302
// Get all nodes from overlay routing table
284
- let kbuckets = self . kbuckets . read ( ) ;
303
+ let kbuckets = kbuckets. read ( ) ;
285
304
let all_nodes: Vec < & kbucket:: Node < NodeId , Node > > = kbuckets
286
305
. buckets_iter ( )
287
306
. map ( |kbucket| {
@@ -297,37 +316,44 @@ where
297
316
let mut enrs_and_content: HashMap < String , Vec < RawContentKey > > = HashMap :: new ( ) ;
298
317
299
318
// Filter all nodes from overlay routing table where XOR_distance(content_id, nodeId) < node radius
300
- for content_key_value in content {
301
- let interested_enrs: Vec < Enr > = all_nodes
302
- . clone ( )
303
- . into_iter ( )
304
- . filter ( |node| {
305
- XorMetric :: distance (
306
- & content_key_value. 0 . content_id ( ) ,
307
- & node. key . preimage ( ) . raw ( ) ,
308
- ) < node. value . data_radius ( )
309
- } )
310
- . map ( |node| node. value . enr ( ) )
311
- . collect ( ) ;
312
-
313
- // Continue if no nodes are interested in the content
314
- if interested_enrs. is_empty ( ) {
315
- debug ! ( "No nodes interested in neighborhood gossip: content_key={} num_nodes_checked={}" ,
316
- hex_encode( content_key_value. 0 . into( ) ) , all_nodes. len( ) ) ;
317
- continue ;
318
- }
319
+ let interested_enrs: Vec < Enr > = all_nodes
320
+ . clone ( )
321
+ . into_iter ( )
322
+ . filter ( |node| {
323
+ XorMetric :: distance (
324
+ & content_key_value. 0 . content_id ( ) ,
325
+ & node. key . preimage ( ) . raw ( ) ,
326
+ ) < node. value . data_radius ( )
327
+ } )
328
+ . map ( |node| node. value . enr ( ) )
329
+ . collect ( ) ;
319
330
320
- // Get log2 random ENRs to gossip
321
- let random_enrs = log2_random_enrs ( interested_enrs) ?;
331
+ // Continue if no nodes are interested in the content
332
+ if interested_enrs. is_empty ( ) {
333
+ debug ! (
334
+ "No nodes interested in neighborhood gossip: content_key={} num_nodes_checked={}" ,
335
+ hex_encode( content_key_value. 0 . into( ) ) ,
336
+ all_nodes. len( )
337
+ ) ;
338
+ return ;
339
+ }
322
340
323
- // Temporarily store all randomly selected nodes with the content of interest.
324
- // We want this so we can offer all the content to interested node in one request.
325
- for enr in random_enrs {
326
- enrs_and_content
327
- . entry ( enr. to_base64 ( ) )
328
- . or_default ( )
329
- . push ( content_key_value. clone ( ) . 0 . into ( ) ) ;
341
+ // Get log2 random ENRs to gossip
342
+ let random_enrs = match log2_random_enrs ( interested_enrs) {
343
+ Ok ( val) => val,
344
+ Err ( msg) => {
345
+ debug ! ( "No available enrs for gossip: {msg}" ) ;
346
+ return ;
330
347
}
348
+ } ;
349
+
350
+ // Temporarily store all randomly selected nodes with the content of interest.
351
+ // We want this so we can offer all the content to interested node in one request.
352
+ for enr in random_enrs {
353
+ enrs_and_content
354
+ . entry ( enr. to_base64 ( ) )
355
+ . or_default ( )
356
+ . push ( content_key_value. clone ( ) . 0 . into ( ) ) ;
331
357
}
332
358
333
359
// Create and send OFFER overlay request to the interested nodes
@@ -351,24 +377,10 @@ where
351
377
None ,
352
378
) ;
353
379
354
- if let Err ( err) = self . request_tx . send ( overlay_request) {
380
+ if let Err ( err) = request_tx. send ( overlay_request) {
355
381
error ! ( "Unable to send OFFER request to overlay: {err}." )
356
382
}
357
383
}
358
- Ok ( ( ) )
359
- }
360
-
361
- /// Try to store overlay content into database
362
- fn store_overlay_content ( & self , content_key : & TContentKey , value : ByteList ) {
363
- let should_store = self . storage . read ( ) . should_store ( content_key) ;
364
- match should_store {
365
- Ok ( _) => {
366
- if let Err ( err) = self . storage . write ( ) . store ( content_key, & value. into ( ) ) {
367
- warn ! ( "Unable to store accepted content: {err}" ) ;
368
- }
369
- }
370
- Err ( err) => error ! ( "Unable to determine whether to store accepted content: {err}" ) ,
371
- }
372
384
}
373
385
374
386
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
0 commit comments