Skip to content

Commit

Permalink
Merge branch 'feature/resolve-overlay-peers'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Mar 23, 2024
2 parents 961fcd4 + f5fce7f commit c645ec3
Show file tree
Hide file tree
Showing 36 changed files with 4,032 additions and 772 deletions.
280 changes: 151 additions & 129 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ ref_option_ref = "warn"
rest_pat_in_fully_bound_structs = "warn"
same_functions_in_if_condition = "warn"
semicolon_if_nothing_returned = "warn"
single_match_else = "warn"
string_add_assign = "warn"
string_add = "warn"
string_lit_as_bytes = "warn"
Expand Down
4 changes: 3 additions & 1 deletion network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ path = "examples/network_node.rs"
# crates.io deps
ahash = "0.8"
anyhow = "1.0"
base64 = "0.22"
arc-swap = "1.6"
base64 = "0.21"
bytes = { version = "1.0", features = ["serde"] }
castaway = "0.2"
dashmap = "5.4"
ed25519 = { version = "2.0", features = ["alloc", "pkcs8"] }
everscale-crypto = { version = "0.2", features = ["tl-proto"] }
exponential-backoff = "1"
futures-util = { version = "0.3", features = ["sink"] }
hex = "0.4"
moka = { version = "0.12", features = ["sync"] }
Expand Down
7 changes: 4 additions & 3 deletions network/examples/network_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,19 +224,20 @@ impl Node {
fn new(key: ed25519::SecretKey, address: Address, config: NodeConfig) -> Result<Self> {
let keypair = everscale_crypto::ed25519::KeyPair::from(&key);

let (dht_client, dht) = DhtService::builder(keypair.public_key.into())
let (dht_tasks, dht_service) = DhtService::builder(keypair.public_key.into())
.with_config(config.dht)
.build();

let router = Router::builder().route(dht).build();
let router = Router::builder().route(dht_service.clone()).build();

let network = Network::builder()
.with_config(config.network)
.with_private_key(key.to_bytes())
.with_service_name("test-service")
.build(address, router)?;

let dht = dht_client.build(network.clone());
dht_tasks.spawn(&network);
let dht = dht_service.make_client(network.clone());

Ok(Self { network, dht })
}
Expand Down
149 changes: 76 additions & 73 deletions network/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,65 +12,46 @@ use tokio::task::JoinHandle;
use tycho_util::realloc_box_enum;
use tycho_util::time::{now_sec, shifted_interval};

use self::query::{Query, StoreValue};
use self::routing::{RoutingTable, RoutingTableSource};
use self::query::{Query, QueryCache, StoreValue};
use self::routing::HandlesRoutingTable;
use self::storage::Storage;
use crate::network::{Network, WeakNetwork};
use crate::proto::dht::{
rpc, NodeInfoResponse, NodeResponse, PeerValue, PeerValueKey, PeerValueKeyName,
PeerValueKeyRef, PeerValueRef, Value, ValueRef, ValueResponseRaw,
};
use crate::types::{
Address, PeerAffinity, PeerId, PeerInfo, Request, Response, Service, ServiceRequest,
};
use crate::types::{PeerId, PeerInfo, Request, Response, Service, ServiceRequest};
use crate::util::{NetworkExt, Routable};

pub use self::config::DhtConfig;
pub use self::peer_resolver::{PeerResolver, PeerResolverBuilder, PeerResolverHandle};
pub use self::storage::{OverlayValueMerger, StorageError};

mod config;
mod peer_resolver;
mod query;
mod routing;
mod storage;

pub struct DhtClientBuilder {
inner: Arc<DhtInner>,
disable_background_tasks: bool,
}

impl DhtClientBuilder {
pub fn disable_background_tasks(mut self) -> Self {
self.disable_background_tasks = true;
self
}

pub fn build(self, network: Network) -> DhtClient {
if !self.disable_background_tasks {
self.inner
.start_background_tasks(Network::downgrade(&network));
}

DhtClient {
inner: self.inner,
network,
}
}
}

#[derive(Clone)]
pub struct DhtClient {
inner: Arc<DhtInner>,
network: Network,
}

impl DhtClient {
#[inline]
pub fn network(&self) -> &Network {
&self.network
}

#[inline]
pub fn service(&self) -> &DhtService {
DhtService::wrap(&self.inner)
}

pub fn add_peer(&self, peer: Arc<PeerInfo>) -> Result<bool> {
self.inner
.add_peer_info(&self.network, peer, RoutingTableSource::Trusted)
self.inner.add_peer_info(&self.network, peer)
}

pub async fn get_node_info(&self, peer_id: &PeerId) -> Result<PeerInfo> {
Expand Down Expand Up @@ -243,6 +224,17 @@ impl<'a> std::ops::DerefMut for DhtQueryWithDataBuilder<'a> {
}
}

pub struct DhtServiceBackgroundTasks {
inner: Arc<DhtInner>,
}

impl DhtServiceBackgroundTasks {
pub fn spawn(self, network: &Network) {
self.inner
.start_background_tasks(Network::downgrade(network));
}
}

pub struct DhtServiceBuilder {
local_id: PeerId,
config: Option<DhtConfig>,
Expand All @@ -260,7 +252,7 @@ impl DhtServiceBuilder {
self
}

pub fn build(self) -> (DhtClientBuilder, DhtService) {
pub fn build(self) -> (DhtServiceBackgroundTasks, DhtService) {
let config = self.config.unwrap_or_default();

let storage = {
Expand All @@ -283,33 +275,51 @@ impl DhtServiceBuilder {

let inner = Arc::new(DhtInner {
local_id: self.local_id,
routing_table: Mutex::new(RoutingTable::new(self.local_id)),
routing_table: Mutex::new(HandlesRoutingTable::new(self.local_id)),
storage,
local_peer_info: Mutex::new(None),
config,
announced_peers,
find_value_queries: Default::default(),
});

let client_builder = DhtClientBuilder {
let background_tasks = DhtServiceBackgroundTasks {
inner: inner.clone(),
disable_background_tasks: false,
};

(client_builder, DhtService(inner))
(background_tasks, DhtService(inner))
}
}

#[derive(Clone)]
#[repr(transparent)]
pub struct DhtService(Arc<DhtInner>);

impl DhtService {
#[inline]
fn wrap(inner: &Arc<DhtInner>) -> &Self {
// SAFETY: `DhtService` has the same memory layout as `Arc<DhtInner>`.
unsafe { &*(inner as *const Arc<DhtInner>).cast::<Self>() }
}

pub fn builder(local_id: PeerId) -> DhtServiceBuilder {
DhtServiceBuilder {
local_id,
config: None,
overlay_merger: None,
}
}

pub fn make_client(&self, network: Network) -> DhtClient {
DhtClient {
inner: self.0.clone(),
network,
}
}

pub fn make_peer_resolver(&self) -> PeerResolverBuilder {
PeerResolver::builder(self.clone())
}
}

impl Service<ServiceRequest> for DhtService {
Expand Down Expand Up @@ -415,11 +425,12 @@ impl Routable for DhtService {

struct DhtInner {
local_id: PeerId,
routing_table: Mutex<RoutingTable>,
routing_table: Mutex<HandlesRoutingTable>,
storage: Storage,
local_peer_info: Mutex<Option<PeerInfo>>,
config: DhtConfig,
announced_peers: broadcast::Sender<Arc<PeerInfo>>,
find_value_queries: QueryCache<Option<Box<Value>>>,
}

impl DhtInner {
Expand Down Expand Up @@ -469,8 +480,7 @@ impl DhtInner {
this.refresh_local_peer_info(&network);
}
Action::AnnounceLocalPeerInfo => {
// Always refresh peer info before announcing
this.refresh_local_peer_info(&network);
// Peer info is always refreshed before announcing
refresh_peer_info_interval.reset();

if let Err(e) = this.announce_local_peer_info(&network).await {
Expand All @@ -492,9 +502,7 @@ impl DhtInner {
}
Action::AddPeer(peer_info) => {
tracing::info!(peer_id = %peer_info.id, "received peer info");
if let Err(e) =
this.add_peer_info(&network, peer_info, RoutingTableSource::Untrusted)
{
if let Err(e) = this.add_peer_info(&network, peer_info) {
tracing::error!("failed to add peer to the routing table: {e:?}");
}
}
Expand All @@ -511,12 +519,18 @@ impl DhtInner {

#[tracing::instrument(level = "debug", skip_all, fields(local_id = %self.local_id))]
async fn announce_local_peer_info(&self, network: &Network) -> Result<()> {
let data = tl_proto::serialize(&[network.local_addr().into()] as &[Address]);
let now = now_sec();
let data = {
let peer_info = self.make_local_peer_info(network, now);
let data = tl_proto::serialize(&peer_info);
*self.local_peer_info.lock().unwrap() = Some(peer_info);
data
};

let mut value = self.make_unsigned_peer_value(
PeerValueKeyName::NodeInfo,
&data,
now_sec() + self.config.max_peer_info_ttl.as_secs() as u32,
now + self.config.max_peer_info_ttl.as_secs() as u32,
);
let signature = network.sign_tl(&value);
value.signature = &signature;
Expand Down Expand Up @@ -603,28 +617,28 @@ impl DhtInner {
peer.clone(),
self.config.max_k,
&self.config.max_peer_info_ttl,
RoutingTableSource::Trusted,
|peer_info| network.known_peers().insert(peer_info, false).ok(),
);
if is_new {
network.known_peers().insert(peer, PeerAffinity::Allowed);
count += 1;
}
count += is_new as usize;
}

tracing::debug!(count, "found new peers");
}

async fn find_value(&self, network: &Network, key_hash: &[u8; 32]) -> Option<Box<Value>> {
// TODO: deduplicate shared futures
let query = Query::new(
network.clone(),
&self.routing_table.lock().unwrap(),
key_hash,
self.config.max_k,
);
self.find_value_queries
.run(key_hash, || {
let query = Query::new(
network.clone(),
&self.routing_table.lock().unwrap(),
key_hash,
self.config.max_k,
);

// NOTE: expression is intentionally split to drop the routing table guard
query.find_value().await
// NOTE: expression is intentionally split to drop the routing table guard
Box::pin(query.find_value())
})
.await
}

async fn store_value(
Expand Down Expand Up @@ -659,31 +673,20 @@ impl DhtInner {
Ok(())
}

fn add_peer_info(
&self,
network: &Network,
peer_info: Arc<PeerInfo>,
source: RoutingTableSource,
) -> Result<bool> {
fn add_peer_info(&self, network: &Network, peer_info: Arc<PeerInfo>) -> Result<bool> {
anyhow::ensure!(peer_info.is_valid(now_sec()), "invalid peer info");

if peer_info.id == self.local_id {
return Ok(false);
}

let mut routing_table = self.routing_table.lock().unwrap();
let is_new = routing_table.add(
Ok(routing_table.add(
peer_info.clone(),
self.config.max_k,
&self.config.max_peer_info_ttl,
source,
);
if is_new {
network
.known_peers()
.insert(peer_info, PeerAffinity::Allowed);
}
Ok(is_new)
|peer_info| network.known_peers().insert(peer_info, false).ok(),
))
}

fn make_unsigned_peer_value<'a>(
Expand Down
Loading

0 comments on commit c645ec3

Please sign in to comment.