Skip to content

Commit b75fc0f

Browse files
committed
Add content validation for accepted content
1 parent 439cad3 commit b75fc0f

File tree

7 files changed

+131
-85
lines changed

7 files changed

+131
-85
lines changed

trin-core/src/portalnet/overlay.rs

Lines changed: 85 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use rand::seq::IteratorRandom;
5050
use ssz::{Decode, Encode};
5151
use ssz_types::VariableList;
5252
use tokio::sync::mpsc::UnboundedSender;
53+
use tokio::sync::Mutex;
5354
use tracing::{debug, warn};
5455

5556
pub use super::overlay_service::{OverlayRequestError, RequestDirection};
@@ -116,12 +117,13 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
116117
phantom_content_key: PhantomData<TContentKey>,
117118
/// Associate a metric with the overlay network.
118119
phantom_metric: PhantomData<TMetric>,
119-
/// Declare the Validator type for a given overlay network.
120-
phantom_validator: PhantomData<TValidator>,
120+
/// Non-blocking validator that makes requests to this/other overlay networks (or infura) to
121+
/// validate accepted content.
122+
validator: Arc<Mutex<TValidator>>,
121123
}
122124

123125
impl<
124-
TContentKey: OverlayContentKey + Send + Sync,
126+
TContentKey: 'static + OverlayContentKey + Send + Sync,
125127
TMetric: Metric + Send + Sync,
126128
TValidator: 'static + Validator<TContentKey> + Send + Sync,
127129
> OverlayProtocol<TContentKey, TMetric, TValidator>
@@ -135,7 +137,7 @@ where
135137
storage: Arc<RwLock<PortalStorage>>,
136138
data_radius: U256,
137139
protocol: ProtocolId,
138-
validator: TValidator,
140+
validator: Arc<Mutex<TValidator>>,
139141
) -> Self {
140142
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
141143
discovery.local_enr().node_id().into(),
@@ -156,7 +158,7 @@ where
156158
protocol.clone(),
157159
utp_listener_tx.clone(),
158160
config.enable_metrics,
159-
validator,
161+
Arc::clone(&validator),
160162
config.query_timeout,
161163
config.query_peer_timeout,
162164
config.query_parallelism,
@@ -176,7 +178,7 @@ where
176178
utp_listener_tx,
177179
phantom_content_key: PhantomData,
178180
phantom_metric: PhantomData,
179-
phantom_validator: PhantomData,
181+
validator,
180182
}
181183
}
182184

@@ -253,35 +255,60 @@ where
253255
));
254256
}
255257

256-
// TODO: Verify overlay content data with an Oracle
257-
258-
// Temporarily store content key/value pairs to propagate here
259-
let mut content_keys_values: Vec<(TContentKey, ByteList)> = Vec::new();
260-
261-
// Try to store the content into the database and propagate gossip the content
258+
// Validate, and try to store the content into the database and propagate gossip the content
262259
for (content_key, content_value) in content_keys.into_iter().zip(content_values.to_vec()) {
263260
match TContentKey::try_from(content_key) {
264261
Ok(key) => {
265-
// Store accepted content in DB
266-
self.store_overlay_content(&key, content_value.clone());
267-
content_keys_values.push((key, content_value))
262+
// Spawn a task that...
263+
// - Validates accepted content (this step requires a dedicated task since it
264+
// might require non-blocking requests to this/other overlay networks)
265+
// - Checks if content should be stored, and stores it if true
266+
// - Propagate all validated content
267+
let validator = Arc::clone(&self.validator);
268+
let storage = Arc::clone(&self.storage);
269+
let kbuckets = Arc::clone(&self.kbuckets);
270+
let request_tx = self.request_tx.clone();
271+
272+
tokio::spawn(async move {
273+
// Validated received content
274+
let mut lock = validator.lock().await;
275+
if let Err(err) = lock.validate_content(&key, &content_value.to_vec()).await
276+
{
277+
// Skip storing & propagating content if it's not valid
278+
error!("Unable to validate received content: {err:?}");
279+
return;
280+
};
281+
282+
// Check if data should be stored, and store if true.
283+
let _ = storage
284+
.write()
285+
.store_if_should(&key, &content_value.to_vec());
286+
287+
// Propagate all validated content, whether or not it was stored.
288+
OverlayProtocol::<TContentKey, TMetric, TValidator>::propagate_gossip(
289+
(key, content_value),
290+
kbuckets,
291+
request_tx,
292+
);
293+
});
268294
}
269295
Err(err) => {
270-
return Err(anyhow!(
271-
"Unexpected error while decoding overlay content key: {err}"
272-
));
296+
warn!("Unexpected error while decoding overlay content key: {err}");
297+
continue;
273298
}
274299
}
275300
}
276-
// Propagate gossip accepted content
277-
self.propagate_gossip(content_keys_values)?;
278301
Ok(())
279302
}
280303

