diff --git a/Cargo.lock b/Cargo.lock index 97748ea3c..f1ca79d4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1203,6 +1203,7 @@ dependencies = [ "discv5", "futures 0.3.21", "hex", + "httpmock", "hyper", "log 0.4.17", "rocksdb", diff --git a/ethportal-peertest/Cargo.toml b/ethportal-peertest/Cargo.toml index dda837515..99afc4cb8 100644 --- a/ethportal-peertest/Cargo.toml +++ b/ethportal-peertest/Cargo.toml @@ -11,8 +11,9 @@ anyhow = "1.0.57" clap = "2.33.3" discv5 = { git = "https://github.com/sigp/discv5.git", branch = "master" } futures = "0.3.21" -hyper = { version = "0.14", features = ["full"] } hex = "0.4.3" +httpmock = "0.6.6" +hyper = { version = "0.14", features = ["full"] } log = "0.4.14" rocksdb = "0.18.0" serde_json = "1.0.59" diff --git a/ethportal-peertest/src/lib.rs b/ethportal-peertest/src/lib.rs index d852cbfeb..0b0f618d2 100644 --- a/ethportal-peertest/src/lib.rs +++ b/ethportal-peertest/src/lib.rs @@ -9,11 +9,54 @@ use std::net::{IpAddr, Ipv4Addr}; use std::{sync::Arc, thread, time}; use futures::future; +use httpmock::prelude::{MockServer, POST}; +use serde_json::json; use trin_core::{ cli::TrinConfig, jsonrpc::service::JsonRpcExiter, portalnet::types::messages::SszEnr, }; +fn setup_mock_infura_server() -> MockServer { + let server = MockServer::start(); + server.mock(|when, then| { + // setup up a mock infura response for validating accepted content + // inside test_offer_accept scenario + when.method(POST) + .body_contains("eth_getBlockByNumber"); + then.status(200) + .header("content-type", "application/json") + .json_body(json!({ + "jsonrpc": "2.0", + "id": 0, + "result": { + "baseFeePerGas": "0x1aae1651b6", + "difficulty": "0x327bd7ad3116ce", + "extraData": "0x457468657265756d50504c4e532f326d696e6572735f55534133", + "gasLimit": "0x1c9c364", + "gasUsed": "0x140db1", + "hash": "0x720704f3aa11c53cf344ea069db95cecb81ad7453c8f276b2a1062979611f09c", + "logsBloom": "0x00200000400000001000400080080000000000010004010001000008000000002000110000000000000090020001110402008000080208040010000000a8000000000000000000210822000900205020000000000160020020000400800040000000000042080000000400004008084020001000001004004000001000000000000001000000110000040000010200844040048101000008002000404810082002800000108020000200408008000100000000000000002020000b0001008060090200020000005000040000000000000040000000202101000000a00002000003420000800400000020100002000000000000000c000400000010000001001", + "miner": "0x00192fb10df37c9fb26829eb2cc623cd1bf599e8", + "mixHash": "0xf1a32e24eb62f01ec3f2b3b5893f7be9062fbf5482bc0d490a54352240350e26", + "nonce": "0x2087fbb243327696", + "number": "0xe147ed", + "parentHash": "0x2c58e3212c085178dbb1277e2f3c24b3f451267a75a234945c1581af639f4a7a", + "receiptsRoot": "0x168a3827607627e781941dc777737fc4b6beb69a8b139240b881992b35b854ea", + "sha3Uncles": "0x58a694212e0416353a4d3865ccf475496b55af3a3d3b002057000741af973191", + "size": "0x1f96", + "stateRoot": "0x67a9fb631f4579f9015ef3c6f1f3830dfa2dc08afe156f750e90022134b9ebf6", + "timestamp": "0x627d9afa", + "totalDifficulty": "0xa55e1baf12dfa3fc50c", + // transactions have been left out of response + "transactions": [], + "transactionsRoot": "0x18a2978fc62cd1a23e90de920af68c0c3af3330327927cda4c005faccefb5ce7", + "uncles": ["0x817d4158df626cd8e9a20da9552c51a0d43f22b25de0b4dc5a089d81af899c70"] + } + })); + }); + server +} + pub struct PeertestNode { pub enr: SszEnr, pub web3_ipc_path: String, @@ -88,7 +131,9 @@ pub async fn launch_node(id: u16, bootnode_enr: Option<&SszEnr>) -> anyhow::Resu } }; let web3_ipc_path = trin_config.web3_ipc_path.clone(); - let exiter = trin::run_trin(trin_config, String::new()).await.unwrap(); + let server = setup_mock_infura_server(); + let mock_infura_url = server.url("/"); + let exiter = trin::run_trin(trin_config, mock_infura_url).await.unwrap(); let enr = get_enode(&web3_ipc_path)?; // Short sleep to make sure all peertest nodes can connect diff --git a/newsfragments/376.added.md b/newsfragments/376.added.md new file mode 100644 index 000000000..5ab05c569 --- /dev/null +++ b/newsfragments/376.added.md @@ -0,0 +1 @@ +Added history network content validation for accepted content. diff --git a/tests/self_peertest.rs b/tests/self_peertest.rs index e2ecfb5d0..2a45009a9 100644 --- a/tests/self_peertest.rs +++ b/tests/self_peertest.rs @@ -1,8 +1,9 @@ #[cfg(test)] mod test { - use ethportal_peertest as peertest; use std::net::{IpAddr, Ipv4Addr}; use std::{thread, time}; + + use ethportal_peertest as peertest; use trin_core::cli::TrinConfig; // Logs don't show up when trying to use test_log here, maybe because of multi_thread diff --git a/trin-core/src/portalnet/overlay.rs b/trin-core/src/portalnet/overlay.rs index 37ccb0bb4..6da5352d5 100644 --- a/trin-core/src/portalnet/overlay.rs +++ b/trin-core/src/portalnet/overlay.rs @@ -44,12 +44,14 @@ use discv5::{ }; use ethereum_types::U256; use futures::channel::oneshot; +use futures::future::join_all; use log::error; use parking_lot::RwLock; use rand::seq::IteratorRandom; use ssz::{Decode, Encode}; use ssz_types::VariableList; use tokio::sync::mpsc::UnboundedSender; +use tokio::task::JoinHandle; use tracing::{debug, warn}; pub use super::overlay_service::{OverlayRequestError, RequestDirection}; @@ -116,8 +118,8 @@ pub struct OverlayProtocol { phantom_content_key: PhantomData, /// Associate a metric with the overlay network. phantom_metric: PhantomData, - /// Declare the Validator type for a given overlay network. - phantom_validator: PhantomData, + /// Accepted content validator that makes requests to this/other overlay networks (or infura) + validator: Arc, } impl< @@ -156,7 +158,7 @@ where protocol.clone(), utp_listener_tx.clone(), config.enable_metrics, - validator, + Arc::clone(&validator), config.query_timeout, config.query_peer_timeout, config.query_parallelism, @@ -176,7 +178,7 @@ where utp_listener_tx, phantom_content_key: PhantomData, phantom_metric: PhantomData, - phantom_validator: PhantomData, + validator, } } @@ -253,35 +255,71 @@ where )); } - // TODO: Verify overlay content data with an Oracle - - // Temporarily store content key/value pairs to propagate here - let mut content_keys_values: Vec<(TContentKey, ByteList)> = Vec::new(); + let validator = Arc::clone(&self.validator); + let storage = Arc::clone(&self.storage); + let kbuckets = Arc::clone(&self.kbuckets); + let command_tx = self.command_tx.clone(); - // Try to store the content into the database and propagate gossip the content - for (content_key, content_value) in content_keys.into_iter().zip(content_values.to_vec()) { - match TContentKey::try_from(content_key) { - Ok(key) => { - // Store accepted content in DB - self.store_overlay_content(&key, content_value.clone()); - content_keys_values.push((key, content_value)) - } - Err(err) => { - return Err(anyhow!( - "Unexpected error while decoding overlay content key: {err}" - )); - } - } - } - // Propagate gossip accepted content - self.propagate_gossip(content_keys_values)?; + // Spawn a task that spawns a validation task for each piece of content, + // collects validated content and propagates it via gossip. + tokio::spawn(async move { + let handles: Vec> = content_keys + .into_iter() + .zip(content_values.to_vec()) + .map( + |(content_key, content_value)| match TContentKey::try_from(content_key) { + Ok(key) => { + // Spawn a task that... + // - Validates accepted content (this step requires a dedicated task since it + // might require non-blocking requests to this/other overlay networks) + // - Checks if validated content should be stored, and stores it if true + // - Propagate all validated content + let validator = Arc::clone(&validator); + let storage = Arc::clone(&storage); + Some(tokio::spawn(async move { + // Validated received content + validator + .validate_content(&key, &content_value.to_vec()) + .await + // Skip storing & propagating content if it's not valid + .expect("Unable to validate received content: {err:?}"); + + // Check if data should be stored, and store if true. + // Ignore error since all validated content is propagated. + let _ = storage + .write() + .store_if_should(&key, &content_value.to_vec()); + + (key, content_value) + })) + } + Err(err) => { + warn!("Unexpected error while decoding overlay content key: {err}"); + None + } + }, + ) + .flatten() + .collect(); + let validated_content = join_all(handles) + .await + .into_iter() + .filter_map(|content| content.ok()) + .collect(); + // Propagate all validated content, whether or not it was stored. + Self::propagate_gossip(validated_content, kbuckets, command_tx); + }); Ok(()) } /// Propagate gossip accepted content via OFFER/ACCEPT: - fn propagate_gossip(&self, content: Vec<(TContentKey, ByteList)>) -> anyhow::Result<()> { + fn propagate_gossip( + content: Vec<(TContentKey, ByteList)>, + kbuckets: Arc>>, + command_tx: UnboundedSender>, + ) { // Get all nodes from overlay routing table - let kbuckets = self.kbuckets.read(); + let kbuckets = kbuckets.read(); let all_nodes: Vec<&kbucket::Node> = kbuckets .buckets_iter() .map(|kbucket| { @@ -318,7 +356,13 @@ where } // Get log2 random ENRs to gossip - let random_enrs = log2_random_enrs(interested_enrs)?; + let random_enrs = match log2_random_enrs(interested_enrs) { + Ok(val) => val, + Err(msg) => { + debug!("Error calculating log2 random enrs for gossip propagation: {msg}"); + return; + } + }; // Temporarily store all randomly selected nodes with the content of interest. // We want this so we can offer all the content to interested node in one request. @@ -351,27 +395,10 @@ where None, ); - if let Err(err) = self - .command_tx - .send(OverlayCommand::Request(overlay_request)) - { + if let Err(err) = command_tx.send(OverlayCommand::Request(overlay_request)) { error!("Unable to send OFFER request to overlay: {err}.") } } - Ok(()) - } - - /// Try to store overlay content into database - fn store_overlay_content(&self, content_key: &TContentKey, value: ByteList) { - let should_store = self.storage.read().should_store(content_key); - match should_store { - Ok(_) => { - if let Err(err) = self.storage.write().store(content_key, &value.into()) { - warn!("Unable to store accepted content: {err}"); - } - } - Err(err) => error!("Unable to determine whether to store accepted content: {err}"), - } } /// Returns a vector of all ENR node IDs of nodes currently contained in the routing table. diff --git a/trin-core/src/portalnet/storage.rs b/trin-core/src/portalnet/storage.rs index 467f6d724..e2f608cbe 100644 --- a/trin-core/src/portalnet/storage.rs +++ b/trin-core/src/portalnet/storage.rs @@ -144,6 +144,21 @@ impl PortalStorage { } } + /// Public method for automatically storing content after a `should_store` check. + pub fn store_if_should( + &mut self, + key: &impl OverlayContentKey, + value: &Vec, + ) -> Result { + match self.should_store(key)? { + true => { + self.store(key, value)?; + Ok(true) + } + false => Ok(false), + } + } + /// Public method for storing a given value for a given content-key. pub fn store( &mut self, diff --git a/trin-core/src/types/validation.rs b/trin-core/src/types/validation.rs index 8c12bce84..52a6960f8 100644 --- a/trin-core/src/types/validation.rs +++ b/trin-core/src/types/validation.rs @@ -43,7 +43,7 @@ impl Default for HeaderOracle { impl HeaderOracle { // Currently falls back to infura, to be updated to use canonical block indices network. pub fn get_hash_at_height(&self, block_number: u64) -> anyhow::Result { - let hex_number = format!("0x:{:02X}", block_number); + let hex_number = format!("0x{:02X}", block_number); let request = JsonRequest { jsonrpc: "2.0".to_string(), params: Params::Array(vec![json!(hex_number), json!(false)]), diff --git a/trin-core/src/utp/stream.rs b/trin-core/src/utp/stream.rs index a3a9e0f93..c359f8fe7 100644 --- a/trin-core/src/utp/stream.rs +++ b/trin-core/src/utp/stream.rs @@ -1430,12 +1430,12 @@ impl UtpStream { ) .await { - let msg = format!("Unavle to send FIN packet: {msg}"); + let msg = format!("Unable to send FIN packet: {msg}"); debug!("{msg}"); return Err(anyhow!(msg)); } - debug!("CLosing connection, sent {:?}", packet); + debug!("Closing connection, sent {:?}", packet); self.state = StreamState::FinSent; // Receive JAKE diff --git a/trin-state/src/network.rs b/trin-state/src/network.rs index af2714d58..227b9e077 100644 --- a/trin-state/src/network.rs +++ b/trin-state/src/network.rs @@ -5,6 +5,7 @@ use eth_trie::EthTrie; use log::{debug, error}; use parking_lot::RwLock; use tokio::sync::mpsc::UnboundedSender; + use trin_core::{ portalnet::{ discovery::Discovery,