From 70a5d7e6e0dbe75a0783b85f702d5c5e1b3d61aa Mon Sep 17 00:00:00 2001 From: Qinxuan Chen Date: Wed, 4 Sep 2024 01:29:04 +0800 Subject: [PATCH] refactor(layers/prometheus): provide builder APIs (#5072) * refactor(layers/prometheus): provide consistent APIs * fix fmt * add feature attr for observe mod * add some comments * update * fix fmt * update doc * apply review suggestions * adjust imports * fix doc * improve prometheus error handling * move parse_prometheus_error * fix doc example * move register_default and align path_label * update doc --- core/src/layers/mod.rs | 2 +- core/src/layers/observe/mod.rs | 51 ++++ core/src/layers/prometheus.rs | 425 +++++++++++++++++++++++---------- 3 files changed, 345 insertions(+), 133 deletions(-) diff --git a/core/src/layers/mod.rs b/core/src/layers/mod.rs index 11a4f399357..cb76475bc3b 100644 --- a/core/src/layers/mod.rs +++ b/core/src/layers/mod.rs @@ -62,7 +62,7 @@ pub use self::mime_guess::MimeGuessLayer; #[cfg(feature = "layers-prometheus")] mod prometheus; #[cfg(feature = "layers-prometheus")] -pub use self::prometheus::PrometheusLayer; +pub use self::prometheus::{PrometheusLayer, PrometheusLayerBuilder}; #[cfg(feature = "layers-prometheus-client")] mod prometheus_client; diff --git a/core/src/layers/observe/mod.rs b/core/src/layers/observe/mod.rs index 57e26f3c4b5..c7a9c2dd288 100644 --- a/core/src/layers/observe/mod.rs +++ b/core/src/layers/observe/mod.rs @@ -18,8 +18,20 @@ //! OpenDAL Observability Layer //! //! This module offers essential components to facilitate the implementation of observability in OpenDAL. +//! +//! # Prometheus Metrics +//! +//! These metrics are essential for understanding the behavior and performance of our applications. +//! +//! | Metric Name | Type | Description | Labels | +//! |------------------------------|-----------|--------------------------------------------------------------|-------------------------------------------------| +//! | operation_duration_seconds | Histogram | Histogram of time spent during opendal operations | scheme, namespace, root, operation, path | +//! | operation_bytes. | Histogram | Histogram of the bytes transferred during opendal operations | scheme, operation, root, operation, path | +//! | operation_errors_total | Counter | Error counter during opendal operations | scheme, operation, root, operation, path, error | +//! mod metrics; + pub use metrics::MetricMetadata; pub use metrics::MetricsAccessor; pub use metrics::MetricsIntercept; @@ -33,3 +45,42 @@ pub use metrics::LABEL_SCHEME; pub use metrics::METRIC_OPERATION_BYTES; pub use metrics::METRIC_OPERATION_DURATION_SECONDS; pub use metrics::METRIC_OPERATION_ERRORS_TOTAL; + +/// Return the path label value according to the given `path` and `level`. +/// +/// - level = 0: return `None`, which means we ignore the path label. +/// - level > 0: the path label will be the path split by "/" and get the last n level, +/// if n=1 and input path is "abc/def/ghi", and then we'll use "abc/" as the path label. +pub fn path_label_value(path: &str, level: usize) -> Option<&str> { + if path.is_empty() { + return None; + } + + if level > 0 { + let label_value = path + .char_indices() + .filter(|&(_, c)| c == '/') + .nth(level - 1) + .map_or(path, |(i, _)| &path[..i]); + Some(label_value) + } else { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_path_label_value() { + let path = "abc/def/ghi"; + assert_eq!(path_label_value(path, 0), None); + assert_eq!(path_label_value(path, 1), Some("abc")); + assert_eq!(path_label_value(path, 2), Some("abc/def")); + assert_eq!(path_label_value(path, 3), Some("abc/def/ghi")); + assert_eq!(path_label_value(path, usize::MAX), Some("abc/def/ghi")); + + assert_eq!(path_label_value("", 1), None); + } +} diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index adbc436ceab..458afd36b96 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -22,9 +22,8 @@ use prometheus::core::AtomicU64; use prometheus::core::GenericCounterVec; use prometheus::exponential_buckets; use prometheus::histogram_opts; -use prometheus::register_histogram_vec_with_registry; -use prometheus::register_int_counter_vec_with_registry; use prometheus::HistogramVec; +use prometheus::Opts; use prometheus::Registry; use crate::layers::observe; @@ -36,43 +35,28 @@ use crate::*; /// /// # Prometheus Metrics /// -/// In this section, we will introduce three metrics that are currently being exported by our project. These metrics are essential for understanding the behavior and performance of our applications. -/// -/// -/// | Metric Name | Type | Description | Labels | -/// |-------------------------|----------|---------------------------------------------------|---------------------| -/// | requests_total | Counter | Total times of 'create' operation being called | scheme, operation | -/// | requests_duration_seconds | Histogram | Histogram of the time spent on specific operation | scheme, operation | -/// | bytes_total | Histogram | Total size | scheme, operation | -/// +/// We provide several metrics, please see the documentation of [`observe`] module. /// For a more detailed explanation of these metrics and how they are used, please refer to the [Prometheus documentation](https://prometheus.io/docs/introduction/overview/). /// -/// # Histogram Configuration -/// -/// The metric buckets for these histograms are automatically generated based on the `exponential_buckets(0.01, 2.0, 16)` configuration. -/// /// # Examples /// -/// ```no_build -/// use log::debug; -/// use log::info; -/// use opendal::layers::PrometheusLayer; -/// use opendal::services; -/// use opendal::Operator; -/// use opendal::Result; -/// use prometheus::Encoder; +/// ```no_run +/// # use log::debug; +/// # use log::info; +/// # use opendal::layers::PrometheusLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// # use opendal::Result; +/// # use prometheus::Encoder; /// -/// /// Visit [`opendal::services`] for more service related config. -/// /// Visit [`opendal::Operator`] for more operator level APIs. -/// #[tokio::main] -/// async fn main() -> Result<()> { +/// # #[tokio::main] +/// # async fn main() -> Result<()> { /// // Pick a builder and configure it. /// let builder = services::Memory::default(); /// let registry = prometheus::default_registry(); /// -/// let op = Operator::new(builder) -/// .expect("must init") -/// .layer(PrometheusLayer::new(registry.clone())) +/// let op = Operator::new(builder)? +/// .layer(PrometheusLayer::builder().register(registry).expect("register metrics successfully")) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -93,32 +77,125 @@ use crate::*; /// println!("## Prometheus Metrics"); /// println!("{}", String::from_utf8(buffer.clone()).unwrap()); /// Ok(()) -/// } +/// # } /// ``` #[derive(Clone, Debug)] pub struct PrometheusLayer { - registry: Registry, + interceptor: PrometheusInterceptor, +} + +impl PrometheusLayer { + /// Create a [`PrometheusLayerBuilder`] to set the configuration of metrics. + /// + /// # Default Configuration + /// + /// - `operation_duration_seconds_buckets`: `exponential_buckets(0.01, 2.0, 16)` + /// - `operation_bytes_buckets`: `exponential_buckets(1.0, 2.0, 16)` + /// - `path_label`: `0` + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let duration_seconds_buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let bytes_buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_duration_seconds_buckets(duration_seconds_buckets) + /// .operation_bytes_buckets(bytes_buckets) + /// .path_label(0) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn builder() -> PrometheusLayerBuilder { + let operation_duration_seconds_buckets = exponential_buckets(0.01, 2.0, 16).unwrap(); + let operation_bytes_buckets = exponential_buckets(1.0, 2.0, 16).unwrap(); + let path_label_level = 0; + PrometheusLayerBuilder::new( + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, + ) + } +} + +impl Layer for PrometheusLayer { + type LayeredAccess = observe::MetricsAccessor; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) + } +} + +/// [`PrometheusLayerBuilder`] is a config builder to build a [`PrometheusLayer`]. +pub struct PrometheusLayerBuilder { operation_duration_seconds_buckets: Vec, operation_bytes_buckets: Vec, path_label_level: usize, } -impl PrometheusLayer { - /// Create a [`PrometheusLayer`] while registering itself to this registry. - pub fn new(registry: Registry) -> Self { +impl PrometheusLayerBuilder { + fn new( + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, + path_label_level: usize, + ) -> Self { Self { - registry, - operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), - operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), - path_label_level: 0, + operation_duration_seconds_buckets, + operation_bytes_buckets, + path_label_level, } } /// Set buckets for `operation_duration_seconds` histogram. /// - /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) - /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) - /// to generate the buckets. + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(0.01, 2.0, 16).unwrap(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_duration_seconds_buckets(buckets) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_duration_seconds_buckets = buckets; @@ -128,9 +205,35 @@ impl PrometheusLayer { /// Set buckets for `operation_bytes` histogram. /// - /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) - /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) - /// to generate the buckets. + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let buckets = prometheus::exponential_buckets(1.0, 2.0, 16).unwrap(); + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .operation_bytes_buckets(buckets) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { self.operation_bytes_buckets = buckets; @@ -143,81 +246,167 @@ impl PrometheusLayer { /// - level = 0: we will ignore the path label. /// - level > 0: the path label will be the path split by "/" and get the last n level, /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. - pub fn enable_path_label(mut self, level: usize) -> Self { + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .path_label(1) + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn path_label(mut self, level: usize) -> Self { self.path_label_level = level; self } -} - -impl Layer for PrometheusLayer { - type LayeredAccess = observe::MetricsAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - let interceptor = PrometheusInterceptor::register( - self.registry.clone(), - self.operation_duration_seconds_buckets.clone(), - self.operation_bytes_buckets.clone(), - self.path_label_level, - ); - observe::MetricsLayer::new(interceptor).layer(inner) - } -} - -#[derive(Clone, Debug)] -pub struct PrometheusInterceptor { - operation_duration_seconds: HistogramVec, - operation_bytes: HistogramVec, - operation_errors_total: GenericCounterVec, - path_label_level: usize, -} -impl PrometheusInterceptor { - fn register( - registry: Registry, - operation_duration_seconds_buckets: Vec, - operation_bytes_buckets: Vec, - path_label_level: usize, - ) -> Self { - let labels = OperationLabels::names(false, path_label_level); - let operation_duration_seconds = register_histogram_vec_with_registry!( + /// Register the metrics into the given registry and return a [`PrometheusLayer`]. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// let registry = prometheus::default_registry(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .register(registry) + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register(self, registry: &Registry) -> Result { + let labels = OperationLabels::names(false, self.path_label_level); + let operation_duration_seconds = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_DURATION_SECONDS.name(), observe::METRIC_OPERATION_DURATION_SECONDS.help(), - operation_duration_seconds_buckets + self.operation_duration_seconds_buckets ), &labels, - registry ) - .unwrap(); - let operation_bytes = register_histogram_vec_with_registry!( + .map_err(parse_prometheus_error)?; + let operation_bytes = HistogramVec::new( histogram_opts!( observe::METRIC_OPERATION_BYTES.name(), observe::METRIC_OPERATION_BYTES.help(), - operation_bytes_buckets + self.operation_bytes_buckets ), &labels, - registry ) - .unwrap(); + .map_err(parse_prometheus_error)?; - let labels = OperationLabels::names(true, path_label_level); - let operation_errors_total = register_int_counter_vec_with_registry!( - observe::METRIC_OPERATION_ERRORS_TOTAL.name(), - observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + let labels = OperationLabels::names(true, self.path_label_level); + let operation_errors_total = GenericCounterVec::new( + Opts::new( + observe::METRIC_OPERATION_ERRORS_TOTAL.name(), + observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + ), &labels, - registry ) - .unwrap(); + .map_err(parse_prometheus_error)?; + + registry + .register(Box::new(operation_duration_seconds.clone())) + .map_err(parse_prometheus_error)?; + registry + .register(Box::new(operation_bytes.clone())) + .map_err(parse_prometheus_error)?; + registry + .register(Box::new(operation_errors_total.clone())) + .map_err(parse_prometheus_error)?; + + Ok(PrometheusLayer { + interceptor: PrometheusInterceptor { + operation_duration_seconds, + operation_bytes, + operation_errors_total, + path_label_level: self.path_label_level, + }, + }) + } - Self { - operation_duration_seconds, - operation_bytes, - operation_errors_total, - path_label_level, - } + /// Register the metrics into the default registry and return a [`PrometheusLayer`]. + /// + /// # Example + /// + /// ```no_run + /// # use log::debug; + /// # use opendal::layers::PrometheusLayer; + /// # use opendal::services; + /// # use opendal::Operator; + /// # use opendal::Result; + /// # + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// // Pick a builder and configure it. + /// let builder = services::Memory::default(); + /// + /// let op = Operator::new(builder)? + /// .layer( + /// PrometheusLayer::builder() + /// .register_default() + /// .expect("register metrics successfully") + /// ) + /// .finish(); + /// debug!("operator: {op:?}"); + /// + /// Ok(()) + /// # } + /// ``` + pub fn register_default(self) -> Result { + let registry = prometheus::default_registry(); + self.register(registry) } } +/// Convert the [`prometheus::Error`] to [`Error`]. +fn parse_prometheus_error(err: prometheus::Error) -> Error { + Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err) +} + +#[derive(Clone, Debug)] +pub struct PrometheusInterceptor { + operation_duration_seconds: HistogramVec, + operation_bytes: HistogramVec, + operation_errors_total: GenericCounterVec, + path_label_level: usize, +} + impl observe::MetricsIntercept for PrometheusInterceptor { fn observe_operation_duration_seconds( &self, @@ -232,7 +421,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: None, path, } @@ -256,7 +445,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: None, path, } @@ -280,7 +469,7 @@ impl observe::MetricsIntercept for PrometheusInterceptor { scheme, namespace: &namespace, root: &root, - op, + operation: op, error: Some(error), path, } @@ -294,7 +483,7 @@ struct OperationLabels<'a> { scheme: Scheme, namespace: &'a str, root: &'a str, - op: Operation, + operation: Operation, path: &'a str, error: Option, } @@ -334,11 +523,11 @@ impl<'a> OperationLabels<'a> { self.scheme.into_static(), self.namespace, self.root, - self.op.into_static(), + self.operation.into_static(), ]); - if path_label_level > 0 { - labels.push(get_path_label(self.path, path_label_level)); + if let Some(path) = observe::path_label_value(self.path, path_label_level) { + labels.push(path); } if let Some(error) = self.error { @@ -348,31 +537,3 @@ impl<'a> OperationLabels<'a> { labels } } - -fn get_path_label(path: &str, path_level: usize) -> &str { - if path_level > 0 { - return path - .char_indices() - .filter(|&(_, c)| c == '/') - .nth(path_level - 1) - .map_or(path, |(i, _)| &path[..i]); - } - "" -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_get_path_label() { - let path = "abc/def/ghi"; - assert_eq!(get_path_label(path, 0), ""); - assert_eq!(get_path_label(path, 1), "abc"); - assert_eq!(get_path_label(path, 2), "abc/def"); - assert_eq!(get_path_label(path, 3), "abc/def/ghi"); - assert_eq!(get_path_label(path, usize::MAX), "abc/def/ghi"); - - assert_eq!(get_path_label("", 0), ""); - } -}