diff --git a/Cargo.lock b/Cargo.lock index be80f1e42..ff90e7a39 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1995,6 +1995,7 @@ dependencies = [ "futures-util", "hex", "moka", + "parking_lot", "pin-project-lite", "pkcs8", "quinn", diff --git a/network/Cargo.toml b/network/Cargo.toml index 9fcc5dd54..344c63195 100644 --- a/network/Cargo.toml +++ b/network/Cargo.toml @@ -22,6 +22,7 @@ everscale-crypto = { version = "0.2", features = ["tl-proto"] } futures-util = { version = "0.3", features = ["sink"] } hex = "0.4" moka = { version = "0.12", features = ["sync"] } +parking_lot = "0.12" pin-project-lite = "0.2" pkcs8 = "0.10" quinn = { version = "0.10", default-features = false, features = ["runtime-tokio", "tls-rustls"] } diff --git a/network/src/dht/mod.rs b/network/src/dht/mod.rs index bcafb37c3..ceba799bb 100644 --- a/network/src/dht/mod.rs +++ b/network/src/dht/mod.rs @@ -509,7 +509,7 @@ impl DhtInner { *self.local_peer_info.lock().unwrap() = Some(peer_info); } - #[tracing::instrument(level = "debug", skip_all, fields(local_id = % self.local_id))] + #[tracing::instrument(level = "debug", skip_all, fields(local_id = %self.local_id))] async fn announce_local_peer_info(&self, network: &Network) -> Result<()> { let data = tl_proto::serialize(&[network.local_addr().into()] as &[Address]); @@ -524,7 +524,7 @@ impl DhtInner { self.store_value(network, ValueRef::Peer(value), true).await } - #[tracing::instrument(level = "debug", skip_all, fields(local_id = % self.local_id))] + #[tracing::instrument(level = "debug", skip_all, fields(local_id = %self.local_id))] async fn refresh_routing_table(&self, network: &Network) { const PARALLEL_QUERIES: usize = 3; const MAX_DISTANCE: usize = 15; diff --git a/network/src/dht/routing.rs b/network/src/dht/routing.rs index 3a054011e..9278b6db5 100644 --- a/network/src/dht/routing.rs +++ b/network/src/dht/routing.rs @@ -203,9 +203,9 @@ mod tests { #[test] fn buckets_are_sets() { - let mut table = RoutingTable::new(PeerId::random()); + let mut table = RoutingTable::new(rand::random()); - let peer = PeerId::random(); + let peer = rand::random(); assert!(table.add( make_node(peer), MAX_K, @@ -223,7 +223,7 @@ mod tests { #[test] fn sould_not_add_seld() { - let local_id = PeerId::random(); + let local_id = rand::random(); let mut table = RoutingTable::new(local_id); assert!(!table.add( @@ -243,14 +243,14 @@ mod tests { for _ in 0..k { assert!(bucket.insert( - make_node(PeerId::random()), + make_node(rand::random()), k, &timeout, RoutingTableSource::Trusted )); } assert!(!bucket.insert( - make_node(PeerId::random()), + make_node(rand::random()), k, &timeout, RoutingTableSource::Trusted diff --git a/network/src/lib.rs b/network/src/lib.rs index fe4689e40..2f01ec87a 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -1,3 +1,9 @@ +pub use self::overlay::{ + OverlayConfig, OverlayId, OverlayService, OverlayServiceBackgroundTasks, OverlayServiceBuilder, + PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesReadGuard, + PrivateOverlayEntriesWriteGuard, PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, + PublicOverlayEntriesReadGuard, +}; pub use self::util::{check_peer_signature, NetworkExt, Routable, Router, RouterBuilder}; pub use dht::{ xor_distance, DhtClient, DhtClientBuilder, DhtConfig, DhtQueryBuilder, DhtQueryWithDataBuilder, @@ -18,11 +24,13 @@ pub use quinn; mod dht; mod network; +mod overlay; mod types; mod util; pub mod proto { pub mod dht; + pub mod overlay; } #[doc(hidden)] @@ -39,10 +47,22 @@ mod tests { #[tokio::test] async fn init_works() { let keypair = everscale_crypto::ed25519::KeyPair::generate(&mut rand::thread_rng()); + let peer_id: PeerId = keypair.public_key.into(); + + let private_overlay = PrivateOverlay::builder(rand::random()) + .build(service_message_fn(|_| futures_util::future::ready(()))); + + let public_overlay = PublicOverlay::builder(rand::random()) + .build(service_message_fn(|_| futures_util::future::ready(()))); + + let (overlay_tasks, overlay_service) = OverlayService::builder(peer_id) + .with_private_overlay(&private_overlay) + .with_public_overlay(&public_overlay) + .build(); - let (dht_client, dht) = DhtService::builder(keypair.public_key.into()).build(); + let (dht_client, dht) = DhtService::builder(peer_id).build(); - let router = Router::builder().route(dht).build(); + let router = Router::builder().route(dht).route(overlay_service).build(); let network = Network::builder() .with_random_private_key() @@ -50,6 +70,7 @@ mod tests { .build((Ipv4Addr::LOCALHOST, 0), router) .unwrap(); - let _dht_client = dht_client.build(network); + let _dht_client = dht_client.build(network.clone()); + overlay_tasks.spawn(network); } } diff --git a/network/src/overlay/config.rs b/network/src/overlay/config.rs new file mode 100644 index 000000000..ad5f7b58a --- /dev/null +++ b/network/src/overlay/config.rs @@ -0,0 +1,41 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; +use tycho_util::serde_helpers; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +pub struct OverlayConfig { + /// Maximum time to live for public overlay peer entries. + /// + /// Default: 1 hour. + #[serde(with = "serde_helpers::humantime")] + pub max_public_entry_tll: Duration, + + /// A period of exchanging public overlay peers. + /// + /// Default: 3 minutes. + #[serde(with = "serde_helpers::humantime")] + pub public_overlay_peer_exchange_period: Duration, + + /// A maximum value of a random jitter for the peer exchange period. + /// + /// Default: 30 seconds. + pub public_overlay_peer_exchange_max_jitter: Duration, + + /// Number of peers to send during entries exchange request. + /// + /// Default: 20. + pub exchange_public_entries_batch: usize, +} + +impl Default for OverlayConfig { + fn default() -> Self { + Self { + max_public_entry_tll: Duration::from_secs(3600), + public_overlay_peer_exchange_period: Duration::from_secs(3 * 60), + public_overlay_peer_exchange_max_jitter: Duration::from_secs(30), + exchange_public_entries_batch: 20, + } + } +} diff --git a/network/src/overlay/mod.rs b/network/src/overlay/mod.rs new file mode 100644 index 000000000..03d09d9f5 --- /dev/null +++ b/network/src/overlay/mod.rs @@ -0,0 +1,597 @@ +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll, Waker}; + +use anyhow::Result; +use bytes::Buf; +use futures_util::{Stream, StreamExt}; +use tl_proto::{TlError, TlRead}; +use tokio::sync::Notify; +use tokio::task::JoinHandle; +use tycho_util::futures::BoxFutureOrNoop; +use tycho_util::time::{now_sec, shifted_interval}; +use tycho_util::{FastDashMap, FastHashMap, FastHashSet}; + +use crate::network::{Network, WeakNetwork}; +use crate::proto::overlay::{rpc, PublicEntriesResponse, PublicEntry, PublicEntryToSign}; +use crate::types::{PeerId, Request, Response, Service, ServiceRequest}; +use crate::util::{NetworkExt, Routable}; + +pub use self::config::OverlayConfig; +pub use self::overlay_id::OverlayId; +pub use self::private_overlay::{ + PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesReadGuard, + PrivateOverlayEntriesWriteGuard, +}; +pub use self::public_overlay::{ + PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard, +}; + +mod config; +mod overlay_id; +mod private_overlay; +mod public_overlay; + +pub struct OverlayServiceBackgroundTasks { + inner: Arc, +} + +impl OverlayServiceBackgroundTasks { + pub fn spawn(self, network: Network) { + self.inner + .start_background_tasks(Network::downgrade(&network)); + } +} + +pub struct OverlayServiceBuilder { + local_id: PeerId, + config: Option, + private_overlays: FastDashMap, + public_overlays: FastDashMap, +} + +impl OverlayServiceBuilder { + pub fn with_config(mut self, config: OverlayConfig) -> Self { + self.config = Some(config); + self + } + + pub fn with_private_overlay(self, overlay: &PrivateOverlay) -> Self { + assert!( + !self.public_overlays.contains_key(overlay.overlay_id()), + "public overlay with id {} already exists", + overlay.overlay_id() + ); + + let prev = self + .private_overlays + .insert(*overlay.overlay_id(), overlay.clone()); + if let Some(prev) = prev { + panic!( + "private overlay with id {} already exists", + prev.overlay_id() + ); + } + self + } + + pub fn with_public_overlay(self, overlay: &PublicOverlay) -> Self { + assert!( + !self.private_overlays.contains_key(overlay.overlay_id()), + "private overlay with id {} already exists", + overlay.overlay_id() + ); + + let prev = self + .public_overlays + .insert(*overlay.overlay_id(), overlay.clone()); + if let Some(prev) = prev { + panic!( + "public overlay with id {} already exists", + prev.overlay_id() + ); + } + self + } + + pub fn build(self) -> (OverlayServiceBackgroundTasks, OverlayService) { + let config = self.config.unwrap_or_default(); + + let inner = Arc::new(OverlayServiceInner { + local_id: self.local_id, + config, + private_overlays: self.private_overlays, + public_overlays: self.public_overlays, + public_overlays_changed: Arc::new(Notify::new()), + }); + + let background_tasks = OverlayServiceBackgroundTasks { + inner: inner.clone(), + }; + + (background_tasks, OverlayService(inner)) + } +} + +#[derive(Clone)] +pub struct OverlayService(Arc); + +impl OverlayService { + pub fn builder(local_id: PeerId) -> OverlayServiceBuilder { + OverlayServiceBuilder { + local_id, + config: None, + private_overlays: Default::default(), + public_overlays: Default::default(), + } + } + + pub fn try_add_private_overlay(&self, overlay: &PrivateOverlay) -> bool { + self.0.try_add_private_overlay(overlay) + } + + pub fn try_add_public_overlay(&self, overlay: &PublicOverlay) -> bool { + self.0.try_add_public_overlay(overlay) + } +} + +impl Service for OverlayService { + type QueryResponse = Response; + type OnQueryFuture = BoxFutureOrNoop>; + type OnMessageFuture = BoxFutureOrNoop<()>; + type OnDatagramFuture = futures_util::future::Ready<()>; + + #[tracing::instrument( + level = "debug", + name = "on_overlay_query", + skip_all, + fields(peer_id = %req.metadata.peer_id, addr = %req.metadata.remote_address) + )] + fn on_query(&self, mut req: ServiceRequest) -> Self::OnQueryFuture { + let e = 'req: { + if req.body.len() < 4 { + break 'req TlError::UnexpectedEof; + } + + // NOTE: `req.body` is untouched while reading the constructor + // and `as_ref` here is exactly for that. + let mut offset = 0; + let overlay_id = match req.body.as_ref().get_u32_le() { + rpc::Prefix::TL_ID => match rpc::Prefix::read_from(&req.body, &mut offset) { + Ok(rpc::Prefix { overlay_id }) => overlay_id, + Err(e) => break 'req e, + }, + rpc::ExchangeRandomPublicEntries::TL_ID => { + let req = match tl_proto::deserialize::( + &req.body, + ) { + Ok(req) => req, + Err(e) => break 'req e, + }; + tracing::debug!("exchange_random_public_entries"); + + let res = self.0.handle_exchange_public_entries(&req); + return BoxFutureOrNoop::future(futures_util::future::ready(Some( + Response::from_tl(res), + ))); + } + _ => break 'req TlError::UnknownConstructor, + }; + + if req.body.len() < offset + 4 { + // Definitely an invalid request (not enough bytes for the constructor) + break 'req TlError::UnexpectedEof; + } + + if let Some(private_overlay) = self.0.private_overlays.get(overlay_id) { + req.body.advance(offset); + return private_overlay.handle_query(req); + } else if let Some(public_overlay) = self.0.public_overlays.get(overlay_id) { + req.body.advance(offset); + return public_overlay.handle_query(req); + } + + tracing::debug!( + overlay_id = %OverlayId::wrap(overlay_id), + "unknown overlay id" + ); + return BoxFutureOrNoop::Noop; + }; + + tracing::debug!("failed to deserialize query: {e:?}"); + BoxFutureOrNoop::Noop + } + + #[tracing::instrument( + level = "debug", + name = "on_overlay_message", + skip_all, + fields(peer_id = %req.metadata.peer_id, addr = %req.metadata.remote_address) + )] + fn on_message(&self, mut req: ServiceRequest) -> Self::OnMessageFuture { + // TODO: somehow refactor with one method for both query and message + + let e = 'req: { + if req.body.len() < 4 { + break 'req TlError::UnexpectedEof; + } + + // NOTE: `req.body` is untouched while reading the constructor + // and `as_ref` here is exactly for that. + let mut offset = 0; + let overlay_id = match req.body.as_ref().get_u32_le() { + rpc::Prefix::TL_ID => match rpc::Prefix::read_from(&req.body, &mut offset) { + Ok(rpc::Prefix { overlay_id }) => overlay_id, + Err(e) => break 'req e, + }, + _ => break 'req TlError::UnknownConstructor, + }; + + if req.body.len() < offset + 4 { + // Definitely an invalid request (not enough bytes for the constructor) + break 'req TlError::UnexpectedEof; + } + + if let Some(private_overlay) = self.0.private_overlays.get(overlay_id) { + req.body.advance(offset); + return private_overlay.handle_message(req); + } else if let Some(public_overlay) = self.0.public_overlays.get(overlay_id) { + req.body.advance(offset); + return public_overlay.handle_message(req); + } + + tracing::debug!( + overlay_id = %OverlayId::wrap(overlay_id), + "unknown overlay id" + ); + return BoxFutureOrNoop::Noop; + }; + + tracing::debug!("failed to deserialize message: {e:?}"); + BoxFutureOrNoop::Noop + } + + #[inline] + fn on_datagram(&self, _req: ServiceRequest) -> Self::OnDatagramFuture { + futures_util::future::ready(()) + } +} + +impl Routable for OverlayService { + fn query_ids(&self) -> impl IntoIterator { + [rpc::ExchangeRandomPublicEntries::TL_ID, rpc::Prefix::TL_ID] + } + + fn message_ids(&self) -> impl IntoIterator { + [rpc::Prefix::TL_ID] + } +} + +struct OverlayServiceInner { + local_id: PeerId, + config: OverlayConfig, + public_overlays: FastDashMap, + private_overlays: FastDashMap, + public_overlays_changed: Arc, +} + +impl OverlayServiceInner { + fn start_background_tasks(self: &Arc, network: WeakNetwork) { + enum Action<'a> { + UpdatePublicOverlaysList { + exchange_state: &'a mut ExchangeState, + }, + ExchangePublicEntries { + exchange_state: &'a mut ExchangeState, + overlay_id: OverlayId, + }, + } + + #[derive(Default)] + struct ExchangeState { + stream: PublicOverlayActionsStream, + futures: FastHashMap>>, + } + + let public_overlays_notify = self.public_overlays_changed.clone(); + + let this = Arc::downgrade(self); + tokio::spawn(async move { + tracing::debug!("background overlay loop started"); + + let mut exchange_state = None::; + let mut public_overlays_changed = Box::pin(public_overlays_notify.notified()); + + loop { + let action = match &mut exchange_state { + // Initial update + None => Action::UpdatePublicOverlaysList { + exchange_state: exchange_state.get_or_insert_with(Default::default), + }, + // Default actions + Some(exchange_state) => { + tokio::select! { + _ = &mut public_overlays_changed => { + public_overlays_changed = Box::pin(public_overlays_notify.notified()); + Action::UpdatePublicOverlaysList { + exchange_state + } + }, + overlay_id = exchange_state.stream.next() => match overlay_id { + Some(id) => Action::ExchangePublicEntries { + exchange_state, + overlay_id: id + }, + None => continue, + } + } + } + }; + + let (Some(this), Some(network)) = (this.upgrade(), network.upgrade()) else { + break; + }; + + match action { + Action::UpdatePublicOverlaysList { exchange_state } => exchange_state.stream.rebuild( + this.public_overlays.iter().map(|item| *item.key()), + |_| { + shifted_interval( + this.config.public_overlay_peer_exchange_period, + this.config.public_overlay_peer_exchange_max_jitter, + ) + }, + |overlay_id| { + if let Some(fut) = exchange_state.futures.remove(overlay_id).flatten() { + tracing::debug!(%overlay_id, "cancelling exchange public entries task"); + fut.abort(); + } + }, + ), + Action::ExchangePublicEntries { exchange_state, overlay_id } => { + let fut_entry = exchange_state.futures.entry(overlay_id).or_default(); + + // Wait for the previous exchange to finish. + if let Some(fut) = fut_entry.take() { + if let Err(e) = fut.await { + if e.is_panic() { + std::panic::resume_unwind(e.into_panic()); + } + } + } + + // Spawn a new exchange + *fut_entry = Some(tokio::spawn(async move { + let res = this.exchange_public_entries(&network, &overlay_id).await; + if let Err(e) = res { + tracing::error!(%overlay_id, "failed to exchange public entries: {e:?}"); + }; + })); + } + } + } + + tracing::debug!("background overlay loop stopped"); + }); + } + + #[tracing::instrument( + level = "debug", + skip_all, + fields(local_id = %self.local_id, overlay_id = %overlay_id), + )] + async fn exchange_public_entries( + &self, + network: &Network, + overlay_id: &OverlayId, + ) -> Result<()> { + let Some(overlay) = self.public_overlays.get(overlay_id) else { + anyhow::bail!("overlay not found"); + }; + + let n = std::cmp::max(self.config.exchange_public_entries_batch, 1); + let mut entries = Vec::with_capacity(n); + + // Always include us in the response + entries.push(Arc::new(self.make_local_public_overlay_entry( + network, + overlay_id, + now_sec(), + ))); + + // Choose a random target to send the request and additional random entries + let peer_id = { + let rng = &mut rand::thread_rng(); + + let all_entries = overlay.read_entries(); + let mut iter = all_entries.choose_multiple(rng, n); + + // TODO: search for target in known peers. This is a stub which will not work. + let peer_id = match iter.next() { + Some(entry) => entry.peer_id, + None => anyhow::bail!("empty overlay, no peers to exchange entries with"), + }; + + // Add additional random entries to the response + entries.extend(iter.cloned()); + + // Use this peer id for the request + peer_id + }; + + // Send request + let response = network + .query( + &peer_id, + Request::from_tl(rpc::ExchangeRandomPublicEntries { + overlay_id: overlay_id.to_bytes(), + entries, + }), + ) + .await? + .parse_tl::()?; + + // Populate the overlay with the response + match response { + PublicEntriesResponse::PublicEntries(entries) => { + tracing::debug!( + %peer_id, + count = entries.len(), + "received public entries" + ); + overlay.add_untrusted_entries(&entries); + } + PublicEntriesResponse::OverlayNotFound => { + tracing::debug!(%peer_id, "overlay not found"); + } + } + + // Done + Ok(()) + } + + fn make_local_public_overlay_entry( + &self, + network: &Network, + overlay_id: &OverlayId, + now: u32, + ) -> PublicEntry { + let signature = Box::new(network.sign_tl(PublicEntryToSign { + overlay_id: overlay_id.as_bytes(), + peer_id: &self.local_id, + created_at: now, + })); + PublicEntry { + peer_id: self.local_id, + created_at: now, + signature, + } + } + + pub fn try_add_private_overlay(&self, overlay: &PrivateOverlay) -> bool { + use dashmap::mapref::entry::Entry; + + if self.public_overlays.contains_key(overlay.overlay_id()) { + return false; + } + match self.private_overlays.entry(*overlay.overlay_id()) { + Entry::Vacant(entry) => { + entry.insert(overlay.clone()); + true + } + Entry::Occupied(_) => false, + } + } + + pub fn try_add_public_overlay(&self, overlay: &PublicOverlay) -> bool { + use dashmap::mapref::entry::Entry; + + if self.private_overlays.contains_key(overlay.overlay_id()) { + return false; + } + match self.public_overlays.entry(*overlay.overlay_id()) { + Entry::Vacant(entry) => { + entry.insert(overlay.clone()); + self.public_overlays_changed.notify_waiters(); + true + } + Entry::Occupied(_) => false, + } + } + + fn handle_exchange_public_entries( + &self, + req: &rpc::ExchangeRandomPublicEntries, + ) -> PublicEntriesResponse { + // NOTE: validation is done in the TL parser. + debug_assert!(req.entries.len() <= 20); + + // Find the overlay + let overlay = match self.public_overlays.get(&req.overlay_id) { + Some(overlay) => overlay, + None => return PublicEntriesResponse::OverlayNotFound, + }; + + // Add proposed entries to the overlay + overlay.add_untrusted_entries(&req.entries); + + // Collect proposed entries to exclude from the response + let requested_ids = req + .entries + .iter() + .map(|id| id.peer_id) + .collect::>(); + + let entries = { + let entries = overlay.read_entries(); + + // Choose additional random entries to ensure we have enough new entries to send back + let n = self.config.exchange_public_entries_batch; + entries + .choose_multiple(&mut rand::thread_rng(), n + requested_ids.len()) + .filter_map(|entry| { + let is_new = !requested_ids.contains(&entry.peer_id); + is_new.then(|| entry.clone()) + }) + .take(n) + .collect::>() + }; + + PublicEntriesResponse::PublicEntries(entries) + } +} + +#[derive(Default)] +struct PublicOverlayActionsStream { + intervals: Vec<(tokio::time::Interval, OverlayId)>, + waker: Option, +} + +impl PublicOverlayActionsStream { + fn rebuild, A, R>( + &mut self, + iter: I, + mut on_add: A, + mut on_remove: R, + ) where + for<'a> A: FnMut(&'a OverlayId) -> tokio::time::Interval, + for<'a> R: FnMut(&'a OverlayId), + { + let mut new_overlays = iter.collect::>(); + self.intervals.retain(|(_, id)| { + let retain = new_overlays.remove(id); + if !retain { + on_remove(id); + } + retain + }); + + for id in new_overlays { + self.intervals.push((on_add(&id), id)); + } + + if let Some(waker) = &self.waker { + waker.wake_by_ref(); + } + } +} + +impl Stream for PublicOverlayActionsStream { + type Item = OverlayId; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // Always register the waker to resume the stream even if there were + // changes in the intervals. + if !matches!(&self.waker, Some(waker) if cx.waker().will_wake(waker)) { + self.waker = Some(cx.waker().clone()); + } + + for (interval, data) in self.intervals.iter_mut() { + if interval.poll_tick(cx).is_ready() { + return Poll::Ready(Some(*data)); + } + } + + Poll::Pending + } +} diff --git a/network/src/overlay/overlay_id.rs b/network/src/overlay/overlay_id.rs new file mode 100644 index 000000000..f2da897d8 --- /dev/null +++ b/network/src/overlay/overlay_id.rs @@ -0,0 +1,118 @@ +use std::borrow::Borrow; +use std::str::FromStr; + +use rand::Rng; +use tl_proto::{TlRead, TlWrite}; + +#[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord, TlRead, TlWrite)] +#[repr(transparent)] +pub struct OverlayId([u8; 32]); + +impl OverlayId { + pub fn wrap(bytes: &[u8; 32]) -> &Self { + // SAFETY: `[u8; 32]` has the same layout as `OverlayId`. + unsafe { &*(bytes as *const [u8; 32]).cast::() } + } + + #[inline] + pub fn as_bytes(&self) -> &[u8; 32] { + &self.0 + } + + #[inline] + pub fn to_bytes(self) -> [u8; 32] { + self.0 + } +} + +impl Borrow<[u8; 32]> for OverlayId { + #[inline] + fn borrow(&self) -> &[u8; 32] { + &self.0 + } +} + +impl<'a> TlRead<'a> for &'a OverlayId { + type Repr = tl_proto::Boxed; + + #[inline] + fn read_from(packet: &'a [u8], offset: &mut usize) -> tl_proto::TlResult { + <_>::read_from(packet, offset).map(OverlayId::wrap) + } +} + +impl rand::distributions::Distribution for rand::distributions::Standard { + #[inline] + fn sample(&self, rng: &mut R) -> OverlayId { + OverlayId(rand::distributions::Standard.sample(rng)) + } +} + +impl std::fmt::Display for OverlayId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let len = f.precision().unwrap_or(32); + for byte in self.0.iter().take(len) { + write!(f, "{byte:02x}")?; + } + Ok(()) + } +} + +impl std::fmt::Debug for OverlayId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "OverlayId({self})") + } +} + +impl FromStr for OverlayId { + type Err = hex::FromHexError; + + fn from_str(s: &str) -> Result { + let mut overlay_id = OverlayId([0; 32]); + hex::decode_to_slice(s, &mut overlay_id.0).map(|_| overlay_id) + } +} + +impl serde::Serialize for OverlayId { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if serializer.is_human_readable() { + serializer.collect_str(self) + } else { + self.0.serialize(serializer) + } + } +} + +impl<'de> serde::Deserialize<'de> for OverlayId { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + if deserializer.is_human_readable() { + deserializer.deserialize_str(tycho_util::serde_helpers::StrVisitor::new()) + } else { + <[u8; 32]>::deserialize(deserializer).map(Self) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn serde() { + const SOME_ID: &str = "5d09fe251943525a30f471791d5b4fea1298613f52ad2ad6d985fed05eb00533"; + + let from_json: OverlayId = serde_json::from_str(&format!("\"{SOME_ID}\"")).unwrap(); + let from_str = OverlayId::from_str(SOME_ID).unwrap(); + assert_eq!(from_json, from_str); + + let to_json = serde_json::to_string(&from_json).unwrap(); + let from_json: OverlayId = serde_json::from_str(&to_json).unwrap(); + assert_eq!(from_json, from_str); + } +} diff --git a/network/src/overlay/private_overlay.rs b/network/src/overlay/private_overlay.rs new file mode 100644 index 000000000..25d4040e8 --- /dev/null +++ b/network/src/overlay/private_overlay.rs @@ -0,0 +1,254 @@ +use std::borrow::Borrow; +use std::collections::hash_map; +use std::sync::Arc; + +use anyhow::Result; +use bytes::{Bytes, BytesMut}; +use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use rand::seq::SliceRandom; +use rand::Rng; +use tycho_util::futures::BoxFutureOrNoop; +use tycho_util::{FastHashMap, FastHashSet}; + +use crate::network::Network; +use crate::overlay::OverlayId; +use crate::proto::overlay::rpc; +use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest}; +use crate::util::NetworkExt; + +pub struct PrivateOverlayBuilder { + overlay_id: OverlayId, + entries: FastHashSet, +} + +impl PrivateOverlayBuilder { + pub fn with_entries(mut self, allowed_peers: I) -> Self + where + I: IntoIterator, + I::Item: Borrow, + { + self.entries + .extend(allowed_peers.into_iter().map(|p| *p.borrow())); + self + } + + pub fn build(self, service: S) -> PrivateOverlay + where + S: Send + Sync + 'static, + S: Service, + { + let request_prefix = tl_proto::serialize(rpc::Prefix { + overlay_id: self.overlay_id.as_bytes(), + }); + + let mut entries = PrivateOverlayEntries { + peer_id_to_index: Default::default(), + data: Default::default(), + }; + for peer_id in self.entries { + entries.insert(&peer_id); + } + + PrivateOverlay { + inner: Arc::new(Inner { + overlay_id: self.overlay_id, + entries: RwLock::new(entries), + service: service.boxed(), + request_prefix: request_prefix.into_boxed_slice(), + }), + } + } +} + +#[derive(Clone)] +pub struct PrivateOverlay { + inner: Arc, +} + +impl PrivateOverlay { + pub fn builder(overlay_id: OverlayId) -> PrivateOverlayBuilder { + PrivateOverlayBuilder { + overlay_id, + entries: Default::default(), + } + } + + #[inline] + pub fn overlay_id(&self) -> &OverlayId { + &self.inner.overlay_id + } + + pub async fn query( + &self, + network: &Network, + peer_id: &PeerId, + mut request: Request, + ) -> Result { + self.prepend_prefix_to_body(&mut request.body); + network.query(peer_id, request).await + } + + pub async fn send( + &self, + network: &Network, + peer_id: &PeerId, + mut request: Request, + ) -> Result<()> { + self.prepend_prefix_to_body(&mut request.body); + network.send(peer_id, request).await + } + + pub fn write_entries(&self) -> PrivateOverlayEntriesWriteGuard<'_> { + PrivateOverlayEntriesWriteGuard { + entries: self.inner.entries.write(), + } + } + + pub fn read_entries(&self) -> PrivateOverlayEntriesReadGuard<'_> { + PrivateOverlayEntriesReadGuard { + entries: self.inner.entries.read(), + } + } + + pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop> { + if self.inner.entries.read().contains(&req.metadata.peer_id) { + BoxFutureOrNoop::future(self.inner.service.on_query(req)) + } else { + BoxFutureOrNoop::Noop + } + } + + pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> { + if self.inner.entries.read().contains(&req.metadata.peer_id) { + BoxFutureOrNoop::future(self.inner.service.on_message(req)) + } else { + BoxFutureOrNoop::Noop + } + } + + fn prepend_prefix_to_body(&self, body: &mut Bytes) { + // TODO: reduce allocations + let mut res = BytesMut::with_capacity(self.inner.request_prefix.len() + body.len()); + res.extend_from_slice(&self.inner.request_prefix); + res.extend_from_slice(body); + *body = res.freeze(); + } +} + +impl std::fmt::Debug for PrivateOverlay { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PrivateOverlay") + .field("overlay_id", &self.inner.overlay_id) + .finish() + } +} + +struct Inner { + overlay_id: OverlayId, + entries: RwLock, + service: BoxService, + request_prefix: Box<[u8]>, +} + +pub struct PrivateOverlayEntries { + peer_id_to_index: FastHashMap, + data: Vec, +} + +impl PrivateOverlayEntries { + /// Returns one random peer, or `None` if set is empty. + pub fn choose(&self, rng: &mut R) -> Option<&PeerId> + where + R: Rng + ?Sized, + { + self.data.choose(rng) + } + + /// Chooses `n` entries from the set, without repetition, + /// and in random order. + pub fn choose_multiple( + &self, + rng: &mut R, + n: usize, + ) -> rand::seq::SliceChooseIter<'_, [PeerId], PeerId> + where + R: Rng + ?Sized, + { + self.data.choose_multiple(rng, n) + } + + /// Returns true if the set contains the specified peer id. + pub fn contains(&self, peer_id: &PeerId) -> bool { + self.peer_id_to_index.contains_key(peer_id) + } + + /// Adds a peer id to the set. + /// + /// Returns whether the value was newly inserted. + pub fn insert(&mut self, peer_id: &PeerId) -> bool { + match self.peer_id_to_index.entry(*peer_id) { + // No entry for the peer_id, insert a new one + hash_map::Entry::Vacant(entry) => { + entry.insert(self.data.len()); + self.data.push(*peer_id); + true + } + // Entry for the peer_id exists, do nothing + hash_map::Entry::Occupied(_) => false, + } + } + + /// Removes a value from the set. + /// + /// Returns whether the value was present in the set. + pub fn remove(&mut self, peer_id: &PeerId) -> bool { + let Some(index) = self.peer_id_to_index.remove(peer_id) else { + return false; + }; + + // Remove the entry from the data vector + self.data.swap_remove(index); + + // Update the swapped entry's index + let entry = self + .peer_id_to_index + .get_mut(&self.data[index]) + .expect("inconsistent state"); + *entry = index; + + true + } +} + +pub struct PrivateOverlayEntriesWriteGuard<'a> { + entries: RwLockWriteGuard<'a, PrivateOverlayEntries>, +} + +impl std::ops::Deref for PrivateOverlayEntriesWriteGuard<'_> { + type Target = PrivateOverlayEntries; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.entries + } +} + +impl std::ops::DerefMut for PrivateOverlayEntriesWriteGuard<'_> { + #[inline] + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.entries + } +} + +pub struct PrivateOverlayEntriesReadGuard<'a> { + entries: RwLockReadGuard<'a, PrivateOverlayEntries>, +} + +impl std::ops::Deref for PrivateOverlayEntriesReadGuard<'_> { + type Target = PrivateOverlayEntries; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.entries + } +} diff --git a/network/src/overlay/public_overlay.rs b/network/src/overlay/public_overlay.rs new file mode 100644 index 000000000..0ead8edd7 --- /dev/null +++ b/network/src/overlay/public_overlay.rs @@ -0,0 +1,515 @@ +use std::borrow::Borrow; +use std::collections::hash_map; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use anyhow::Result; +use bytes::{Bytes, BytesMut}; +use parking_lot::{RwLock, RwLockReadGuard}; +use rand::seq::SliceRandom; +use rand::Rng; +use tycho_util::futures::BoxFutureOrNoop; +use tycho_util::{FastDashSet, FastHashMap}; + +use crate::network::Network; +use crate::overlay::OverlayId; +use crate::proto::overlay::{rpc, PublicEntry, PublicEntryToSign}; +use crate::types::{BoxService, PeerId, Request, Response, Service, ServiceExt, ServiceRequest}; +use crate::util::NetworkExt; + +pub struct PublicOverlayBuilder { + overlay_id: OverlayId, + min_capacity: usize, + banned_peer_ids: FastDashSet, +} + +impl PublicOverlayBuilder { + /// Minimum capacity for public overlay. + /// Public overlay will use suggested peers from untrusted sources to fill the overlay + /// until it reaches this capacity. + /// + /// Default: 100. + pub fn with_min_capacity(mut self, min_capacity: usize) -> Self { + self.min_capacity = min_capacity; + self + } + + pub fn with_banned_peers(mut self, banned_peers: I) -> Self + where + I: IntoIterator, + I::Item: Borrow, + { + self.banned_peer_ids + .extend(banned_peers.into_iter().map(|id| *id.borrow())); + self + } + + pub fn build(self, service: S) -> PublicOverlay + where + S: Send + Sync + 'static, + S: Service, + { + let request_prefix = tl_proto::serialize(rpc::Prefix { + overlay_id: self.overlay_id.as_bytes(), + }); + + PublicOverlay { + inner: Arc::new(Inner { + overlay_id: self.overlay_id, + min_capacity: self.min_capacity, + entries: RwLock::new(Default::default()), + entry_count: AtomicUsize::new(0), + banned_peer_ids: self.banned_peer_ids, + service: service.boxed(), + request_prefix: request_prefix.into_boxed_slice(), + }), + } + } +} + +#[derive(Clone)] +#[repr(transparent)] +pub struct PublicOverlay { + inner: Arc, +} + +impl PublicOverlay { + pub fn builder(overlay_id: OverlayId) -> PublicOverlayBuilder { + PublicOverlayBuilder { + overlay_id, + min_capacity: 100, + banned_peer_ids: Default::default(), + } + } + + #[inline] + pub fn overlay_id(&self) -> &OverlayId { + &self.inner.overlay_id + } + + pub async fn query( + &self, + network: &Network, + peer_id: &PeerId, + mut request: Request, + ) -> Result { + self.prepend_prefix_to_body(&mut request.body); + network.query(peer_id, request).await + } + + pub async fn send( + &self, + network: &Network, + peer_id: &PeerId, + mut request: Request, + ) -> Result<()> { + self.prepend_prefix_to_body(&mut request.body); + network.send(peer_id, request).await + } + + /// Bans the given peer from the overlay. + /// + /// Returns `true` if the peer was not already banned. + pub fn ban_peer(&self, peer_id: PeerId) -> bool { + self.inner.banned_peer_ids.insert(peer_id) + } + + /// Unbans the given peer from the overlay. + /// + /// Returns `true` if the peer was banned. + pub fn unban_peer(&self, peer_id: &PeerId) -> bool { + self.inner.banned_peer_ids.remove(peer_id).is_some() + } + + pub fn read_entries(&self) -> PublicOverlayEntriesReadGuard<'_> { + PublicOverlayEntriesReadGuard { + entries: self.inner.entries.read(), + } + } + + pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop> { + if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) { + // TODO: add peer from metadata to the overlay + BoxFutureOrNoop::future(self.inner.service.on_query(req)) + } else { + BoxFutureOrNoop::Noop + } + } + + pub(crate) fn handle_message(&self, req: ServiceRequest) -> BoxFutureOrNoop<()> { + if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) { + // TODO: add peer from metadata to the overlay + BoxFutureOrNoop::future(self.inner.service.on_message(req)) + } else { + BoxFutureOrNoop::Noop + } + } + + /// Adds the given entries to the overlay. + /// + /// NOTE: Will deadlock if called while `PublicOverlayEntriesReadGuard` is held. + pub(crate) fn add_untrusted_entries(&self, entries: &[Arc]) { + if entries.is_empty() { + return; + } + + let this = self.inner.as_ref(); + + // Check if we can add more entries to the overlay and optimistically + // increase the entry count. (if no other thread has already done so). + let to_add = entries.len(); + let mut entry_count = this.entry_count.load(Ordering::Acquire); + let to_add = loop { + let to_add = match this.min_capacity.checked_sub(entry_count) { + Some(capacity) if capacity > 0 => std::cmp::min(to_add, capacity), + _ => return, + }; + + let res = this.entry_count.compare_exchange_weak( + entry_count, + entry_count + to_add, + Ordering::Release, + Ordering::Acquire, + ); + match res { + Ok(_) => break to_add, + Err(n) => entry_count = n, + } + }; + + // Prepare validation state + let mut is_valid = vec![false; entries.len()]; + let mut valid_count = 0; + + // First pass: verify all entries + for (entry, is_valid) in std::iter::zip(entries, is_valid.iter_mut()) { + let Some(pubkey) = entry.peer_id.as_public_key() else { + continue; + }; + + if !pubkey.verify( + PublicEntryToSign { + overlay_id: this.overlay_id.as_bytes(), + peer_id: &entry.peer_id, + created_at: entry.created_at, + }, + &entry.signature, + ) { + continue; + } + + *is_valid = true; + valid_count += 1; + + if valid_count >= to_add { + break; + } + } + + // Second pass: insert all valid entries (if any) + // + // NOTE: two passes are necessary because public key parsing and + // signature verification can be expensive and we want to avoid + // holding the lock for too long. + if valid_count > 0 { + let mut stored = this.entries.write(); + for (entry, is_valid) in std::iter::zip(entries, is_valid) { + if is_valid { + stored.insert(entry); + } + } + } + + // Rollback entries that were not valid and not inserted + if valid_count < to_add { + this.entry_count + .fetch_sub(to_add - valid_count, Ordering::Release); + } + } + + fn prepend_prefix_to_body(&self, body: &mut Bytes) { + let this = self.inner.as_ref(); + + // TODO: reduce allocations + let mut res = BytesMut::with_capacity(this.request_prefix.len() + body.len()); + res.extend_from_slice(&this.request_prefix); + res.extend_from_slice(body); + *body = res.freeze(); + } +} + +impl std::fmt::Debug for PublicOverlay { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PublicOverlay") + .field("overlay_id", &self.inner.overlay_id) + .finish() + } +} + +struct Inner { + overlay_id: OverlayId, + min_capacity: usize, + entries: RwLock, + entry_count: AtomicUsize, + banned_peer_ids: FastDashSet, + service: BoxService, + request_prefix: Box<[u8]>, +} + +#[derive(Default)] +pub struct PublicOverlayEntries { + peer_id_to_index: FastHashMap, + data: Vec>, +} + +impl PublicOverlayEntries { + /// Returns a reference to one random element of the slice, + /// or `None` if the slice is empty. + pub fn choose(&self, rng: &mut R) -> Option<&Arc> + where + R: Rng + ?Sized, + { + self.data.choose(rng) + } + + /// Chooses `n` entries from the set, without repetition, + /// and in random order. + pub fn choose_multiple( + &self, + rng: &mut R, + n: usize, + ) -> rand::seq::SliceChooseIter<'_, [Arc], Arc> + where + R: Rng + ?Sized, + { + self.data.choose_multiple(rng, n) + } + + fn insert(&mut self, item: &PublicEntry) -> bool { + match self.peer_id_to_index.entry(item.peer_id) { + // No entry for the peer_id, insert a new one + hash_map::Entry::Vacant(entry) => { + entry.insert(self.data.len()); + self.data.push(Arc::new(item.clone())); + true + } + // Entry for the peer_id exists, update it if the new item is newer + hash_map::Entry::Occupied(entry) => { + let index = *entry.get(); + let existing = &mut self.data[index]; + if existing.created_at >= item.created_at { + return false; + } + + // Try to reuse the existing Arc if possible + match Arc::get_mut(existing) { + Some(existing) => existing.clone_from(item), + None => self.data[index] = Arc::new(item.clone()), + } + true + } + } + } + + fn remove(&mut self, peer_id: &PeerId) -> bool { + let Some(index) = self.peer_id_to_index.remove(peer_id) else { + return false; + }; + + // Remove the entry from the data vector + self.data.swap_remove(index); + + // Update the swapped entry's index + let entry = self + .peer_id_to_index + .get_mut(&self.data[index].peer_id) + .expect("inconsistent state"); + *entry = index; + + true + } +} + +pub struct PublicOverlayEntriesReadGuard<'a> { + entries: RwLockReadGuard<'a, PublicOverlayEntries>, +} + +impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> { + type Target = PublicOverlayEntries; + + #[inline] + fn deref(&self) -> &Self::Target { + &self.entries + } +} + +#[cfg(test)] +mod tests { + use everscale_crypto::ed25519; + use tycho_util::time::now_sec; + + use super::*; + + fn generate_public_entry(overlay: &PublicOverlay, now: u32) -> Arc { + let keypair = ed25519::KeyPair::generate(&mut rand::thread_rng()); + let peer_id: PeerId = keypair.public_key.into(); + let signature = keypair.sign(crate::proto::overlay::PublicEntryToSign { + overlay_id: overlay.overlay_id().as_bytes(), + peer_id: &peer_id, + created_at: now, + }); + Arc::new(PublicEntry { + peer_id, + created_at: now, + signature: Box::new(signature), + }) + } + + fn generate_invalid_public_entry(now: u32) -> Arc { + let keypair = ed25519::KeyPair::generate(&mut rand::thread_rng()); + let peer_id: PeerId = keypair.public_key.into(); + Arc::new(PublicEntry { + peer_id, + created_at: now, + signature: Box::new([0; 64]), + }) + } + + fn generate_public_entries( + overlay: &PublicOverlay, + now: u32, + n: usize, + ) -> Vec> { + (0..n) + .map(|_| generate_public_entry(overlay, now)) + .collect() + } + + fn count_entries(overlay: &PublicOverlay) -> usize { + let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire); + let guard = overlay.read_entries(); + assert_eq!( + guard.entries.data.len(), + guard.entries.peer_id_to_index.len(), + ); + assert_eq!(guard.entries.data.len(), tracked_count); + tracked_count + } + + fn make_overlay_with_min_capacity(min_capacity: usize) -> PublicOverlay { + PublicOverlay::builder(rand::random()) + .with_min_capacity(min_capacity) + .build(crate::service_query_fn(|_| { + futures_util::future::ready(None) + })) + } + + #[test] + fn min_capacity_works_with_single_thread() { + let now = now_sec(); + + // Add with small portions + { + let overlay = make_overlay_with_min_capacity(10); + let entries = generate_public_entries(&overlay, now, 10); + + overlay.add_untrusted_entries(&entries[..5]); + assert_eq!(count_entries(&overlay), 5); + + overlay.add_untrusted_entries(&entries[5..]); + assert_eq!(count_entries(&overlay), 10); + } + + // Add exact + { + let overlay = make_overlay_with_min_capacity(10); + let entries = generate_public_entries(&overlay, now, 10); + overlay.add_untrusted_entries(&entries); + assert_eq!(count_entries(&overlay), 10); + } + + // Add once but too much + { + let overlay = make_overlay_with_min_capacity(10); + let entries = generate_public_entries(&overlay, now, 20); + overlay.add_untrusted_entries(&entries); + assert_eq!(count_entries(&overlay), 10); + } + + // Add once but zero capacity + { + let overlay = make_overlay_with_min_capacity(0); + let entries = generate_public_entries(&overlay, now, 10); + overlay.add_untrusted_entries(&entries); + assert_eq!(count_entries(&overlay), 0); + } + + // Add all invalid entries + { + let overlay = make_overlay_with_min_capacity(10); + let entries = (0..10) + .map(|_| generate_invalid_public_entry(now)) + .collect::>(); + overlay.add_untrusted_entries(&entries); + assert_eq!(count_entries(&overlay), 0); + } + + // Add mixed invalid entries + { + let overlay = make_overlay_with_min_capacity(10); + let entries = [ + generate_invalid_public_entry(now), + generate_public_entry(&overlay, now), + generate_invalid_public_entry(now), + generate_public_entry(&overlay, now), + generate_invalid_public_entry(now), + generate_public_entry(&overlay, now), + generate_invalid_public_entry(now), + generate_public_entry(&overlay, now), + generate_invalid_public_entry(now), + generate_public_entry(&overlay, now), + ]; + overlay.add_untrusted_entries(&entries); + assert_eq!(count_entries(&overlay), 5); + } + + // Add mixed invalid entries on edge + { + let overlay = make_overlay_with_min_capacity(3); + let entries = [ + generate_invalid_public_entry(now), + generate_invalid_public_entry(now), + generate_invalid_public_entry(now), + generate_invalid_public_entry(now), + generate_invalid_public_entry(now), + generate_public_entry(&overlay, now), + generate_public_entry(&overlay, now), + generate_public_entry(&overlay, now), + generate_public_entry(&overlay, now), + generate_public_entry(&overlay, now), + ]; + overlay.add_untrusted_entries(&entries); + assert_eq!(count_entries(&overlay), 3); + } + } + + #[test] + fn min_capacity_works_with_multi_thread() { + let now = now_sec(); + + let overlay = make_overlay_with_min_capacity(201); + let entries = generate_public_entries(&overlay, now, 7 * 3 * 10); + + std::thread::scope(|s| { + for entries in entries.chunks_exact(7 * 3) { + s.spawn(|| { + for entries in entries.chunks_exact(7) { + overlay.add_untrusted_entries(entries); + } + }); + } + }); + + assert_eq!(count_entries(&overlay), 201); + } +} diff --git a/network/src/proto.tl b/network/src/proto.tl index 21be43d69..60ad4f88e 100644 --- a/network/src/proto.tl +++ b/network/src/proto.tl @@ -151,3 +151,62 @@ dht.findValue key:int256 k:int = dht.ValueResponse; * Requests a signed node info */ dht.getNodeInfo = dht.NodeInfoResponse; + +// Overlay +//////////////////////////////////////////////////////////////////////////////// + +---types--- + +/** +* A data to sign for `overlay.publicEntry`. +* +* @param overlay_id public overlay id +* @param peer_id node public key +* @param created_at unix timestamp when the info was generated +*/ +overlay.publicEntryToSign + overlay_id:int256 + peer_id:transport.PeerId + created_at:int + = overlay.PublicEntryToSign; + +/** +* A public overlay entry. +* +* @param peer_id node public key +* @param created_at unix timestamp when the info was generated +* @param signature a signature of the `overlay.PublicEntryToSign` struct (as boxed) +*/ +overlay.publicEntry + peer_id:transport.PeerId + created_at:int + signature:bytes + = overlay.PublicEntry; + +/** +* @param entries list of public overlay entries. +*/ +overlay.publicEntries entries:(vector overlay.publicEntry) = overlay.PublicEntriesResponse; +overlay.overlayNotFound = overlay.PublicEntriesResponse; + +// TODO: add broadcast + +---functions--- + +/** +* Exchanges random entries of the specified public overlay. +* +* @param overlay_id public overlay id +* @param entries list of public overlay entries +*/ +overlay.exchangeRandomPublicEntries + overlay_id:int256 + entries:(vector overlay.publicEntry) + = overlay.PublicEntries; + +/** +* Overlay query/message prefix with an overlay id. +* +* @param overlay_id overlay id +*/ +overlay.prefix overlay_id:int256 = True; diff --git a/network/src/proto/dht.rs b/network/src/proto/dht.rs index 53c9dfbff..312379c90 100644 --- a/network/src/proto/dht.rs +++ b/network/src/proto/dht.rs @@ -310,6 +310,7 @@ impl<'a> TlRead<'a> for ValueRef<'a> { #[tl(boxed, id = "dht.nodesFound", scheme = "proto.tl")] pub struct NodeResponse { /// List of nodes closest to the key. + #[tl(with = "tl::VecWithMaxLen::<20>")] pub nodes: Vec>, } @@ -322,7 +323,7 @@ pub enum ValueResponse { Found(Box), /// List of nodes closest to the key. #[tl(id = "dht.valueNotFound")] - NotFound(Vec>), + NotFound(#[tl(with = "tl::VecWithMaxLen::<20>")] Vec>), } /// A response for the [`rpc::FindValue`] query. diff --git a/network/src/proto/overlay.rs b/network/src/proto/overlay.rs new file mode 100644 index 000000000..2ba6a52ab --- /dev/null +++ b/network/src/proto/overlay.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use tl_proto::{TlRead, TlWrite}; + +use crate::types::PeerId; +use crate::util::tl; + +/// A data to sign for [`PublicEntry`]. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, id = "overlay.publicEntryToSign", scheme = "proto.tl")] +pub struct PublicEntryToSign<'tl> { + /// Public overlay id. + pub overlay_id: &'tl [u8; 32], + /// Node public key. + pub peer_id: &'tl PeerId, + /// Unix timestamp when the info was generated. + pub created_at: u32, +} + +/// A public overlay entry. +#[derive(Debug, Clone, Hash, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, id = "overlay.publicEntry", scheme = "proto.tl")] +pub struct PublicEntry { + /// Node public key. + pub peer_id: PeerId, + /// Unix timestamp when the info was generated. + pub created_at: u32, + /// A signature of the [`PublicEntryToSign`] (as boxed). + #[tl(signature, with = "tl::signature_owned")] + pub signature: Box<[u8; 64]>, +} + +/// A list of public overlay entries. +#[derive(Debug, Clone, Hash, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, scheme = "proto.tl")] +pub enum PublicEntriesResponse { + #[tl(id = "overlay.publicEntries")] + PublicEntries(#[tl(with = "tl::VecWithMaxLen::<20>")] Vec>), + #[tl(id = "overlay.overlayNotFound")] + OverlayNotFound, +} + +/// Overlay RPC models. +pub mod rpc { + use super::*; + + /// Exchanges random entries of the specified public overlay. + #[derive(Debug, Clone, TlRead, TlWrite)] + #[tl(boxed, id = "overlay.exchangeRandomPublicEntries", scheme = "proto.tl")] + pub struct ExchangeRandomPublicEntries { + /// Public overlay id. + pub overlay_id: [u8; 32], + /// A list of public overlay entries. + #[tl(with = "tl::VecWithMaxLen::<20>")] + pub entries: Vec>, + } + + /// Overlay query/message prefix with an overlay id. + #[derive(Debug, Clone, TlRead, TlWrite)] + #[tl(boxed, id = "overlay.prefix", scheme = "proto.tl")] + pub struct Prefix<'tl> { + pub overlay_id: &'tl [u8; 32], + } +} diff --git a/network/src/types/peer_id.rs b/network/src/types/peer_id.rs index 031e45289..cfad5d472 100644 --- a/network/src/types/peer_id.rs +++ b/network/src/types/peer_id.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use everscale_crypto::ed25519; +use rand::Rng; use tl_proto::{TlRead, TlWrite}; #[derive(Clone, Copy, TlRead, TlWrite, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -27,10 +28,6 @@ impl PeerId { pub fn as_public_key(&self) -> Option { ed25519::PublicKey::from_bytes(self.0) } - - pub fn random() -> Self { - Self(rand::random()) - } } impl<'a> TlRead<'a> for &'a PeerId { @@ -96,6 +93,13 @@ impl<'de> serde::Deserialize<'de> for PeerId { } } +impl rand::distributions::Distribution for rand::distributions::Standard { + #[inline] + fn sample(&self, rng: &mut R) -> PeerId { + PeerId(rand::distributions::Standard.sample(rng)) + } +} + impl From for PeerId { #[inline] fn from(public_key: ed25519::PublicKey) -> Self { diff --git a/network/src/util/tl.rs b/network/src/util/tl.rs index ee01d9ea4..744af6cf9 100644 --- a/network/src/util/tl.rs +++ b/network/src/util/tl.rs @@ -41,3 +41,34 @@ pub mod signature_owned { }) } } + +pub struct VecWithMaxLen; + +impl VecWithMaxLen { + #[inline] + pub fn size_hint(value: &[T]) -> usize { + value.max_size_hint() + } + + #[inline] + pub fn write(value: &[T], packet: &mut P) { + value.write_to(packet); + } + + pub fn read<'tl, T>(packet: &'tl [u8], offset: &mut usize) -> TlResult> + where + T: tl_proto::TlRead<'tl>, + { + let len = u32::read_from(packet, offset)? as usize; + if len > N { + return Err(TlError::InvalidData); + } + + let mut items = Vec::with_capacity(len); + for _ in 0..len { + items.push(T::read_from(packet, offset)?); + } + + Ok(items) + } +}