From b75f2661e0dbd525ba633f4b3c1ae5f6c5b83c83 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Fri, 8 Jul 2022 14:58:58 -0400 Subject: [PATCH 1/3] Spawn non-blocking task for content validation --- newsfragments/373.fixed.md | 1 + trin-core/src/portalnet/overlay.rs | 5 +- trin-core/src/portalnet/overlay_service.rs | 66 +++++++++++----------- trin-core/tests/overlay.rs | 4 +- trin-history/src/network.rs | 3 +- trin-state/src/network.rs | 5 +- 6 files changed, 44 insertions(+), 40 deletions(-) create mode 100644 newsfragments/373.fixed.md diff --git a/newsfragments/373.fixed.md b/newsfragments/373.fixed.md new file mode 100644 index 000000000..86713ed29 --- /dev/null +++ b/newsfragments/373.fixed.md @@ -0,0 +1 @@ +Remove blocking async from overlay service loop via spawning new tasks for utp & content validation. diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 49a30b08a..91b757067 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -50,6 +50,7 @@ use rand::seq::IteratorRandom; use ssz::{Decode, Encode}; use ssz_types::VariableList; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; use tracing::{debug, warn}; pub use super::overlay_service::{OverlayRequestError, RequestDirection}; @@ -121,7 +122,7 @@ pub struct OverlayProtocol { } impl< - TContentKey: OverlayContentKey + Send + Sync, + TContentKey: 'static + OverlayContentKey + Send + Sync, TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, > OverlayProtocol @@ -135,7 +136,7 @@ where storage: Arc>, data_radius: U256, protocol: ProtocolId, - validator: TValidator, + validator: Arc>, ) -> Self { let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( discovery.local_enr().node_id().into(), diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 3c5b5b698..dde9e60b6 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -68,6 +68,7 @@ use ssz::Encode; use ssz_types::{BitList, VariableList}; use thiserror::Error; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; +use tokio::sync::Mutex; /// Maximum number of ENRs in response to FindNodes. pub const FIND_NODES_MAX_NODES: usize = 32; @@ -317,11 +318,11 @@ pub struct OverlayService { /// Metrics reporting component metrics: Option, /// Validator for overlay network content. - validator: TValidator, + validator: Arc>, } impl< - TContentKey: OverlayContentKey + Send + Sync, + TContentKey: 'static + OverlayContentKey + Send + Sync, TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, > OverlayService @@ -343,7 +344,7 @@ where protocol: ProtocolId, utp_listener_sender: UnboundedSender, enable_metrics: bool, - validator: TValidator, + validator: Arc>, query_timeout: Duration, query_peer_timeout: Duration, query_parallelism: usize, @@ -486,7 +487,7 @@ where // Perform background processing. match response.response { - Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await, + Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id), Err(error) => self.process_request_failure(response.request_id, active_request.destination, error), } @@ -1108,7 +1109,7 @@ where } /// Processes a response to an outgoing request from some source node. - async fn process_response( + fn process_response( &mut self, response: Response, source: Enr, @@ -1171,7 +1172,6 @@ where } }; self.process_content(content, source, find_content_request, query_id) - .await } Response::Accept(accept) => { let offer_request = match request { @@ -1182,10 +1182,7 @@ where } }; - if let Err(err) = self - .process_accept(accept, source, offer_request.content_keys) - .await - { + if let Err(err) = self.process_accept(accept, source, offer_request.content_keys) { error!("Error processing ACCEPT response in overlay service: {err}") } } @@ -1193,7 +1190,7 @@ where } /// Process ACCEPT response - pub async fn process_accept( + pub fn process_accept( &self, response: Accept, enr: Enr, @@ -1227,17 +1224,17 @@ where self.utp_listener_tx .send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; - let mut conn = rx.await?; - // Handle STATE packet for SYN - let mut buf = [0; BUF_SIZE]; - conn.recv(&mut buf).await?; - let content_items = self.provide_requested_content(&response, content_keys_offered)?; let content_payload = ContentPayloadList::new(content_items) .map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?; tokio::spawn(async move { + let mut conn = rx.await.unwrap(); + // Handle STATE packet for SYN + let mut buf = [0; BUF_SIZE]; + conn.recv(&mut buf).await.unwrap(); + // send the content to the acceptor over a uTP stream if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await { warn!("Error sending content {err}"); @@ -1297,7 +1294,7 @@ where } /// Processes a Content response. - async fn process_content( + fn process_content( &mut self, content: Content, source: Enr, @@ -1315,8 +1312,7 @@ where self.protocol, id ), Content::Content(content) => { - self.process_received_content(content.clone(), request) - .await; + self.process_received_content(content.clone(), request); // TODO: Should we only advance the query if the content has been validated? if let Some(query_id) = query_id { self.advance_find_content_query_with_content(&query_id, source, content.into()); @@ -1332,7 +1328,7 @@ where } } - async fn process_received_content(&mut self, content: ByteList, request: FindContent) { + fn process_received_content(&mut self, content: ByteList, request: FindContent) { let content_key = match TContentKey::try_from(request.content_key) { Ok(val) => val, Err(msg) => { @@ -1344,19 +1340,23 @@ where match should_store { Ok(val) => { if val { - // validate content before storing - if let Err(err) = self - .validator - .validate_content(&content_key, &content.to_vec()) - .await - { - error!("Unable to validate received content: {err:?}"); - return; - }; + let validator = Arc::clone(&self.validator); + let storage = Arc::clone(&self.storage); + // Spawn task that validates content before storing. + // Allows for non-blocking requests to this/other overlay services. + tokio::spawn(async move { + let mut lock = validator.lock().await; + if let Err(err) = + lock.validate_content(&content_key, &content.to_vec()).await + { + error!("Unable to validate received content: {err:?}"); + return; + }; - if let Err(err) = self.storage.write().store(&content_key, &content.into()) { - error!("Content received, but not stored: {err}") - } + if let Err(err) = storage.write().store(&content_key, &content.into()) { + error!("Content received, but not stored: {err}") + } + }); } else { debug!( "Content received, but not stored: Content is already stored or its distance falls outside current radius." @@ -1993,7 +1993,7 @@ mod tests { let (request_tx, request_rx) = mpsc::unbounded_channel(); let (response_tx, response_rx) = mpsc::unbounded_channel(); let metrics = None; - let validator = MockValidator {}; + let validator = Arc::new(Mutex::new(MockValidator {})); let service = OverlayService { discovery, diff --git a/trin-core/tests/overlay.rs b/trin-core/tests/overlay.rs index 0455c9c2d..4fa0acb51 100644 --- a/trin-core/tests/overlay.rs +++ b/trin-core/tests/overlay.rs @@ -21,7 +21,7 @@ use discv5::Discv5Event; use ethereum_types::U256; use parking_lot::RwLock; use tokio::{ - sync::{mpsc, mpsc::unbounded_channel}, + sync::{mpsc, mpsc::unbounded_channel, Mutex}, time::{self, Duration}, }; @@ -40,7 +40,7 @@ async fn init_overlay( let overlay_config = OverlayConfig::default(); // Ignore all uTP events let (utp_listener_tx, _) = unbounded_channel::(); - let validator = MockValidator {}; + let validator = Arc::new(Mutex::new(MockValidator {})); OverlayProtocol::new( overlay_config, diff --git a/trin-history/src/network.rs b/trin-history/src/network.rs index f6f7348fb..ad313622e 100644 --- a/trin-history/src/network.rs +++ b/trin-history/src/network.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock as StdRwLock}; use parking_lot::RwLock; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; use trin_core::{ portalnet::{ @@ -41,7 +42,7 @@ impl HistoryNetwork { ..Default::default() }; let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap())); - let validator = ChainHistoryValidator { header_oracle }; + let validator = Arc::new(Mutex::new(ChainHistoryValidator { header_oracle })); let overlay = OverlayProtocol::new( config, discovery, diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index c95a505b2..276bb7e55 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -5,6 +5,7 @@ use eth_trie::EthTrie; use log::{debug, error}; use parking_lot::RwLock; use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::Mutex; use trin_core::{ portalnet::{ discovery::Discovery, @@ -42,9 +43,9 @@ impl StateNetwork { let trie = EthTrie::new(Arc::new(triedb)); let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap())); - let validator = StateValidator { + let validator = Arc::new(Mutex::new(StateValidator { header_oracle: HeaderOracle::default(), - }; + })); let config = OverlayConfig { bootnode_enrs: portal_config.bootnode_enrs.clone(), enable_metrics: portal_config.enable_metrics, From 57d734a47db9fab8d930a0b025c6cc705a3268fd Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Tue, 12 Jul 2022 08:14:45 -0400 Subject: [PATCH 2/3] Move provide_requested_content into spawned task --- trin-core/src/portalnet/overlay_service.rs | 30 +++++++++++++++++----- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index dde9e60b6..97717b377 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -1224,10 +1224,8 @@ where self.utp_listener_tx .send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; - let content_items = self.provide_requested_content(&response, content_keys_offered)?; - - let content_payload = ContentPayloadList::new(content_items) - .map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?; + let storage = Arc::clone(&self.storage); + let response_clone = response.clone(); tokio::spawn(async move { let mut conn = rx.await.unwrap(); @@ -1235,6 +1233,26 @@ where let mut buf = [0; BUF_SIZE]; conn.recv(&mut buf).await.unwrap(); + let content_items = match Self::provide_requested_content( + storage, + &response_clone, + content_keys_offered, + ) { + Ok(val) => val, + Err(msg) => { + warn!("Unable to provide requested content for acceptor: {msg:?}"); + return; + } + }; + + let content_payload = match ContentPayloadList::new(content_items) { + Ok(val) => val, + Err(msg) => { + warn!("Unable to build content payload: {msg:?}"); + return; + } + }; + // send the content to the acceptor over a uTP stream if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await { warn!("Error sending content {err}"); @@ -1456,7 +1474,7 @@ where /// Provide the requested content key and content value for the acceptor fn provide_requested_content( - &self, + storage: Arc>, accept_message: &Accept, content_keys_offered: Vec, ) -> anyhow::Result> { @@ -1469,7 +1487,7 @@ where .zip(content_keys_offered.iter()) { if i == true { - match self.storage.read().get(key) { + match storage.read().get(key) { Ok(content) => match content { Some(content) => content_items.push(content.into()), None => return Err(anyhow!("Unable to read offered content!")), From b609d93c6c33a4e2a2fee5b94349de1d48ce4cb4 Mon Sep 17 00:00:00 2001 From: Nick Gheorghita Date: Wed, 20 Jul 2022 11:28:10 -0400 Subject: [PATCH 3/3] Remove mutex and self mut ref on Validator --- trin-core/src/portalnet/overlay.rs | 3 +- trin-core/src/portalnet/overlay_service.rs | 38 +++++++--------------- trin-core/src/types/validation.rs | 8 ++--- trin-core/tests/overlay.rs | 4 +-- trin-history/src/network.rs | 3 +- trin-history/src/validation.rs | 16 ++++----- trin-state/src/network.rs | 5 ++- trin-state/src/validation.rs | 2 +- 8 files changed, 31 insertions(+), 48 deletions(-) diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 91b757067..d4e36c4b4 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -50,7 +50,6 @@ use rand::seq::IteratorRandom; use ssz::{Decode, Encode}; use ssz_types::VariableList; use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::Mutex; use tracing::{debug, warn}; pub use super::overlay_service::{OverlayRequestError, RequestDirection}; @@ -136,7 +135,7 @@ where storage: Arc>, data_radius: U256, protocol: ProtocolId, - validator: Arc>, + validator: Arc, ) -> Self { let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( discovery.local_enr().node_id().into(), diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 97717b377..d8ec7a6d7 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -68,7 +68,6 @@ use ssz::Encode; use ssz_types::{BitList, VariableList}; use thiserror::Error; use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender}; -use tokio::sync::Mutex; /// Maximum number of ENRs in response to FindNodes. pub const FIND_NODES_MAX_NODES: usize = 32; @@ -318,7 +317,7 @@ pub struct OverlayService { /// Metrics reporting component metrics: Option, /// Validator for overlay network content. - validator: Arc>, + validator: Arc, } impl< @@ -344,7 +343,7 @@ where protocol: ProtocolId, utp_listener_sender: UnboundedSender, enable_metrics: bool, - validator: Arc>, + validator: Arc, query_timeout: Duration, query_peer_timeout: Duration, query_parallelism: usize, @@ -1233,25 +1232,12 @@ where let mut buf = [0; BUF_SIZE]; conn.recv(&mut buf).await.unwrap(); - let content_items = match Self::provide_requested_content( - storage, - &response_clone, - content_keys_offered, - ) { - Ok(val) => val, - Err(msg) => { - warn!("Unable to provide requested content for acceptor: {msg:?}"); - return; - } - }; + let content_items = + Self::provide_requested_content(storage, &response_clone, content_keys_offered) + .expect("Unable to provide requested content for acceptor: {msg:?}"); - let content_payload = match ContentPayloadList::new(content_items) { - Ok(val) => val, - Err(msg) => { - warn!("Unable to build content payload: {msg:?}"); - return; - } - }; + let content_payload = ContentPayloadList::new(content_items) + .expect("Unable to build content payload: {msg:?}"); // send the content to the acceptor over a uTP stream if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await { @@ -1363,11 +1349,11 @@ where // Spawn task that validates content before storing. // Allows for non-blocking requests to this/other overlay services. tokio::spawn(async move { - let mut lock = validator.lock().await; - if let Err(err) = - lock.validate_content(&content_key, &content.to_vec()).await + if let Err(err) = validator + .validate_content(&content_key, &content.to_vec()) + .await { - error!("Unable to validate received content: {err:?}"); + warn!("Unable to validate received content: {err:?}"); return; }; @@ -2011,7 +1997,7 @@ mod tests { let (request_tx, request_rx) = mpsc::unbounded_channel(); let (response_tx, response_rx) = mpsc::unbounded_channel(); let metrics = None; - let validator = Arc::new(Mutex::new(MockValidator {})); + let validator = Arc::new(MockValidator {}); let service = OverlayService { discovery, diff --git a/trin-core/src/types/validation.rs b/trin-core/src/types/validation.rs index 31edc7f28..8c12bce84 100644 --- a/trin-core/src/types/validation.rs +++ b/trin-core/src/types/validation.rs @@ -42,7 +42,7 @@ impl Default for HeaderOracle { impl HeaderOracle { // Currently falls back to infura, to be updated to use canonical block indices network. - pub fn get_hash_at_height(&mut self, block_number: u64) -> anyhow::Result { + pub fn get_hash_at_height(&self, block_number: u64) -> anyhow::Result { let hex_number = format!("0x:{:02X}", block_number); let request = JsonRequest { jsonrpc: "2.0".to_string(), @@ -70,7 +70,7 @@ impl HeaderOracle { Ok(infura_hash.to_owned()) } - pub fn get_header_by_hash(&mut self, block_hash: H256) -> anyhow::Result
{ + pub fn get_header_by_hash(&self, block_hash: H256) -> anyhow::Result
{ let block_hash = format!("0x{:02X}", block_hash); let request = JsonRequest { jsonrpc: "2.0".to_string(), @@ -101,7 +101,7 @@ impl HeaderOracle { #[async_trait] pub trait Validator { async fn validate_content( - &mut self, + &self, content_key: &TContentKey, content: &[u8], ) -> anyhow::Result<()> @@ -115,7 +115,7 @@ pub struct MockValidator {} #[async_trait] impl Validator for MockValidator { async fn validate_content( - &mut self, + &self, _content_key: &IdentityContentKey, _content: &[u8], ) -> anyhow::Result<()> diff --git a/trin-core/tests/overlay.rs b/trin-core/tests/overlay.rs index 4fa0acb51..db322d1c3 100644 --- a/trin-core/tests/overlay.rs +++ b/trin-core/tests/overlay.rs @@ -21,7 +21,7 @@ use discv5::Discv5Event; use ethereum_types::U256; use parking_lot::RwLock; use tokio::{ - sync::{mpsc, mpsc::unbounded_channel, Mutex}, + sync::{mpsc, mpsc::unbounded_channel}, time::{self, Duration}, }; @@ -40,7 +40,7 @@ async fn init_overlay( let overlay_config = OverlayConfig::default(); // Ignore all uTP events let (utp_listener_tx, _) = unbounded_channel::(); - let validator = Arc::new(Mutex::new(MockValidator {})); + let validator = Arc::new(MockValidator {}); OverlayProtocol::new( overlay_config, diff --git a/trin-history/src/network.rs b/trin-history/src/network.rs index ad313622e..f41c7e177 100644 --- a/trin-history/src/network.rs +++ b/trin-history/src/network.rs @@ -3,7 +3,6 @@ use std::sync::{Arc, RwLock as StdRwLock}; use parking_lot::RwLock; use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::Mutex; use trin_core::{ portalnet::{ @@ -42,7 +41,7 @@ impl HistoryNetwork { ..Default::default() }; let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap())); - let validator = Arc::new(Mutex::new(ChainHistoryValidator { header_oracle })); + let validator = Arc::new(ChainHistoryValidator { header_oracle }); let overlay = OverlayProtocol::new( config, discovery, diff --git a/trin-history/src/validation.rs b/trin-history/src/validation.rs index bea0d832b..1ce3c303d 100644 --- a/trin-history/src/validation.rs +++ b/trin-history/src/validation.rs @@ -22,7 +22,7 @@ pub struct ChainHistoryValidator { #[async_trait] impl Validator for ChainHistoryValidator { async fn validate_content( - &mut self, + &self, content_key: &HistoryContentKey, content: &[u8], ) -> anyhow::Result<()> @@ -212,7 +212,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = HistoryContentKey::BlockHeader(BlockHeader { chain_id: 1, block_hash: header.hash().0, @@ -240,7 +240,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = HistoryContentKey::BlockHeader(BlockHeader { chain_id: 1, block_hash: header.hash().0, @@ -269,7 +269,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = HistoryContentKey::BlockHeader(BlockHeader { chain_id: 1, block_hash: header.hash().0, @@ -294,7 +294,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_body_key(); chain_history_validator @@ -327,7 +327,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_body_key(); chain_history_validator @@ -348,7 +348,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_receipts_key(); chain_history_validator @@ -379,7 +379,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_receipts_key(); chain_history_validator diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index 276bb7e55..af2714d58 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -5,7 +5,6 @@ use eth_trie::EthTrie; use log::{debug, error}; use parking_lot::RwLock; use tokio::sync::mpsc::UnboundedSender; -use tokio::sync::Mutex; use trin_core::{ portalnet::{ discovery::Discovery, @@ -43,9 +42,9 @@ impl StateNetwork { let trie = EthTrie::new(Arc::new(triedb)); let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap())); - let validator = Arc::new(Mutex::new(StateValidator { + let validator = Arc::new(StateValidator { header_oracle: HeaderOracle::default(), - })); + }); let config = OverlayConfig { bootnode_enrs: portal_config.bootnode_enrs.clone(), enable_metrics: portal_config.enable_metrics, diff --git a/trin-state/src/validation.rs b/trin-state/src/validation.rs index fc9e16525..b02215fcf 100644 --- a/trin-state/src/validation.rs +++ b/trin-state/src/validation.rs @@ -12,7 +12,7 @@ pub struct StateValidator { #[async_trait] impl Validator for StateValidator { async fn validate_content( - &mut self, + &self, _content_key: &StateContentKey, _content: &[u8], ) -> anyhow::Result<()>