Skip to content

Commit f8eb000

Browse files
authored
Merge pull request #331 from njgheorghita/validation
Initial ValidationOracle implementation
2 parents 95d128b + b0c4817 commit f8eb000

29 files changed

+1539
-364
lines changed

Cargo.lock

Lines changed: 1004 additions & 279 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ repository = "https://github.com/ethereum/trin"
1313
discv5 = { git = "https://github.com/sigp/discv5.git", branch = "master" }
1414
ethereum-types = "0.12.1"
1515
hex = "0.4.3"
16-
log = "0.4.14"
16+
log = "0.4.17"
1717
prometheus_exporter = "0.8.4"
1818
rand = "0.8.4"
1919
rlp = "0.5.0"

ethportal-peertest/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ rust-version = "1.58.0"
77
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
88

99
[dependencies]
10-
anyhow = "1.0.52"
10+
anyhow = "1.0.57"
1111
clap = "2.33.3"
1212
discv5 = { git = "https://github.com/sigp/discv5.git", branch = "master" }
1313
futures = "0.3.21"

ethportal-peertest/src/jsonrpc.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,7 @@ pub fn make_ipc_request(ipc_path: &str, request: &JsonRpcRequest) -> anyhow::Res
363363
stream.flush().unwrap();
364364
let deser = serde_json::Deserializer::from_reader(stream);
365365
let next_obj = deser.into_iter::<Value>().next();
366-
let response_obj = next_obj.ok_or(anyhow!("Empty JsonRpc response"))?;
366+
let response_obj = next_obj.ok_or_else(|| anyhow!("Empty JsonRpc response"))?;
367367
get_response_result(response_obj)
368368
}
369369

ethportal-peertest/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub async fn launch_node(id: u16, bootnode_enr: Option<&SszEnr>) -> anyhow::Resu
4242
Some(enr) => {
4343
let external_addr = format!(
4444
"{}:{}",
45-
enr.ip().expect("bootnode must have IP"),
45+
enr.ip4().expect("bootnode must have IP"),
4646
discovery_port
4747
);
4848
let enr_base64 = enr.to_base64();

newsfragments/331.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Initial HeaderOracle implementation.

src/lib.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::sync::Arc;
1+
use std::sync::{Arc, RwLock};
22

33
use log::debug;
44
use tokio::sync::mpsc;
@@ -14,6 +14,7 @@ use trin_core::{
1414
discovery::Discovery, events::PortalnetEvents, storage::PortalStorage,
1515
types::messages::PortalnetConfig,
1616
},
17+
types::validation::HeaderOracle,
1718
utils::bootnodes::parse_bootnodes,
1819
utp::stream::UtpListener,
1920
};
@@ -22,12 +23,11 @@ use trin_state::initialize_state_network;
2223

