Skip to content

Commit

Permalink
chore(overlay-client): add subscription to overlay peer remove event
Browse files Browse the repository at this point in the history
  • Loading branch information
MrWad3r committed Apr 15, 2024
1 parent 5d10890 commit a4ea204
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 12 deletions.
12 changes: 10 additions & 2 deletions core/src/overlay_client/neighbours.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,8 @@ impl Neighbours {
}

pub async fn update_selection_index(&self) {
let now = tycho_util::time::now_sec();
let mut guard = self.entries.lock().await;
guard.retain(|x| x.is_reliable() && x.expires_at_secs() > now);
guard.retain(|x| x.is_reliable());
let mut lock = self.selection_index.lock().await;
lock.update(guard.as_slice());
}
Expand Down Expand Up @@ -104,6 +103,15 @@ impl Neighbours {
drop(guard);
self.update_selection_index().await;
}

pub async fn remove_outdated_neighbours(&self) {
let now = tycho_util::time::now_sec();
let mut guard = self.entries.lock().await;
//remove unreliable and expired neighbours
guard.retain(|x| x.expires_at_secs() > now);
drop(guard);
self.update_selection_index().await;
}
}

struct SelectionIndex {
Expand Down
11 changes: 9 additions & 2 deletions core/src/overlay_client/neighbours_actualizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::overlay_client::public_overlay_client::PublicOverlayClient;
use std::time::Duration;

async fn start_neighbours_ping(client: PublicOverlayClient) {
let mut interval = tokio::time::interval(Duration::from_millis(client.update_interval()));
let mut interval = tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms()));

loop {
interval.tick().await;
Expand All @@ -13,11 +13,18 @@ async fn start_neighbours_ping(client: PublicOverlayClient) {
}

async fn start_neighbours_update(client: PublicOverlayClient) {
let mut interval = tokio::time::interval(Duration::from_millis(client.update_interval()));
let mut interval = tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms()));
loop {
interval.tick().await;
client.update_neighbours().await;
}
}

async fn wait_update_neighbours(client: PublicOverlayClient) {
loop {
client.entries_removed().await;
client.remove_outdated_neighbours().await;
}
}


22 changes: 14 additions & 8 deletions core/src/overlay_client/public_overlay_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ impl PublicOverlayClient {
&self.0.neighbours.0
}

pub async fn entries_removed(&self) {
self.0.overlay.entries_removed().notified().await
}
pub fn neighbour_update_interval_ms(&self) -> u64 {
self.0.settings.neighbours_update_interval
}
pub fn neighbour_ping_interval_ms(&self) -> u64 {
self.0.settings.neighbours_ping_interval
}

pub async fn update_neighbours(&self) {
let active_neighbours = self.neighbours().get_active_neighbours().await.len();
let max_neighbours = self.neighbours().options().max_neighbours;
Expand Down Expand Up @@ -94,6 +104,10 @@ impl PublicOverlayClient {
self.neighbours().update(neighbours).await;
}

pub async fn remove_outdated_neighbours(&self) {
self.neighbours().remove_outdated_neighbours().await;
}

pub async fn ping_random_neighbour(&self) -> Result<()> {
let Some(neighbour) = self.0.neighbours.0.choose().await else {
tracing::error!("No neighbours found to ping");
Expand Down Expand Up @@ -135,14 +149,6 @@ impl PublicOverlayClient {

Ok(())
}

pub fn update_interval(&self) -> u64 {
self.0.settings.neighbours_update_interval
}

pub fn ping_interval(&self) -> u64 {
self.0.settings.neighbours_ping_interval
}
}

impl OverlayClient for PublicOverlayClient {
Expand Down
8 changes: 8 additions & 0 deletions network/src/overlay/public_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl PublicOverlayBuilder {
entries: RwLock::new(entries),
entries_added: Notify::new(),
entries_changed: Notify::new(),
entries_removed: Notify::new(),
entry_count: AtomicUsize::new(0),
banned_peer_ids: self.banned_peer_ids,
service: service.boxed(),
Expand Down Expand Up @@ -176,6 +177,10 @@ impl PublicOverlay {
&self.inner.entries_changed
}

pub fn entries_removed(&self) -> &Notify {
&self.inner.entries_removed
}

pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) {
// TODO: add peer from metadata to the overlay
Expand Down Expand Up @@ -309,6 +314,8 @@ impl PublicOverlay {
!item.entry.is_expired(now, this.entry_ttl_sec)
&& !this.banned_peer_ids.contains(&item.entry.peer_id)
});

self.inner.entries_removed.notify_waiters()
}

fn prepend_prefix_to_body(&self, body: &mut Bytes) {
Expand Down Expand Up @@ -338,6 +345,7 @@ struct Inner {
entry_count: AtomicUsize,
entries_added: Notify,
entries_changed: Notify,
entries_removed: Notify,
banned_peer_ids: FastDashSet<PeerId>,
service: BoxService<ServiceRequest, Response>,
request_prefix: Box<[u8]>,
Expand Down

0 comments on commit a4ea204

Please sign in to comment.