Skip to content

Commit 25ddc68

Browse files
authored
Merge pull request #275 from ogenev/offer-accept-utp-stream
Refactor OFFER/ACCEPT uTP stream and add `portal_*Offer` json-rpc endpoint
2 parents c00e99c + 4a13c94 commit 25ddc68

File tree

11 files changed

+163
-61
lines changed

11 files changed

+163
-61
lines changed

newsfragments/275.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `portal_*Offer` JSON-RPC andpoint

trin-core/src/jsonrpc/endpoints.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub enum StateEndpoint {
1414
FindContent,
1515
FindNodes,
1616
LocalContent,
17+
Offer,
1718
Ping,
1819
}
1920

@@ -24,6 +25,7 @@ pub enum HistoryEndpoint {
2425
FindContent,
2526
FindNodes,
2627
LocalContent,
28+
Offer,
2729
Ping,
2830
RecursiveFindContent,
2931
}
@@ -77,6 +79,7 @@ impl FromStr for TrinEndpoint {
7779
"portal_historyLocalContent" => {
7880
Ok(TrinEndpoint::HistoryEndpoint(HistoryEndpoint::LocalContent))
7981
}
82+
"portal_historyOffer" => Ok(TrinEndpoint::HistoryEndpoint(HistoryEndpoint::Offer)),
8083
"portal_historyPing" => Ok(TrinEndpoint::HistoryEndpoint(HistoryEndpoint::Ping)),
8184
"portal_historyRadius" => {
8285
Ok(TrinEndpoint::HistoryEndpoint(HistoryEndpoint::DataRadius))
@@ -88,6 +91,7 @@ impl FromStr for TrinEndpoint {
8891
"portal_stateLocalContent" => {
8992
Ok(TrinEndpoint::StateEndpoint(StateEndpoint::LocalContent))
9093
}
94+
"portal_stateOffer" => Ok(TrinEndpoint::StateEndpoint(StateEndpoint::Offer)),
9195
"portal_statePing" => Ok(TrinEndpoint::StateEndpoint(StateEndpoint::Ping)),
9296
"portal_stateRadius" => Ok(TrinEndpoint::StateEndpoint(StateEndpoint::DataRadius)),
9397
_ => Err(()),

trin-core/src/jsonrpc/types.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,65 @@ impl TryFrom<[&Value; 2]> for FindContentParams {
196196
}
197197
}
198198

199+
pub struct OfferParams {
200+
pub enr: SszEnr,
201+
pub content_keys: Vec<ByteList>,
202+
}
203+
204+
impl TryFrom<Params> for OfferParams {
205+
type Error = ValidationError;
206+
207+
fn try_from(params: Params) -> Result<Self, Self::Error> {
208+
match params {
209+
Params::Array(val) => match val.len() {
210+
2 => Self::try_from([&val[0], &val[1]]),
211+
_ => Err(ValidationError::new("Expected 2 params")),
212+
},
213+
_ => Err(ValidationError::new("Expected array of params")),
214+
}
215+
}
216+
}
217+
218+
impl TryFrom<[&Value; 2]> for OfferParams {
219+
type Error = ValidationError;
220+
221+
fn try_from(params: [&Value; 2]) -> Result<Self, Self::Error> {
222+
let enr: SszEnr = params[0].try_into()?;
223+
224+
let content_keys = params[1].as_array();
225+
226+
match content_keys {
227+
Some(content_keys) => {
228+
let content_keys: Result<Vec<String>, _> = content_keys
229+
.iter()
230+
.cloned()
231+
.map(serde_json::from_value)
232+
.collect();
233+
234+
if let Ok(content_keys) = content_keys {
235+
let content_keys: Result<Vec<Vec<u8>>, _> =
236+
content_keys.iter().map(hex::decode).collect();
237+
238+
if let Ok(content_keys) = content_keys {
239+
Ok(Self {
240+
enr,
241+
content_keys: content_keys
242+
.into_iter()
243+
.map(VariableList::from)
244+
.collect(),
245+
})
246+
} else {
247+
Err(ValidationError::new("Unable to hex decode content keys"))
248+
}
249+
} else {
250+
Err(ValidationError::new("Unable to decode content keys"))
251+
}
252+
}
253+
None => Err(ValidationError::new("Required a list of content keys")),
254+
}
255+
}
256+
}
257+
199258
pub struct RecursiveFindContentParams {
200259
pub content_key: ByteList,
201260
}

trin-core/src/portalnet/overlay.rs

Lines changed: 27 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::portalnet::types::messages::{
1515
ProtocolId, Request, Response,
1616
};
1717

18-
use crate::utp::stream::{ConnectionKey, UtpListener, BUF_SIZE};
18+
use crate::utp::stream::{UtpListener, BUF_SIZE};
1919
use crate::utp::trin_helpers::{UtpAccept, UtpMessage, UtpMessageId};
2020
use discv5::{
2121
enr::NodeId,
@@ -283,15 +283,15 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
283283
// TODO: Wait for uTP transfer and return the data
284284
Ok(found_content)
285285
}
286-
Err(msg) => {
287-
warn!("Unable to handle SYN-ACK packet: {msg}");
288-
Err(OverlayRequestError::UtpError(msg.to_string()))
286+
Err(err) => {
287+
warn!("Unable to handle SYN-ACK packet: {err}");
288+
Err(OverlayRequestError::UtpError(err.to_string()))
289289
}
290290
}
291291
}
292-
Err(msg) => {
293-
warn!("Unable to initiate uTP stream with remote node {msg}");
294-
Err(OverlayRequestError::UtpError(msg.to_string()))
292+
Err(err) => {
293+
warn!("Unable to initiate uTP stream with remote node {err}");
294+
Err(OverlayRequestError::UtpError(err.to_string()))
295295
}
296296
}
297297
}
@@ -324,7 +324,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
324324
.collect();
325325
let content_keys_offered: Vec<TContentKey> = match content_keys_offered {
326326
Ok(val) => val,
327-
Err(_msg) => return Err(OverlayRequestError::DecodeError),
327+
Err(_) => return Err(OverlayRequestError::DecodeError),
328328
};
329329

