Skip to content

Commit 5b3bae1

Browse files
committed
Add content validation for accepted content
1 parent ae4423c commit 5b3bae1

File tree

10 files changed

+144
-52
lines changed

10 files changed

+144
-52
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ethportal-peertest/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ anyhow = "1.0.57"
1111
clap = "2.33.3"
1212
discv5 = { git = "https://github.com/sigp/discv5.git", branch = "master" }
1313
futures = "0.3.21"
14-
hyper = { version = "0.14", features = ["full"] }
1514
hex = "0.4.3"
15+
httpmock = "0.6.6"
16+
hyper = { version = "0.14", features = ["full"] }
1617
log = "0.4.14"
1718
rocksdb = "0.18.0"
1819
serde_json = "1.0.59"

ethportal-peertest/src/lib.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,54 @@ use std::net::{IpAddr, Ipv4Addr};
99
use std::{sync::Arc, thread, time};
1010

1111
use futures::future;
12+
use httpmock::prelude::{MockServer, POST};
13+
use serde_json::json;
1214

1315
use trin_core::{
1416
cli::TrinConfig, jsonrpc::service::JsonRpcExiter, portalnet::types::messages::SszEnr,
1517
};
1618

19+
fn setup_mock_infura_server() -> MockServer {
20+
let server = MockServer::start();
21+
server.mock(|when, then| {
22+
// setup up a mock infura response for validating accepted content inside
23+
// test_offer_accept scenario
24+
when.method(POST)
25+
.body_contains("eth_getBlockByNumber");
26+
then.status(200)
27+
.header("content-type", "application/json")
28+
.json_body(json!({
29+
"jsonrpc": "2.0",
30+
"id": 0,
31+
"result": {
32+
"baseFeePerGas": "0x1aae1651b6",
33+
"difficulty": "0x327bd7ad3116ce",
34+
"extraData": "0x457468657265756d50504c4e532f326d696e6572735f55534133",
35+
"gasLimit": "0x1c9c364",
36+
"gasUsed": "0x140db1",
37+
"hash": "0x720704f3aa11c53cf344ea069db95cecb81ad7453c8f276b2a1062979611f09c",
38+
"logsBloom": "0x00200000400000001000400080080000000000010004010001000008000000002000110000000000000090020001110402008000080208040010000000a8000000000000000000210822000900205020000000000160020020000400800040000000000042080000000400004008084020001000001004004000001000000000000001000000110000040000010200844040048101000008002000404810082002800000108020000200408008000100000000000000002020000b0001008060090200020000005000040000000000000040000000202101000000a00002000003420000800400000020100002000000000000000c000400000010000001001",
39+
"miner": "0x00192fb10df37c9fb26829eb2cc623cd1bf599e8",
40+
"mixHash": "0xf1a32e24eb62f01ec3f2b3b5893f7be9062fbf5482bc0d490a54352240350e26",
41+
"nonce": "0x2087fbb243327696",
42+
"number": "0xe147ed",
43+
"parentHash": "0x2c58e3212c085178dbb1277e2f3c24b3f451267a75a234945c1581af639f4a7a",
44+
"receiptsRoot": "0x168a3827607627e781941dc777737fc4b6beb69a8b139240b881992b35b854ea",
45+
"sha3Uncles": "0x58a694212e0416353a4d3865ccf475496b55af3a3d3b002057000741af973191",
46+
"size": "0x1f96",
47+
"stateRoot": "0x67a9fb631f4579f9015ef3c6f1f3830dfa2dc08afe156f750e90022134b9ebf6",
48+
"timestamp": "0x627d9afa",
49+
"totalDifficulty": "0xa55e1baf12dfa3fc50c",
50+
// transactions have been left out of response
51+
"transactions": [],
52+
"transactionsRoot": "0x18a2978fc62cd1a23e90de920af68c0c3af3330327927cda4c005faccefb5ce7",
53+
"uncles": ["0x817d4158df626cd8e9a20da9552c51a0d43f22b25de0b4dc5a089d81af899c70"]
54+
}
55+
}));
56+
});
57+
server
58+
}
59+
1760
pub struct PeertestNode {
1861
pub enr: SszEnr,
1962
pub web3_ipc_path: String,
@@ -88,7 +131,9 @@ pub async fn launch_node(id: u16, bootnode_enr: Option<&SszEnr>) -> anyhow::Resu
88131
}
89132
};
90133
let web3_ipc_path = trin_config.web3_ipc_path.clone();
91-
let exiter = trin::run_trin(trin_config, String::new()).await.unwrap();
134+
let server = setup_mock_infura_server();
135+
let mock_infura_url = server.url("/");
136+
let exiter = trin::run_trin(trin_config, mock_infura_url).await.unwrap();
92137
let enr = get_enode(&web3_ipc_path)?;
93138

94139
// Short sleep to make sure all peertest nodes can connect

