Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aj/honor metric timestamps #904

Merged
merged 15 commits into from
Feb 28, 2025
101 changes: 55 additions & 46 deletions dogstatsd/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::constants;
use crate::datadog::{self, Metric as MetricToShip, Series};
use crate::errors;
use crate::metric::{self, Metric, MetricValue, SortedTags};
use std::time;

use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload};
use ddsketch_agent::DDSketch;
Expand Down Expand Up @@ -91,13 +90,14 @@ impl Aggregator {
/// Function will return overflow error if more than
/// `min(constants::MAX_CONTEXTS, CONTEXTS)` is exceeded.
pub fn insert(&mut self, metric: Metric) -> Result<(), errors::Insert> {
let id = metric::id(metric.name, &metric.tags);
let id = metric::id(metric.name, &metric.tags, metric.timestamp);
let len = self.map.len();

match self
.map
.entry(id, |m| m.id == id, |m| metric::id(m.name, &m.tags))
{
match self.map.entry(
id,
|m| m.id == id,
|m| metric::id(m.name, &m.tags, m.timestamp),
) {
hash_table::Entry::Vacant(entry) => {
if len >= self.max_context {
return Err(errors::Insert::Overflow);
Expand All @@ -117,19 +117,13 @@ impl Aggregator {

#[must_use]
pub fn distributions_to_protobuf(&self) -> SketchPayload {
let now = time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
let mut sketch_payload = SketchPayload::new();

sketch_payload.sketches = self
.map
.iter()
.filter_map(|entry| match entry.value {
MetricValue::Distribution(_) => build_sketch(now, entry, self.tags.clone()),
MetricValue::Distribution(_) => build_sketch(entry, self.tags.clone()),
_ => None,
})
.collect();
Expand All @@ -138,12 +132,6 @@ impl Aggregator {

#[must_use]
pub fn consume_distributions(&mut self) -> Vec<SketchPayload> {
let now = time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
let mut batched_payloads = Vec::new();
let mut sketch_payload = SketchPayload::new();
let mut this_batch_size = 0u64;
Expand All @@ -155,7 +143,7 @@ impl Aggregator {
}
false
})
.filter_map(|entry| build_sketch(now, &entry, self.tags.clone()))
.filter_map(|entry| build_sketch(&entry, self.tags.clone()))
{
let next_chunk_size = sketch.compute_size();

Expand Down Expand Up @@ -248,18 +236,23 @@ impl Aggregator {
batched_payloads
}

pub fn get_entry_by_id(&self, name: Ustr, tags: &Option<SortedTags>) -> Option<&Metric> {
let id = metric::id(name, tags);
pub fn get_entry_by_id(
&self,
name: Ustr,
tags: &Option<SortedTags>,
timestamp: i64,
) -> Option<&Metric> {
let id = metric::id(name, tags, timestamp);
self.map.find(id, |m| m.id == id)
}
}

fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch> {
fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<Sketch> {
let sketch = entry.value.get_sketch()?;
let mut dogsketch = Dogsketch::default();
sketch.merge_to_dogsketch(&mut dogsketch);
// TODO(Astuyve) allow users to specify timestamp
dogsketch.set_ts(now);
dogsketch.set_ts(entry.timestamp);
let mut sketch = Sketch::default();
sketch.set_dogsketches(vec![dogsketch]);
let name = entry.name.to_string();
Expand All @@ -286,10 +279,7 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option<MetricTo
let point = datadog::Point {
value: entry.value.get_value()?,
// TODO(astuyve) allow user to specify timestamp
timestamp: time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs(),
timestamp: entry.timestamp as u64,
};

if let Some(tags) = entry.tags.clone() {
Expand Down Expand Up @@ -328,21 +318,29 @@ pub mod tests {
metric_id: &str,
value: f64,
tags: &str,
timestamp: i64,
) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) =
aggregator.get_entry_by_id(metric_id.into(), &Some(SortedTags::parse(tags).unwrap()))
{
if let Some(e) = aggregator.get_entry_by_id(
metric_id.into(),
&Some(SortedTags::parse(tags).unwrap()),
timestamp,
) {
let metric = e.value.get_value().unwrap();
assert!((metric - value).abs() < PRECISION);
} else {
panic!("{}", format!("{metric_id} not found"));
}
}

pub fn assert_sketch(aggregator_mutex: &Mutex<Aggregator>, metric_id: &str, value: f64) {
pub fn assert_sketch(
aggregator_mutex: &Mutex<Aggregator>,
metric_id: &str,
value: f64,
timestamp: i64,
) {
let aggregator = aggregator_mutex.lock().unwrap();
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None) {
if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, timestamp) {
let metric = e.value.get_sketch().unwrap();
assert!((metric.max().unwrap() - value).abs() < PRECISION);
assert!((metric.min().unwrap() - value).abs() < PRECISION);
Expand Down Expand Up @@ -387,14 +385,20 @@ pub mod tests {
#[cfg_attr(miri, ignore)]
fn overflow() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap();

let mut now = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
now = (now / 10) * 10;
let metric1 = parse("test:1|c|#k:v").expect("metric parse failed");
let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed");
let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed");

let id1 = metric::id(metric1.name, &metric1.tags);
let id2 = metric::id(metric2.name, &metric2.tags);
let id3 = metric::id(metric3.name, &metric3.tags);
let id1 = metric::id(metric1.name, &metric1.tags, now);
let id2 = metric::id(metric2.name, &metric2.tags, now);
let id3 = metric::id(metric3.name, &metric3.tags, now);

assert_ne!(id1, id2);
assert_ne!(id1, id3);
Expand All @@ -417,25 +421,30 @@ pub mod tests {
#[cfg_attr(miri, ignore)]
fn clear() {
let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap();

let metric1 = parse("test:3|c|#k1:v1").expect("metric parse failed");
let metric2 = parse("foo:5|c|#k2:v2").expect("metric parse failed");
let mut now = 1656581409;
now = (now / 10) * 10;
let metric1 = parse("test:3|c|#k1:v1|T1656581409").expect("metric parse failed");
let metric2 = parse("foo:5|c|#k2:v2|T1656581409").expect("metric parse failed");

assert!(aggregator.insert(metric1).is_ok());
assert!(aggregator.insert(metric2).is_ok());

assert_eq!(aggregator.map.len(), 2);
if let Some(v) =
aggregator.get_entry_by_id("foo".into(), &Some(SortedTags::parse("k2:v2").unwrap()))
{
if let Some(v) = aggregator.get_entry_by_id(
"foo".into(),
&Some(SortedTags::parse("k2:v2").unwrap()),
now,
) {
assert_eq!(v.value.get_value().unwrap(), 5f64);
} else {
panic!("failed to get value by id");
}

if let Some(v) =
aggregator.get_entry_by_id("test".into(), &Some(SortedTags::parse("k1:v1").unwrap()))
{
if let Some(v) = aggregator.get_entry_by_id(
"test".into(),
&Some(SortedTags::parse("k1:v1").unwrap()),
now,
) {
assert_eq!(v.value.get_value().unwrap(), 3f64);
} else {
panic!("failed to get value by id");
Expand Down
38 changes: 29 additions & 9 deletions dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ mod tests {
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_multi_distribution() {
let locked_aggregator = setup_dogstatsd(
"single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d
single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d
single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
"single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409
single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409
single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409
",
)
.await;
Expand All @@ -154,24 +154,38 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
&locked_aggregator,
"single_machine_performance.rouster.api.series_v2.payload_size_bytes",
269_942_f64,
1656581400,
);
assert_sketch(
&locked_aggregator,
"single_machine_performance.rouster.metrics_min_timestamp_latency",
1_426.908_702_16,
1656581400,
);
assert_sketch(
&locked_aggregator,
"single_machine_performance.rouster.metrics_max_timestamp_latency",
1_376.908_702_16,
1656581400,
);
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_multi_metric() {
let mut now = std::time::UNIX_EPOCH
.elapsed()
.expect("unable to poll clock, unrecoverable")
.as_secs()
.try_into()
.unwrap_or_default();
now = (now / 10) * 10;
let locked_aggregator = setup_dogstatsd(
"metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2\n",
format!(
"metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n",
now
)
.as_str(),
)
.await;
let aggregator = locked_aggregator.lock().expect("lock poisoned");
Expand All @@ -182,23 +196,29 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0);
drop(aggregator);

assert_value(&locked_aggregator, "metric1", 1.0, "");
assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2");
assert_value(&locked_aggregator, "metric3", 3.0, "tag3:val3,tag4:val4");
assert_value(&locked_aggregator, "metric1", 1.0, "", now);
assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now);
assert_value(
&locked_aggregator,
"metric3",
3.0,
"tag3:val3,tag4:val4",
now,
);
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_dogstatsd_single_metric() {
let locked_aggregator = setup_dogstatsd("metric123:99123|c").await;
let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await;
let aggregator = locked_aggregator.lock().expect("lock poisoned");
let parsed_metrics = aggregator.to_series();

assert_eq!(parsed_metrics.len(), 1);
assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0);
drop(aggregator);

assert_value(&locked_aggregator, "metric123", 99_123.0, "");
assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400);
}

#[tokio::test]
Expand Down
Loading
Loading