diff --git a/newsfragments/373.fixed.md b/newsfragments/373.fixed.md new file mode 100644 index 000000000..86713ed29 --- /dev/null +++ b/newsfragments/373.fixed.md @@ -0,0 +1 @@ +Remove blocking async from overlay service loop via spawning new tasks for utp & content validation. diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 49a30b08a..d4e36c4b4 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -121,7 +121,7 @@ pub struct OverlayProtocol { } impl< - TContentKey: OverlayContentKey + Send + Sync, + TContentKey: 'static + OverlayContentKey + Send + Sync, TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, > OverlayProtocol @@ -135,7 +135,7 @@ where storage: Arc>, data_radius: U256, protocol: ProtocolId, - validator: TValidator, + validator: Arc, ) -> Self { let kbuckets = Arc::new(RwLock::new(KBucketsTable::new( discovery.local_enr().node_id().into(), diff --git a/trin-core/src/portalnet/overlay_service.rs b/trin-core/src/portalnet/overlay_service.rs index 3c5b5b698..d8ec7a6d7 100644 --- a/trin-core/src/portalnet/overlay_service.rs +++ b/trin-core/src/portalnet/overlay_service.rs @@ -317,11 +317,11 @@ pub struct OverlayService { /// Metrics reporting component metrics: Option, /// Validator for overlay network content. - validator: TValidator, + validator: Arc, } impl< - TContentKey: OverlayContentKey + Send + Sync, + TContentKey: 'static + OverlayContentKey + Send + Sync, TMetric: Metric + Send + Sync, TValidator: 'static + Validator + Send + Sync, > OverlayService @@ -343,7 +343,7 @@ where protocol: ProtocolId, utp_listener_sender: UnboundedSender, enable_metrics: bool, - validator: TValidator, + validator: Arc, query_timeout: Duration, query_peer_timeout: Duration, query_parallelism: usize, @@ -486,7 +486,7 @@ where // Perform background processing. match response.response { - Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id).await, + Ok(response) => self.process_response(response, active_request.destination, active_request.request, active_request.query_id), Err(error) => self.process_request_failure(response.request_id, active_request.destination, error), } @@ -1108,7 +1108,7 @@ where } /// Processes a response to an outgoing request from some source node. - async fn process_response( + fn process_response( &mut self, response: Response, source: Enr, @@ -1171,7 +1171,6 @@ where } }; self.process_content(content, source, find_content_request, query_id) - .await } Response::Accept(accept) => { let offer_request = match request { @@ -1182,10 +1181,7 @@ where } }; - if let Err(err) = self - .process_accept(accept, source, offer_request.content_keys) - .await - { + if let Err(err) = self.process_accept(accept, source, offer_request.content_keys) { error!("Error processing ACCEPT response in overlay service: {err}") } } @@ -1193,7 +1189,7 @@ where } /// Process ACCEPT response - pub async fn process_accept( + pub fn process_accept( &self, response: Accept, enr: Enr, @@ -1227,17 +1223,22 @@ where self.utp_listener_tx .send(utp_request).map_err(|err| anyhow!("Unable to send Connect request to UtpListener when processing ACCEPT message: {err}"))?; - let mut conn = rx.await?; - // Handle STATE packet for SYN - let mut buf = [0; BUF_SIZE]; - conn.recv(&mut buf).await?; + let storage = Arc::clone(&self.storage); + let response_clone = response.clone(); + + tokio::spawn(async move { + let mut conn = rx.await.unwrap(); + // Handle STATE packet for SYN + let mut buf = [0; BUF_SIZE]; + conn.recv(&mut buf).await.unwrap(); - let content_items = self.provide_requested_content(&response, content_keys_offered)?; + let content_items = + Self::provide_requested_content(storage, &response_clone, content_keys_offered) + .expect("Unable to provide requested content for acceptor: {msg:?}"); - let content_payload = ContentPayloadList::new(content_items) - .map_err(|err| anyhow!("Unable to build content payload: {err:?}"))?; + let content_payload = ContentPayloadList::new(content_items) + .expect("Unable to build content payload: {msg:?}"); - tokio::spawn(async move { // send the content to the acceptor over a uTP stream if let Err(err) = conn.send_to(&content_payload.as_ssz_bytes()).await { warn!("Error sending content {err}"); @@ -1297,7 +1298,7 @@ where } /// Processes a Content response. - async fn process_content( + fn process_content( &mut self, content: Content, source: Enr, @@ -1315,8 +1316,7 @@ where self.protocol, id ), Content::Content(content) => { - self.process_received_content(content.clone(), request) - .await; + self.process_received_content(content.clone(), request); // TODO: Should we only advance the query if the content has been validated? if let Some(query_id) = query_id { self.advance_find_content_query_with_content(&query_id, source, content.into()); @@ -1332,7 +1332,7 @@ where } } - async fn process_received_content(&mut self, content: ByteList, request: FindContent) { + fn process_received_content(&mut self, content: ByteList, request: FindContent) { let content_key = match TContentKey::try_from(request.content_key) { Ok(val) => val, Err(msg) => { @@ -1344,19 +1344,23 @@ where match should_store { Ok(val) => { if val { - // validate content before storing - if let Err(err) = self - .validator - .validate_content(&content_key, &content.to_vec()) - .await - { - error!("Unable to validate received content: {err:?}"); - return; - }; + let validator = Arc::clone(&self.validator); + let storage = Arc::clone(&self.storage); + // Spawn task that validates content before storing. + // Allows for non-blocking requests to this/other overlay services. + tokio::spawn(async move { + if let Err(err) = validator + .validate_content(&content_key, &content.to_vec()) + .await + { + warn!("Unable to validate received content: {err:?}"); + return; + }; - if let Err(err) = self.storage.write().store(&content_key, &content.into()) { - error!("Content received, but not stored: {err}") - } + if let Err(err) = storage.write().store(&content_key, &content.into()) { + error!("Content received, but not stored: {err}") + } + }); } else { debug!( "Content received, but not stored: Content is already stored or its distance falls outside current radius." @@ -1456,7 +1460,7 @@ where /// Provide the requested content key and content value for the acceptor fn provide_requested_content( - &self, + storage: Arc>, accept_message: &Accept, content_keys_offered: Vec, ) -> anyhow::Result> { @@ -1469,7 +1473,7 @@ where .zip(content_keys_offered.iter()) { if i == true { - match self.storage.read().get(key) { + match storage.read().get(key) { Ok(content) => match content { Some(content) => content_items.push(content.into()), None => return Err(anyhow!("Unable to read offered content!")), @@ -1993,7 +1997,7 @@ mod tests { let (request_tx, request_rx) = mpsc::unbounded_channel(); let (response_tx, response_rx) = mpsc::unbounded_channel(); let metrics = None; - let validator = MockValidator {}; + let validator = Arc::new(MockValidator {}); let service = OverlayService { discovery, diff --git a/trin-core/src/types/validation.rs b/trin-core/src/types/validation.rs index 31edc7f28..8c12bce84 100644 --- a/trin-core/src/types/validation.rs +++ b/trin-core/src/types/validation.rs @@ -42,7 +42,7 @@ impl Default for HeaderOracle { impl HeaderOracle { // Currently falls back to infura, to be updated to use canonical block indices network. - pub fn get_hash_at_height(&mut self, block_number: u64) -> anyhow::Result { + pub fn get_hash_at_height(&self, block_number: u64) -> anyhow::Result { let hex_number = format!("0x:{:02X}", block_number); let request = JsonRequest { jsonrpc: "2.0".to_string(), @@ -70,7 +70,7 @@ impl HeaderOracle { Ok(infura_hash.to_owned()) } - pub fn get_header_by_hash(&mut self, block_hash: H256) -> anyhow::Result
{ + pub fn get_header_by_hash(&self, block_hash: H256) -> anyhow::Result
{ let block_hash = format!("0x{:02X}", block_hash); let request = JsonRequest { jsonrpc: "2.0".to_string(), @@ -101,7 +101,7 @@ impl HeaderOracle { #[async_trait] pub trait Validator { async fn validate_content( - &mut self, + &self, content_key: &TContentKey, content: &[u8], ) -> anyhow::Result<()> @@ -115,7 +115,7 @@ pub struct MockValidator {} #[async_trait] impl Validator for MockValidator { async fn validate_content( - &mut self, + &self, _content_key: &IdentityContentKey, _content: &[u8], ) -> anyhow::Result<()> diff --git a/trin-core/tests/overlay.rs b/trin-core/tests/overlay.rs index 0455c9c2d..db322d1c3 100644 --- a/trin-core/tests/overlay.rs +++ b/trin-core/tests/overlay.rs @@ -40,7 +40,7 @@ async fn init_overlay( let overlay_config = OverlayConfig::default(); // Ignore all uTP events let (utp_listener_tx, _) = unbounded_channel::(); - let validator = MockValidator {}; + let validator = Arc::new(MockValidator {}); OverlayProtocol::new( overlay_config, diff --git a/trin-history/src/network.rs b/trin-history/src/network.rs index f6f7348fb..f41c7e177 100644 --- a/trin-history/src/network.rs +++ b/trin-history/src/network.rs @@ -41,7 +41,7 @@ impl HistoryNetwork { ..Default::default() }; let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap())); - let validator = ChainHistoryValidator { header_oracle }; + let validator = Arc::new(ChainHistoryValidator { header_oracle }); let overlay = OverlayProtocol::new( config, discovery, diff --git a/trin-history/src/validation.rs b/trin-history/src/validation.rs index bea0d832b..1ce3c303d 100644 --- a/trin-history/src/validation.rs +++ b/trin-history/src/validation.rs @@ -22,7 +22,7 @@ pub struct ChainHistoryValidator { #[async_trait] impl Validator for ChainHistoryValidator { async fn validate_content( - &mut self, + &self, content_key: &HistoryContentKey, content: &[u8], ) -> anyhow::Result<()> @@ -212,7 +212,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = HistoryContentKey::BlockHeader(BlockHeader { chain_id: 1, block_hash: header.hash().0, @@ -240,7 +240,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = HistoryContentKey::BlockHeader(BlockHeader { chain_id: 1, block_hash: header.hash().0, @@ -269,7 +269,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = HistoryContentKey::BlockHeader(BlockHeader { chain_id: 1, block_hash: header.hash().0, @@ -294,7 +294,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_body_key(); chain_history_validator @@ -327,7 +327,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_body_key(); chain_history_validator @@ -348,7 +348,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_receipts_key(); chain_history_validator @@ -379,7 +379,7 @@ mod tests { infura_url, ..HeaderOracle::default() })); - let mut chain_history_validator = ChainHistoryValidator { header_oracle }; + let chain_history_validator = ChainHistoryValidator { header_oracle }; let content_key = block_14764013_receipts_key(); chain_history_validator diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index c95a505b2..af2714d58 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -42,9 +42,9 @@ impl StateNetwork { let trie = EthTrie::new(Arc::new(triedb)); let storage = Arc::new(RwLock::new(PortalStorage::new(storage_config).unwrap())); - let validator = StateValidator { + let validator = Arc::new(StateValidator { header_oracle: HeaderOracle::default(), - }; + }); let config = OverlayConfig { bootnode_enrs: portal_config.bootnode_enrs.clone(), enable_metrics: portal_config.enable_metrics, diff --git a/trin-state/src/validation.rs b/trin-state/src/validation.rs index fc9e16525..b02215fcf 100644 --- a/trin-state/src/validation.rs +++ b/trin-state/src/validation.rs @@ -12,7 +12,7 @@ pub struct StateValidator { #[async_trait] impl Validator for StateValidator { async fn validate_content( - &mut self, + &self, _content_key: &StateContentKey, _content: &[u8], ) -> anyhow::Result<()>