Skip to content

Commit

Permalink
feat: add missed captures counter
Browse files Browse the repository at this point in the history
  • Loading branch information
mhatzl committed Jun 27, 2023
1 parent e15a16e commit 9e754e3
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions src/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::{
collections::{HashMap, HashSet},
hash::Hash,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, SyncSender},
atomic::{AtomicBool, AtomicUsize, Ordering},
mpsc::{self, SyncSender, TrySendError},
Arc, RwLock,
},
thread,
Expand Down Expand Up @@ -66,6 +66,7 @@ where
capture_blocking: Arc<AtomicBool>,
capture_channel_bound: usize,
subscription_channel_bound: usize,
missed_captures: Arc<AtomicUsize>,
}

impl<K, T, F> EvidentPublisher<K, T, F>
Expand Down Expand Up @@ -139,6 +140,7 @@ where
capture_blocking: mode,
capture_channel_bound,
subscription_channel_bound,
missed_captures: Arc::new(AtomicUsize::new(0)),
}
}

Expand Down Expand Up @@ -193,9 +195,21 @@ where
if self.capture_blocking.load(Ordering::Acquire) {
let _ = self.capturer.send(Event::new(interm_event.take_entry()));
} else {
let _ = self
let res = self
.capturer
.try_send(Event::new(interm_event.take_entry()));

if let Err(TrySendError::Full(_)) = res {
// Note: If another thread has missed captures at the same moment, the count may be inaccurate, because there is no lock.
// This should still be fine, since
// - highly unlikely to happen during production with reasonable channel bounds and number of logs captured
// - count is still increased, and any increase in missed captures is bad (+/- one or two is irrelevant)
let missed_captures = self.missed_captures.load(Ordering::Relaxed);
if missed_captures < usize::MAX {
self.missed_captures
.store(missed_captures + 1, Ordering::Relaxed);
}
}
}
}

Expand All @@ -214,6 +228,14 @@ where
}
}

pub fn get_missed_captures(&self) -> usize {
self.missed_captures.load(Ordering::Relaxed)
}

pub fn reset_missed_captures(&self) {
self.missed_captures.store(0, Ordering::Relaxed);
}

pub fn subscribe(&self, id: K) -> Result<Subscription<K, T, F>, SubscriptionError<K>> {
self.subscribe_to_many(vec![id])
}
Expand Down

0 comments on commit 9e754e3

Please sign in to comment.