diff --git a/network/src/overlay/public_overlay.rs b/network/src/overlay/public_overlay.rs index 4c9f85880..65e32a43d 100644 --- a/network/src/overlay/public_overlay.rs +++ b/network/src/overlay/public_overlay.rs @@ -9,6 +9,7 @@ use bytes::{Bytes, BytesMut}; use parking_lot::{RwLock, RwLockReadGuard}; use rand::seq::SliceRandom; use rand::Rng; +use tokio::sync::Notify; use tycho_util::futures::BoxFutureOrNoop; use tycho_util::{FastDashSet, FastHashMap}; @@ -88,6 +89,8 @@ impl PublicOverlayBuilder { min_capacity: self.min_capacity, entry_ttl_sec, entries: RwLock::new(entries), + entries_added: Notify::new(), + entries_changed: Notify::new(), entry_count: AtomicUsize::new(0), banned_peer_ids: self.banned_peer_ids, service: service.boxed(), @@ -159,6 +162,16 @@ impl PublicOverlay { } } + /// Notifies when new entries are added to the overlay. + pub fn entires_added(&self) -> &Notify { + &self.inner.entries_added + } + + /// Notifies when entries are updated in the overlay (added or updated). + pub fn entries_changed(&self) -> &Notify { + &self.inner.entries_changed + } + pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop> { if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) { // TODO: add peer from metadata to the overlay @@ -251,6 +264,7 @@ impl PublicOverlay { // signature verification can be expensive and we want to avoid // holding the lock for too long. let mut added = 0; + let mut changed = false; if has_valid { let mut stored = this.entries.write(); for (entry, is_valid) in std::iter::zip(entries, is_valid) { @@ -258,7 +272,10 @@ impl PublicOverlay { continue; } - added += stored.insert(entry) as usize; + let status = stored.insert(entry); + changed |= status.is_changed(); + added += status.is_added() as usize; + if added >= to_add { break; } @@ -270,6 +287,13 @@ impl PublicOverlay { this.entry_count .fetch_sub(to_add - added, Ordering::Release); } + + if added > 0 { + this.entries_added.notify_waiters(); + } + if changed { + this.entries_changed.notify_waiters(); + } } /// Removes all expired and banned entries from the overlay. @@ -308,6 +332,8 @@ struct Inner { entry_ttl_sec: u32, entries: RwLock, entry_count: AtomicUsize, + entries_added: Notify, + entries_changed: Notify, banned_peer_ids: FastDashSet, service: BoxService, request_prefix: Box<[u8]>, @@ -376,7 +402,7 @@ impl PublicOverlayEntries { self.data.choose_multiple(rng, self.data.len()) } - fn insert(&mut self, item: &PublicEntry) -> bool { + fn insert(&mut self, item: &PublicEntry) -> UpdateStatus { match self.peer_id_to_index.entry(item.peer_id) { // No entry for the peer_id, insert a new one hash_map::Entry::Vacant(entry) => { @@ -392,14 +418,14 @@ impl PublicOverlayEntries { resolver_handle, }); - true + UpdateStatus::Added } // Entry for the peer_id exists, update it if the new item is newer hash_map::Entry::Occupied(entry) => { let index = *entry.get(); let existing = &mut self.data[index]; if existing.entry.created_at >= item.created_at { - return false; + return UpdateStatus::Skipped; } // Try to reuse the existing Arc if possible @@ -407,7 +433,7 @@ impl PublicOverlayEntries { Some(existing) => existing.clone_from(item), None => self.data[index].entry = Arc::new(item.clone()), } - true + UpdateStatus::Updated } } } @@ -445,6 +471,23 @@ impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum UpdateStatus { + Skipped, + Updated, + Added, +} + +impl UpdateStatus { + fn is_changed(self) -> bool { + matches!(self, Self::Updated | Self::Added) + } + + fn is_added(self) -> bool { + matches!(self, Self::Added) + } +} + #[cfg(test)] mod tests { use everscale_crypto::ed25519;