From 5f5903495ee8348791f1e16c9d7cd5626e81624b Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Mon, 25 Mar 2024 20:06:56 +0100 Subject: [PATCH] test(network): add tests for peer resolver --- network/examples/network_node.rs | 2 +- network/src/dht/mod.rs | 14 +--- network/src/dht/peer_resolver.rs | 16 ++++- network/src/network/mod.rs | 15 ++++- network/tests/dht.rs | 43 ++++-------- network/tests/overlay.rs | 111 ++++++++++++++++++------------- 6 files changed, 109 insertions(+), 92 deletions(-) diff --git a/network/examples/network_node.rs b/network/examples/network_node.rs index ee6c6c51c..d668f2fad 100644 --- a/network/examples/network_node.rs +++ b/network/examples/network_node.rs @@ -237,7 +237,7 @@ impl Node { .build(address, router)?; dht_tasks.spawn(&network); - let dht = dht_service.make_client(network.clone()); + let dht = dht_service.make_client(&network); Ok(Self { network, dht }) } diff --git a/network/src/dht/mod.rs b/network/src/dht/mod.rs index 56d2396cf..95d63551a 100644 --- a/network/src/dht/mod.rs +++ b/network/src/dht/mod.rs @@ -310,10 +310,10 @@ impl DhtService { } } - pub fn make_client(&self, network: Network) -> DhtClient { + pub fn make_client(&self, network: &Network) -> DhtClient { DhtClient { inner: self.0.clone(), - network, + network: network.clone(), } } @@ -707,15 +707,7 @@ impl DhtInner { } fn make_local_peer_info(&self, network: &Network, now: u32) -> PeerInfo { - let mut peer_info = PeerInfo { - id: self.local_id, - address_list: vec![network.local_addr().into()].into_boxed_slice(), - created_at: now, - expires_at: now + self.config.max_peer_info_ttl.as_secs() as u32, - signature: Box::new([0; 64]), - }; - *peer_info.signature = network.sign_tl(&peer_info); - peer_info + network.sign_peer_info(now, self.config.max_peer_info_ttl.as_secs() as u32) } fn try_handle_prefix<'a>(&self, req: &'a ServiceRequest) -> Result<(u32, &'a [u8])> { diff --git a/network/src/dht/peer_resolver.rs b/network/src/dht/peer_resolver.rs index d7d927dc5..0f9068628 100644 --- a/network/src/dht/peer_resolver.rs +++ b/network/src/dht/peer_resolver.rs @@ -287,7 +287,17 @@ impl PeerResolverInner { // NOTE: Acquire network ref only during the operation. { let network = self.weak_network.upgrade()?; - let dht_client = self.dht_service.make_client(network.clone()); + if let Some(peer_info) = network.known_peers().get(&data.peer_id) { + tracing::trace!( + peer_id = %data.peer_id, + attempts, + is_stale, + "peer info exists", + ); + return Some((network, peer_info)); + } + + let dht_client = self.dht_service.make_client(&network); let res = { let _permit = self.semaphore.acquire().await.unwrap(); @@ -369,6 +379,10 @@ impl PeerResolverHandle { } } + pub fn peer_id(&self) -> &PeerId { + &self.inner.data.peer_id + } + pub fn load_handle(&self) -> Option { self.inner.data.handle.lock().unwrap().clone() } diff --git a/network/src/network/mod.rs b/network/src/network/mod.rs index eb89b5af5..5393bdc9d 100644 --- a/network/src/network/mod.rs +++ b/network/src/network/mod.rs @@ -9,7 +9,8 @@ use self::config::EndpointConfig; use self::connection_manager::{ConnectionManager, ConnectionManagerRequest}; use self::endpoint::Endpoint; use crate::types::{ - Address, DisconnectReason, PeerEvent, PeerId, Response, Service, ServiceExt, ServiceRequest, + Address, DisconnectReason, PeerEvent, PeerId, PeerInfo, Response, Service, ServiceExt, + ServiceRequest, }; pub use self::config::{NetworkConfig, QuicConfig}; @@ -228,6 +229,18 @@ impl Network { self.0.keypair.sign_raw(data) } + pub fn sign_peer_info(&self, now: u32, ttl: u32) -> PeerInfo { + let mut res = PeerInfo { + id: *self.0.peer_id(), + address_list: vec![self.local_addr().into()].into_boxed_slice(), + created_at: now, + expires_at: now.saturating_add(ttl), + signature: Box::new([0; 64]), + }; + *res.signature = self.sign_tl(&res); + res + } + pub fn downgrade(this: &Self) -> WeakNetwork { WeakNetwork(Arc::downgrade(&this.0)) } diff --git a/network/tests/dht.rs b/network/tests/dht.rs index 01c61fa46..b6485b97c 100644 --- a/network/tests/dht.rs +++ b/network/tests/dht.rs @@ -9,9 +9,7 @@ use std::sync::Arc; use anyhow::Result; use everscale_crypto::ed25519; use tl_proto::{TlRead, TlWrite}; -use tycho_network::{ - proto, Address, DhtClient, DhtService, FindValueError, Network, PeerId, PeerInfo, Router, -}; +use tycho_network::{proto, DhtClient, DhtService, FindValueError, Network, PeerInfo, Router}; use tycho_util::time::now_sec; struct Node { @@ -20,10 +18,11 @@ struct Node { } impl Node { - fn new(key: &ed25519::SecretKey) -> Self { - let keypair = ed25519::KeyPair::from(key); + fn with_random_key() -> Self { + let key = ed25519::SecretKey::generate(&mut rand::thread_rng()); + let local_id = ed25519::PublicKey::from(&key).into(); - let (dht_tasks, dht_service) = DhtService::builder(keypair.public_key.into()).build(); + let (dht_tasks, dht_service) = DhtService::builder(local_id).build(); let router = Router::builder().route(dht_service.clone()).build(); @@ -35,38 +34,22 @@ impl Node { dht_tasks.spawn(&network); - let dht = dht_service.make_client(network.clone()); + let dht = dht_service.make_client(&network); Self { network, dht } } - - fn make_peer_info(key: &ed25519::SecretKey, address: Address) -> PeerInfo { - let keypair = ed25519::KeyPair::from(key); - let peer_id = PeerId::from(keypair.public_key); - - let now = now_sec(); - let mut node_info = PeerInfo { - id: peer_id, - address_list: vec![address].into_boxed_slice(), - created_at: now, - expires_at: u32::MAX, - signature: Box::new([0; 64]), - }; - *node_info.signature = keypair.sign(&node_info); - node_info - } } fn make_network(node_count: usize) -> (Vec, Vec>) { - let keys = (0..node_count) - .map(|_| ed25519::SecretKey::generate(&mut rand::thread_rng())) + let nodes = (0..node_count) + .map(|_| Node::with_random_key()) .collect::>(); - let nodes = keys.iter().map(Node::new).collect::>(); - - let bootstrap_info = std::iter::zip(&keys, &nodes) - .map(|(key, node)| Arc::new(Node::make_peer_info(key, node.network.local_addr().into()))) + let bootstrap_info = nodes + .iter() + .map(|node| Arc::new(node.network.sign_peer_info(0, u32::MAX))) .collect::>(); + for node in &nodes { for info in &bootstrap_info { node.dht.add_peer(info.clone()).unwrap(); @@ -149,7 +132,7 @@ async fn connect_new_node_to_bootstrap() -> Result<()> { let (bootstrap_nodes, global_config) = make_network(5); - let node = Node::new(&ed25519::SecretKey::generate(&mut rand::thread_rng())); + let node = Node::with_random_key(); for peer_info in &global_config { node.dht.add_peer(peer_info.clone())?; } diff --git a/network/tests/overlay.rs b/network/tests/overlay.rs index f604f5296..3a887ed5e 100644 --- a/network/tests/overlay.rs +++ b/network/tests/overlay.rs @@ -3,31 +3,49 @@ //! RUST_LOG=info,tycho_network=trace //! ``` -use anyhow::Result; -use everscale_crypto::ed25519; use std::net::Ipv4Addr; use std::sync::Arc; +use std::time::Duration; + +use anyhow::Result; +use everscale_crypto::ed25519; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; use tl_proto::{TlRead, TlWrite}; use tycho_network::{ - Address, KnownPeerHandle, Network, OverlayId, OverlayService, PeerId, PeerInfo, PrivateOverlay, + DhtClient, DhtConfig, DhtService, Network, OverlayId, OverlayService, PeerId, PrivateOverlay, Request, Response, Router, Service, ServiceRequest, }; -use tycho_util::time::now_sec; struct Node { network: Network, private_overlay: PrivateOverlay, - known_peer_handles: Vec, + dht_client: DhtClient, } impl Node { - fn new(key: &ed25519::SecretKey) -> Self { - let keypair = ed25519::KeyPair::from(key); - let local_id = PeerId::from(keypair.public_key); - - let (overlay_tasks, overlay_service) = OverlayService::builder(local_id).build(); - - let router = Router::builder().route(overlay_service.clone()).build(); + fn with_random_key() -> Self { + let key = ed25519::SecretKey::generate(&mut rand::thread_rng()); + let local_id = ed25519::PublicKey::from(&key).into(); + + let (dht_tasks, dht_service) = DhtService::builder(local_id) + .with_config(DhtConfig { + local_info_announce_period: Duration::from_secs(1), + max_local_info_announce_period_jitter: Duration::from_secs(1), + routing_table_refresh_period: Duration::from_secs(1), + max_routing_table_refresh_period_jitter: Duration::from_secs(1), + ..Default::default() + }) + .build(); + + let (overlay_tasks, overlay_service) = OverlayService::builder(local_id) + .with_dht_service(dht_service.clone()) + .build(); + + let router = Router::builder() + .route(dht_service.clone()) + .route(overlay_service.clone()) + .build(); let network = Network::builder() .with_private_key(key.to_bytes()) @@ -35,34 +53,24 @@ impl Node { .build((Ipv4Addr::LOCALHOST, 0), router) .unwrap(); + dht_tasks.spawn(&network); overlay_tasks.spawn(&network); - let private_overlay = PrivateOverlay::builder(PRIVATE_OVERLAY_ID).build(PingPongService); + let dht_client = dht_service.make_client(&network); + let peer_resolver = dht_service.make_peer_resolver().build(&network); + + let private_overlay = PrivateOverlay::builder(PRIVATE_OVERLAY_ID) + .with_peer_resolver(peer_resolver) + .build(PingPongService); overlay_service.add_private_overlay(&private_overlay); Self { network, + dht_client, private_overlay, - known_peer_handles: Vec::new(), } } - fn make_peer_info(key: &ed25519::SecretKey, address: Address) -> PeerInfo { - let keypair = ed25519::KeyPair::from(key); - let peer_id = PeerId::from(keypair.public_key); - - let now = now_sec(); - let mut node_info = PeerInfo { - id: peer_id, - address_list: vec![address].into_boxed_slice(), - created_at: now, - expires_at: u32::MAX, - signature: Box::new([0; 64]), - }; - *node_info.signature = keypair.sign(&node_info); - node_info - } - async fn private_overlay_query(&self, peer_id: &PeerId, req: Q) -> Result where Q: tl_proto::TlWrite, @@ -77,32 +85,24 @@ impl Node { } fn make_network(node_count: usize) -> Vec { - let keys = (0..node_count) - .map(|_| ed25519::SecretKey::generate(&mut rand::thread_rng())) + let nodes = (0..node_count) + .map(|_| Node::with_random_key()) .collect::>(); - let mut nodes = keys.iter().map(Node::new).collect::>(); + let common_peer_info = nodes.first().unwrap().network.sign_peer_info(0, u32::MAX); - let bootstrap_info = std::iter::zip(&keys, &nodes) - .map(|(key, node)| Arc::new(Node::make_peer_info(key, node.network.local_addr().into()))) - .collect::>(); + for node in &nodes { + node.dht_client + .add_peer(Arc::new(common_peer_info.clone())) + .unwrap(); - for node in &mut nodes { let mut private_overlay_entries = node.private_overlay.write_entries(); - for info in &bootstrap_info { - if info.id == node.network.peer_id() { + for peer_id in nodes.iter().map(|node| node.network.peer_id()) { + if peer_id == node.network.peer_id() { continue; } - - node.known_peer_handles.push( - node.network - .known_peers() - .insert(info.clone(), false) - .unwrap(), - ); - - private_overlay_entries.insert(&info.id); + private_overlay_entries.insert(peer_id); } } @@ -116,6 +116,21 @@ async fn private_overlays_accessible() -> Result<()> { let nodes = make_network(5); + for node in &nodes { + let resolved = FuturesUnordered::new(); + for entry in node.private_overlay.read_entries().iter() { + let handle = entry.resolver_handle.clone(); + resolved.push(async move { handle.wait_resolved().await }); + } + + // Ensure all entries are resolved. + resolved.collect::>().await; + tracing::info!( + peer_id = %node.network.peer_id(), + "all entries resolved", + ); + } + for i in 0..nodes.len() { for j in 0..nodes.len() { if i == j {