330330
// Send the request and wait on the response.
@@ -338,7 +338,7 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
338338
.await
339339
{
340340
Ok(accept) => Ok(accept),
341-
Err(msg) => Err(OverlayRequestError::OfferError(msg.to_string())),
341+
Err(err) => Err(OverlayRequestError::OfferError(err.to_string())),
342342
}
343343
}
344344
Ok(_) => Err(OverlayRequestError::InvalidResponse),
@@ -352,25 +352,30 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
352352
enr: Enr,
353353
content_keys_offered: Vec<TContentKey>,
354354
) -> anyhow::Result<Accept> {
355-
let connection_id = response.connection_id.clone();
355+
let connection_id = response.connection_id.to_be();
356+
357+
// Do not initialize uTP stream if remote node doesn't have interest in the offered content keys
358+
if response.content_keys.is_zero() {
359+
return Ok(response);
360+
}
356361

357362
self.utp_listener
358363
.write()
359364
.await
360365
.listening
361-
.insert(connection_id.clone(), UtpMessageId::OfferAcceptStream);
366+
.insert(connection_id, UtpMessageId::OfferStream);
362367

363368
// initiate the connection to the acceptor
364369
let mut conn = self
365370
.utp_listener
366371
.write()
367372
.await
368-
.connect(connection_id.clone(), enr.node_id())
373+
.connect(connection_id, enr.node_id())
369374
.await?;
370375

371376
// TODO: Replace this with recv_from
372377
// Handle STATE packet for SYN
373-
let mut buf = [0; 1500];
378+
let mut buf = [0; BUF_SIZE];
374379
conn.recv(&mut buf).await?;
375380

376381
// Return to acceptor: the content key and corresponding data
@@ -398,30 +403,18 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
398403
message: content_items,
399404
};
400405

