diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index 196a9b761..f31a3b0fc 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -7,7 +7,6 @@ use crate::constants; use crate::datadog::{self, Metric as MetricToShip, Series}; use crate::errors; use crate::metric::{self, Metric, MetricValue, SortedTags}; -use std::time; use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; use ddsketch_agent::DDSketch; @@ -91,13 +90,14 @@ impl Aggregator { /// Function will return overflow error if more than /// `min(constants::MAX_CONTEXTS, CONTEXTS)` is exceeded. pub fn insert(&mut self, metric: Metric) -> Result<(), errors::Insert> { - let id = metric::id(metric.name, &metric.tags); + let id = metric::id(metric.name, &metric.tags, metric.timestamp); let len = self.map.len(); - match self - .map - .entry(id, |m| m.id == id, |m| metric::id(m.name, &m.tags)) - { + match self.map.entry( + id, + |m| m.id == id, + |m| metric::id(m.name, &m.tags, m.timestamp), + ) { hash_table::Entry::Vacant(entry) => { if len >= self.max_context { return Err(errors::Insert::Overflow); @@ -117,19 +117,13 @@ impl Aggregator { #[must_use] pub fn distributions_to_protobuf(&self) -> SketchPayload { - let now = time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); let mut sketch_payload = SketchPayload::new(); sketch_payload.sketches = self .map .iter() .filter_map(|entry| match entry.value { - MetricValue::Distribution(_) => build_sketch(now, entry, self.tags.clone()), + MetricValue::Distribution(_) => build_sketch(entry, self.tags.clone()), _ => None, }) .collect(); @@ -138,12 +132,6 @@ impl Aggregator { #[must_use] pub fn consume_distributions(&mut self) -> Vec { - let now = time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); let mut batched_payloads = Vec::new(); let mut sketch_payload = SketchPayload::new(); let mut this_batch_size = 0u64; @@ -155,7 +143,7 @@ impl Aggregator { } false }) - .filter_map(|entry| build_sketch(now, &entry, self.tags.clone())) + .filter_map(|entry| build_sketch(&entry, self.tags.clone())) { let next_chunk_size = sketch.compute_size(); @@ -248,18 +236,23 @@ impl Aggregator { batched_payloads } - pub fn get_entry_by_id(&self, name: Ustr, tags: &Option) -> Option<&Metric> { - let id = metric::id(name, tags); + pub fn get_entry_by_id( + &self, + name: Ustr, + tags: &Option, + timestamp: i64, + ) -> Option<&Metric> { + let id = metric::id(name, tags, timestamp); self.map.find(id, |m| m.id == id) } } -fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Option { +fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option { let sketch = entry.value.get_sketch()?; let mut dogsketch = Dogsketch::default(); sketch.merge_to_dogsketch(&mut dogsketch); // TODO(Astuyve) allow users to specify timestamp - dogsketch.set_ts(now); + dogsketch.set_ts(entry.timestamp); let mut sketch = Sketch::default(); sketch.set_dogsketches(vec![dogsketch]); let name = entry.name.to_string(); @@ -286,10 +279,7 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option, metric_id: &str, value: f64) { + pub fn assert_sketch( + aggregator_mutex: &Mutex, + metric_id: &str, + value: f64, + timestamp: i64, + ) { let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None) { + if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, timestamp) { let metric = e.value.get_sketch().unwrap(); assert!((metric.max().unwrap() - value).abs() < PRECISION); assert!((metric.min().unwrap() - value).abs() < PRECISION); @@ -387,14 +385,20 @@ pub mod tests { #[cfg_attr(miri, ignore)] fn overflow() { let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed"); - let id1 = metric::id(metric1.name, &metric1.tags); - let id2 = metric::id(metric2.name, &metric2.tags); - let id3 = metric::id(metric3.name, &metric3.tags); + let id1 = metric::id(metric1.name, &metric1.tags, now); + let id2 = metric::id(metric2.name, &metric2.tags, now); + let id3 = metric::id(metric3.name, &metric3.tags, now); assert_ne!(id1, id2); assert_ne!(id1, id3); @@ -417,25 +421,30 @@ pub mod tests { #[cfg_attr(miri, ignore)] fn clear() { let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - - let metric1 = parse("test:3|c|#k1:v1").expect("metric parse failed"); - let metric2 = parse("foo:5|c|#k2:v2").expect("metric parse failed"); + let mut now = 1656581409; + now = (now / 10) * 10; + let metric1 = parse("test:3|c|#k1:v1|T1656581409").expect("metric parse failed"); + let metric2 = parse("foo:5|c|#k2:v2|T1656581409").expect("metric parse failed"); assert!(aggregator.insert(metric1).is_ok()); assert!(aggregator.insert(metric2).is_ok()); assert_eq!(aggregator.map.len(), 2); - if let Some(v) = - aggregator.get_entry_by_id("foo".into(), &Some(SortedTags::parse("k2:v2").unwrap())) - { + if let Some(v) = aggregator.get_entry_by_id( + "foo".into(), + &Some(SortedTags::parse("k2:v2").unwrap()), + now, + ) { assert_eq!(v.value.get_value().unwrap(), 5f64); } else { panic!("failed to get value by id"); } - if let Some(v) = - aggregator.get_entry_by_id("test".into(), &Some(SortedTags::parse("k1:v1").unwrap())) - { + if let Some(v) = aggregator.get_entry_by_id( + "test".into(), + &Some(SortedTags::parse("k1:v1").unwrap()), + now, + ) { assert_eq!(v.value.get_value().unwrap(), 3f64); } else { panic!("failed to get value by id"); diff --git a/dogstatsd/src/dogstatsd.rs b/dogstatsd/src/dogstatsd.rs index 41456966e..236c82d07 100644 --- a/dogstatsd/src/dogstatsd.rs +++ b/dogstatsd/src/dogstatsd.rs @@ -136,9 +136,9 @@ mod tests { #[cfg_attr(miri, ignore)] async fn test_dogstatsd_multi_distribution() { let locked_aggregator = setup_dogstatsd( - "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d -single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d -single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d + "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409 +single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409 +single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409 ", ) .await; @@ -154,24 +154,38 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d &locked_aggregator, "single_machine_performance.rouster.api.series_v2.payload_size_bytes", 269_942_f64, + 1656581400, ); assert_sketch( &locked_aggregator, "single_machine_performance.rouster.metrics_min_timestamp_latency", 1_426.908_702_16, + 1656581400, ); assert_sketch( &locked_aggregator, "single_machine_performance.rouster.metrics_max_timestamp_latency", 1_376.908_702_16, + 1656581400, ); } #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_dogstatsd_multi_metric() { + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; let locked_aggregator = setup_dogstatsd( - "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2\n", + format!( + "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n", + now + ) + .as_str(), ) .await; let aggregator = locked_aggregator.lock().expect("lock poisoned"); @@ -182,15 +196,21 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); drop(aggregator); - assert_value(&locked_aggregator, "metric1", 1.0, ""); - assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2"); - assert_value(&locked_aggregator, "metric3", 3.0, "tag3:val3,tag4:val4"); + assert_value(&locked_aggregator, "metric1", 1.0, "", now); + assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now); + assert_value( + &locked_aggregator, + "metric3", + 3.0, + "tag3:val3,tag4:val4", + now, + ); } #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_dogstatsd_single_metric() { - let locked_aggregator = setup_dogstatsd("metric123:99123|c").await; + let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await; let aggregator = locked_aggregator.lock().expect("lock poisoned"); let parsed_metrics = aggregator.to_series(); @@ -198,7 +218,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); drop(aggregator); - assert_value(&locked_aggregator, "metric123", 99_123.0, ""); + assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400); } #[tokio::test] diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index dcfb23c6c..f37943afd 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -13,9 +13,10 @@ use ustr::Ustr; pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() }; +// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell?tab=metrics#dogstatsd-protocol-v13 lazy_static! { static ref METRIC_REGEX: Regex = Regex::new( - r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?$", + r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?(?:\|T(?P[^|]+))?$", ).expect("Failed to create metric regex"); } @@ -169,20 +170,52 @@ pub struct Metric { /// ID given a name and tagset. pub id: u64, + // Timestamp + pub timestamp: i64, } impl Metric { - pub fn new(name: Ustr, value: MetricValue, tags: Option) -> Metric { - let id = id(name, &tags); + pub fn new( + name: Ustr, + value: MetricValue, + tags: Option, + timestamp: Option, + ) -> Metric { + let parsed_timestamp = timestamp_to_bucket(timestamp.unwrap_or_else(|| { + std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default() + })); + + let id = id(name, &tags, parsed_timestamp); Metric { name, value, tags, id, + timestamp: parsed_timestamp, } } } +// Round down to the nearest 10 seconds +// to form a bucket of metric contexts aggregated per 10s +pub fn timestamp_to_bucket(timestamp: i64) -> i64 { + let now_seconds: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + if timestamp > now_seconds { + return (now_seconds / 10) * 10; + } + (timestamp / 10) * 10 +} + /// Parse a metric from given input. /// /// This function parses a passed `&str` into a `Metric`. We assume that @@ -207,6 +240,17 @@ pub fn parse(input: &str) -> Result { } let val = first_value(caps.name("values").unwrap().as_str())?; let t = caps.name("type").unwrap().as_str(); + let now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + // let Metric::new() handle bucketing the timestamp + let parsed_timestamp: i64 = match caps.name("timestamp") { + Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or(now)), + None => timestamp_to_bucket(now), + }; let metric_value = match t { "c" => MetricValue::Count(val), "g" => MetricValue::Gauge(val), @@ -223,12 +267,13 @@ pub fn parse(input: &str) -> Result { } }; let name = Ustr::from(caps.name("name").unwrap().as_str()); - let id = id(name, &tags); + let id = id(name, &tags, parsed_timestamp); return Ok(Metric { name, value: metric_value, tags, id, + timestamp: parsed_timestamp, }); } Err(ParseError::Raw(format!("Invalid metric format {input}"))) @@ -258,10 +303,11 @@ fn first_value(values: &str) -> Result { /// from the point of view of this function. #[inline] #[must_use] -pub fn id(name: Ustr, tags: &Option) -> u64 { +pub fn id(name: Ustr, tags: &Option, timestamp: i64) -> u64 { let mut hasher = FnvHasher::default(); name.hash(&mut hasher); + timestamp.hash(&mut hasher); if let Some(tags_present) = tags { for kv in tags_present.values.iter() { kv.0.as_bytes().hash(&mut hasher); @@ -285,7 +331,7 @@ mod tests { use proptest::{collection, option, strategy::Strategy, string::string_regex}; use ustr::Ustr; - use crate::metric::{id, parse, MetricValue, SortedTags}; + use crate::metric::{id, parse, timestamp_to_bucket, MetricValue, SortedTags}; use super::ParseError; @@ -412,7 +458,6 @@ mod tests { let result = parse(&input); let verify = result.unwrap_err().to_string(); - println!("{}", verify); assert!(verify.starts_with("parse failure: Invalid metric format ")); } @@ -447,6 +492,7 @@ mod tests { mut tags in metric_tags()) { let mut tagset1 = String::new(); let mut tagset2 = String::new(); + let now = timestamp_to_bucket(std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default()); for (k,v) in &tags { tagset1.push_str(k); @@ -466,8 +512,8 @@ mod tests { tagset2.pop(); } - let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap())); - let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap())); + let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap()), now); + let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap()), now); assert_eq!(id1, id2); } @@ -551,6 +597,33 @@ mod tests { } } + #[test] + fn parse_metric_timestamp() { + // Important to test that we round down to the nearest 10 seconds + // for our buckets + let input = "page.views:15|c|#env:dev|T1656581409"; + let metric = parse(input).unwrap(); + assert_eq!(metric.timestamp, 1656581400); + } + + #[test] + fn parse_metric_no_timestamp() { + // *wince* this could be a race condition + // we round the timestamp down to a 10s bucket and I want to test now + // but if the timestamp rolls over to the next bucket time and the test + // is somehow slower than 1s then the test will fail. + // come bug me if I wrecked your CI run + let input = "page.views:15|c|#env:dev"; + let metric = parse(input).unwrap(); + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + assert_eq!(metric.timestamp, (now / 10) * 10); + } + #[test] fn sorting_tags() { let mut tags = SortedTags::parse("z:z0,b:b2,c:c3").unwrap();