Skip to content

Commit

Permalink
ref: Separate cadence from metrics feature
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-auer committed Dec 11, 2023
1 parent dc5ea74 commit f2c6410
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 97 deletions.
3 changes: 2 additions & 1 deletion sentry-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ client = ["rand"]
# and macros actually expand features (and extern crate) where they are used!
debug-logs = ["dep:log"]
test = ["client"]
UNSTABLE_metrics = ["dep:cadence", "sentry-types/UNSTABLE_metrics"]
UNSTABLE_metrics = ["sentry-types/UNSTABLE_metrics"]
UNSTABLE_cadence = ["dep:cadence", "UNSTABLE_metrics"]

[dependencies]
cadence = { version = "0.29.0", optional = true }
Expand Down
90 changes: 90 additions & 0 deletions sentry-core/src/cadence.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use std::sync::Arc;

use cadence::MetricSink;

use crate::{Client, Hub};

/// A [`cadence`] compatible [`MetricSink`].
///
/// This will ingest all the emitted metrics to Sentry as well as forward them
/// to the inner [`MetricSink`].
#[derive(Debug)]
pub struct SentryMetricSink<S> {
client: Arc<Client>,
sink: S,
}

impl<S> SentryMetricSink<S> {
/// Creates a new [`SentryMetricSink`], wrapping the given [`MetricSink`].
pub fn try_new(sink: S) -> Result<Self, S> {
let hub = Hub::current();
let Some(client) = hub.client() else {
return Err(sink);
};

Ok(Self { client, sink })
}
}

impl<S> MetricSink for SentryMetricSink<S>
where
S: MetricSink,
{
fn emit(&self, metric: &str) -> std::io::Result<usize> {
self.client.add_metric(metric);
self.sink.emit(metric)
}

fn flush(&self) -> std::io::Result<()> {
if self.client.flush(None) {
self.sink.flush()
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Flushing Client failed",
))
}
}
}

#[cfg(test)]
mod tests {
use cadence::{Counted, Distributed};
use sentry_types::protocol::latest::EnvelopeItem;

use crate::test::with_captured_envelopes;

use super::*;

#[test]
fn test_basic_metrics() {
let envelopes = with_captured_envelopes(|| {
let sink = SentryMetricSink::try_new(cadence::NopMetricSink).unwrap();

let client = cadence::StatsdClient::from_sink("sentry.test", sink);
client.count("some.count", 1).unwrap();
client.count("some.count", 10).unwrap();
client
.count_with_tags("count.with.tags", 1)
.with_tag("foo", "bar")
.send();
client.distribution("some.distr", 1).unwrap();
client.distribution("some.distr", 2).unwrap();
client.distribution("some.distr", 3).unwrap();
});
assert_eq!(envelopes.len(), 1);

let mut items = envelopes[0].items();
let Some(EnvelopeItem::Metrics(metrics)) = items.next() else {
panic!("expected metrics");
};
let metrics = std::str::from_utf8(metrics).unwrap();

println!("{metrics}");

assert!(metrics.contains("sentry.test.count.with.tags:1|c|#foo:bar|T"));
assert!(metrics.contains("sentry.test.some.count:11|c|T"));
assert!(metrics.contains("sentry.test.some.distr:1:2:3|d|T"));
assert_eq!(items.next(), None);
}
}
6 changes: 4 additions & 2 deletions sentry-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ pub use crate::performance::*;
pub use crate::scope::{Scope, ScopeGuard};
pub use crate::transport::{Transport, TransportFactory};

#[cfg(all(feature = "client", feature = "UNSTABLE_cadence"))]
mod cadence;
// client feature
#[cfg(feature = "client")]
mod client;
Expand All @@ -145,10 +147,10 @@ mod hub_impl;
mod metrics;
#[cfg(feature = "client")]
mod session;
#[cfg(all(feature = "client", feature = "UNSTABLE_cadence"))]
pub use crate::cadence::SentryMetricSink;
#[cfg(feature = "client")]
pub use crate::client::Client;
#[cfg(all(feature = "client", feature = "UNSTABLE_metrics"))]
pub use crate::metrics::SentryMetricSink;

// test utilities
#[cfg(feature = "test")]
Expand Down
99 changes: 5 additions & 94 deletions sentry-core/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,13 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::sync::mpsc::{sync_channel, Receiver, RecvTimeoutError, SyncSender};
use std::sync::Arc;
use std::sync::mpsc;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};

