Skip to content

Commit 5f28053

Browse files
committed
Handle uTP listener events in overlay protocol
1 parent 836b0d9 commit 5f28053

File tree

9 files changed

+209
-89
lines changed

9 files changed

+209
-89
lines changed

newsfragments/337.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Handle global uTP listener events in portalnet event handler and dispatch them to overlay networks.

src/lib.rs

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ pub async fn run_trin(
6262

6363
debug!("Selected networks to spawn: {:?}", trin_config.networks);
6464
// Initialize state sub-network service and event handlers, if selected
65-
let (state_handler, state_network_task, state_event_tx, state_jsonrpc_tx) =
65+
let (state_handler, state_network_task, state_event_tx, state_utp_tx, state_jsonrpc_tx) =
6666
if trin_config.networks.iter().any(|val| val == STATE_NETWORK) {
6767
initialize_state_network(
6868
&discovery,
@@ -72,26 +72,31 @@ pub async fn run_trin(
7272
)
7373
.await
7474
} else {
75-
(None, None, None, None)
75+
(None, None, None, None, None)
7676
};
7777

7878
// Initialize chain history sub-network service and event handlers, if selected
79-
let (history_handler, history_network_task, history_event_tx, history_jsonrpc_tx) =
80-
if trin_config
81-
.networks
82-
.iter()
83-
.any(|val| val == HISTORY_NETWORK)
84-
{
85-
initialize_history_network(
86-
&discovery,
87-
utp_listener_tx,
88-
portalnet_config.clone(),
89-
storage_config.clone(),
90-
)
91-
.await
92-
} else {
93-
(None, None, None, None)
94-
};
79+
let (
80+
history_handler,
81+
history_network_task,
82+
history_event_tx,
83+
history_utp_tx,
84+
history_jsonrpc_tx,
85+
) = if trin_config
86+
.networks
87+
.iter()
88+
.any(|val| val == HISTORY_NETWORK)
89+
{
90+
initialize_history_network(
91+
&discovery,
92+
utp_listener_tx,
93+
portalnet_config.clone(),
94+
storage_config.clone(),
95+
)
96+
.await
97+
} else {
98+
(None, None, None, None, None)
99+
};
95100

96101
// Initialize json-rpc server
97102
let (portal_jsonrpc_tx, portal_jsonrpc_rx) = mpsc::unbounded_channel::<PortalJsonRpcRequest>();
@@ -137,7 +142,9 @@ pub async fn run_trin(
137142
portal_events_discovery,
138143
utp_listener_rx,
139144
history_event_tx,
145+
history_utp_tx,
140146
state_event_tx,
147+
state_utp_tx,
141148
utp_events_tx,
142149
)
143150
.await;

trin-core/src/portalnet/events.rs

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,35 @@ use crate::utp::stream::UtpListenerEvent;
99
use hex;
1010
use std::str::FromStr;
1111

12+
/// Main handler for portal network events
1213
pub struct PortalnetEvents {
14+
/// Discv5 service layer
1315
pub discovery: Arc<Discovery>,
16+
/// Receive Discv5 events
1417
pub protocol_receiver: mpsc::Receiver<Discv5Event>,
18+
/// Send overlay `TalkReq` to history network
19+
pub history_overlay_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
20+
/// Send uTP events with payload to history overlay network
21+
pub history_utp_sender: Option<mpsc::UnboundedSender<UtpListenerEvent>>,
22+
/// Send overlay `TalkReq` to state network
23+
pub state_overlay_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
24+
/// Send uTP events with payload to state overlay network
25+
pub state_utp_sender: Option<mpsc::UnboundedSender<UtpListenerEvent>>,
26+
/// Send TalkReq events with "utp" protocol id to `UtpListener`
27+
pub utp_listener_sender: mpsc::UnboundedSender<TalkRequest>,
28+
/// Receive uTP payload events from `UtpListener`
1529
pub utp_listener_receiver: mpsc::UnboundedReceiver<UtpListenerEvent>,
16-
pub history_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
17-
pub state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
18-
pub utp_sender: mpsc::UnboundedSender<TalkRequest>,
1930
}
2031

2132
impl PortalnetEvents {
2233
pub async fn new(
2334
discovery: Arc<Discovery>,
2435
utp_listener_receiver: mpsc::UnboundedReceiver<UtpListenerEvent>,
25-
history_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
26-
state_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
27-
utp_sender: mpsc::UnboundedSender<TalkRequest>,
36+
history_overlay_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
37+
history_utp_sender: Option<mpsc::UnboundedSender<UtpListenerEvent>>,
38+
state_overlay_sender: Option<mpsc::UnboundedSender<TalkRequest>>,
39+
state_utp_sender: Option<mpsc::UnboundedSender<UtpListenerEvent>>,
40+
utp_listener_sender: mpsc::UnboundedSender<TalkRequest>,
2841
) -> Self {
2942
let protocol_receiver = discovery
3043
.discv5
@@ -37,13 +50,15 @@ impl PortalnetEvents {
3750
discovery: Arc::clone(&discovery),
3851
protocol_receiver,
3952
utp_listener_receiver,
40-
history_sender,
41-
state_sender,
42-
utp_sender,
53+
history_overlay_sender,
54+
history_utp_sender,
55+
state_overlay_sender,
56+
state_utp_sender,
57+
utp_listener_sender,
4358
}
4459
}
4560

46-
/// Receives a request from the talkreq handler and sends a response back
61+
/// Main loop to dispatch `Discv5` and `UtpListener` events to overlay networks
4762
pub async fn start(mut self) {
4863
loop {
4964
tokio::select! {
@@ -75,19 +90,19 @@ impl PortalnetEvents {
7590
match protocol_id {
7691
Ok(protocol) => match protocol {
7792
ProtocolId::History => {
78-
match &self.history_sender {
93+
match &self.history_overlay_sender {
7994
Some(tx) => tx.send(request).unwrap(),
8095
None => warn!("History event handler not initialized!"),
8196
};
8297
}
8398
ProtocolId::State => {
84-
match &self.state_sender {
99+
match &self.state_overlay_sender {
85100
Some(tx) => tx.send(request).unwrap(),
86101
None => warn!("State event handler not initialized!"),
87102
};
88103
}
89104
ProtocolId::Utp => {
90-
if let Err(err) = self.utp_sender.send(request) {
105+
if let Err(err) = self.utp_listener_sender.send(request) {
91106
warn! {"Error sending uTP request to uTP listener: {err}"};
92107
}
93108
}
@@ -102,9 +117,44 @@ impl PortalnetEvents {
102117
},
103118
Err(_) => warn!("Unable to decode protocol id"),
104119
}
105-
}
120+
},
106121
Some(event) = self.utp_listener_receiver.recv() => {
107-
122+
match event {
123+
UtpListenerEvent::ClosedStream(_, ref protocol_id, _) => {
124+
match protocol_id {
125+
ProtocolId::History =>{
126+
match &self.history_utp_sender {
127+
Some(tx) => tx.send(event).unwrap(),
128+
None => warn!("History uTP event handler not initialized!"),
129+
};
130+
}
131+
ProtocolId::State => {
132+
match &self.state_utp_sender {
133+
Some(tx) => tx.send(event).unwrap(),
134+
None => warn!("State uTP event handler not initialized!"),
135+
};
136+
}
137+
_ => warn!("Unsupported protocol id event to dispatch")
138+
}
139+
}
140+
UtpListenerEvent::ResetStream(ref protocol_id, _) => {
141+
match protocol_id {
142+
ProtocolId::History =>{
143+
match &self.history_utp_sender {
144+
Some(tx) => tx.send(event).unwrap(),
145+
None => warn!("History uTP event handler not initialized!"),
146+
};
147+
}
148+
ProtocolId::State => {
149+
match &self.state_utp_sender {
150+
Some(tx) => tx.send(event).unwrap(),
151+
None => warn!("State uTP event handler not initialized!"),
152+
};
153+
}
154+
_ => warn!("Unsupported protocol id event to dispatch")
155+
}
156+
}
157+
}
108158
}
109159
}
110160
}

trin-core/src/portalnet/overlay.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::portalnet::{
1818
use crate::{
1919
portalnet::types::content_key::RawContentKey,
2020
utp::{
21-
stream::{UtpListenerRequest, UtpStream, BUF_SIZE},
21+
stream::{UtpListenerEvent, UtpListenerRequest, UtpStream, BUF_SIZE},
2222
trin_helpers::{UtpAccept, UtpMessage, UtpStreamId},
2323
},
2424
};
@@ -174,6 +174,15 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
174174
self.send_overlay_request(request, direction).await
175175
}
176176

177+
/// Process overlay uTP listener event
178+
pub fn process_utp_event(&self, event: UtpListenerEvent) {
179+
// TODO: Handle overlay uTP events
180+
debug!(
181+
"Got overlay uTP event: {event:?}, protocol id: {:?}",
182+
self.protocol
183+
);
184+
}
185+
177186
/// Returns a vector of all ENR node IDs of nodes currently contained in the routing table.
178187
pub fn table_entries_id(&self) -> Vec<NodeId> {
179188
self.kbuckets

trin-history/src/events.rs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,43 @@ use discv5::TalkRequest;
33
use std::sync::Arc;
44
use tokio::sync::mpsc::UnboundedReceiver;
55
use tracing::{debug, error, warn, Instrument};
6-
use trin_core::portalnet::types::messages::Message;
6+
use trin_core::{portalnet::types::messages::Message, utp::stream::UtpListenerEvent};
77

88
pub struct HistoryEvents {
99
pub network: Arc<HistoryNetwork>,
1010
pub event_rx: UnboundedReceiver<TalkRequest>,
11+
pub utp_listener_rx: UnboundedReceiver<UtpListenerEvent>,
1112
}
1213

1314
impl HistoryEvents {
14-
pub async fn process_requests(mut self) {
15-
while let Some(talk_request) = self.event_rx.recv().await {
16-
let reply = match self
17-
.network
18-
.overlay
19-
.process_one_request(&talk_request)
20-
.instrument(tracing::info_span!("history_network"))
21-
.await
22-
{
23-
Ok(response) => {
24-
debug!("Sending reply: {:?}", response);
25-
Message::from(response).into()
15+
pub async fn start(mut self) {
16+
loop {
17+
tokio::select! {
18+
Some(talk_request) = self.event_rx.recv() => {
19+
let reply = match self
20+
.network
21+
.overlay
22+
.process_one_request(&talk_request)
23+
.instrument(tracing::info_span!("history_network"))
24+
.await
25+
{
26+
Ok(response) => {
27+
debug!("Sending reply: {:?}", response);
28+
Message::from(response).into()
29+
}
30+
Err(error) => {
31+
error!("Failed to process portal history event: {}", error);
32+
error.to_string().into_bytes()
33+
}
34+
};
35+
36+
if let Err(error) = talk_request.respond(reply) {
37+
warn!("Failed to send reply: {}", error);
2638
}
27-
Err(error) => {
28-
error!("Failed to process portal history event: {}", error);
29-
error.to_string().into_bytes()
39+
}
40+
Some(event) = self.utp_listener_rx.recv() => {
41+
self.network.overlay.process_utp_event(event);
3042
}
31-
};
32-
33-
if let Err(error) = talk_request.respond(reply) {
34-
warn!("Failed to send reply: {}", error);
3543
}
3644
}
3745
}

trin-history/src/lib.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use trin_core::{
2020
types::messages::PortalnetConfig,
2121
},
2222
utils::bootnodes::parse_bootnodes,
23-
utp::stream::{UtpListener, UtpListenerRequest},
23+
utp::stream::{UtpListener, UtpListenerEvent, UtpListenerRequest},
2424
};
2525

2626
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -56,6 +56,7 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
5656
tokio::spawn(async move { utp_listener.start().await });
5757

5858
let (history_event_tx, history_event_rx) = mpsc::unbounded_channel::<TalkRequest>();
59+
let (history_utp_tx, history_utp_rx) = mpsc::unbounded_channel::<UtpListenerEvent>();
5960
let portal_events_discovery = Arc::clone(&discovery);
6061

6162
// Initialize DB config
@@ -68,6 +69,8 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
6869
portal_events_discovery,
6970
utp_listener_rx,
7071
Some(history_event_tx),
72+
Some(history_utp_tx),
73+
None,
7174
None,
7275
utp_sender,
7376
)
@@ -84,15 +87,21 @@ pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
8487
.await;
8588
let history_network = Arc::new(history_network);
8689

87-
spawn_history_network(history_network, portalnet_config, history_event_rx)
88-
.await
89-
.unwrap();
90+
spawn_history_network(
91+
history_network,
92+
portalnet_config,
93+
history_utp_rx,
94+
history_event_rx,
95+
)
96+
.await
97+
.unwrap();
9098
Ok(())
9199
}
92100

93101
type HistoryHandler = Option<HistoryRequestHandler>;
94102
type HistoryNetworkTask = Option<JoinHandle<()>>;
95103
type HistoryEventTx = Option<mpsc::UnboundedSender<TalkRequest>>;
104+
type HistoryUtpTx = Option<mpsc::UnboundedSender<UtpListenerEvent>>;
96105
type HistoryJsonRpcTx = Option<mpsc::UnboundedSender<HistoryJsonRpcRequest>>;
97106

98107
pub async fn initialize_history_network(
@@ -104,11 +113,13 @@ pub async fn initialize_history_network(
104113
HistoryHandler,
105114
HistoryNetworkTask,
106115
HistoryEventTx,
116+
HistoryUtpTx,
107117
HistoryJsonRpcTx,
108118
) {
109119
let (history_jsonrpc_tx, history_jsonrpc_rx) =
110120
mpsc::unbounded_channel::<HistoryJsonRpcRequest>();
111121
let (history_event_tx, history_event_rx) = mpsc::unbounded_channel::<TalkRequest>();
122+
let (utp_history_tx, utp_history_rx) = mpsc::unbounded_channel::<UtpListenerEvent>();
112123
let history_network = HistoryNetwork::new(
113124
Arc::clone(discovery),
114125
utp_listener_tx,
@@ -124,19 +135,22 @@ pub async fn initialize_history_network(
124135
let history_network_task = spawn_history_network(
125136
Arc::clone(&history_network),
126137
portalnet_config,
138+
utp_history_rx,
127139
history_event_rx,
128140
);
129141
(
130142
Some(history_handler),
131143
Some(history_network_task),
132144
Some(history_event_tx),
145+
Some(utp_history_tx),
133146
Some(history_jsonrpc_tx),
134147
)
135148
}
136149

137150
pub fn spawn_history_network(
138151
network: Arc<HistoryNetwork>,
139152
portalnet_config: PortalnetConfig,
153+
utp_listener_rx: mpsc::UnboundedReceiver<UtpListenerEvent>,
140154
history_event_rx: mpsc::UnboundedReceiver<TalkRequest>,
141155
) -> JoinHandle<()> {
142156
info!(
@@ -148,10 +162,11 @@ pub fn spawn_history_network(
148162
let history_events = HistoryEvents {
149163
network: Arc::clone(&network),
150164
event_rx: history_event_rx,
165+
utp_listener_rx,
151166
};
152167

153168
// Spawn history event handler
154-
tokio::spawn(history_events.process_requests());
169+
tokio::spawn(history_events.start());
155170

156171
// hacky test: make sure we establish a session with the boot node
157172
network.ping_bootnodes().await.unwrap();

0 commit comments

Comments
 (0)