Skip to content

Commit 4b50f97

Browse files
committed
Refactor portal, history and state event handlers
1 parent 4b70ddb commit 4b50f97

File tree

3 files changed

+137
-129
lines changed

3 files changed

+137
-129
lines changed

trin-core/src/portalnet/events.rs

Lines changed: 86 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -63,100 +63,100 @@ impl PortalnetEvents {
6363
loop {
6464
tokio::select! {
6565
Some(event) = self.protocol_receiver.recv() => {
66-
let request = match event {
67-
Discv5Event::TalkRequest(r) => r,
68-
Discv5Event::NodeInserted { node_id, replaced } => {
69-
match replaced {
70-
Some(old_node_id) => {
71-
debug!(
72-
"Received NodeInserted(node_id={}, replaced={})",
73-
node_id, old_node_id,
74-
);
75-
}
76-
None => {
77-
debug!("Received NodeInserted(node_id={})", node_id);
78-
}
79-
}
80-
continue;
81-
}
82-
event => {
83-
debug!("Got discv5 event {:?}", event);
84-
continue;
85-
}
86-
};
87-
88-
let protocol_id = ProtocolId::from_str(&hex::encode_upper(request.protocol()));
89-
90-
match protocol_id {
91-
Ok(protocol) => match protocol {
92-
ProtocolId::History => {
93-
match &self.history_overlay_sender {
94-
Some(tx) => tx.send(request).unwrap(),
95-
None => warn!("History event handler not initialized!"),
96-
};
97-
}
98-
ProtocolId::State => {
99-
match &self.state_overlay_sender {
100-
Some(tx) => tx.send(request).unwrap(),
101-
None => warn!("State event handler not initialized!"),
102-
};
103-
}
104-
ProtocolId::Utp => {
105-
if let Err(err) = self.utp_listener_sender.send(request) {
106-
warn! {"Error sending uTP request to uTP listener: {err}"};
107-
}
108-
}
109-
_ => {
110-
warn!(
111-
"Received TalkRequest on unknown protocol from={} protocol={} body={}",
112-
request.node_id(),
113-
hex::encode_upper(request.protocol()),
114-
hex::encode(request.body()),
115-
);
116-
}
117-
},
118-
Err(_) => warn!("Unable to decode protocol id"),
119-
}
120-
},
121-
Some(event) = self.utp_listener_receiver.recv() => {
122-
match event {
123-
UtpListenerEvent::ClosedStream(_, ref protocol_id, _) => {
124-
match protocol_id {
125-
ProtocolId::History =>{
126-
match &self.history_utp_sender {
127-
Some(tx) => tx.send(event).unwrap(),
128-
None => warn!("History uTP event handler not initialized!"),
129-
};
66+
let talk_req = match event {
67+
Discv5Event::TalkRequest(r) => r,
68+
Discv5Event::NodeInserted { node_id, replaced } => {
69+
match replaced {
70+
Some(old_node_id) => {
71+
debug!(
72+
"Received NodeInserted(node_id={}, replaced={})",
73+
node_id, old_node_id,
74+
);
13075
}
131-
ProtocolId::State => {
132-
match &self.state_utp_sender {
133-
Some(tx) => tx.send(event).unwrap(),
134-
None => warn!("State uTP event handler not initialized!"),
135-
};
76+
None => {
77+
debug!("Received NodeInserted(node_id={})", node_id);
13678
}
137-
_ => warn!("Unsupported protocol id event to dispatch")
13879
}
80+
continue;
13981
}
140-
UtpListenerEvent::ResetStream(ref protocol_id, _) => {
141-
match protocol_id {
142-
ProtocolId::History =>{
143-
match &self.history_utp_sender {
144-
Some(tx) => tx.send(event).unwrap(),
145-
None => warn!("History uTP event handler not initialized!"),
146-
};
147-
}
148-
ProtocolId::State => {
149-
match &self.state_utp_sender {
150-
Some(tx) => tx.send(event).unwrap(),
151-
None => warn!("State uTP event handler not initialized!"),
152-
};
153-
}
154-
_ => warn!("Unsupported protocol id event to dispatch")
155-
}
82+
event => {
83+
debug!("Got discv5 event {:?}", event);
84+
continue;
15685
}
86+
};
87+
self.dispatch_discv5_talk_req(talk_req);
88+
},
89+
Some(event) = self.utp_listener_receiver.recv() => self.handle_utp_listener_event(event)
90+
}
91+
}
92+
}
93+
94+
/// Dispatch Discv5 TalkRequest event to overlay networks or uTP listener
95+
fn dispatch_discv5_talk_req(&self, request: TalkRequest) {
96+
let protocol_id = ProtocolId::from_str(&hex::encode_upper(request.protocol()));
97+
98+
match protocol_id {
99+
Ok(protocol) => match protocol {
100+
ProtocolId::History => {
101+
match &self.history_overlay_sender {
102+
Some(tx) => tx.send(request).unwrap(),
103+
None => warn!("History event handler not initialized!"),
104+
};
105+
}
106+
ProtocolId::State => {
107+
match &self.state_overlay_sender {
108+
Some(tx) => tx.send(request).unwrap(),
109+
None => warn!("State event handler not initialized!"),
110+
};
111+
}
112+
ProtocolId::Utp => {
113+
if let Err(err) = self.utp_listener_sender.send(request) {
114+
warn! {"Error sending uTP request to uTP listener: {err}"};
157115
}
158116
}
117+
_ => {
118+
warn!(
119+
"Received TalkRequest on unknown protocol from={} protocol={} body={}",
120+
request.node_id(),
121+
hex::encode_upper(request.protocol()),
122+
hex::encode(request.body()),
123+
);
124+
}
125+
},
126+
Err(_) => warn!("Unable to decode protocol id"),
127+
}
128+
}
129+
130+
/// Handle uTP listener event
131+
fn handle_utp_listener_event(&self, event: UtpListenerEvent) {
132+
let event_clone = event.clone();
133+
134+
match event {
135+
UtpListenerEvent::ClosedStream(_, ref protocol_id, _) => {
136+
self.dispatch_utp_listener_event(event_clone, &protocol_id)
137+
}
138+
UtpListenerEvent::ResetStream(ref protocol_id, _) => {
139+
self.dispatch_utp_listener_event(event_clone, &protocol_id)
140+
}
141+
}
142+
}
143+
144+
/// Dispatch uTP listener event to overlay network
145+
fn dispatch_utp_listener_event(&self, event: UtpListenerEvent, protocol_id: &ProtocolId) {
146+
match protocol_id {
147+
ProtocolId::History => {
148+
match &self.history_utp_sender {
149+
Some(tx) => tx.send(event).unwrap(),
150+
None => warn!("History uTP event handler not initialized!"),
151+
};
152+
}
153+
ProtocolId::State => {
154+
match &self.state_utp_sender {
155+
Some(tx) => tx.send(event).unwrap(),
156+
None => warn!("State uTP event handler not initialized!"),
157+
};
159158
}
159+
_ => warn!("Unsupported protocol id event to dispatch"),
160160
}
161161
}
162162
}

trin-history/src/events.rs

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,31 +16,35 @@ impl HistoryEvents {
1616
loop {
1717
tokio::select! {
1818
Some(talk_request) = self.event_rx.recv() => {
19-
let reply = match self
20-
.network
21-
.overlay
22-
.process_one_request(&talk_request)
23-
.instrument(tracing::info_span!("history_network"))
24-
.await
25-
{
26-
Ok(response) => {
27-
debug!("Sending reply: {:?}", response);
28-
Message::from(response).into()
29-
}
30-
Err(error) => {
31-
error!("Failed to process portal history event: {}", error);
32-
error.to_string().into_bytes()
33-
}
34-
};
35-
36-
if let Err(error) = talk_request.respond(reply) {
37-
warn!("Failed to send reply: {}", error);
19+
self.handle_history_talk_request(talk_request).await;
3820
}
39-
}
4021
Some(event) = self.utp_listener_rx.recv() => {
4122
self.network.overlay.process_utp_event(event);
4223
}
4324
}
4425
}
4526
}
27+
28+
/// Handle history network TalkRequest event
29+
async fn handle_history_talk_request(&self, talk_request: TalkRequest) {
30+
let reply = match self
31+
.network
32+
.overlay
33+
.process_one_request(&talk_request)
34+
.instrument(tracing::info_span!("history_network"))
35+
.await
36+
{
37+
Ok(response) => {
38+
debug!("Sending reply: {:?}", response);
39+
Message::from(response).into()
40+
}
41+
Err(error) => {
42+
error!("Failed to process portal history event: {error}");
43+
error.to_string().into_bytes()
44+
}
45+
};
46+
if let Err(error) = talk_request.respond(reply) {
47+
warn!("Failed to send reply: {error}");
48+
}
49+
}
4650
}

trin-state/src/events.rs

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,36 @@ impl StateEvents {
1515
pub async fn start(mut self) {
1616
loop {
1717
tokio::select! {
18-
Some(talk_request) = self.event_rx.recv() => {
19-
let reply = match self
20-
.network
21-
.overlay
22-
.process_one_request(&talk_request)
23-
.instrument(tracing::info_span!("state_network"))
24-
.await
25-
{
26-
Ok(response) => {
27-
debug!("Sending reply: {:?}", response);
28-
Message::from(response).into()
29-
}
30-
Err(error) => {
31-
error!("Failed to process portal state event: {}", error);
32-
error.to_string().into_bytes()
33-
}
34-
};
35-
36-
if let Err(error) = talk_request.respond(reply) {
37-
warn!("Failed to send reply: {}", error);
18+
Some(talk_request) = self.event_rx.recv() => {
19+
self.handle_state_talk_request(talk_request).await;
3820
}
39-
}
40-
Some(event) = self.utp_listener_rx.recv() => {
21+
Some(event) = self.utp_listener_rx.recv() => {
4122
self.network.overlay.process_utp_event(event);
4223
}
43-
}
24+
}
25+
}
26+
}
27+
28+
/// Handle state network TalkRequest event
29+
async fn handle_state_talk_request(&self, talk_request: TalkRequest) {
30+
let reply = match self
31+
.network
32+
.overlay
33+
.process_one_request(&talk_request)
34+
.instrument(tracing::info_span!("state_network"))
35+
.await
36+
{
37+
Ok(response) => {
38+
debug!("Sending reply: {:?}", response);
39+
Message::from(response).into()
40+
}
41+
Err(error) => {
42+
error!("Failed to process portal state event: {error}");
43+
error.to_string().into_bytes()
44+
}
45+
};
46+
if let Err(error) = talk_request.respond(reply) {
47+
warn!("Failed to send reply: {error}");
4448
}
4549
}
4650
}

0 commit comments

Comments
 (0)