Skip to content

Commit

Permalink
fix(network): fix public overlay entry_count tracking
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Apr 11, 2024
1 parent 0e43c70 commit c79494d
Showing 1 changed file with 48 additions and 5 deletions.
53 changes: 48 additions & 5 deletions network/src/overlay/public_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use bytes::{Bytes, BytesMut};
use parking_lot::{RwLock, RwLockReadGuard};
use rand::seq::SliceRandom;
use rand::Rng;
use tokio::sync::Notify;
use tycho_util::futures::BoxFutureOrNoop;
use tycho_util::{FastDashSet, FastHashMap};

Expand Down Expand Up @@ -88,6 +89,8 @@ impl PublicOverlayBuilder {
min_capacity: self.min_capacity,
entry_ttl_sec,
entries: RwLock::new(entries),
entries_added: Notify::new(),
entries_changed: Notify::new(),
entry_count: AtomicUsize::new(0),
banned_peer_ids: self.banned_peer_ids,
service: service.boxed(),
Expand Down Expand Up @@ -159,6 +162,16 @@ impl PublicOverlay {
}
}

/// Notifies when new entries are added to the overlay.
pub fn entires_added(&self) -> &Notify {
&self.inner.entries_added
}

/// Notifies when entries are updated in the overlay (added or updated).
pub fn entries_changed(&self) -> &Notify {
&self.inner.entries_changed
}

pub(crate) fn handle_query(&self, req: ServiceRequest) -> BoxFutureOrNoop<Option<Response>> {
if !self.inner.banned_peer_ids.contains(&req.metadata.peer_id) {
// TODO: add peer from metadata to the overlay
Expand Down Expand Up @@ -251,14 +264,18 @@ impl PublicOverlay {
// signature verification can be expensive and we want to avoid
// holding the lock for too long.
let mut added = 0;
let mut changed = false;
if has_valid {
let mut stored = this.entries.write();
for (entry, is_valid) in std::iter::zip(entries, is_valid) {
if !is_valid {
continue;
}

added += stored.insert(entry) as usize;
let status = stored.insert(entry);
changed |= status.is_changed();
added += status.is_added() as usize;

if added >= to_add {
break;
}
Expand All @@ -270,6 +287,13 @@ impl PublicOverlay {
this.entry_count
.fetch_sub(to_add - added, Ordering::Release);
}

if added > 0 {
this.entries_added.notify_waiters();
}
if changed {
this.entries_changed.notify_waiters();
}
}

/// Removes all expired and banned entries from the overlay.
Expand Down Expand Up @@ -308,6 +332,8 @@ struct Inner {
entry_ttl_sec: u32,
entries: RwLock<PublicOverlayEntries>,
entry_count: AtomicUsize,
entries_added: Notify,
entries_changed: Notify,
banned_peer_ids: FastDashSet<PeerId>,
service: BoxService<ServiceRequest, Response>,
request_prefix: Box<[u8]>,
Expand Down Expand Up @@ -376,7 +402,7 @@ impl PublicOverlayEntries {
self.data.choose_multiple(rng, self.data.len())
}

fn insert(&mut self, item: &PublicEntry) -> bool {
fn insert(&mut self, item: &PublicEntry) -> UpdateStatus {
match self.peer_id_to_index.entry(item.peer_id) {
// No entry for the peer_id, insert a new one
hash_map::Entry::Vacant(entry) => {
Expand All @@ -392,22 +418,22 @@ impl PublicOverlayEntries {
resolver_handle,
});

true
UpdateStatus::Added
}
// Entry for the peer_id exists, update it if the new item is newer
hash_map::Entry::Occupied(entry) => {
let index = *entry.get();
let existing = &mut self.data[index];
if existing.entry.created_at >= item.created_at {
return false;
return UpdateStatus::Skipped;
}

// Try to reuse the existing Arc if possible
match Arc::get_mut(&mut existing.entry) {
Some(existing) => existing.clone_from(item),
None => self.data[index].entry = Arc::new(item.clone()),
}
true
UpdateStatus::Updated
}
}
}
Expand Down Expand Up @@ -445,6 +471,23 @@ impl std::ops::Deref for PublicOverlayEntriesReadGuard<'_> {
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum UpdateStatus {
Skipped,
Updated,
Added,
}

impl UpdateStatus {
fn is_changed(self) -> bool {
matches!(self, Self::Updated | Self::Added)
}

fn is_added(self) -> bool {
matches!(self, Self::Added)
}
}

#[cfg(test)]
mod tests {
use everscale_crypto::ed25519;
Expand Down

0 comments on commit c79494d

Please sign in to comment.