newsfragments/376.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Added history network content validation for accepted content.

tests/self_peertest.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
#[cfg(test)]
22
mod test {
3-
use ethportal_peertest as peertest;
43
use std::net::{IpAddr, Ipv4Addr};
54
use std::{thread, time};
5+
6+
use ethportal_peertest as peertest;
67
use trin_core::cli::TrinConfig;
78

89
// Logs don't show up when trying to use test_log here, maybe because of multi_thread

trin-core/src/portalnet/overlay.rs

Lines changed: 73 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,14 @@ use discv5::{
4444
};
4545
use ethereum_types::U256;
4646
use futures::channel::oneshot;
47+
use futures::future::join_all;
4748
use log::error;
4849
use parking_lot::RwLock;
4950
use rand::seq::IteratorRandom;
5051
use ssz::{Decode, Encode};
5152
use ssz_types::VariableList;
5253
use tokio::sync::mpsc::UnboundedSender;
54+
use tokio::task::JoinHandle;
5355
use tracing::{debug, warn};
5456

5557
pub use super::overlay_service::{OverlayRequestError, RequestDirection};
@@ -116,8 +118,8 @@ pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
116118
phantom_content_key: PhantomData<TContentKey>,
117119
/// Associate a metric with the overlay network.
118120
phantom_metric: PhantomData<TMetric>,
119-
/// Declare the Validator type for a given overlay network.
120-
phantom_validator: PhantomData<TValidator>,
121+
/// Accepted content validator that makes requests to this/other overlay networks (or infura)
122+
validator: Arc<TValidator>,
121123
}
122124

123125
impl<
@@ -156,7 +158,7 @@ where
156158
protocol.clone(),
157159
utp_listener_tx.clone(),
158160
config.enable_metrics,
159-
validator,
161+
Arc::clone(&validator),
160162
config.query_timeout,
161163
config.query_peer_timeout,
162164
config.query_parallelism,
@@ -176,7 +178,7 @@ where
176178
utp_listener_tx,
177179
phantom_content_key: PhantomData,
178180
phantom_metric: PhantomData,
179-
phantom_validator: PhantomData,
181+
validator,
180182
}
181183
}
182184

@@ -253,35 +255,71 @@ where
253255
));
254256
}
255257

256-
// TODO: Verify overlay content data with an Oracle
257-
258-
// Temporarily store content key/value pairs to propagate here
259-
let mut content_keys_values: Vec<(TContentKey, ByteList)> = Vec::new();
258+
let validator = Arc::clone(&self.validator);
259+
let storage = Arc::clone(&self.storage);
260+
let kbuckets = Arc::clone(&self.kbuckets);
261+
let command_tx = self.command_tx.clone();
260262

261-
// Try to store the content into the database and propagate gossip the content
262-
for (content_key, content_value) in content_keys.into_iter().zip(content_values.to_vec()) {
263-
match TContentKey::try_from(content_key) {
264-
Ok(key) => {
265-
// Store accepted content in DB
266-
self.store_overlay_content(&key, content_value.clone());
267-
content_keys_values.push((key, content_value))
268-
}
269-
Err(err) => {
270-
return Err(anyhow!(
271-
"Unexpected error while decoding overlay content key: {err}"
272-
));
273-
}
274-
}
275-
}
276-
// Propagate gossip accepted content
277-
self.propagate_gossip(content_keys_values)?;
263+
// Spawn a task that spawns a validation task for each piece of content,
264+
// collects validated content and propagates it via gossip.
265+
tokio::spawn(async move {
266+
let handles: Vec<JoinHandle<_>> = content_keys
267+
.into_iter()
268+
.zip(content_values.to_vec())
269+
.map(
270+
|(content_key, content_value)| match TContentKey::try_from(content_key) {
271+
Ok(key) => {
272+
// Spawn a task that...
273+
// - Validates accepted content (this step requires a dedicated task since it
274+
// might require non-blocking requests to this/other overlay networks)
275+
// - Checks if validated content should be stored, and stores it if true
276+
// - Propagate all validated content
277+
let validator = Arc::clone(&validator);
278+
let storage = Arc::clone(&storage);
279+
Some(tokio::spawn(async move {
280+
// Validated received content
281+
validator
282+
.validate_content(&key, &content_value.to_vec())
283+
.await
284+
// Skip storing & propagating content if it's not valid
285+
.expect("Unable to validate received content: {err:?}");
286+
287+
// Check if data should be stored, and store if true.
288+
// Ignore error since all validated content is propagated.
289+
let _ = storage
290+
.write()
291+
.store_if_should(&key, &content_value.to_vec());
292+
293+
(key, content_value)
294+
}))
295+
}
296+
Err(err) => {
297+
warn!("Unexpected error while decoding overlay content key: {err}");
298+
None
299+
}
300+
},
301+
)
302+
.flatten()
303+
.collect();
304+
let validated_content = join_all(handles)
305+
.await
306+
.into_iter()
307+
.filter_map(|content| content.ok())
308+
.collect();
309+
// Propagate all validated content, whether or not it was stored.
310+
Self::propagate_gossip(validated_content, kbuckets, command_tx);
311+
});
278312
Ok(())
279313
}
280314

