Skip to content

Commit

Permalink
test(network): add tests for peer resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Mar 25, 2024
1 parent c645ec3 commit 5f59034
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 92 deletions.
2 changes: 1 addition & 1 deletion network/examples/network_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ impl Node {
.build(address, router)?;

dht_tasks.spawn(&network);
let dht = dht_service.make_client(network.clone());
let dht = dht_service.make_client(&network);

Ok(Self { network, dht })
}
Expand Down
14 changes: 3 additions & 11 deletions network/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,10 @@ impl DhtService {
}
}

pub fn make_client(&self, network: Network) -> DhtClient {
pub fn make_client(&self, network: &Network) -> DhtClient {
DhtClient {
inner: self.0.clone(),
network,
network: network.clone(),
}
}

Expand Down Expand Up @@ -707,15 +707,7 @@ impl DhtInner {
}

fn make_local_peer_info(&self, network: &Network, now: u32) -> PeerInfo {
let mut peer_info = PeerInfo {
id: self.local_id,
address_list: vec![network.local_addr().into()].into_boxed_slice(),
created_at: now,
expires_at: now + self.config.max_peer_info_ttl.as_secs() as u32,
signature: Box::new([0; 64]),
};
*peer_info.signature = network.sign_tl(&peer_info);
peer_info
network.sign_peer_info(now, self.config.max_peer_info_ttl.as_secs() as u32)
}

fn try_handle_prefix<'a>(&self, req: &'a ServiceRequest) -> Result<(u32, &'a [u8])> {
Expand Down
16 changes: 15 additions & 1 deletion network/src/dht/peer_resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,17 @@ impl PeerResolverInner {
// NOTE: Acquire network ref only during the operation.
{
let network = self.weak_network.upgrade()?;
let dht_client = self.dht_service.make_client(network.clone());
if let Some(peer_info) = network.known_peers().get(&data.peer_id) {
tracing::trace!(
peer_id = %data.peer_id,
attempts,
is_stale,
"peer info exists",
);
return Some((network, peer_info));
}

let dht_client = self.dht_service.make_client(&network);

let res = {
let _permit = self.semaphore.acquire().await.unwrap();
Expand Down Expand Up @@ -369,6 +379,10 @@ impl PeerResolverHandle {
}
}

pub fn peer_id(&self) -> &PeerId {
&self.inner.data.peer_id
}

pub fn load_handle(&self) -> Option<KnownPeerHandle> {
self.inner.data.handle.lock().unwrap().clone()
}
Expand Down
15 changes: 14 additions & 1 deletion network/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use self::config::EndpointConfig;
use self::connection_manager::{ConnectionManager, ConnectionManagerRequest};
use self::endpoint::Endpoint;
use crate::types::{
Address, DisconnectReason, PeerEvent, PeerId, Response, Service, ServiceExt, ServiceRequest,
Address, DisconnectReason, PeerEvent, PeerId, PeerInfo, Response, Service, ServiceExt,
ServiceRequest,
};

pub use self::config::{NetworkConfig, QuicConfig};
Expand Down Expand Up @@ -228,6 +229,18 @@ impl Network {
self.0.keypair.sign_raw(data)
}

pub fn sign_peer_info(&self, now: u32, ttl: u32) -> PeerInfo {
let mut res = PeerInfo {
id: *self.0.peer_id(),
address_list: vec![self.local_addr().into()].into_boxed_slice(),
created_at: now,
expires_at: now.saturating_add(ttl),
signature: Box::new([0; 64]),
};
*res.signature = self.sign_tl(&res);
res
}

pub fn downgrade(this: &Self) -> WeakNetwork {
WeakNetwork(Arc::downgrade(&this.0))
}
Expand Down
43 changes: 13 additions & 30 deletions network/tests/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use std::sync::Arc;
use anyhow::Result;
use everscale_crypto::ed25519;
use tl_proto::{TlRead, TlWrite};
use tycho_network::{
proto, Address, DhtClient, DhtService, FindValueError, Network, PeerId, PeerInfo, Router,
};
use tycho_network::{proto, DhtClient, DhtService, FindValueError, Network, PeerInfo, Router};
use tycho_util::time::now_sec;

struct Node {
Expand All @@ -20,10 +18,11 @@ struct Node {
}

impl Node {
fn new(key: &ed25519::SecretKey) -> Self {
let keypair = ed25519::KeyPair::from(key);
fn with_random_key() -> Self {
let key = ed25519::SecretKey::generate(&mut rand::thread_rng());
let local_id = ed25519::PublicKey::from(&key).into();

let (dht_tasks, dht_service) = DhtService::builder(keypair.public_key.into()).build();
let (dht_tasks, dht_service) = DhtService::builder(local_id).build();

let router = Router::builder().route(dht_service.clone()).build();

Expand All @@ -35,38 +34,22 @@ impl Node {

dht_tasks.spawn(&network);

let dht = dht_service.make_client(network.clone());
let dht = dht_service.make_client(&network);

Self { network, dht }
}

fn make_peer_info(key: &ed25519::SecretKey, address: Address) -> PeerInfo {
let keypair = ed25519::KeyPair::from(key);
let peer_id = PeerId::from(keypair.public_key);

let now = now_sec();
let mut node_info = PeerInfo {
id: peer_id,
address_list: vec![address].into_boxed_slice(),
created_at: now,
expires_at: u32::MAX,
signature: Box::new([0; 64]),
};
*node_info.signature = keypair.sign(&node_info);
node_info
}
}

fn make_network(node_count: usize) -> (Vec<Node>, Vec<Arc<PeerInfo>>) {
let keys = (0..node_count)
.map(|_| ed25519::SecretKey::generate(&mut rand::thread_rng()))
let nodes = (0..node_count)
.map(|_| Node::with_random_key())
.collect::<Vec<_>>();

let nodes = keys.iter().map(Node::new).collect::<Vec<_>>();

let bootstrap_info = std::iter::zip(&keys, &nodes)
.map(|(key, node)| Arc::new(Node::make_peer_info(key, node.network.local_addr().into())))
let bootstrap_info = nodes
.iter()
.map(|node| Arc::new(node.network.sign_peer_info(0, u32::MAX)))
.collect::<Vec<_>>();

for node in &nodes {
for info in &bootstrap_info {
node.dht.add_peer(info.clone()).unwrap();
Expand Down Expand Up @@ -149,7 +132,7 @@ async fn connect_new_node_to_bootstrap() -> Result<()> {

let (bootstrap_nodes, global_config) = make_network(5);

let node = Node::new(&ed25519::SecretKey::generate(&mut rand::thread_rng()));
let node = Node::with_random_key();
for peer_info in &global_config {
node.dht.add_peer(peer_info.clone())?;
}
Expand Down
111 changes: 63 additions & 48 deletions network/tests/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,66 +3,74 @@
//! RUST_LOG=info,tycho_network=trace
//! ```
use anyhow::Result;
use everscale_crypto::ed25519;
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use everscale_crypto::ed25519;
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use tl_proto::{TlRead, TlWrite};
use tycho_network::{
Address, KnownPeerHandle, Network, OverlayId, OverlayService, PeerId, PeerInfo, PrivateOverlay,
DhtClient, DhtConfig, DhtService, Network, OverlayId, OverlayService, PeerId, PrivateOverlay,
Request, Response, Router, Service, ServiceRequest,
};
use tycho_util::time::now_sec;

struct Node {
network: Network,
private_overlay: PrivateOverlay,
known_peer_handles: Vec<KnownPeerHandle>,
dht_client: DhtClient,
}

impl Node {
fn new(key: &ed25519::SecretKey) -> Self {
let keypair = ed25519::KeyPair::from(key);
let local_id = PeerId::from(keypair.public_key);

let (overlay_tasks, overlay_service) = OverlayService::builder(local_id).build();

let router = Router::builder().route(overlay_service.clone()).build();
fn with_random_key() -> Self {
let key = ed25519::SecretKey::generate(&mut rand::thread_rng());
let local_id = ed25519::PublicKey::from(&key).into();

let (dht_tasks, dht_service) = DhtService::builder(local_id)
.with_config(DhtConfig {
local_info_announce_period: Duration::from_secs(1),
max_local_info_announce_period_jitter: Duration::from_secs(1),
routing_table_refresh_period: Duration::from_secs(1),
max_routing_table_refresh_period_jitter: Duration::from_secs(1),
..Default::default()
})
.build();

let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
.with_dht_service(dht_service.clone())
.build();

let router = Router::builder()
.route(dht_service.clone())
.route(overlay_service.clone())
.build();

let network = Network::builder()
.with_private_key(key.to_bytes())
.with_service_name("test-service")
.build((Ipv4Addr::LOCALHOST, 0), router)
.unwrap();

dht_tasks.spawn(&network);
overlay_tasks.spawn(&network);

let private_overlay = PrivateOverlay::builder(PRIVATE_OVERLAY_ID).build(PingPongService);
let dht_client = dht_service.make_client(&network);
let peer_resolver = dht_service.make_peer_resolver().build(&network);

let private_overlay = PrivateOverlay::builder(PRIVATE_OVERLAY_ID)
.with_peer_resolver(peer_resolver)
.build(PingPongService);
overlay_service.add_private_overlay(&private_overlay);

Self {
network,
dht_client,
private_overlay,
known_peer_handles: Vec::new(),
}
}

fn make_peer_info(key: &ed25519::SecretKey, address: Address) -> PeerInfo {
let keypair = ed25519::KeyPair::from(key);
let peer_id = PeerId::from(keypair.public_key);

let now = now_sec();
let mut node_info = PeerInfo {
id: peer_id,
address_list: vec![address].into_boxed_slice(),
created_at: now,
expires_at: u32::MAX,
signature: Box::new([0; 64]),
};
*node_info.signature = keypair.sign(&node_info);
node_info
}

async fn private_overlay_query<Q, A>(&self, peer_id: &PeerId, req: Q) -> Result<A>
where
Q: tl_proto::TlWrite<Repr = tl_proto::Boxed>,
Expand All @@ -77,32 +85,24 @@ impl Node {
}

fn make_network(node_count: usize) -> Vec<Node> {
let keys = (0..node_count)
.map(|_| ed25519::SecretKey::generate(&mut rand::thread_rng()))
let nodes = (0..node_count)
.map(|_| Node::with_random_key())
.collect::<Vec<_>>();

let mut nodes = keys.iter().map(Node::new).collect::<Vec<_>>();
let common_peer_info = nodes.first().unwrap().network.sign_peer_info(0, u32::MAX);

let bootstrap_info = std::iter::zip(&keys, &nodes)
.map(|(key, node)| Arc::new(Node::make_peer_info(key, node.network.local_addr().into())))
.collect::<Vec<_>>();
for node in &nodes {
node.dht_client
.add_peer(Arc::new(common_peer_info.clone()))
.unwrap();

for node in &mut nodes {
let mut private_overlay_entries = node.private_overlay.write_entries();

for info in &bootstrap_info {
if info.id == node.network.peer_id() {
for peer_id in nodes.iter().map(|node| node.network.peer_id()) {
if peer_id == node.network.peer_id() {
continue;
}

node.known_peer_handles.push(
node.network
.known_peers()
.insert(info.clone(), false)
.unwrap(),
);

private_overlay_entries.insert(&info.id);
private_overlay_entries.insert(peer_id);
}
}

Expand All @@ -116,6 +116,21 @@ async fn private_overlays_accessible() -> Result<()> {

let nodes = make_network(5);

for node in &nodes {
let resolved = FuturesUnordered::new();
for entry in node.private_overlay.read_entries().iter() {
let handle = entry.resolver_handle.clone();
resolved.push(async move { handle.wait_resolved().await });
}

// Ensure all entries are resolved.
resolved.collect::<Vec<_>>().await;
tracing::info!(
peer_id = %node.network.peer_id(),
"all entries resolved",
);
}

for i in 0..nodes.len() {
for j in 0..nodes.len() {
if i == j {
Expand Down

0 comments on commit 5f59034

Please sign in to comment.