281304
/// Propagate gossip accepted content via OFFER/ACCEPT:
282-
fn propagate_gossip(&self, content: Vec<(TContentKey, ByteList)>) -> anyhow::Result<()> {
305+
fn propagate_gossip(
306+
content_key_value: (TContentKey, ByteList),
307+
kbuckets: Arc<RwLock<KBucketsTable<NodeId, Node>>>,
308+
request_tx: UnboundedSender<OverlayRequest>,
309+
) {
283310
// Get all nodes from overlay routing table
284-
let kbuckets = self.kbuckets.read();
311+
let kbuckets = kbuckets.read();
285312
let all_nodes: Vec<&kbucket::Node<NodeId, Node>> = kbuckets
286313
.buckets_iter()
287314
.map(|kbucket| {
@@ -297,37 +324,44 @@ where
297324
let mut enrs_and_content: HashMap<String, Vec<RawContentKey>> = HashMap::new();
298325

299326
// Filter all nodes from overlay routing table where XOR_distance(content_id, nodeId) < node radius
300-
for content_key_value in content {
301-
let interested_enrs: Vec<Enr> = all_nodes
302-
.clone()
303-
.into_iter()
304-
.filter(|node| {
305-
XorMetric::distance(
306-
&content_key_value.0.content_id(),
307-
&node.key.preimage().raw(),
308-
) < node.value.data_radius()
309-
})
310-
.map(|node| node.value.enr())
311-
.collect();
312-
313-
// Continue if no nodes are interested in the content
314-
if interested_enrs.is_empty() {
315-
debug!("No nodes interested in neighborhood gossip: content_key={} num_nodes_checked={}",
316-
hex_encode(content_key_value.0.into()), all_nodes.len());
317-
continue;
318-
}
327+
let interested_enrs: Vec<Enr> = all_nodes
328+
.clone()
329+
.into_iter()
330+
.filter(|node| {
331+
XorMetric::distance(
332+
&content_key_value.0.content_id(),
333+
&node.key.preimage().raw(),
334+
) < node.value.data_radius()
335+
})
336+
.map(|node| node.value.enr())
337+
.collect();
319338

320-
// Get log2 random ENRs to gossip
321-
let random_enrs = log2_random_enrs(interested_enrs)?;
339+
// Continue if no nodes are interested in the content
340+
if interested_enrs.is_empty() {
341+
debug!(
342+
"No nodes interested in neighborhood gossip: content_key={} num_nodes_checked={}",
343+
hex_encode(content_key_value.0.into()),
344+
all_nodes.len()
345+
);
346+
return;
347+
}
322348

323-
// Temporarily store all randomly selected nodes with the content of interest.
324-
// We want this so we can offer all the content to interested node in one request.
325-
for enr in random_enrs {
326-
enrs_and_content
327-
.entry(enr.to_base64())
328-
.or_default()
329-
.push(content_key_value.clone().0.into());
349+
// Get log2 random ENRs to gossip
350+
let random_enrs = match log2_random_enrs(interested_enrs) {
351+
Ok(val) => val,
352+
Err(msg) => {
353+
debug!("No available enrs for gossip: {msg}");
354+
return;
330355
}
356+
};
357+
358+
// Temporarily store all randomly selected nodes with the content of interest.
359+
// We want this so we can offer all the content to interested node in one request.
360+
for enr in random_enrs {
361+
enrs_and_content
362+
.entry(enr.to_base64())
363+
.or_default()
364+
.push(content_key_value.clone().0.into());
331365
}
332366

333367
// Create and send OFFER overlay request to the interested nodes
@@ -351,24 +385,10 @@ where
351385
None,
352386
);
353387

354-
if let Err(err) = self.request_tx.send(overlay_request) {
388+
if let Err(err) = request_tx.send(overlay_request) {
355389
error!("Unable to send OFFER request to overlay: {err}.")
356390
}
357391
}
358-
Ok(())
359-
}
360-
361-
/// Try to store overlay content into database
362-
fn store_overlay_content(&self, content_key: &TContentKey, value: ByteList) {
363-
let should_store = self.storage.read().should_store(content_key);
364-
match should_store {
365-
Ok(_) => {
366-
if let Err(err) = self.storage.write().store(content_key, &value.into()) {
367-
warn!("Unable to store accepted content: {err}");
368-
}
369-
}
370-
Err(err) => error!("Unable to determine whether to store accepted content: {err}"),
371-
}
372392
}
373393

374394
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.

trin-core/src/portalnet/overlay_service.rs

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use ssz::Encode;
6868
use ssz_types::{BitList, VariableList};
6969
use thiserror::Error;
7070
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
71+
use tokio::sync::Mutex;
7172

7273
/// Maximum number of ENRs in response to FindNodes.
7374
pub const FIND_NODES_MAX_NODES: usize = 32;
@@ -317,11 +318,11 @@ pub struct OverlayService<TContentKey, TMetric, TValidator> {
317318
/// Metrics reporting component
318319
metrics: Option<OverlayMetrics>,
319320
/// Validator for overlay network content.
320-
validator: TValidator,
321+
validator: Arc<Mutex<TValidator>>,
321322
}
322323

323324
impl<
324-
TContentKey: OverlayContentKey + Send + Sync,
325+
TContentKey: 'static + OverlayContentKey + Send + Sync,
325326
TMetric: Metric + Send + Sync,
326327
TValidator: 'static + Validator<TContentKey> + Send + Sync,
327328
> OverlayService<TContentKey, TMetric, TValidator>
@@ -343,7 +344,7 @@ where
343344
protocol: ProtocolId,
344345
utp_listener_sender: UnboundedSender<UtpListenerRequest>,
345346
enable_metrics: bool,
346-
validator: TValidator,
347+
validator: Arc<Mutex<TValidator>>,
347348
query_timeout: Duration,
348349
query_peer_timeout: Duration,
349350
query_parallelism: usize,
@@ -1317,6 +1318,7 @@ where
13171318
Content::Content(content) => {
13181319
self.process_received_content(content.clone(), request)
13191320
.await;
1321+
// huh?
13201322
// TODO: Should we only advance the query if the content has been validated?
13211323
if let Some(query_id) = query_id {
13221324
self.advance_find_content_query_with_content(&query_id, source, content.into());
@@ -1344,19 +1346,23 @@ where
13441346
match should_store {
13451347
Ok(val) => {
13461348
if val {
1347-
// validate content before storing
1348-
if let Err(err) = self
1349-
.validator
1350-
.validate_content(&content_key, &content.to_vec())
1351-
.await
1352-
{
1353-
error!("Unable to validate received content: {err:?}");
1354-
return;
1355-
};
1349+
let validator = Arc::clone(&self.validator);
1350+
let storage = Arc::clone(&self.storage);
1351+
// Spawn task that validates content before storing.
1352+
// Allows for non-blocking requests to this/other overlay services.
1353+
tokio::spawn(async move {
1354+
let mut lock = validator.lock().await;
1355+
if let Err(err) =
1356+
lock.validate_content(&content_key, &content.to_vec()).await
1357+
{
1358+
error!("Unable to validate received content: {err:?}");
1359+
return;
1360+
};
13561361

1357-
if let Err(err) = self.storage.write().store(&content_key, &content.into()) {
1358-
error!("Content received, but not stored: {err}")
1359-
}
1362+
if let Err(err) = storage.write().store(&content_key, &content.into()) {
1363+
error!("Content received, but not stored: {err}")
1364+
}
1365+
});
13601366
} else {
13611367
debug!(
13621368
"Content received, but not stored: Content is already stored or its distance falls outside current radius."
@@ -1993,7 +1999,7 @@ mod tests {
19931999
let (request_tx, request_rx) = mpsc::unbounded_channel();
19942000
let (response_tx, response_rx) = mpsc::unbounded_channel();
19952001
let metrics = None;
1996-
let validator = MockValidator {};
2002+
let validator = Arc::new(Mutex::new(MockValidator {}));
19972003

19982004
let service = OverlayService {
19992005
discovery,

trin-core/src/portalnet/storage.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,21 @@ impl PortalStorage {
136136
}
137137
}
138138

139+
/// Public method for automatically storing content after a `should_store` check.
140+
pub fn store_if_should(
141+
&mut self,
142+
key: &impl OverlayContentKey,
143+
value: &Vec<u8>,
144+
) -> Result<bool, PortalStorageError> {
145+
match self.should_store(key)? {
146+
true => {
147+
let _ = self.store(key, value)?;
148+
Ok(true)
149+
}
150+
false => Ok(false),
151+
}
152+
}
153+
139154
/// Public method for storing a given value for a given content-key.
140155
pub fn store(
141156
&mut self,

trin-core/tests/overlay.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use discv5::Discv5Event;
2121
use ethereum_types::U256;
2222
use parking_lot::RwLock;
2323
use tokio::{
24+
sync::Mutex,
2425
sync::{mpsc, mpsc::unbounded_channel},
2526
time::{self, Duration},
2627
};
@@ -40,7 +41,7 @@ async fn init_overlay(
4041
let overlay_config = OverlayConfig::default();
4142
// Ignore all uTP events
4243
let (utp_listener_tx, _) = unbounded_channel::<UtpListenerRequest>();
43-
let validator = MockValidator {};
44+
let validator = Arc::new(Mutex::new(MockValidator {}));
4445

4546
OverlayProtocol::new(
4647
overlay_config,

trin-history/src/network.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock as StdRwLock};
33

44
use parking_lot::RwLock;
55
use tokio::sync::mpsc::UnboundedSender;
6+
use tokio::sync::Mutex;
67

78
use trin_core::{
89
portalnet::{
@@ -41,7 +42,7 @@ impl HistoryNetwork {
4142
..Default::default()
4243
};
4344
let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap()));
44-
let validator = ChainHistoryValidator { header_oracle };
45+
let validator = Arc::new(Mutex::new(ChainHistoryValidator { header_oracle }));
4546
let overlay = OverlayProtocol::new(
4647
config,
4748
discovery,

trin-state/src/events.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ impl StateEvents {
1515
pub async fn start(mut self) {
1616
loop {
1717
tokio::select! {
18+
// blocking async!
1819
Some(talk_request) = self.event_rx.recv() => {
1920
self.handle_state_talk_request(talk_request).await;
2021
}

trin-state/src/network.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use eth_trie::EthTrie;
55
use log::{debug, error};
66
use parking_lot::RwLock;
77
use tokio::sync::mpsc::UnboundedSender;
8+
use tokio::sync::Mutex;
9+
810
use trin_core::{
911
portalnet::{
1012
discovery::Discovery,
@@ -42,9 +44,9 @@ impl StateNetwork {
4244
let trie = EthTrie::new(Arc::new(triedb));
4345

4446
let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap()));
45-
let validator = StateValidator {
47+
let validator = Arc::new(Mutex::new(StateValidator {
4648
header_oracle: HeaderOracle::default(),
47-
};
49+
}));
4850
let config = OverlayConfig {
4951
bootnode_enrs: portal_config.bootnode_enrs.clone(),
5052
enable_metrics: portal_config.enable_metrics,

0 commit comments

Comments
 (0)