use cadence::MetricSink;
use sentry_types::protocol::latest::{Envelope, EnvelopeItem};

use crate::client::TransportArc;
use crate::{Client, Hub};

/// A [`cadence`] compatible [`MetricSink`].
///
/// This will ingest all the emitted metrics to Sentry as well as forward them
/// to the inner [`MetricSink`].
#[derive(Debug)]
pub struct SentryMetricSink<S> {
client: Arc<Client>,
sink: S,
}

impl<S> SentryMetricSink<S> {
/// Creates a new [`SentryMetricSink`], wrapping the given [`MetricSink`].
pub fn try_new(sink: S) -> Result<Self, S> {
let hub = Hub::current();
let Some(client) = hub.client() else {
return Err(sink);
};

Ok(Self { client, sink })
}
}

impl<S> MetricSink for SentryMetricSink<S>
where
S: MetricSink,
{
fn emit(&self, metric: &str) -> std::io::Result<usize> {
self.client.add_metric(metric);
self.sink.emit(metric)
}

fn flush(&self) -> std::io::Result<()> {
if self.client.flush(None) {
self.sink.flush()
} else {
Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Flushing Client failed",
))
}
}
}

#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
enum MetricType {
Expand Down Expand Up @@ -166,7 +120,7 @@ enum Task {
}

pub struct MetricAggregator {
sender: SyncSender<Task>,
sender: mpsc::SyncSender<Task>,
handle: Option<JoinHandle<()>>,
}

Expand All @@ -175,7 +129,7 @@ const FLUSH_INTERVAL: Duration = Duration::from_secs(10);

impl MetricAggregator {
pub fn new(transport: TransportArc) -> Self {
let (sender, receiver) = sync_channel(30);
let (sender, receiver) = mpsc::sync_channel(30);
let handle = thread::Builder::new()
.name("sentry-metrics".into())
.spawn(move || Self::worker_thread(receiver, transport))
Expand Down Expand Up @@ -240,15 +194,15 @@ impl MetricAggregator {
let _ = self.sender.send(Task::Flush);
}

fn worker_thread(receiver: Receiver<Task>, transport: TransportArc) {
fn worker_thread(receiver: mpsc::Receiver<Task>, transport: TransportArc) {
let mut buckets = AggregateMetrics::new();
let mut last_flush = Instant::now();

loop {
let timeout = FLUSH_INTERVAL.saturating_sub(last_flush.elapsed());

match receiver.recv_timeout(timeout) {
Err(RecvTimeoutError::Timeout) | Ok(Task::Flush) => {
Err(_) | Ok(Task::Flush) => {
// flush
Self::flush_buckets(std::mem::take(&mut buckets), &transport);
last_flush = Instant::now();
Expand Down Expand Up @@ -343,46 +297,3 @@ impl Drop for MetricAggregator {
}
}
}

#[cfg(test)]
mod tests {
use std::str::from_utf8;

use cadence::{Counted, Distributed};

use crate::test::with_captured_envelopes;

use super::*;

#[test]
fn test_basic_metrics() {
let envelopes = with_captured_envelopes(|| {
let sink = SentryMetricSink::try_new(cadence::NopMetricSink).unwrap();

let client = cadence::StatsdClient::from_sink("sentry.test", sink);
client.count("some.count", 1).unwrap();
client.count("some.count", 10).unwrap();
client
.count_with_tags("count.with.tags", 1)
.with_tag("foo", "bar")
.send();
client.distribution("some.distr", 1).unwrap();
client.distribution("some.distr", 2).unwrap();
client.distribution("some.distr", 3).unwrap();
});
assert_eq!(envelopes.len(), 1);

let mut items = envelopes[0].items();
let Some(EnvelopeItem::Metrics(metrics)) = items.next() else {
panic!("expected metrics");
};
let metrics = from_utf8(metrics).unwrap();

println!("{metrics}");

assert!(metrics.contains("sentry.test.count.with.tags:1|c|#foo:bar|T"));
assert!(metrics.contains("sentry.test.some.count:11|c|T"));
assert!(metrics.contains("sentry.test.some.distr:1:2:3|d|T"));
assert_eq!(items.next(), None);
}
}

0 comments on commit f2c6410

Please sign in to comment.