281315
/// Propagate gossip accepted content via OFFER/ACCEPT:
282-
fn propagate_gossip(&self, content: Vec<(TContentKey, ByteList)>) -> anyhow::Result<()> {
316+
fn propagate_gossip(
317+
content: Vec<(TContentKey, ByteList)>,
318+
kbuckets: Arc<RwLock<KBucketsTable<NodeId, Node>>>,
319+
command_tx: UnboundedSender<OverlayCommand<TContentKey>>,
320+
) {
283321
// Get all nodes from overlay routing table
284-
let kbuckets = self.kbuckets.read();
322+
let kbuckets = kbuckets.read();
285323
let all_nodes: Vec<&kbucket::Node<NodeId, Node>> = kbuckets
286324
.buckets_iter()
287325
.map(|kbucket| {
@@ -318,7 +356,13 @@ where
318356
}
319357

320358
// Get log2 random ENRs to gossip
321-
let random_enrs = log2_random_enrs(interested_enrs)?;
359+
let random_enrs = match log2_random_enrs(interested_enrs) {
360+
Ok(val) => val,
361+
Err(msg) => {
362+
debug!("Error calculating log2 random enrs for gossip propagation: {msg}");
363+
return;
364+
}
365+
};
322366

323367
// Temporarily store all randomly selected nodes with the content of interest.
324368
// We want this so we can offer all the content to interested node in one request.
@@ -351,27 +395,10 @@ where
351395
None,
352396
);
353397

354-
if let Err(err) = self
355-
.command_tx
356-
.send(OverlayCommand::Request(overlay_request))
357-
{
398+
if let Err(err) = command_tx.send(OverlayCommand::Request(overlay_request)) {
358399
error!("Unable to send OFFER request to overlay: {err}.")
359400
}
360401
}
361-
Ok(())
362-
}
363-
364-
/// Try to store overlay content into database
365-
fn store_overlay_content(&self, content_key: &TContentKey, value: ByteList) {
366-
let should_store = self.storage.read().should_store(content_key);
367-
match should_store {
368-
Ok(_) => {
369-
if let Err(err) = self.storage.write().store(content_key, &value.into()) {
370-
warn!("Unable to store accepted content: {err}");
371-
}
372-
}
373-
Err(err) => error!("Unable to determine whether to store accepted content: {err}"),
374-
}
375402
}
376403

377404
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.

trin-core/src/portalnet/storage.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,21 @@ impl PortalStorage {
144144
}
145145
}
146146

147+
/// Public method for automatically storing content after a `should_store` check.
148+
pub fn store_if_should(
149+
&mut self,
150+
key: &impl OverlayContentKey,
151+
value: &Vec<u8>,
152+
) -> Result<bool, PortalStorageError> {
153+
match self.should_store(key)? {
154+
true => {
155+
self.store(key, value)?;
156+
Ok(true)
157+
}
158+
false => Ok(false),
159+
}
160+
}
161+
147162
/// Public method for storing a given value for a given content-key.
148163
pub fn store(
149164
&mut self,

trin-core/src/types/validation.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ impl Default for HeaderOracle {
4343
impl HeaderOracle {
4444
// Currently falls back to infura, to be updated to use canonical block indices network.
4545
pub fn get_hash_at_height(&self, block_number: u64) -> anyhow::Result<String> {
46-
let hex_number = format!("0x:{:02X}", block_number);
46+
let hex_number = format!("0x{:02X}", block_number);
4747
let request = JsonRequest {
4848
jsonrpc: "2.0".to_string(),
4949
params: Params::Array(vec![json!(hex_number), json!(false)]),

trin-core/src/utp/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1430,12 +1430,12 @@ impl UtpStream {
14301430
)
14311431
.await
14321432
{
1433-
let msg = format!("Unavle to send FIN packet: {msg}");
1433+
let msg = format!("Unable to send FIN packet: {msg}");
14341434
debug!("{msg}");
14351435
return Err(anyhow!(msg));
14361436
}
14371437

1438-
debug!("CLosing connection, sent {:?}", packet);
1438+
debug!("Closing connection, sent {:?}", packet);
14391439
self.state = StreamState::FinSent;
14401440

14411441
// Receive JAKE

trin-state/src/network.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use eth_trie::EthTrie;
55
use log::{debug, error};
66
use parking_lot::RwLock;
77
use tokio::sync::mpsc::UnboundedSender;
8+
89
use trin_core::{
910
portalnet::{
1011
discovery::Discovery,

0 commit comments

Comments
 (0)