diff --git a/core/src/layers/prometheus.rs b/core/src/layers/prometheus.rs index 8eedb002750..adbc436ceab 100644 --- a/core/src/layers/prometheus.rs +++ b/core/src/layers/prometheus.rs @@ -15,13 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; +use std::time::Duration; -use bytes::Buf; -use futures::TryFutureExt; -use log::debug; use prometheus::core::AtomicU64; use prometheus::core::GenericCounterVec; use prometheus::exponential_buckets; @@ -31,6 +27,7 @@ use prometheus::register_int_counter_vec_with_registry; use prometheus::HistogramVec; use prometheus::Registry; +use crate::layers::observe; use crate::raw::Access; use crate::raw::*; use crate::*; @@ -75,7 +72,7 @@ use crate::*; /// /// let op = Operator::new(builder) /// .expect("must init") -/// .layer(PrometheusLayer::with_registry(registry.clone())) +/// .layer(PrometheusLayer::new(registry.clone())) /// .finish(); /// debug!("operator: {op:?}"); /// @@ -83,7 +80,7 @@ use crate::*; /// op.write("test", "Hello, World!").await?; /// // Read data from object. /// let bs = op.read("test").await?; -/// info!("content: {}", String::from_utf8_lossy(&bs)); +/// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes())); /// /// // Get object metadata. /// let meta = op.stat("test").await?; @@ -98,44 +95,54 @@ use crate::*; /// Ok(()) /// } /// ``` -#[derive(Default, Debug, Clone)] +#[derive(Clone, Debug)] pub struct PrometheusLayer { registry: Registry, - requests_duration_seconds_buckets: Vec, - bytes_total_buckets: Vec, + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, path_label_level: usize, } impl PrometheusLayer { - /// create PrometheusLayer by incoming registry. - pub fn with_registry(registry: Registry) -> Self { + /// Create a [`PrometheusLayer`] while registering itself to this registry. + pub fn new(registry: Registry) -> Self { Self { registry, - requests_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), - bytes_total_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), + operation_duration_seconds_buckets: exponential_buckets(0.01, 2.0, 16).unwrap(), + operation_bytes_buckets: exponential_buckets(1.0, 2.0, 16).unwrap(), path_label_level: 0, } } - /// set buckets for requests_duration_seconds - pub fn requests_duration_seconds_buckets(mut self, buckets: Vec) -> Self { + /// Set buckets for `operation_duration_seconds` histogram. + /// + /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) + /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) + /// to generate the buckets. + pub fn operation_duration_seconds_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.requests_duration_seconds_buckets = buckets; + self.operation_duration_seconds_buckets = buckets; } self } - /// set buckets for bytes_total - pub fn bytes_total_buckets(mut self, buckets: Vec) -> Self { + /// Set buckets for `operation_bytes` histogram. + /// + /// You could call the [`linear_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.linear_buckets.html) + /// or [`exponential_buckets`](https://docs.rs/prometheus/latest/prometheus/fn.exponential_buckets.html) + /// to generate the buckets. + pub fn operation_bytes_buckets(mut self, buckets: Vec) -> Self { if !buckets.is_empty() { - self.bytes_total_buckets = buckets; + self.operation_bytes_buckets = buckets; } self } - /// set path label level - /// 0: no path label, the path label will be the "" - /// >0: the path label will be the path split by "/" and get the last n level, like "/abc/def/ghi", if n=1, the path label will be "/abc" + /// Set the level of path label. + /// + /// - level = 0: we will ignore the path label. + /// - level > 0: the path label will be the path split by "/" and get the last n level, + /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. pub fn enable_path_label(mut self, level: usize) -> Self { self.path_label_level = level; self @@ -143,739 +150,202 @@ impl PrometheusLayer { } impl Layer for PrometheusLayer { - type LayeredAccess = PrometheusAccessor; + type LayeredAccess = observe::MetricsAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - let meta = inner.info(); - let scheme = meta.scheme(); - - PrometheusAccessor { - inner, - stats: Arc::new(PrometheusMetrics::new( - self.registry.clone(), - self.requests_duration_seconds_buckets.clone(), - self.bytes_total_buckets.clone(), - self.path_label_level, - )), - scheme, - } + let interceptor = PrometheusInterceptor::register( + self.registry.clone(), + self.operation_duration_seconds_buckets.clone(), + self.operation_bytes_buckets.clone(), + self.path_label_level, + ); + observe::MetricsLayer::new(interceptor).layer(inner) } } -/// [`PrometheusMetrics`] provide the performance and IO metrics. -#[derive(Debug)] -pub struct PrometheusMetrics { - /// Total times of the specific operation be called. - pub requests_total: GenericCounterVec, - /// Latency of the specific operation be called. - pub requests_duration_seconds: HistogramVec, - /// Size of the specific metrics. - pub bytes_total: HistogramVec, - /// The Path Level we will keep in the path label. - pub path_label_level: usize, +#[derive(Clone, Debug)] +pub struct PrometheusInterceptor { + operation_duration_seconds: HistogramVec, + operation_bytes: HistogramVec, + operation_errors_total: GenericCounterVec, + path_label_level: usize, } -impl PrometheusMetrics { - /// new with prometheus register. - pub fn new( +impl PrometheusInterceptor { + fn register( registry: Registry, - requests_duration_seconds_buckets: Vec, - bytes_total_buckets: Vec, + operation_duration_seconds_buckets: Vec, + operation_bytes_buckets: Vec, path_label_level: usize, ) -> Self { - let labels = if path_label_level > 0 { - vec!["scheme", "operation", "path"] - } else { - vec!["scheme", "operation"] - }; - let requests_total = register_int_counter_vec_with_registry!( - "requests_total", - "Total times of create be called", + let labels = OperationLabels::names(false, path_label_level); + let operation_duration_seconds = register_histogram_vec_with_registry!( + histogram_opts!( + observe::METRIC_OPERATION_DURATION_SECONDS.name(), + observe::METRIC_OPERATION_DURATION_SECONDS.help(), + operation_duration_seconds_buckets + ), + &labels, + registry + ) + .unwrap(); + let operation_bytes = register_histogram_vec_with_registry!( + histogram_opts!( + observe::METRIC_OPERATION_BYTES.name(), + observe::METRIC_OPERATION_BYTES.help(), + operation_bytes_buckets + ), &labels, registry ) .unwrap(); - let opts = histogram_opts!( - "requests_duration_seconds", - "Histogram of the time spent on specific operation", - requests_duration_seconds_buckets - ); - - let requests_duration_seconds = - register_histogram_vec_with_registry!(opts, &labels, registry).unwrap(); - let opts = histogram_opts!("bytes_total", "Total size of ", bytes_total_buckets); - let bytes_total = register_histogram_vec_with_registry!(opts, &labels, registry).unwrap(); + let labels = OperationLabels::names(true, path_label_level); + let operation_errors_total = register_int_counter_vec_with_registry!( + observe::METRIC_OPERATION_ERRORS_TOTAL.name(), + observe::METRIC_OPERATION_ERRORS_TOTAL.help(), + &labels, + registry + ) + .unwrap(); Self { - requests_total, - requests_duration_seconds, - bytes_total, + operation_duration_seconds, + operation_bytes, + operation_errors_total, path_label_level, } } - - /// error handling is the cold path, so we will not init error counters - /// in advance. - #[inline] - fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - debug!( - "Prometheus statistics metrics error, operation {} error {}", - op.into_static(), - kind.into_static() - ); - } - - /// generate metric label - pub fn generate_metric_label<'a>( - &self, - scheme: &'a str, - operation: &'a str, - path_label: &'a str, - ) -> Vec<&'a str> { - match self.path_label_level { - 0 => { - vec![scheme, operation] - } - n if n > 0 => { - let path_value = get_path_label(path_label, self.path_label_level); - vec![scheme, operation, path_value] - } - _ => { - vec![scheme, operation] - } - } - } -} - -#[derive(Clone)] -pub struct PrometheusAccessor { - inner: A, - stats: Arc, - scheme: Scheme, -} - -impl Debug for PrometheusAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PrometheusAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() - } } -impl LayeredAccess for PrometheusAccessor { - type Inner = A; - type Reader = PrometheusMetricWrapper; - type BlockingReader = PrometheusMetricWrapper; - type Writer = PrometheusMetricWrapper; - type BlockingWriter = PrometheusMetricWrapper; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::CreateDir.into_static(), - path, - ); - - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let create_res = self.inner.create_dir(path, args).await; - - timer.observe_duration(); - create_res.map_err(|e| { - self.stats - .increment_errors_total(Operation::CreateDir, e.kind()); - e - }) - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Read.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.read(path, args).await; - timer.observe_duration(); - - match res { - Ok((rp, r)) => Ok(( - rp, - PrometheusMetricWrapper::new( - r, - Operation::Read, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - )), - Err(err) => { - self.stats - .increment_errors_total(Operation::Read, err.kind()); - Err(err) - } - } - } - - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Write.into_static(), +impl observe::MetricsIntercept for PrometheusInterceptor { + fn observe_operation_duration_seconds( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + duration: Duration, + ) { + let labels = OperationLabels { + scheme, + namespace: &namespace, + root: &root, + op, + error: None, path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.write(path, args).await; - timer.observe_duration(); - - match res { - Ok((rp, w)) => Ok(( - rp, - PrometheusMetricWrapper::new( - w, - Operation::Write, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - )), - Err(err) => { - self.stats - .increment_errors_total(Operation::Write, err.kind()); - Err(err) - } } - } - - async fn stat(&self, path: &str, args: OpStat) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Stat.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - - let stat_res = self - .inner - .stat(path, args) - .inspect_err(|e| { - self.stats.increment_errors_total(Operation::Stat, e.kind()); - }) - .await; - timer.observe_duration(); - stat_res.map_err(|e| { - self.stats.increment_errors_total(Operation::Stat, e.kind()); - e - }) - } - - async fn delete(&self, path: &str, args: OpDelete) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Delete.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - - let delete_res = self.inner.delete(path, args).await; - timer.observe_duration(); - delete_res.map_err(|e| { - self.stats - .increment_errors_total(Operation::Delete, e.kind()); - e - }) - } - - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::List.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - - let list_res = self.inner.list(path, args).await; - - timer.observe_duration(); - list_res.map_err(|e| { - self.stats.increment_errors_total(Operation::List, e.kind()); - e - }) - } - - async fn batch(&self, args: OpBatch) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Batch.into_static(), - "", - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.batch(args).await; - - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::Batch, e.kind()); - e - }) - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::Presign.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.presign(path, args).await; - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::Presign, e.kind()); - e - }) - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingCreateDir.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_create_dir(path, args); - - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingCreateDir, e.kind()); - e - }) - } - - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingRead.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingRead, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingRead, e.kind()); - e - }) - } - - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingWrite.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_write(path, args).map(|(rp, r)| { - ( - rp, - PrometheusMetricWrapper::new( - r, - Operation::BlockingWrite, - self.stats.clone(), - self.scheme, - &path.to_string(), - ), - ) - }); - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingWrite, e.kind()); - e - }) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingStat.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_stat(path, args); - timer.observe_duration(); - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingStat, e.kind()); - e - }) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingDelete.into_static(), - path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); + .into_values(self.path_label_level); - let timer = self - .stats - .requests_duration_seconds + self.operation_duration_seconds .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_delete(path, args); - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingDelete, e.kind()); - e - }) + .observe(duration.as_secs_f64()) } - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingList.into_static(), + fn observe_operation_bytes( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + bytes: usize, + ) { + let labels = OperationLabels { + scheme, + namespace: &namespace, + root: &root, + op, + error: None, path, - ); - self.stats.requests_total.with_label_values(&labels).inc(); + } + .into_values(self.path_label_level); - let timer = self - .stats - .requests_duration_seconds + self.operation_bytes .with_label_values(&labels) - .start_timer(); - let result = self.inner.blocking_list(path, args); - timer.observe_duration(); - - result.map_err(|e| { - self.stats - .increment_errors_total(Operation::BlockingList, e.kind()); - e - }) + .observe(bytes as f64); } -} - -pub struct PrometheusMetricWrapper { - inner: R, - - op: Operation, - stats: Arc, - scheme: Scheme, - path: String, -} -impl PrometheusMetricWrapper { - fn new( - inner: R, - op: Operation, - stats: Arc, + fn observe_operation_errors_total( + &self, scheme: Scheme, - path: &String, - ) -> Self { - Self { - inner, - op, - stats, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + error: ErrorKind, + ) { + let labels = OperationLabels { scheme, - path: path.to_string(), + namespace: &namespace, + root: &root, + op, + error: Some(error), + path, } - } -} + .into_values(self.path_label_level); -impl oio::Read for PrometheusMetricWrapper { - async fn read(&mut self) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::ReaderRead.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.read().await; - timer.observe_duration(); - - match res { - Ok(bytes) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(bytes.remaining() as f64); - Ok(bytes) - } - Err(e) => { - self.stats.increment_errors_total(self.op, e.kind()); - Err(e) - } - } + self.operation_errors_total.with_label_values(&labels).inc(); } } -impl oio::BlockingRead for PrometheusMetricWrapper { - fn read(&mut self) -> Result { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingReaderRead.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.read(); - timer.observe_duration(); - - match res { - Ok(bs) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(bs.remaining() as f64); - Ok(bs) - } - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } - } +struct OperationLabels<'a> { + scheme: Scheme, + namespace: &'a str, + root: &'a str, + op: Operation, + path: &'a str, + error: Option, } -impl oio::Write for PrometheusMetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result<()> { - let size = bs.len(); +impl<'a> OperationLabels<'a> { + fn names(error: bool, path_label_level: usize) -> Vec<&'a str> { + let mut names = Vec::with_capacity(6); - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::WriterWrite.into_static(), - &self.path, - ); + names.extend([ + observe::LABEL_SCHEME, + observe::LABEL_NAMESPACE, + observe::LABEL_ROOT, + observe::LABEL_OPERATION, + ]); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.write(bs).await; - timer.observe_duration(); - - match res { - Ok(_) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(size as f64); - Ok(()) - } - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } + if path_label_level > 0 { + names.push(observe::LABEL_PATH); } - } - - async fn abort(&mut self) -> Result<()> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::WriterAbort.into_static(), - &self.path, - ); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.abort().await; - timer.observe_duration(); - - match res { - Ok(()) => Ok(()), - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } + if error { + names.push(observe::LABEL_ERROR); } - } - - async fn close(&mut self) -> Result<()> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::WriterClose.into_static(), - &self.path, - ); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.close().await; - timer.observe_duration(); - - match res { - Ok(()) => Ok(()), - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } - } + names } -} -impl oio::BlockingWrite for PrometheusMetricWrapper { - fn write(&mut self, bs: Buffer) -> Result<()> { - let size = bs.len(); + /// labels: + /// + /// 1. `["scheme", "namespace", "root", "operation"]` + /// 2. `["scheme", "namespace", "root", "operation", "path"]` + /// 3. `["scheme", "namespace", "root", "operation", "error"]` + /// 4. `["scheme", "namespace", "root", "operation", "path", "error"]` + fn into_values(self, path_label_level: usize) -> Vec<&'a str> { + let mut labels = Vec::with_capacity(6); - let labels = self.stats.generate_metric_label( + labels.extend([ self.scheme.into_static(), - Operation::BlockingWrite.into_static(), - &self.path, - ); + self.namespace, + self.root, + self.op.into_static(), + ]); - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.write(bs); - timer.observe_duration(); - - match res { - Ok(_) => { - self.stats - .bytes_total - .with_label_values(&labels) - .observe(size as f64); - Ok(()) - } - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } + if path_label_level > 0 { + labels.push(get_path_label(self.path, path_label_level)); } - } - fn close(&mut self) -> Result<()> { - let labels = self.stats.generate_metric_label( - self.scheme.into_static(), - Operation::BlockingWriterClose.into_static(), - &self.path, - ); - - let timer = self - .stats - .requests_duration_seconds - .with_label_values(&labels) - .start_timer(); - let res = self.inner.close(); - timer.observe_duration(); - - match res { - Ok(()) => Ok(()), - Err(err) => { - self.stats.increment_errors_total(self.op, err.kind()); - Err(err) - } + if let Some(error) = self.error { + labels.push(error.into_static()); } + + labels } }