From a4ea204ef48a821d2c2ca6063321dab925d8e1d7 Mon Sep 17 00:00:00 2001 From: MrWad3r Date: Mon, 15 Apr 2024 12:18:07 +0200 Subject: [PATCH] chore(overlay-client): add subscription to overlay peer remove event --- core/src/overlay_client/neighbours.rs | 12 ++++++++-- .../overlay_client/neighbours_actualizer.rs | 11 ++++++++-- .../overlay_client/public_overlay_client.rs | 22 ++++++++++++------- network/src/overlay/public_overlay.rs | 8 +++++++ 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/core/src/overlay_client/neighbours.rs b/core/src/overlay_client/neighbours.rs index d66c71125..4b5f381ac 100644 --- a/core/src/overlay_client/neighbours.rs +++ b/core/src/overlay_client/neighbours.rs @@ -53,9 +53,8 @@ impl Neighbours { } pub async fn update_selection_index(&self) { - let now = tycho_util::time::now_sec(); let mut guard = self.entries.lock().await; - guard.retain(|x| x.is_reliable() && x.expires_at_secs() > now); + guard.retain(|x| x.is_reliable()); let mut lock = self.selection_index.lock().await; lock.update(guard.as_slice()); } @@ -104,6 +103,15 @@ impl Neighbours { drop(guard); self.update_selection_index().await; } + + pub async fn remove_outdated_neighbours(&self) { + let now = tycho_util::time::now_sec(); + let mut guard = self.entries.lock().await; + //remove unreliable and expired neighbours + guard.retain(|x| x.expires_at_secs() > now); + drop(guard); + self.update_selection_index().await; + } } struct SelectionIndex { diff --git a/core/src/overlay_client/neighbours_actualizer.rs b/core/src/overlay_client/neighbours_actualizer.rs index 036adb313..6c20450de 100644 --- a/core/src/overlay_client/neighbours_actualizer.rs +++ b/core/src/overlay_client/neighbours_actualizer.rs @@ -2,7 +2,7 @@ use crate::overlay_client::public_overlay_client::PublicOverlayClient; use std::time::Duration; async fn start_neighbours_ping(client: PublicOverlayClient) { - let mut interval = tokio::time::interval(Duration::from_millis(client.update_interval())); + let mut interval = tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms())); loop { interval.tick().await; @@ -13,11 +13,18 @@ async fn start_neighbours_ping(client: PublicOverlayClient) { } async fn start_neighbours_update(client: PublicOverlayClient) { - let mut interval = tokio::time::interval(Duration::from_millis(client.update_interval())); + let mut interval = tokio::time::interval(Duration::from_millis(client.neighbour_update_interval_ms())); loop { interval.tick().await; client.update_neighbours().await; } } +async fn wait_update_neighbours(client: PublicOverlayClient) { + loop { + client.entries_removed().await; + client.remove_outdated_neighbours().await; + } +} + diff --git a/core/src/overlay_client/public_overlay_client.rs b/core/src/overlay_client/public_overlay_client.rs index 2e822de6f..ebd323164 100644 --- a/core/src/overlay_client/public_overlay_client.rs +++ b/core/src/overlay_client/public_overlay_client.rs @@ -67,6 +67,16 @@ impl PublicOverlayClient { &self.0.neighbours.0 } + pub async fn entries_removed(&self) { + self.0.overlay.entries_removed().notified().await + } + pub fn neighbour_update_interval_ms(&self) -> u64 { + self.0.settings.neighbours_update_interval + } + pub fn neighbour_ping_interval_ms(&self) -> u64 { + self.0.settings.neighbours_ping_interval + } + pub async fn update_neighbours(&self) { let active_neighbours = self.neighbours().get_active_neighbours().await.len(); let max_neighbours = self.neighbours().options().max_neighbours; @@ -94,6 +104,10 @@ impl PublicOverlayClient { self.neighbours().update(neighbours).await; } + pub async fn remove_outdated_neighbours(&self) { + self.neighbours().remove_outdated_neighbours().await; + } + pub async fn ping_random_neighbour(&self) -> Result<()> { let Some(neighbour) = self.0.neighbours.0.choose().await else { tracing::error!("No neighbours found to ping"); @@ -135,14 +149,6 @@ impl PublicOverlayClient { Ok(()) } - - pub fn update_interval(&self) -> u64 { - self.0.settings.neighbours_update_interval - } - - pub fn ping_interval(&self) -> u64 { - self.0.settings.neighbours_ping_interval - } } impl OverlayClient for PublicOverlayClient { diff --git a/network/src/overlay/public_overlay.rs b/network/src/overlay/public_overlay.rs index 72b2d02a7..a43df9726 100644 --- a/network/src/overlay/public_overlay.rs +++ b/network/src/overlay/public_overlay.rs @@ -91,6 +91,7 @@ impl PublicOverlayBuilder { entries: RwLock::new(entries), entries_added: Notify::new(), entries_changed: Notify::new(), + entries_removed: Notify::new(), entry_count: AtomicUsize::new(0), banned_peer_ids: self.banned_peer_ids, service: service.boxed(), @@ -176,6 +177,10 @@ impl PublicOverlay { &self.inner.entries_changed } + pub fn entries_removed(&self) -> &Notify { + &self.inner.entries_removed + } + pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop> { if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) { // TODO: add peer from metadata to the overlay @@ -309,6 +314,8 @@ impl PublicOverlay { !item.entry.is_expired(now, this.entry_ttl_sec) && !this.banned_peer_ids.contains(&item.entry.peer_id) }); + + self.inner.entries_removed.notify_waiters() } fn prepend_prefix_to_body(&self, body: &mut Bytes) { @@ -338,6 +345,7 @@ struct Inner { entry_count: AtomicUsize, entries_added: Notify, entries_changed: Notify, + entries_removed: Notify, banned_peer_ids: FastDashSet, service: BoxService, request_prefix: Box<[u8]>,