Skip to content

Commit b75f266

Browse files
committed
Spawn non-blocking task for content validation
1 parent d6d8f72 commit b75f266

File tree

6 files changed

+44
-40
lines changed

6 files changed

+44
-40
lines changed

newsfragments/373.fixed.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Remove blocking async from overlay service loop via spawning new tasks for utp & content validation.

trin-core/src/portalnet/overlay.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use rand::seq::IteratorRandom;
5050
use ssz::{Decode, Encode};
5151
use ssz_types::VariableList;
5252
use tokio::sync::mpsc::UnboundedSender;
53+
use tokio::sync::Mutex;
5354
use tracing::{debug, warn};
5455

5556
pub use super::overlay_service::{OverlayRequestError, RequestDirection};
@@ -121,7 +122,7 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
121122
}
122123

123124
impl<
124-
TContentKey: OverlayContentKey + Send + Sync,
125+
TContentKey: 'static + OverlayContentKey + Send + Sync,
125126
TMetric: Metric + Send + Sync,
126127
TValidator: 'static + Validator<TContentKey> + Send + Sync,
127128
> OverlayProtocol<TContentKey, TMetric, TValidator>
@@ -135,7 +136,7 @@ where
135136
storage: Arc<RwLock<PortalStorage>>,
136137
data_radius: U256,
137138
protocol: ProtocolId,
138-
validator: TValidator,
139+
validator: Arc<Mutex<TValidator>>,
139140
) -> Self {
140141
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
141142
discovery.local_enr().node_id().into(),

trin-core/src/portalnet/overlay_service.rs

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use ssz::Encode;
6868
use ssz_types::{BitList, VariableList};
6969
use thiserror::Error;
7070
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
71+
use tokio::sync::Mutex;
7172

7273
/// Maximum number of ENRs in response to FindNodes.
7374
pub const FIND_NODES_MAX_NODES: usize = 32;
@@ -317,11 +318,11 @@ pub struct OverlayService<TContentKey, TMetric, TValidator> {
317318
/// Metrics reporting component
318319
metrics: Option<OverlayMetrics>,
319320
/// Validator for overlay network content.
320-
validator: TValidator,
321+
validator: Arc<Mutex<TValidator>>,
321322
}
322323

323324
impl<
324-
TContentKey: OverlayContentKey + Send + Sync,
325+
TContentKey: 'static + OverlayContentKey + Send + Sync,
325326
TMetric: Metric + Send + Sync,
326327
TValidator: 'static + Validator<TContentKey> + Send + Sync,
327328
> OverlayService<TContentKey, TMetric, TValidator>
@@ -343,7 +344,7 @@ where
343344
protocol: ProtocolId,
344345
utp_listener_sender: UnboundedSender<UtpListenerRequest>,
345346
enable_metrics: bool,
346-
validator: TValidator,
347+
validator: Arc<Mutex<TValidator>>,
347348
query_timeout: Duration,
348349
query_peer_timeout: Duration,
349350
query_parallelism: usize,
@@ -486,7 +487,7 @@ where
486487

487488
// Perform background processing.
488489
match response.response {
489-
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await,
490+
Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id),
490491
Err(error) => self.process_request_failure(response.request_id, active_request.destination, error),
491492
}
492493

@@ -1108,7 +1109,7 @@ where
11081109
}
11091110

11101111
/// Processes a response to an outgoing request from some source node.
1111-
async fn process_response(
1112+
fn process_response(
11121113
&mut self,
11131114
response: Response,
11141115
source: Enr,
@@ -1171,7 +1172,6 @@ where
11711172
}
11721173
};
11731174
self.process_content(content, source, find_content_request, query_id)
1174-
.await
11751175
}
11761176
Response::Accept(accept) => {
11771177
let offer_request = match request {
@@ -1182,18 +1182,15 @@ where
11821182
}
11831183
};
11841184

1185-
if let Err(err) = self
1186-
.process_accept(accept, source, offer_request.content_keys)
1187-
.await
1188-
{
1185+
if let Err(err) = self.process_accept(accept, source, offer_request.content_keys) {
11891186
error!("Error processing ACCEPT response in overlay service: {err}")
11901187
}
11911188
}
11921189
}
11931190
}
11941191