2324
pub async fn run_trin(
2425
trin_config: TrinConfig,
25-
infura_project_id: String,
26+
infura_url: String,
2627
) -> Result<Arc<JsonRpcExiter>, Box<dyn std::error::Error>> {
2728
trin_config.display_config();
2829

2930
let bootnode_enrs = parse_bootnodes(&trin_config.bootnodes)?;
30-
3131
let portalnet_config = PortalnetConfig {
3232
external_addr: trin_config.external_addr,
3333
private_key: trin_config.private_key.clone(),
@@ -60,6 +60,12 @@ pub async fn run_trin(
6060
let storage_config =
6161
PortalStorage::setup_config(discovery.local_enr().node_id(), trin_config.kb)?;
6262

63+
// Initialize validation oracle
64+
let header_oracle = Arc::new(RwLock::new(HeaderOracle {
65+
infura_url: infura_url.clone(),
66+
..HeaderOracle::default()
67+
}));
68+
6369
debug!("Selected networks to spawn: {:?}", trin_config.networks);
6470
// Initialize state sub-network service and event handlers, if selected
6571
let (state_handler, state_network_task, state_event_tx, state_utp_tx, state_jsonrpc_tx) =
@@ -92,6 +98,7 @@ pub async fn run_trin(
9298
utp_listener_tx,
9399
portalnet_config.clone(),
94100
storage_config.clone(),
101+
header_oracle,
95102
)
96103
.await
97104
} else {
@@ -108,7 +115,7 @@ pub async fn run_trin(
108115
tokio::task::spawn_blocking(|| {
109116
launch_jsonrpc_server(
110117
jsonrpc_trin_config,
111-
infura_project_id,
118+
infura_url,
112119
portal_jsonrpc_tx,
113120
live_server_tx,
114121
json_exiter_clone,

src/main.rs

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::env;
2-
3-
use trin_core::cli::TrinConfig;
1+
use trin_core::{cli::TrinConfig, utils::infura::build_infura_project_url_from_env};
42

53
use trin::run_trin;
64

@@ -11,15 +9,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
119
println!("Launching trin");
1210

1311
let trin_config = TrinConfig::from_cli();
14-
let infura_project_id = match env::var("TRIN_INFURA_PROJECT_ID") {
15-
Ok(val) => val,
16-
Err(_) => panic!(
17-
"Must supply Infura key as environment variable, like:\n\
18-
TRIN_INFURA_PROJECT_ID=\"your-key-here\" trin"
19-
),
20-
};
21-
22-
let exiter = run_trin(trin_config, infura_project_id).await?;
12+
let infura_url = build_infura_project_url_from_env();
13+
14+
let exiter = run_trin(trin_config, infura_url).await?;
2315

2416
tokio::signal::ctrl_c()
2517
.await

trin-core/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ edition = "2021"
55
rust-version = "1.58.0"
66

77
[dependencies]
8-
anyhow = "1.0.52"
8+
anyhow = "1.0.57"
99
async-recursion = "1.0.0"
10+
async-trait = "0.1.53"
1011
base64 = "0.13.0"
1112
bytes = "1.1.0"
1213
clap = "2.33.3"
@@ -27,7 +28,7 @@ hmac-sha256 = "1.1.1"
2728
httparse = "1.5.1"
2829
keccak-hash = "0.8.0"
2930
lazy_static = "1.4.0"
30-
log = "0.4.14"
31+
log = "0.4.17"
3132
num = "0.4.0"
3233
parking_lot = "0.11.2"
3334
prometheus_exporter = "0.8.4"

trin-core/src/jsonrpc/service.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl Default for JsonRpcExiter {
5858

5959
pub fn launch_jsonrpc_server(
6060
trin_config: TrinConfig,
61-
infura_project_id: String,
61+
infura_url: String,
6262
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
6363
live_server_tx: tokio::sync::mpsc::Sender<bool>,
6464
json_rpc_exiter: Arc<JsonRpcExiter>,
@@ -68,19 +68,13 @@ pub fn launch_jsonrpc_server(
6868
match trin_config.web3_transport.as_str() {
6969
"ipc" => launch_ipc_client(
7070
pool,
71-
infura_project_id,
71+
infura_url,
7272
&trin_config.web3_ipc_path,
7373
portal_tx,
7474
live_server_tx,
7575
json_rpc_exiter,
7676
),
77-
"http" => launch_http_client(
78-
pool,
79-
infura_project_id,
80-
trin_config,
81-
portal_tx,
82-
live_server_tx,
83-
),
77+
"http" => launch_http_client(pool, infura_url, trin_config, portal_tx, live_server_tx),
8478
val => panic!("Unsupported web3 transport: {}", val),
8579
}
8680
}
@@ -126,7 +120,7 @@ fn get_listener_result(ipc_path: &str) -> tokio::io::Result<uds_windows::UnixLis
126120

127121
fn launch_ipc_client(
128122
pool: ThreadPool,
129-
infura_project_id: String,
123+
infura_url: String,
130124
ipc_path: &str,
131125
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
132126
live_server_tx: tokio::sync::mpsc::Sender<bool>,
@@ -174,10 +168,9 @@ fn launch_ipc_client(
174168
Err(_) => break, // Socket exited
175169
};
176170
debug!("New IPC client: {:?}", stream.peer_addr().unwrap());
177-
let infura_project_id = infura_project_id.clone();
171+
let infura_url = infura_url.clone();
178172
let portal_tx = portal_tx.clone();
179173
pool.execute(move || {
180-
let infura_url = get_infura_url(&infura_project_id);
181174
let mut rx = stream.try_clone().unwrap();
182175
let mut tx = stream;
183176
serve_ipc_client(&mut rx, &mut tx, &infura_url, portal_tx);
@@ -192,7 +185,7 @@ fn launch_ipc_client(
192185

193186
fn launch_http_client(
194187
pool: ThreadPool,
195-
infura_project_id: String,
188+
infura_url: String,
196189
trin_config: TrinConfig,
197190
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
198191
live_server_tx: tokio::sync::mpsc::Sender<bool>,
@@ -212,10 +205,9 @@ fn launch_http_client(
212205
for stream in listener.incoming() {
213206
match stream {
214207
Ok(stream) => {
215-
let infura_project_id = infura_project_id.clone();
208+
let infura_url = infura_url.clone();
216209
let portal_tx = portal_tx.clone();
217210
pool.execute(move || {
218-
let infura_url = get_infura_url(&infura_project_id);
219211
serve_http_client(stream, &infura_url, portal_tx);
220212
});
221213
}
@@ -386,7 +378,7 @@ fn dispatch_trin_request(
386378
}
387379

388380
// Handle all requests served by infura
389-
fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result<String, String> {
381+
pub fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result<String, String> {
390382
match proxy_to_url(&obj, infura_url) {
391383
Ok(result_body) => Ok(std::str::from_utf8(&result_body).unwrap().to_owned()),
392384
Err(err) => Err(json!({
@@ -447,7 +439,3 @@ fn proxy_to_url(request: &JsonRequest, url: &str) -> io::Result<Vec<u8>> {
447439
)),
448440
}
449441
}
450-
451-
fn get_infura_url(infura_project_id: &str) -> String {
452-
return format!("https://mainnet.infura.io:443/v3/{}", infura_project_id);
453-
}

trin-core/src/portalnet/discovery.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl Discovery {
104104
if let Some(ip_address) = config.enr_address {
105105
builder.ip(ip_address);
106106
}
107-
builder.udp(config.listen_port);
107+
builder.udp4(config.listen_port);
108108
builder.build(&enr_key).unwrap()
109109
};
110110

@@ -150,7 +150,7 @@ impl Discovery {
150150
json!({
151151
"enr": self.discv5.local_enr().to_base64(),
152152
"nodeId": self.discv5.local_enr().node_id().to_string(),
153-
"ip": self.discv5.local_enr().ip().map_or("None".to_owned(), |ip| ip.to_string())
153+
"ip": self.discv5.local_enr().ip4().map_or("None".to_owned(), |ip| ip.to_string())
154154
})
155155
}
156156

trin-core/src/portalnet/overlay.rs

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
use anyhow::anyhow;
2-
use std::{collections::HashSet, marker::PhantomData, sync::Arc, time::Duration};
2+
use std::{
3+
collections::HashSet,
4+
fmt::Debug,
5+
marker::{PhantomData, Sync},
6+
sync::Arc,
7+
time::Duration,
8+
};
39

410
use super::{
511
discovery::Discovery,
@@ -17,6 +23,7 @@ use crate::portalnet::{
1723

1824
use crate::{
1925
portalnet::types::content_key::RawContentKey,
26+
types::validation::Validator,
2027
utp::{
2128
stream::{UtpListenerEvent, UtpListenerRequest, UtpStream, BUF_SIZE},
2229
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
@@ -68,7 +75,7 @@ impl Default for OverlayConfig {
6875
/// implement the overlay protocol and the overlay protocol is where we can encapsulate the logic for
6976
/// handling common network requests/responses.
7077
#[derive(Clone)]
71-
pub struct OverlayProtocol<TContentKey, TMetric> {
78+
pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
7279
/// Reference to the underlying discv5 protocol
7380
pub discovery: Arc<Discovery>,
7481
/// Reference to the database instance
@@ -89,10 +96,17 @@ pub struct OverlayProtocol<TContentKey, TMetric> {
8996
phantom_content_key: PhantomData<TContentKey>,
9097
/// Associate a metric with the overlay network.
9198
phantom_metric: PhantomData<TMetric>,
99+
/// Declare the Validator type for a given overlay network.
100+
phantom_validator: PhantomData<TValidator>,
92101
}
93102

94-
impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
95-
OverlayProtocol<TContentKey, TMetric>
103+
impl<
104+
TContentKey: OverlayContentKey + Send + Sync,
105+
TMetric: Metric + Send,
106+
TValidator: 'static + Validator<TContentKey> + Send,
107+
> OverlayProtocol<TContentKey, TMetric, TValidator>
108+
where
109+
<TContentKey as TryFrom<Vec<u8>>>::Error: Debug,
96110
{
97111
pub async fn new(
98112
config: OverlayConfig,
@@ -101,6 +115,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
101115
storage: Arc<RwLock<PortalStorage>>,
102116
data_radius: U256,
103117
protocol: ProtocolId,
118+
validator: TValidator,
104119
) -> Self {
105120
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
106121
discovery.local_enr().node_id().into(),
@@ -111,7 +126,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
111126
)));
112127

113128
let data_radius = Arc::new(data_radius);
114-
let request_tx = OverlayService::<TContentKey, TMetric>::spawn(
129+
let request_tx = OverlayService::<TContentKey, TMetric, TValidator>::spawn(
115130
Arc::clone(&discovery),
116131
Arc::clone(&storage),
117132
Arc::clone(&kbuckets),
@@ -121,6 +136,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
121136
protocol.clone(),
122137
utp_listener_tx.clone(),
123138
config.enable_metrics,
139+
validator,
124140
)
125141
.await
126142
.unwrap();
@@ -135,6 +151,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
135151
utp_listener_tx,
136152
phantom_content_key: PhantomData,
137153
phantom_metric: PhantomData,
154+
phantom_validator: PhantomData,
138155
}
139156
}
140157

0 commit comments

Comments
 (0)