Skip to content

Commit

Permalink
fix: create histogram buckets in a range (#2144)
Browse files Browse the repository at this point in the history
Signed-off-by: Sidhant Kohli <[email protected]>
  • Loading branch information
kohlisid authored Oct 15, 2024
1 parent b268639 commit 8bf9679
Showing 1 changed file with 78 additions and 6 deletions.
84 changes: 78 additions & 6 deletions rust/numaflow-core/src/monovertex/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::BTreeMap;
use std::iter;
use std::net::SocketAddr;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
Expand All @@ -13,7 +14,7 @@ use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;
use rcgen::{generate_simple_self_signed, CertifiedKey};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -126,6 +127,27 @@ pub struct MonoVtxMetrics {
pub sink_time: Family<Vec<(String, String)>, Histogram>,
}

/// Exponential bucket distribution with range.
/// Creates `length` buckets, where the lowest bucket is `min` and the highest bucket is `max`.
/// The final +Inf bucket is not counted and not included in the returned iterator.
/// The function panics if `length` is 0 or negative, or if `min` is 0 or negative.
fn exponential_buckets_range(min: f64, max: f64, length: u16) -> impl Iterator<Item = f64> {
if length < 1 {
panic!("ExponentialBucketsRange length needs a positive length");
}
if min <= 0.0 {
panic!("ExponentialBucketsRange min needs to be greater than 0");
}

// We know max/min and highest bucket. Solve for growth_factor.
let growth_factor = (max / min).powf(1.0 / (length as f64 - 1.0));

iter::repeat(())
.enumerate()
.map(move |(i, _)| min * growth_factor.powf(i as f64))
.take(length.into())
}

/// impl the MonoVtxMetrics struct and create a new object
impl MonoVtxMetrics {
fn new() -> Self {
Expand All @@ -139,20 +161,21 @@ impl MonoVtxMetrics {
// gauge
source_pending: Family::<Vec<(String, String)>, Gauge>::default(),
// timers
// exponential buckets in the range 100 microseconds to 15 minutes
e2e_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(100.0, 60000000.0 * 15.0, 10))
Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10))
}),
read_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(100.0, 60000000.0 * 15.0, 10))
Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10))
}),
transform_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(
|| Histogram::new(exponential_buckets(100.0, 60000000.0 * 15.0, 10)),
|| Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10)),
),
ack_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(100.0, 60000000.0 * 15.0, 10))
Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10))
}),
sink_time: Family::<Vec<(String, String)>, Histogram>::new_with_constructor(|| {
Histogram::new(exponential_buckets(100.0, 60000000.0 * 15.0, 10))
Histogram::new(exponential_buckets_range(100.0, 60000000.0 * 15.0, 10))
}),
};

Expand Down Expand Up @@ -752,4 +775,53 @@ mod tests {
}
assert_eq!(stored_values, [15, 20, 18, 18]);
}
#[test]
fn test_exponential_buckets_range_basic() {
let min = 1.0;
let max = 32.0;
let length = 6;
let buckets: Vec<f64> = exponential_buckets_range(min, max, length).collect();
let expected = vec![1.0, 2.0, 4.0, 8.0, 16.0, 32.0];
assert_eq!(buckets, expected);
}

#[test]
fn test_exponential_buckets_range_mico_to_seconds_minutes() {
let min = 100.0;
let max = 60000000.0 * 15.0;
let length = 10;
let buckets: Vec<f64> = exponential_buckets_range(min, max, length).collect();
let expected: Vec<f64> = vec![
100.0,
592.5071727239734,
3510.6474972935644,
20800.838230519028,
123246.45850253566,
730244.1067557991,
4.32674871092222e+06,
2.5636296457956206e+07,
1.5189689533417246e+08,
8.999999999999983e+08,
];
for (i, bucket) in buckets.iter().enumerate() {
assert!((bucket - expected[i]).abs() < 1e-2);
}
}
#[test]
#[should_panic(expected = "ExponentialBucketsRange length needs a positive length")]
fn test_exponential_buckets_range_zero_length() {
let _ = exponential_buckets_range(1.0, 100.0, 0).collect::<Vec<f64>>();
}

#[test]
#[should_panic(expected = "ExponentialBucketsRange min needs to be greater than 0")]
fn test_exponential_buckets_range_zero_min() {
let _ = exponential_buckets_range(0.0, 100.0, 10).collect::<Vec<f64>>();
}

#[test]
#[should_panic(expected = "ExponentialBucketsRange min needs to be greater than 0")]
fn test_exponential_buckets_range_negative_min() {
let _ = exponential_buckets_range(-1.0, 100.0, 10).collect::<Vec<f64>>();
}
}

0 comments on commit 8bf9679

Please sign in to comment.