401-
let utp_listener = self.utp_listener.clone();
402406
tokio::spawn(async move {
403-
if let Some(conn) = utp_listener
404-
.write()
407+
// send the content to the acceptor over a uTP stream
408+
if let Err(err) = conn
409+
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
405410
.await
406-
.utp_connections
407-
.get_mut(&ConnectionKey {
408-
node_id: enr.node_id(),
409-
conn_id_recv: connection_id,
410-
})
411411
{
412-
// send the content to the acceptor over a uTP stream
413-
if let Err(msg) = conn
414-
.send_to(&UtpMessage::new(content_message.as_ssz_bytes()).encode()[..])
415-
.await
416-
{
417-
debug!("Error sending content {msg}");
418-
} else {
419-
// Close uTP connection
420-
if let Err(msg) = conn.close().await {
421-
debug!("Unable to close uTP connection!: {msg}")
422-
}
423-
};
424-
}
412+
warn!("Error sending content {err}");
413+
};
414+
// Close uTP connection
415+
if let Err(err) = conn.close().await {
416+
warn!("Unable to close uTP connection!: {err}")
417+
};
425418
});
426419
Ok(response)
427420
}

trin-core/src/portalnet/overlay_service.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,9 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
661661
.map_err(|e| OverlayRequestError::AcceptError(e))?;
662662
let connection_id: u16 = crate::utp::stream::rand();
663663

664+
// TODO: Pipe this with overlay DB and request only not available keys.
665+
let accept_keys = request.content_keys.clone();
666+
664667
for (i, key) in request.content_keys.iter().enumerate() {
665668
// should_store is currently a dummy function
666669
// the actual function will take ContentKey type, so we'll have to decode keys here
@@ -676,18 +679,17 @@ impl<TContentKey: OverlayContentKey + Send, TMetric: Metric + Send>
676679
.write_with_warn()
677680
.await
678681
.listening
679-
.insert(connection_id.clone(), UtpMessageId::OfferAcceptStream);
682+
.insert(connection_id.clone(), UtpMessageId::OfferStream);
680683

681684
// also listen on conn_id + 1 because this is the actual receive path for acceptor
682-
utp_listener
683-
.write_with_warn()
684-
.await
685-
.listening
686-
.insert(connection_id.clone() + 1, UtpMessageId::OfferAcceptStream);
685+
utp_listener.write_with_warn().await.listening.insert(
686+
connection_id.clone() + 1,
687+
UtpMessageId::AcceptStream(accept_keys),
688+
);
687689
});
688690

689691
let accept = Accept {
690-
connection_id,
692+
connection_id: connection_id.to_be(),
691693
content_keys: requested_keys,
692694
};
693695

trin-core/src/portalnet/types/messages.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -491,6 +491,12 @@ pub struct Accept {
491491
pub content_keys: BitList<typenum::U8>,
492492
}
493493

494+
impl Into<Value> for Accept {
495+
fn into(self) -> Value {
496+
serde_json::json!({ "connection_id": format!("{:?}", self.connection_id.to_be()) , "content_keys": self.content_keys})
497+
}
498+
}
499+
494500
#[derive(Debug, PartialEq, Clone)]
495501
pub struct SszEnr(Enr);
496502

trin-core/src/utp/stream.rs

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use discv5::enr::NodeId;
77
use discv5::{Enr, TalkRequest};
88
use log::{debug, warn};
99
use rand::Rng;
10-
use ssz::{Decode, Encode};
10+
use ssz::Encode;
1111
use std::cmp::{max, min};
1212
use std::collections::{HashMap, VecDeque};
1313
use std::convert::TryFrom;
@@ -20,7 +20,7 @@ use crate::portalnet::types::messages::Content::Content;
2020
use crate::portalnet::types::messages::ProtocolId;
2121
use crate::utp::packets::{ExtensionType, Packet, PacketType, HEADER_SIZE};
2222
use crate::utp::time::{now_microseconds, Delay, Timestamp};
23-
use crate::utp::trin_helpers::{UtpAccept, UtpMessage, UtpMessageId};
23+
use crate::utp::trin_helpers::{UtpMessage, UtpMessageId};
2424
use crate::utp::util::{abs_diff, ewma, generate_sequential_identifiers};
2525
use std::time::Duration;
2626