11951192
/// Process ACCEPT response
1196-
pub async fn process_accept(
1193+
pub fn process_accept(
11971194
&self,
11981195
response: Accept,
11991196
enr: Enr,
@@ -1227,17 +1224,17 @@ where
12271224
self.utp_listener_tx
12281225
.send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?;
12291226

1230-
let mut conn = rx.await?;
1231-
// Handle STATE packet for SYN
1232-
let mut buf = [0; BUF_SIZE];
1233-
conn.recv(&mut buf).await?;
1234-
12351227
let content_items = self.provide_requested_content(&response, content_keys_offered)?;
12361228

12371229
let content_payload = ContentPayloadList::new(content_items)
12381230
.map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?;
12391231

12401232
tokio::spawn(async move {
1233+
let mut conn = rx.await.unwrap();
1234+
// Handle STATE packet for SYN
1235+
let mut buf = [0; BUF_SIZE];
1236+
conn.recv(&mut buf).await.unwrap();
1237+
12411238
// send the content to the acceptor over a uTP stream
12421239
if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await {
12431240
warn!("Error sending content {err}");
@@ -1297,7 +1294,7 @@ where
12971294
}
12981295

12991296
/// Processes a Content response.
1300-
async fn process_content(
1297+
fn process_content(
13011298
&mut self,
13021299
content: Content,
13031300
source: Enr,
@@ -1315,8 +1312,7 @@ where
13151312
self.protocol, id
13161313
),
13171314
Content::Content(content) => {
1318-
self.process_received_content(content.clone(), request)
1319-
.await;
1315+
self.process_received_content(content.clone(), request);
13201316
// TODO: Should we only advance the query if the content has been validated?
13211317
if let Some(query_id) = query_id {
13221318
self.advance_find_content_query_with_content(&query_id, source, content.into());
@@ -1332,7 +1328,7 @@ where
13321328
}
13331329
}
13341330

1335-
async fn process_received_content(&mut self, content: ByteList, request: FindContent) {
1331+
fn process_received_content(&mut self, content: ByteList, request: FindContent) {
13361332
let content_key = match TContentKey::try_from(request.content_key) {
13371333
Ok(val) => val,
13381334
Err(msg) => {
@@ -1344,19 +1340,23 @@ where
13441340
match should_store {
13451341
Ok(val) => {
13461342
if val {
1347-
// validate content before storing
1348-
if let Err(err) = self
1349-
.validator
1350-
.validate_content(&content_key, &content.to_vec())
1351-
.await
1352-
{
1353-
error!("Unable to validate received content: {err:?}");
1354-
return;
1355-
};
1343+
let validator = Arc::clone(&self.validator);
1344+
let storage = Arc::clone(&self.storage);
1345+
// Spawn task that validates content before storing.
1346+
// Allows for non-blocking requests to this/other overlay services.
1347+
tokio::spawn(async move {
1348+
let mut lock = validator.lock().await;
1349+
if let Err(err) =
1350+
lock.validate_content(&content_key, &content.to_vec()).await
1351+
{
1352+
error!("Unable to validate received content: {err:?}");
1353+
return;
1354+
};
13561355

1357-
if let Err(err) = self.storage.write().store(&content_key, &content.into()) {
1358-
error!("Content received, but not stored: {err}")
1359-
}
1356+
if let Err(err) = storage.write().store(&content_key, &content.into()) {
1357+
error!("Content received, but not stored: {err}")
1358+
}
1359+
});
13601360
} else {
13611361
debug!(
13621362
"Content received, but not stored: Content is already stored or its distance falls outside current radius."
@@ -1993,7 +1993,7 @@ mod tests {
19931993
let (request_tx, request_rx) = mpsc::unbounded_channel();
19941994
let (response_tx, response_rx) = mpsc::unbounded_channel();
19951995
let metrics = None;
1996-
let validator = MockValidator {};
1996+
let validator = Arc::new(Mutex::new(MockValidator {}));
19971997

19981998
let service = OverlayService {
19991999
discovery,

trin-core/tests/overlay.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use discv5::Discv5Event;
2121
use ethereum_types::U256;
2222
use parking_lot::RwLock;
2323
use tokio::{
24-
sync::{mpsc, mpsc::unbounded_channel},
24+
sync::{mpsc, mpsc::unbounded_channel, Mutex},
2525
time::{self, Duration},
2626
};
2727

@@ -40,7 +40,7 @@ async fn init_overlay(
4040
let overlay_config = OverlayConfig::default();
4141
// Ignore all uTP events
4242
let (utp_listener_tx, _) = unbounded_channel::<UtpListenerRequest>();
43-
let validator = MockValidator {};
43+
let validator = Arc::new(Mutex::new(MockValidator {}));
4444

4545
OverlayProtocol::new(
4646
overlay_config,

trin-history/src/network.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock as StdRwLock};
33

44
use parking_lot::RwLock;
55
use tokio::sync::mpsc::UnboundedSender;
6+
use tokio::sync::Mutex;
67

78
use trin_core::{
89
portalnet::{
@@ -41,7 +42,7 @@ impl HistoryNetwork {
4142
..Default::default()
4243
};
4344
let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap()));
44-
let validator = ChainHistoryValidator { header_oracle };
45+
let validator = Arc::new(Mutex::new(ChainHistoryValidator { header_oracle }));
4546
let overlay = OverlayProtocol::new(
4647
config,
4748
discovery,

trin-state/src/network.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use eth_trie::EthTrie;
55
use log::{debug, error};
66
use parking_lot::RwLock;
77
use tokio::sync::mpsc::UnboundedSender;
8+
use tokio::sync::Mutex;
89
use trin_core::{
910
portalnet::{
1011
discovery::Discovery,
@@ -42,9 +43,9 @@ impl StateNetwork {
4243
let trie = EthTrie::new(Arc::new(triedb));
4344

4445
let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap()));
45-
let validator = StateValidator {
46+
let validator = Arc::new(Mutex::new(StateValidator {
4647
header_oracle: HeaderOracle::default(),
47-
};
48+
}));
4849
let config = OverlayConfig {
4950
bootnode_enrs: portal_config.bootnode_enrs.clone(),
5051
enable_metrics: portal_config.enable_metrics,

0 commit comments

Comments
 (0)