Skip to content

Commit 202904b

Browse files
committed
Write initial ValidationOracle implementation.
1 parent 26deb62 commit 202904b

File tree

21 files changed

+375
-71
lines changed

21 files changed

+375
-71
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/lib.rs

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use trin_core::{
1414
discovery::Discovery, events::PortalnetEvents, storage::PortalStorage,
1515
types::messages::PortalnetConfig,
1616
},
17+
types::validation::ValidationOracle,
1718
utils::bootnodes::parse_bootnodes,
19+
utils::infura::get_infura_url,
1820
utp::stream::UtpListener,
1921
};
2022
use trin_history::initialize_history_network;
@@ -27,6 +29,7 @@ pub async fn run_trin(
2729
trin_config.display_config();
2830

2931
let bootnode_enrs = parse_bootnodes(&trin_config.bootnodes)?;
32+
let infura_url = get_infura_url(&infura_project_id);
3033

3134
let portalnet_config = PortalnetConfig {
3235
external_addr: trin_config.external_addr,
@@ -35,6 +38,7 @@ pub async fn run_trin(
3538
no_stun: trin_config.no_stun,
3639
enable_metrics: trin_config.enable_metrics,
3740
bootnode_enrs,
41+
infura_url: infura_url.clone(),
3842
..Default::default()
3943
};
4044

@@ -60,33 +64,48 @@ pub async fn run_trin(
6064
let storage_config =
6165
PortalStorage::setup_config(discovery.local_enr().node_id(), trin_config.kb)?;
6266

67+
// Initialize validation oracle
68+
let validation_oracle = ValidationOracle {
69+
infura_url: portalnet_config.infura_url.clone(),
70+
..ValidationOracle::default()
71+
};
72+
6373
debug!("Selected networks to spawn: {:?}", trin_config.networks);
74+
// Initialize chain history sub-network service and event handlers, if selected
75+
let (
76+
history_handler,
77+
history_network_task,
78+
history_event_tx,
79+
history_jsonrpc_tx,
80+
_validation_oracle,
81+
) = if trin_config
82+
.networks
83+
.iter()
84+
.any(|val| val == HISTORY_NETWORK)
85+
{
86+
initialize_history_network(
87+
&discovery,
88+
utp_listener_tx.clone(),
89+
portalnet_config.clone(),
90+
storage_config.clone(),
91+
validation_oracle.clone(),
92+
)
93+
.await
94+
} else {
95+
(None, None, None, None, None)
96+
};
97+
6498
// Initialize state sub-network service and event handlers, if selected
6599
let (state_handler, state_network_task, state_event_tx, state_jsonrpc_tx) =
66100
if trin_config.networks.iter().any(|val| val == STATE_NETWORK) {
67101
initialize_state_network(
68-
&discovery,
69-
utp_listener_tx.clone(),
70-
portalnet_config.clone(),
71-
storage_config.clone(),
72-
)
73-
.await
74-
} else {
75-
(None, None, None, None)
76-
};
77-
78-
// Initialize chain history sub-network service and event handlers, if selected
79-
let (history_handler, history_network_task, history_event_tx, history_jsonrpc_tx) =
80-
if trin_config
81-
.networks
82-
.iter()
83-
.any(|val| val == HISTORY_NETWORK)
84-
{
85-
initialize_history_network(
86102
&discovery,
87103
utp_listener_tx,
88104
portalnet_config.clone(),
89105
storage_config.clone(),
106+
// todo: validation oracle gets passed into state network
107+
// - for StateValidator to use
108+
// - for oracle to pick up handle to state_jsonrpc_tx
90109
)
91110
.await
92111
} else {
@@ -103,7 +122,7 @@ pub async fn run_trin(
103122
tokio::task::spawn_blocking(|| {
104123
launch_jsonrpc_server(
105124
jsonrpc_trin_config,
106-
infura_project_id,
125+
infura_url,
107126
portal_jsonrpc_tx,
108127
live_server_tx,
109128
json_exiter_clone,

src/main.rs

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use std::env;
2-
31
use trin_core::cli::TrinConfig;
2+
use trin_core::utils::infura::fetch_infura_id_from_env;
43

54
use trin::run_trin;
65

@@ -11,13 +10,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
1110
println!("Launching trin");
1211

1312
let trin_config = TrinConfig::from_cli();
14-
let infura_project_id = match env::var("TRIN_INFURA_PROJECT_ID") {
15-
Ok(val) => val,
16-
Err(_) => panic!(
17-
"Must supply Infura key as environment variable, like:\n\
18-
TRIN_INFURA_PROJECT_ID=\"your-key-here\" trin"
19-
),
20-
};
13+
let infura_project_id = fetch_infura_id_from_env();
2114

2215
let exiter = run_trin(trin_config, infura_project_id).await?;
2316

trin-core/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ rust-version = "1.58.0"
77
[dependencies]
88
anyhow = "1.0.52"
99
async-recursion = "1.0.0"
10+
async-trait = "0.1.53"
1011
base64 = "0.13.0"
1112
bytes = "1.1.0"
1213
clap = "2.33.3"

trin-core/src/jsonrpc/service.rs

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl Default for JsonRpcExiter {
5858

5959
pub fn launch_jsonrpc_server(
6060
trin_config: TrinConfig,
61-
infura_project_id: String,
61+
infura_url: String,
6262
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
6363
live_server_tx: tokio::sync::mpsc::Sender<bool>,
6464
json_rpc_exiter: Arc<JsonRpcExiter>,
@@ -68,19 +68,13 @@ pub fn launch_jsonrpc_server(
6868
match trin_config.web3_transport.as_str() {
6969
"ipc" => launch_ipc_client(
7070
pool,
71-
infura_project_id,
71+
infura_url,
7272
&trin_config.web3_ipc_path,
7373
portal_tx,
7474
live_server_tx,
7575
json_rpc_exiter,
7676
),
77-
"http" => launch_http_client(
78-
pool,
79-
infura_project_id,
80-
trin_config,
81-
portal_tx,
82-
live_server_tx,
83-
),
77+
"http" => launch_http_client(pool, infura_url, trin_config, portal_tx, live_server_tx),
8478
val => panic!("Unsupported web3 transport: {}", val),
8579
}
8680
}
@@ -126,7 +120,7 @@ fn get_listener_result(ipc_path: &str) -> tokio::io::Result<uds_windows::UnixLis
126120

127121
fn launch_ipc_client(
128122
pool: ThreadPool,
129-
infura_project_id: String,
123+
infura_url: String,
130124
ipc_path: &str,
131125
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
132126
live_server_tx: tokio::sync::mpsc::Sender<bool>,
@@ -174,10 +168,9 @@ fn launch_ipc_client(
174168
Err(_) => break, // Socket exited
175169
};
176170
debug!("New IPC client: {:?}", stream.peer_addr().unwrap());
177-
let infura_project_id = infura_project_id.clone();
171+
let infura_url = infura_url.clone();
178172
let portal_tx = portal_tx.clone();
179173
pool.execute(move || {
180-
let infura_url = get_infura_url(&infura_project_id);
181174
let mut rx = stream.try_clone().unwrap();
182175
let mut tx = stream;
183176
serve_ipc_client(&mut rx, &mut tx, &infura_url, portal_tx);
@@ -192,7 +185,7 @@ fn launch_ipc_client(
192185

193186
fn launch_http_client(
194187
pool: ThreadPool,
195-
infura_project_id: String,
188+
infura_url: String,
196189
trin_config: TrinConfig,
197190
portal_tx: UnboundedSender<PortalJsonRpcRequest>,
198191
live_server_tx: tokio::sync::mpsc::Sender<bool>,
@@ -212,10 +205,9 @@ fn launch_http_client(
212205
for stream in listener.incoming() {
213206
match stream {
214207
Ok(stream) => {
215-
let infura_project_id = infura_project_id.clone();
208+
let infura_url = infura_url.clone();
216209
let portal_tx = portal_tx.clone();
217210
pool.execute(move || {
218-
let infura_url = get_infura_url(&infura_project_id);
219211
serve_http_client(stream, &infura_url, portal_tx);
220212
});
221213
}
@@ -386,7 +378,7 @@ fn dispatch_trin_request(
386378
}
387379

388380
// Handle all requests served by infura
389-
fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result<String, String> {
381+
pub fn dispatch_infura_request(obj: JsonRequest, infura_url: &str) -> Result<String, String> {
390382
match proxy_to_url(&obj, infura_url) {
391383
Ok(result_body) => Ok(std::str::from_utf8(&result_body).unwrap().to_owned()),
392384
Err(err) => Err(json!({
@@ -447,7 +439,3 @@ fn proxy_to_url(request: &JsonRequest, url: &str) -> io::Result<Vec<u8>> {
447439
)),
448440
}
449441
}
450-
451-
fn get_infura_url(infura_project_id: &str) -> String {
452-
return format!("https://mainnet.infura.io:443/v3/{}", infura_project_id);
453-
}

trin-core/src/portalnet/overlay.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use crate::portalnet::{
1515
},
1616
};
1717

18+
use crate::types::validation::Validator;
1819
use crate::utp::{
1920
stream::{UtpListenerRequest, UtpSocket, BUF_SIZE},
2021
trin_helpers::{UtpAccept, UtpMessage},
@@ -65,7 +66,7 @@ impl Default for OverlayConfig {
6566
/// implement the overlay protocol and the overlay protocol is where we can encapsulate the logic for
6667
/// handling common network requests/responses.
6768
#[derive(Clone)]
68-
pub struct OverlayProtocol<TContentKey, TMetric> {
69+
pub struct OverlayProtocol<TContentKey, TMetric, TValidator> {
6970
/// Reference to the underlying discv5 protocol
7071
pub discovery: Arc<Discovery>,
7172
/// Reference to the database instance
@@ -86,10 +87,15 @@ pub struct OverlayProtocol<TContentKey, TMetric> {
8687
phantom_content_key: PhantomData<TContentKey>,
8788
/// Associate a metric with the overlay network.
8889
phantom_metric: PhantomData<TMetric>,
90+
/// Declare the Validator type for a given overlay network.
91+
phantom_validator: PhantomData<TValidator>,
8992
}
9093

91-
impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
92-
OverlayProtocol<TContentKey, TMetric>
94+
impl<
95+
TContentKey: OverlayContentKey + Send,
96+
TMetric: Metric + Send,
97+
TValidator: 'static + Validator<TContentKey> + Send,
98+
> OverlayProtocol<TContentKey, TMetric, TValidator>
9399
{
94100
pub async fn new(
95101
config: OverlayConfig,
@@ -98,6 +104,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
98104
storage: Arc<RwLock<PortalStorage>>,
99105
data_radius: U256,
100106
protocol: ProtocolId,
107+
validator: TValidator,
101108
) -> Self {
102109
let kbuckets = Arc::new(RwLock::new(KBucketsTable::new(
103110
discovery.local_enr().node_id().into(),
@@ -108,7 +115,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
108115
)));
109116

110117
let data_radius = Arc::new(data_radius);
111-
let request_tx = OverlayService::<TContentKey, TMetric>::spawn(
118+
let request_tx = OverlayService::<TContentKey, TMetric, TValidator>::spawn(
112119
Arc::clone(&discovery),
113120
Arc::clone(&storage),
114121
Arc::clone(&kbuckets),
@@ -118,6 +125,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
118125
protocol.clone(),
119126
utp_listener_tx.clone(),
120127
config.enable_metrics,
128+
validator,
121129
)
122130
.await
123131
.unwrap();
@@ -132,6 +140,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
132140
utp_listener_tx,
133141
phantom_content_key: PhantomData,
134142
phantom_metric: PhantomData,
143+
phantom_validator: PhantomData,
135144
}
136145
}
137146

0 commit comments

Comments
 (0)