From 33492be709bbfd0701ebaf00f38c864f15e6974f Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Mon, 24 Feb 2025 20:24:10 -0500 Subject: [PATCH 01/13] feat: set timestamp when metric is parsed --- dogstatsd/src/aggregator.rs | 20 ++++---------------- dogstatsd/src/metric.rs | 17 ++++++++++++++++- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index 196a9b761..c3a376773 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -117,19 +117,13 @@ impl Aggregator { #[must_use] pub fn distributions_to_protobuf(&self) -> SketchPayload { - let now = time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); let mut sketch_payload = SketchPayload::new(); sketch_payload.sketches = self .map .iter() .filter_map(|entry| match entry.value { - MetricValue::Distribution(_) => build_sketch(now, entry, self.tags.clone()), + MetricValue::Distribution(_) => build_sketch(entry, self.tags.clone()), _ => None, }) .collect(); @@ -138,12 +132,6 @@ impl Aggregator { #[must_use] pub fn consume_distributions(&mut self) -> Vec { - let now = time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); let mut batched_payloads = Vec::new(); let mut sketch_payload = SketchPayload::new(); let mut this_batch_size = 0u64; @@ -155,7 +143,7 @@ impl Aggregator { } false }) - .filter_map(|entry| build_sketch(now, &entry, self.tags.clone())) + .filter_map(|entry| build_sketch(&entry, self.tags.clone())) { let next_chunk_size = sketch.compute_size(); @@ -254,12 +242,12 @@ impl Aggregator { } } -fn build_sketch(now: i64, entry: &Metric, mut base_tag_vec: SortedTags) -> Option { +fn build_sketch(entry: &Metric, mut base_tag_vec: SortedTags) -> Option { let sketch = entry.value.get_sketch()?; let mut dogsketch = Dogsketch::default(); sketch.merge_to_dogsketch(&mut dogsketch); // TODO(Astuyve) allow users to specify timestamp - dogsketch.set_ts(now); + dogsketch.set_ts(entry.timestamp); let mut sketch = Sketch::default(); sketch.set_dogsketches(vec![dogsketch]); let name = entry.name.to_string(); diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index dcfb23c6c..c3a08bc07 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -169,16 +169,28 @@ pub struct Metric { /// ID given a name and tagset. pub id: u64, + // Timestamp + pub timestamp: i64, } impl Metric { - pub fn new(name: Ustr, value: MetricValue, tags: Option) -> Metric { + pub fn new(name: Ustr, value: MetricValue, tags: Option, timestamp: Option) -> Metric { let id = id(name, &tags); + let timestamp = timestamp.unwrap_or_else(|| { + std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default() + }); + Metric { name, value, tags, id, + timestamp, } } } @@ -222,13 +234,16 @@ pub fn parse(input: &str) -> Result { return Err(ParseError::Raw(format!("Invalid metric type: {t}"))); } }; + // TODO parse timestamp let name = Ustr::from(caps.name("name").unwrap().as_str()); let id = id(name, &tags); + let now = std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default(); return Ok(Metric { name, value: metric_value, tags, id, + timestamp: now, }); } Err(ParseError::Raw(format!("Invalid metric format {input}"))) From 3c034accbc4941fd5a323328f400ca1bf92b8d66 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Thu, 27 Feb 2025 11:02:48 -0500 Subject: [PATCH 02/13] feat: Ten second rolling buckets --- dogstatsd/src/aggregator.rs | 76 ++++++++++++++++++++++++++----------- dogstatsd/src/dogstatsd.rs | 38 ++++++++++++++----- dogstatsd/src/metric.rs | 72 ++++++++++++++++++++++++++++++----- 3 files changed, 144 insertions(+), 42 deletions(-) diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index c3a376773..62c03293c 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -91,13 +91,14 @@ impl Aggregator { /// Function will return overflow error if more than /// `min(constants::MAX_CONTEXTS, CONTEXTS)` is exceeded. pub fn insert(&mut self, metric: Metric) -> Result<(), errors::Insert> { - let id = metric::id(metric.name, &metric.tags); + let id = metric::id(metric.name, &metric.tags, metric.timestamp); let len = self.map.len(); - match self - .map - .entry(id, |m| m.id == id, |m| metric::id(m.name, &m.tags)) - { + match self.map.entry( + id, + |m| m.id == id, + |m| metric::id(m.name, &m.tags, m.timestamp), + ) { hash_table::Entry::Vacant(entry) => { if len >= self.max_context { return Err(errors::Insert::Overflow); @@ -236,8 +237,13 @@ impl Aggregator { batched_payloads } - pub fn get_entry_by_id(&self, name: Ustr, tags: &Option) -> Option<&Metric> { - let id = metric::id(name, tags); + pub fn get_entry_by_id( + &self, + name: Ustr, + tags: &Option, + timestamp: i64, + ) -> Option<&Metric> { + let id = metric::id(name, tags, timestamp); self.map.find(id, |m| m.id == id) } } @@ -316,11 +322,14 @@ pub mod tests { metric_id: &str, value: f64, tags: &str, + timestamp: i64, ) { let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = - aggregator.get_entry_by_id(metric_id.into(), &Some(SortedTags::parse(tags).unwrap())) - { + if let Some(e) = aggregator.get_entry_by_id( + metric_id.into(), + &Some(SortedTags::parse(tags).unwrap()), + timestamp, + ) { let metric = e.value.get_value().unwrap(); assert!((metric - value).abs() < PRECISION); } else { @@ -328,9 +337,14 @@ pub mod tests { } } - pub fn assert_sketch(aggregator_mutex: &Mutex, metric_id: &str, value: f64) { + pub fn assert_sketch( + aggregator_mutex: &Mutex, + metric_id: &str, + value: f64, + timestamp: i64, + ) { let aggregator = aggregator_mutex.lock().unwrap(); - if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None) { + if let Some(e) = aggregator.get_entry_by_id(metric_id.into(), &None, timestamp) { let metric = e.value.get_sketch().unwrap(); assert!((metric.max().unwrap() - value).abs() < PRECISION); assert!((metric.min().unwrap() - value).abs() < PRECISION); @@ -375,14 +389,20 @@ pub mod tests { #[cfg_attr(miri, ignore)] fn overflow() { let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; let metric1 = parse("test:1|c|#k:v").expect("metric parse failed"); let metric2 = parse("foo:1|c|#k:v").expect("metric parse failed"); let metric3 = parse("bar:1|c|#k:v").expect("metric parse failed"); - let id1 = metric::id(metric1.name, &metric1.tags); - let id2 = metric::id(metric2.name, &metric2.tags); - let id3 = metric::id(metric3.name, &metric3.tags); + let id1 = metric::id(metric1.name, &metric1.tags, now); + let id2 = metric::id(metric2.name, &metric2.tags, now); + let id3 = metric::id(metric3.name, &metric3.tags, now); assert_ne!(id1, id2); assert_ne!(id1, id3); @@ -405,7 +425,13 @@ pub mod tests { #[cfg_attr(miri, ignore)] fn clear() { let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; let metric1 = parse("test:3|c|#k1:v1").expect("metric parse failed"); let metric2 = parse("foo:5|c|#k2:v2").expect("metric parse failed"); @@ -413,17 +439,21 @@ pub mod tests { assert!(aggregator.insert(metric2).is_ok()); assert_eq!(aggregator.map.len(), 2); - if let Some(v) = - aggregator.get_entry_by_id("foo".into(), &Some(SortedTags::parse("k2:v2").unwrap())) - { + if let Some(v) = aggregator.get_entry_by_id( + "foo".into(), + &Some(SortedTags::parse("k2:v2").unwrap()), + now, + ) { assert_eq!(v.value.get_value().unwrap(), 5f64); } else { panic!("failed to get value by id"); } - if let Some(v) = - aggregator.get_entry_by_id("test".into(), &Some(SortedTags::parse("k1:v1").unwrap())) - { + if let Some(v) = aggregator.get_entry_by_id( + "test".into(), + &Some(SortedTags::parse("k1:v1").unwrap()), + now, + ) { assert_eq!(v.value.get_value().unwrap(), 3f64); } else { panic!("failed to get value by id"); diff --git a/dogstatsd/src/dogstatsd.rs b/dogstatsd/src/dogstatsd.rs index 41456966e..236c82d07 100644 --- a/dogstatsd/src/dogstatsd.rs +++ b/dogstatsd/src/dogstatsd.rs @@ -136,9 +136,9 @@ mod tests { #[cfg_attr(miri, ignore)] async fn test_dogstatsd_multi_distribution() { let locked_aggregator = setup_dogstatsd( - "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d -single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d -single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d + "single_machine_performance.rouster.api.series_v2.payload_size_bytes:269942|d|T1656581409 +single_machine_performance.rouster.metrics_min_timestamp_latency:1426.90870216|d|T1656581409 +single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d|T1656581409 ", ) .await; @@ -154,24 +154,38 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d &locked_aggregator, "single_machine_performance.rouster.api.series_v2.payload_size_bytes", 269_942_f64, + 1656581400, ); assert_sketch( &locked_aggregator, "single_machine_performance.rouster.metrics_min_timestamp_latency", 1_426.908_702_16, + 1656581400, ); assert_sketch( &locked_aggregator, "single_machine_performance.rouster.metrics_max_timestamp_latency", 1_376.908_702_16, + 1656581400, ); } #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_dogstatsd_multi_metric() { + let mut now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + now = (now / 10) * 10; let locked_aggregator = setup_dogstatsd( - "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2\n", + format!( + "metric3:3|c|#tag3:val3,tag4:val4\nmetric1:1|c\nmetric2:2|c|#tag2:val2|T{:}\n", + now + ) + .as_str(), ) .await; let aggregator = locked_aggregator.lock().expect("lock poisoned"); @@ -182,15 +196,21 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); drop(aggregator); - assert_value(&locked_aggregator, "metric1", 1.0, ""); - assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2"); - assert_value(&locked_aggregator, "metric3", 3.0, "tag3:val3,tag4:val4"); + assert_value(&locked_aggregator, "metric1", 1.0, "", now); + assert_value(&locked_aggregator, "metric2", 2.0, "tag2:val2", now); + assert_value( + &locked_aggregator, + "metric3", + 3.0, + "tag3:val3,tag4:val4", + now, + ); } #[tokio::test] #[cfg_attr(miri, ignore)] async fn test_dogstatsd_single_metric() { - let locked_aggregator = setup_dogstatsd("metric123:99123|c").await; + let locked_aggregator = setup_dogstatsd("metric123:99123|c|T1656581409").await; let aggregator = locked_aggregator.lock().expect("lock poisoned"); let parsed_metrics = aggregator.to_series(); @@ -198,7 +218,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d assert_eq!(aggregator.distributions_to_protobuf().sketches.len(), 0); drop(aggregator); - assert_value(&locked_aggregator, "metric123", 99_123.0, ""); + assert_value(&locked_aggregator, "metric123", 99_123.0, "", 1656581400); } #[tokio::test] diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index c3a08bc07..af939f26a 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -15,7 +15,7 @@ pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() }; lazy_static! { static ref METRIC_REGEX: Regex = Regex::new( - r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?$", + r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?(?:\|T(?P[^|]+))?$", ).expect("Failed to create metric regex"); } @@ -174,9 +174,13 @@ pub struct Metric { } impl Metric { - pub fn new(name: Ustr, value: MetricValue, tags: Option, timestamp: Option) -> Metric { - let id = id(name, &tags); - let timestamp = timestamp.unwrap_or_else(|| { + pub fn new( + name: Ustr, + value: MetricValue, + tags: Option, + timestamp: Option, + ) -> Metric { + let mut timestamp = timestamp.unwrap_or_else(|| { std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -184,7 +188,9 @@ impl Metric { .try_into() .unwrap_or_default() }); - + timestamp = (timestamp / 10) * 10; + + let id = id(name, &tags, timestamp); Metric { name, value, @@ -219,6 +225,22 @@ pub fn parse(input: &str) -> Result { } let val = first_value(caps.name("values").unwrap().as_str())?; let t = caps.name("type").unwrap().as_str(); + let mut now: i64 = match caps.name("timestamp") { + Some(ts) => ts.as_str().parse().unwrap_or_else(|_| { + std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default() + }), + None => std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(), + }; let metric_value = match t { "c" => MetricValue::Count(val), "g" => MetricValue::Gauge(val), @@ -236,8 +258,8 @@ pub fn parse(input: &str) -> Result { }; // TODO parse timestamp let name = Ustr::from(caps.name("name").unwrap().as_str()); - let id = id(name, &tags); - let now = std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default(); + now = (now / 10) * 10; + let id = id(name, &tags, now); return Ok(Metric { name, value: metric_value, @@ -273,10 +295,11 @@ fn first_value(values: &str) -> Result { /// from the point of view of this function. #[inline] #[must_use] -pub fn id(name: Ustr, tags: &Option) -> u64 { +pub fn id(name: Ustr, tags: &Option, timestamp: i64) -> u64 { let mut hasher = FnvHasher::default(); name.hash(&mut hasher); + timestamp.hash(&mut hasher); if let Some(tags_present) = tags { for kv in tags_present.values.iter() { kv.0.as_bytes().hash(&mut hasher); @@ -462,6 +485,8 @@ mod tests { mut tags in metric_tags()) { let mut tagset1 = String::new(); let mut tagset2 = String::new(); + let mut now = std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default(); + now = (now / 10) * 10; for (k,v) in &tags { tagset1.push_str(k); @@ -481,8 +506,8 @@ mod tests { tagset2.pop(); } - let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap())); - let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap())); + let id1 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset1).unwrap()), now); + let id2 = id(Ustr::from(&name), &Some(SortedTags::parse(&tagset2).unwrap()), now); assert_eq!(id1, id2); } @@ -566,6 +591,33 @@ mod tests { } } + #[test] + fn parse_metric_timestamp() { + // Important to test that we round down to the nearest 10 seconds + // for our buckets + let input = "page.views:15|c|#env:dev|T1656581409"; + let metric = parse(input).unwrap(); + assert_eq!(metric.timestamp, 1656581400); + } + + #[test] + fn parse_metric_no_timestamp() { + // *wince* this could be a race condition + // we round the timestamp down to a 10s bucket and I want to test now + // but if the timestamp rolls over to the next bucket time and the test + // is somehow slower than 1s then the test will fail. + // come bug me if I wrecked your CI run + let input = "page.views:15|c|#env:dev"; + let metric = parse(input).unwrap(); + let now: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + assert_eq!(metric.timestamp, (now / 10) * 10); + } + #[test] fn sorting_tags() { let mut tags = SortedTags::parse("z:z0,b:b2,c:c3").unwrap(); From f24ea40ab5b59583210e5136cf9ba3663de2aec2 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Thu, 27 Feb 2025 13:49:34 -0500 Subject: [PATCH 03/13] debuggin --- dogstatsd/src/aggregator.rs | 5 +---- dogstatsd/src/metric.rs | 9 +++++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index 62c03293c..ea55eeeb3 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -280,10 +280,7 @@ fn build_metric(entry: &Metric, mut base_tag_vec: SortedTags) -> Option, timestamp: Option, ) -> Metric { - let mut timestamp = timestamp.unwrap_or_else(|| { + let mut parsed_timestamp = timestamp.unwrap_or_else(|| { + println!("can't parse timestamp, using default"); std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") @@ -188,15 +189,15 @@ impl Metric { .try_into() .unwrap_or_default() }); - timestamp = (timestamp / 10) * 10; + parsed_timestamp = (parsed_timestamp / 10) * 10; - let id = id(name, &tags, timestamp); + let id = id(name, &tags, parsed_timestamp); Metric { name, value, tags, id, - timestamp, + timestamp: parsed_timestamp, } } } From 1716f634011b3d600cd6e81fd67cdb85651d7461 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 08:55:31 -0500 Subject: [PATCH 04/13] remove debug line --- dogstatsd/src/metric.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index bb7f156ac..04c92997b 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -181,7 +181,6 @@ impl Metric { timestamp: Option, ) -> Metric { let mut parsed_timestamp = timestamp.unwrap_or_else(|| { - println!("can't parse timestamp, using default"); std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") From 2b88bca0d8a3c8dd954b12f4485b349d434958ee Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 08:59:09 -0500 Subject: [PATCH 05/13] fix: remove time from aggr --- dogstatsd/src/aggregator.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index ea55eeeb3..93dcf5d54 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -7,7 +7,6 @@ use crate::constants; use crate::datadog::{self, Metric as MetricToShip, Series}; use crate::errors; use crate::metric::{self, Metric, MetricValue, SortedTags}; -use std::time; use datadog_protos::metrics::{Dogsketch, Sketch, SketchPayload}; use ddsketch_agent::DDSketch; From 78ce0434cead615f23f672d1e61d2f85e0ee41eb Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 13:18:55 -0500 Subject: [PATCH 06/13] feat: Clean up parse code, refactor floored buckets to metrics --- dogstatsd/src/metric.rs | 47 ++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 04c92997b..d0be42fab 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -180,15 +180,14 @@ impl Metric { tags: Option, timestamp: Option, ) -> Metric { - let mut parsed_timestamp = timestamp.unwrap_or_else(|| { + let parsed_timestamp = timestamp_to_bucket(timestamp.unwrap_or_else(|| { std::time::UNIX_EPOCH .elapsed() .expect("unable to poll clock, unrecoverable") .as_secs() .try_into() .unwrap_or_default() - }); - parsed_timestamp = (parsed_timestamp / 10) * 10; + })); let id = id(name, &tags, parsed_timestamp); Metric { @@ -201,6 +200,12 @@ impl Metric { } } +// Round down to the nearest 10 seconds +// to form a bucket of metric contexts aggregated per 10s +pub fn timestamp_to_bucket(timestamp: i64) -> i64 { + (timestamp / 10) * 10 +} + /// Parse a metric from given input. /// /// This function parses a passed `&str` into a `Metric`. We assume that @@ -225,21 +230,15 @@ pub fn parse(input: &str) -> Result { } let val = first_value(caps.name("values").unwrap().as_str())?; let t = caps.name("type").unwrap().as_str(); - let mut now: i64 = match caps.name("timestamp") { - Some(ts) => ts.as_str().parse().unwrap_or_else(|_| { - std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default() - }), - None => std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(), + let now = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + let parsed_timestamp: i64 = match caps.name("timestamp") { + Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or_else(|_| now)), + None => timestamp_to_bucket(now), }; let metric_value = match t { "c" => MetricValue::Count(val), @@ -256,16 +255,14 @@ pub fn parse(input: &str) -> Result { return Err(ParseError::Raw(format!("Invalid metric type: {t}"))); } }; - // TODO parse timestamp let name = Ustr::from(caps.name("name").unwrap().as_str()); - now = (now / 10) * 10; let id = id(name, &tags, now); return Ok(Metric { name, value: metric_value, tags, id, - timestamp: now, + timestamp: parsed_timestamp, }); } Err(ParseError::Raw(format!("Invalid metric format {input}"))) @@ -323,7 +320,10 @@ mod tests { use proptest::{collection, option, strategy::Strategy, string::string_regex}; use ustr::Ustr; - use crate::metric::{id, parse, MetricValue, SortedTags}; + use crate::{ + datadog::Metric, + metric::{id, parse, timestamp_to_bucket, MetricValue, SortedTags}, + }; use super::ParseError; @@ -485,8 +485,7 @@ mod tests { mut tags in metric_tags()) { let mut tagset1 = String::new(); let mut tagset2 = String::new(); - let mut now = std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default(); - now = (now / 10) * 10; + let now = timestamp_to_bucket(std::time::UNIX_EPOCH.elapsed().expect("unable to poll clock, unrecoverable").as_secs().try_into().unwrap_or_default()); for (k,v) in &tags { tagset1.push_str(k); From 6d09005f47bf0fd17ba85cac71e9d400990ab49d Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 13:27:57 -0500 Subject: [PATCH 07/13] feat: comment docs --- dogstatsd/src/metric.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index d0be42fab..5d44db9bd 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -13,6 +13,7 @@ use ustr::Ustr; pub const EMPTY_TAGS: SortedTags = SortedTags { values: Vec::new() }; +// https://docs.datadoghq.com/developers/dogstatsd/datagram_shell?tab=metrics#dogstatsd-protocol-v13 lazy_static! { static ref METRIC_REGEX: Regex = Regex::new( r"^(?P[^:]+):(?P[^|]+)\|(?P[a-zA-Z]+)(?:\|@(?P[\d.]+))?(?:\|#(?P[^|]+))?(?:\|c:(?P[^|]+))?(?:\|T(?P[^|]+))?$", From 1521c4c8319bbd2e8ed8d84f849655af013dd5bb Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 14:20:05 -0500 Subject: [PATCH 08/13] feat: Cleanup impl --- dogstatsd/src/metric.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 5d44db9bd..9412aeb35 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -204,6 +204,15 @@ impl Metric { // Round down to the nearest 10 seconds // to form a bucket of metric contexts aggregated per 10s pub fn timestamp_to_bucket(timestamp: i64) -> i64 { + let now_seconds: i64 = std::time::UNIX_EPOCH + .elapsed() + .expect("unable to poll clock, unrecoverable") + .as_secs() + .try_into() + .unwrap_or_default(); + if timestamp > now_seconds { + return (now_seconds / 10) * 10; + } (timestamp / 10) * 10 } @@ -237,9 +246,10 @@ pub fn parse(input: &str) -> Result { .as_secs() .try_into() .unwrap_or_default(); + // let Metric::new() handle bucketing the timestamp let parsed_timestamp: i64 = match caps.name("timestamp") { - Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or_else(|_| now)), - None => timestamp_to_bucket(now), + Some(ts) => ts.as_str().parse().unwrap_or_else(|_| now), + None => now, }; let metric_value = match t { "c" => MetricValue::Count(val), From 778795e6739cfe7ebeae405e85a614c9d7eba8a7 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 14:26:04 -0500 Subject: [PATCH 09/13] fix: call method --- dogstatsd/src/metric.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 9412aeb35..c76839083 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -190,6 +190,7 @@ impl Metric { .unwrap_or_default() })); + println!("parsed_timestamp: {}", parsed_timestamp); let id = id(name, &tags, parsed_timestamp); Metric { name, @@ -248,8 +249,8 @@ pub fn parse(input: &str) -> Result { .unwrap_or_default(); // let Metric::new() handle bucketing the timestamp let parsed_timestamp: i64 = match caps.name("timestamp") { - Some(ts) => ts.as_str().parse().unwrap_or_else(|_| now), - None => now, + Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or_else(|_| now)), + None => timestamp_to_bucket(now), }; let metric_value = match t { "c" => MetricValue::Count(val), @@ -331,10 +332,7 @@ mod tests { use proptest::{collection, option, strategy::Strategy, string::string_regex}; use ustr::Ustr; - use crate::{ - datadog::Metric, - metric::{id, parse, timestamp_to_bucket, MetricValue, SortedTags}, - }; + use crate::metric::{id, parse, timestamp_to_bucket, MetricValue, SortedTags}; use super::ParseError; From 3d044cd3d7acb897e15db2ded156d7286657dd65 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 14:30:34 -0500 Subject: [PATCH 10/13] fix: thx clippy --- dogstatsd/src/metric.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index c76839083..65e3421a4 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -249,7 +249,7 @@ pub fn parse(input: &str) -> Result { .unwrap_or_default(); // let Metric::new() handle bucketing the timestamp let parsed_timestamp: i64 = match caps.name("timestamp") { - Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or_else(|_| now)), + Some(ts) => timestamp_to_bucket(ts.as_str().parse().unwrap_or(now)), None => timestamp_to_bucket(now), }; let metric_value = match t { From ac72881739d2be88ce7be7fdee764c37902dc9f2 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 14:41:44 -0500 Subject: [PATCH 11/13] fix: make clear() deterministic --- dogstatsd/src/aggregator.rs | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/dogstatsd/src/aggregator.rs b/dogstatsd/src/aggregator.rs index 93dcf5d54..f31a3b0fc 100644 --- a/dogstatsd/src/aggregator.rs +++ b/dogstatsd/src/aggregator.rs @@ -421,15 +421,10 @@ pub mod tests { #[cfg_attr(miri, ignore)] fn clear() { let mut aggregator = Aggregator::new(EMPTY_TAGS, 2).unwrap(); - let mut now = std::time::UNIX_EPOCH - .elapsed() - .expect("unable to poll clock, unrecoverable") - .as_secs() - .try_into() - .unwrap_or_default(); + let mut now = 1656581409; now = (now / 10) * 10; - let metric1 = parse("test:3|c|#k1:v1").expect("metric parse failed"); - let metric2 = parse("foo:5|c|#k2:v2").expect("metric parse failed"); + let metric1 = parse("test:3|c|#k1:v1|T1656581409").expect("metric parse failed"); + let metric2 = parse("foo:5|c|#k2:v2|T1656581409").expect("metric parse failed"); assert!(aggregator.insert(metric1).is_ok()); assert!(aggregator.insert(metric2).is_ok()); From c0ac5d005e69abba9547217d8abe92ccb87d8366 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 14:55:13 -0500 Subject: [PATCH 12/13] fix: used wrong variable --- dogstatsd/src/metric.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 65e3421a4..514bf9e75 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -268,7 +268,7 @@ pub fn parse(input: &str) -> Result { } }; let name = Ustr::from(caps.name("name").unwrap().as_str()); - let id = id(name, &tags, now); + let id = id(name, &tags, parsed_timestamp); return Ok(Metric { name, value: metric_value, From 9549d124f8cfa82995973eca3878e4225f945d32 Mon Sep 17 00:00:00 2001 From: AJ Stuyvenberg Date: Fri, 28 Feb 2025 15:00:22 -0500 Subject: [PATCH 13/13] println --- dogstatsd/src/metric.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/dogstatsd/src/metric.rs b/dogstatsd/src/metric.rs index 514bf9e75..f37943afd 100644 --- a/dogstatsd/src/metric.rs +++ b/dogstatsd/src/metric.rs @@ -190,7 +190,6 @@ impl Metric { .unwrap_or_default() })); - println!("parsed_timestamp: {}", parsed_timestamp); let id = id(name, &tags, parsed_timestamp); Metric { name, @@ -459,7 +458,6 @@ mod tests { let result = parse(&input); let verify = result.unwrap_err().to_string(); - println!("{}", verify); assert!(verify.starts_with("parse failure: Invalid metric format ")); }