@@ -274,20 +274,14 @@ impl UtpListener {
274274

275275
match self.listening.get(&conn.receiver_connection_id) {
276276
Some(message_type) => match message_type {
277-
UtpMessageId::OfferAcceptStream => {
278-
match UtpAccept::from_ssz_bytes(&received_stream[..]) {
279-
Ok(payload) => {
280-
for (key, content) in payload.message {
281-
// TODO: Implement this with overlay store
282-
debug!("Store {key:?}, {content:?}");
283-
}
284-
}
285-
Err(_) => debug!("Recv malformed data on handing UtpAccept"),
286-
}
277+
UtpMessageId::AcceptStream(content_keys) => {
278+
// TODO: Implement this with overlay store and decode receiver stream if multiple content values are send
279+
debug!("Store {content_keys:?}, {received_stream:?}");
280+
}
281+
UtpMessageId::FindContentData(_content) => {
282+
// TODO: Process Content data received via uTP stream
287283
}
288-
// TODO: Process Content data received via uTP stream
289-
UtpMessageId::FindContentData(_) => {}
290-
UtpMessageId::FindContentStream => {}
284+
_ => {}
291285
},
292286
_ => warn!("uTP listening HashMap doesn't have uTP stream message type"),
293287
}
@@ -717,7 +711,7 @@ impl UtpSocket {
717711
// When SYN is send and we receive STATE, do not reply
718712
(SocketState::SynSent, PacketType::State) => {
719713
self.connected_to = src;
720-
self.ack_nr = packet.seq_nr();
714+
self.ack_nr = packet.seq_nr() - 1;
721715
self.seq_nr += 1;
722716
self.state = SocketState::Connected;
723717
self.last_acked = packet.ack_nr();

trin-core/src/utp/trin_helpers.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ pub struct UtpAccept {
5757
pub enum UtpMessageId {
5858
FindContentStream,
5959
FindContentData(Content),
60-
OfferAcceptStream,
60+
OfferStream,
61+
AcceptStream(Vec<Vec<u8>>),
6162
}
6263

6364
#[cfg(test)]

trin-history/src/jsonrpc.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use serde_json::{json, Value};
55
use tokio::sync::mpsc;
66

77
use crate::network::HistoryNetwork;
8+
use trin_core::jsonrpc::types::OfferParams;
89
use trin_core::jsonrpc::{
910
endpoints::HistoryEndpoint,
1011
types::{
@@ -170,6 +171,26 @@ impl HistoryRequestHandler {
170171
};
171172
let _ = request.resp.send(response);
172173
}
174+
HistoryEndpoint::Offer => {
175+
let response = match OfferParams::try_from(request.params) {
176+
Ok(val) => {
177+
let content_keys =
178+
val.content_keys.iter().map(|key| key.to_vec()).collect();
179+
180+
match self
181+
.network
182+
.overlay
183+
.send_offer(content_keys, val.enr.into())
184+
.await
185+
{
186+
Ok(accept) => Ok(accept.into()),
187+
Err(msg) => Err(format!("Offer request timeout: {:?}", msg)),
188+
}
189+
}
190+
Err(msg) => Err(format!("Invalid Offer params: {:?}", msg)),
191+
};
192+
let _ = request.resp.send(response);
193+
}
173194
HistoryEndpoint::Ping => {
174195
let response = match PingParams::try_from(request.params) {
175196
Ok(val) => match self.network.overlay.send_ping(val.enr.into()).await {

0 commit comments

Comments
 (0)