From 407782f8b9c91f106d4bf7b0b8ec7ccef2fd9964 Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 5 Sep 2024 17:06:36 +0800 Subject: [PATCH 1/3] refactor(layers/metrics): rewrite metrics layer using observe layer --- core/src/layers/metrics.rs | 879 ++++++------------------------------- 1 file changed, 125 insertions(+), 754 deletions(-) diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index f051df08e79..67fdfd93ffe 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -15,63 +15,22 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; -use std::time::Instant; +use std::time::Duration; -use bytes::Buf; -use futures::FutureExt; -use futures::TryFutureExt; use metrics::counter; use metrics::histogram; -use metrics::Counter; -use metrics::Histogram; +use metrics::Label; +use crate::layers::observe; use crate::raw::*; use crate::*; -/// requests_total records all successful requests sent via operator. -static METRIC_REQUESTS_TOTAL: &str = "opendal_requests_total"; -/// requests_duration_seconds records the duration seconds of successful -/// requests. -/// -/// # NOTE -/// -/// this metric will track the whole lifetime of this request: -/// -/// - Building request -/// - Sending request -/// - Receiving response -/// - Consuming response -static METRIC_REQUESTS_DURATION_SECONDS: &str = "opendal_requests_duration_seconds"; -static METRICS_ERRORS_TOTAL: &str = "opendal_errors_total"; -/// bytes_total records all bytes processed by operator. -static METRIC_BYTES_TOTAL: &str = "opendal_bytes_total"; - -/// The scheme of the service. -static LABEL_SERVICE: &str = "service"; -/// The operation of this request. -static LABEL_OPERATION: &str = "operation"; -/// The error kind of this failed request. -static LABEL_ERROR: &str = "error"; - /// Add [metrics](https://docs.rs/metrics/) for every operation. /// /// # Metrics /// -/// - `opendal_requests_total`: Total request numbers. -/// - `opendal_requests_duration_seconds`: Request duration seconds. -/// - `opendal_errors_total`: Total error numbers. -/// - `opendal_bytes_total`: bytes read/write from/to underlying storage. -/// -/// # Labels -/// -/// metrics will carry the following labels -/// -/// - `service`: Service name from [`Scheme`] -/// - `operation`: Operation name from [`Operation`] -/// - `error`: [`ErrorKind`] received by requests +/// We provide several metrics, please see the documentation of [`observe`] module. /// /// # Notes /// @@ -82,15 +41,17 @@ static LABEL_ERROR: &str = "error"; /// # Examples /// /// ```no_run -/// use anyhow::Result; -/// use opendal::layers::MetricsLayer; -/// use opendal::services; -/// use opendal::Operator; -/// -/// let _ = Operator::new(services::Memory::default()) -/// .expect("must init") -/// .layer(MetricsLayer) -/// .finish(); +/// # use anyhow::Result; +/// # use opendal::layers::MetricsLayer; +/// # use opendal::services; +/// # use opendal::Operator; +/// +/// # fn main() -> Result<()> { +/// let _ = Operator::new(services::Memory::default())? +/// .layer(MetricsLayer::default()) +/// .finish(); +/// Ok(()) +/// # } /// ``` /// /// # Output @@ -108,735 +69,145 @@ static LABEL_ERROR: &str = "error"; /// let (recorder, exporter) = builder.build().expect("failed to build recorder/exporter"); /// let recorder = builder.build_recorder().expect("failed to build recorder"); /// ``` -#[derive(Debug, Copy, Clone)] -pub struct MetricsLayer; - -impl Layer for MetricsLayer { - type LayeredAccess = MetricsAccessor; - - fn layer(&self, inner: A) -> Self::LayeredAccess { - let meta = inner.info(); - - MetricsAccessor { - inner, - handle: Arc::new(MetricsHandler::new(meta.scheme().into_static())), - } - } +#[derive(Clone, Debug)] +pub struct MetricsLayer { + interceptor: MetricsInterceptor, } -/// metrics will hold all metrics handlers in a `RwLock`. -/// -/// By holding all metrics handlers we needed, we can reduce the lock -/// cost on fetching them. All metrics update will be atomic operations. -struct MetricsHandler { - service: &'static str, - - requests_total_metadata: Counter, - requests_duration_seconds_metadata: Histogram, - - requests_total_create: Counter, - requests_duration_seconds_create: Histogram, - - requests_total_read: Counter, - requests_duration_seconds_read: Histogram, - bytes_total_read: Counter, - - requests_total_write: Counter, - requests_duration_seconds_write: Histogram, - bytes_total_write: Counter, - - requests_total_stat: Counter, - requests_duration_seconds_stat: Histogram, - - requests_total_delete: Counter, - requests_duration_seconds_delete: Histogram, - - requests_total_list: Counter, - requests_duration_seconds_list: Histogram, - - requests_total_presign: Counter, - requests_duration_seconds_presign: Histogram, - - requests_total_batch: Counter, - requests_duration_seconds_batch: Histogram, - - requests_total_blocking_create: Counter, - requests_duration_seconds_blocking_create: Histogram, - - requests_total_blocking_read: Counter, - requests_duration_seconds_blocking_read: Histogram, - bytes_total_blocking_read: Counter, - - requests_total_blocking_write: Counter, - requests_duration_seconds_blocking_write: Histogram, - #[allow(dead_code)] - bytes_total_blocking_write: Counter, - - requests_total_blocking_stat: Counter, - requests_duration_seconds_blocking_stat: Histogram, - - requests_total_blocking_delete: Counter, - requests_duration_seconds_blocking_delete: Histogram, - - requests_total_blocking_list: Counter, - requests_duration_seconds_blocking_list: Histogram, -} - -impl MetricsHandler { - fn new(service: &'static str) -> Self { +impl Default for MetricsLayer { + fn default() -> Self { Self { - service, - - requests_total_metadata: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Info.into_static(), - ), - requests_duration_seconds_metadata: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Info.into_static(), - ), - - requests_total_create: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::CreateDir.into_static(), - ), - requests_duration_seconds_create: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::CreateDir.into_static(), - ), - - requests_total_read: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Read.into_static(), - ), - requests_duration_seconds_read: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Read.into_static(), - ), - bytes_total_read: counter!( - METRIC_BYTES_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Read.into_static(), - ), - - requests_total_write: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Write.into_static(), - ), - requests_duration_seconds_write: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Write.into_static(), - ), - bytes_total_write: counter!( - METRIC_BYTES_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Write.into_static(), - ), - - requests_total_stat: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Stat.into_static(), - ), - requests_duration_seconds_stat: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Stat.into_static(), - ), - - requests_total_delete: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Delete.into_static(), - ), - requests_duration_seconds_delete: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Delete.into_static(), - ), - - requests_total_list: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::List.into_static(), - ), - requests_duration_seconds_list: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::List.into_static(), - ), - - requests_total_presign: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Presign.into_static(), - ), - requests_duration_seconds_presign: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Presign.into_static(), - ), - - requests_total_batch: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Batch.into_static(), - ), - requests_duration_seconds_batch: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::Batch.into_static(), - ), - - requests_total_blocking_create: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingCreateDir.into_static(), - ), - requests_duration_seconds_blocking_create: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingCreateDir.into_static(), - ), - - requests_total_blocking_read: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingRead.into_static(), - ), - requests_duration_seconds_blocking_read: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingRead.into_static(), - ), - bytes_total_blocking_read: counter!( - METRIC_BYTES_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingRead.into_static(), - ), - - requests_total_blocking_write: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingWrite.into_static(), - ), - requests_duration_seconds_blocking_write: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingWrite.into_static(), - ), - bytes_total_blocking_write: counter!( - METRIC_BYTES_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingWrite.into_static(), - ), - - requests_total_blocking_stat: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingStat.into_static(), - ), - requests_duration_seconds_blocking_stat: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingStat.into_static(), - ), - - requests_total_blocking_delete: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingDelete.into_static(), - ), - requests_duration_seconds_blocking_delete: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingDelete.into_static(), - ), - - requests_total_blocking_list: counter!( - METRIC_REQUESTS_TOTAL, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingList.into_static(), - ), - requests_duration_seconds_blocking_list: histogram!( - METRIC_REQUESTS_DURATION_SECONDS, - LABEL_SERVICE => service, - LABEL_OPERATION => Operation::BlockingList.into_static(), - ), + interceptor: MetricsInterceptor { + path_label_level: 0, + }, } } - - /// error handling is the cold path, so we will not init error counters - /// in advance. - #[inline] - fn increment_errors_total(&self, op: Operation, kind: ErrorKind) { - counter!(METRICS_ERRORS_TOTAL, - LABEL_SERVICE => self.service, - LABEL_OPERATION => op.into_static(), - LABEL_ERROR => kind.into_static(), - ) - .increment(1) - } } -#[derive(Clone)] -pub struct MetricsAccessor { - inner: A, - handle: Arc, -} - -impl Debug for MetricsAccessor { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MetricsAccessor") - .field("inner", &self.inner) - .finish_non_exhaustive() +impl MetricsLayer { + /// Set the level of path label. + /// + /// - level = 0: we will ignore the path label. + /// - level > 0: the path label will be the path split by "/" and get the last n level, + /// if n=1 and input path is "abc/def/ghi", and then we will get "abc/" as the path label. + pub fn path_label(mut self, level: usize) -> Self { + self.interceptor.path_label_level = level; + self } } -impl LayeredAccess for MetricsAccessor { - type Inner = A; - type Reader = MetricWrapper; - type BlockingReader = MetricWrapper; - type Writer = MetricWrapper; - type BlockingWriter = MetricWrapper; - type Lister = A::Lister; - type BlockingLister = A::BlockingLister; - - fn inner(&self) -> &Self::Inner { - &self.inner - } - - fn metadata(&self) -> Arc { - self.handle.requests_total_metadata.increment(1); - - let start = Instant::now(); - let result = self.inner.info(); - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_metadata.record(dur); - - result - } - - async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.handle.requests_total_create.increment(1); - - let start = Instant::now(); - - self.inner - .create_dir(path, args) - .map(|v| { - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_create.record(dur); - - v.map_err(|e| { - self.handle - .increment_errors_total(Operation::CreateDir, e.kind()); - e - }) - }) - .await - } - - async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.handle.requests_total_read.increment(1); - - let _start = Instant::now(); - - self.inner - .read(path, args) - .map(|v| { - v.map(|(rp, r)| { - ( - rp, - MetricWrapper::new( - r, - Operation::Read, - self.handle.clone(), - self.handle.bytes_total_read.clone(), - self.handle.requests_duration_seconds_read.clone(), - ), - ) - }) - .map_err(|err| { - self.handle - .increment_errors_total(Operation::Read, err.kind()); - err - }) - }) - .await - } - - async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - self.handle.requests_total_write.increment(1); - - let _start = Instant::now(); - - self.inner - .write(path, args) - .map_ok(|(rp, w)| { - ( - rp, - MetricWrapper::new( - w, - Operation::Write, - self.handle.clone(), - self.handle.bytes_total_write.clone(), - self.handle.requests_duration_seconds_write.clone(), - ), - ) - }) - .inspect_err(|e| { - self.handle - .increment_errors_total(Operation::Write, e.kind()); - }) - .await - } - - async fn stat(&self, path: &str, args: OpStat) -> Result { - self.handle.requests_total_stat.increment(1); - - let start = Instant::now(); - - self.inner - .stat(path, args) - .inspect_ok(|_| { - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_stat.record(dur); - }) - .inspect_err(|e| { - self.handle - .increment_errors_total(Operation::Stat, e.kind()); - }) - .await - } - - async fn delete(&self, path: &str, args: OpDelete) -> Result { - self.handle.requests_total_delete.increment(1); - - let start = Instant::now(); - - self.inner - .delete(path, args) - .inspect_ok(|_| { - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_delete.record(dur); - }) - .inspect_err(|e| { - self.handle - .increment_errors_total(Operation::Delete, e.kind()); - }) - .await - } - - async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { - self.handle.requests_total_list.increment(1); - - let start = Instant::now(); - - self.inner - .list(path, args) - .inspect_ok(|_| { - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_list.record(dur); - }) - .inspect_err(|e| { - self.handle - .increment_errors_total(Operation::List, e.kind()); - }) - .await - } - - async fn batch(&self, args: OpBatch) -> Result { - self.handle.requests_total_batch.increment(1); - - let start = Instant::now(); - let result = self.inner.batch(args).await; - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_batch.record(dur); - - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::Batch, e.kind()); - e - }) - } - - async fn presign(&self, path: &str, args: OpPresign) -> Result { - self.handle.requests_total_presign.increment(1); - - let start = Instant::now(); - let result = self.inner.presign(path, args).await; - let dur = start.elapsed().as_secs_f64(); - - self.handle.requests_duration_seconds_presign.record(dur); - - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::Presign, e.kind()); - e - }) - } - - fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.handle.requests_total_blocking_create.increment(1); - - let start = Instant::now(); - let result = self.inner.blocking_create_dir(path, args); - let dur = start.elapsed().as_secs_f64(); - - self.handle - .requests_duration_seconds_blocking_create - .record(dur); - - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::BlockingCreateDir, e.kind()); - e - }) - } - - fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.handle.requests_total_blocking_read.increment(1); - - let _start = Instant::now(); - let result = self.inner.blocking_read(path, args).map(|(rp, r)| { - ( - rp, - MetricWrapper::new( - r, - Operation::BlockingRead, - self.handle.clone(), - self.handle.bytes_total_blocking_read.clone(), - self.handle.requests_duration_seconds_blocking_read.clone(), - ), - ) - }); - - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::BlockingRead, e.kind()); - e - }) - } - - fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.handle.requests_total_blocking_write.increment(1); - - let start = Instant::now(); - let result = self.inner.blocking_write(path, args); - let dur = start.elapsed().as_secs_f64(); - - self.handle - .requests_duration_seconds_blocking_write - .record(dur); - - result - .map(|(rp, w)| { - ( - rp, - MetricWrapper::new( - w, - Operation::BlockingWrite, - self.handle.clone(), - self.handle.bytes_total_write.clone(), - self.handle.requests_duration_seconds_write.clone(), - ), - ) - }) - .map_err(|e| { - self.handle - .increment_errors_total(Operation::BlockingWrite, e.kind()); - e - }) - } - - fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.handle.requests_total_blocking_stat.increment(1); - - let start = Instant::now(); - let result = self.inner.blocking_stat(path, args); - let dur = start.elapsed().as_secs_f64(); - - self.handle - .requests_duration_seconds_blocking_stat - .record(dur); - - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::BlockingStat, e.kind()); - e - }) - } - - fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.handle.requests_total_blocking_delete.increment(1); - - let start = Instant::now(); - let result = self.inner.blocking_delete(path, args); - let dur = start.elapsed().as_secs_f64(); - - self.handle - .requests_duration_seconds_blocking_delete - .record(dur); - - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::BlockingDelete, e.kind()); - e - }) - } - - fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.handle.requests_total_blocking_list.increment(1); - - let start = Instant::now(); - let result = self.inner.blocking_list(path, args); - let dur = start.elapsed().as_secs_f64(); - - self.handle - .requests_duration_seconds_blocking_list - .record(dur); +impl Layer for MetricsLayer { + type LayeredAccess = observe::MetricsAccessor; - result.map_err(|e| { - self.handle - .increment_errors_total(Operation::BlockingList, e.kind()); - e - }) + fn layer(&self, inner: A) -> Self::LayeredAccess { + observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) } } -pub struct MetricWrapper { - inner: R, - - op: Operation, - bytes_counter: Counter, - requests_duration_seconds: Histogram, - handle: Arc, +#[derive(Clone, Debug)] +pub struct MetricsInterceptor { + path_label_level: usize, } -impl MetricWrapper { - fn new( - inner: R, +impl observe::MetricsIntercept for MetricsInterceptor { + fn observe_operation_duration_seconds( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, op: Operation, - handle: Arc, - bytes_counter: Counter, - requests_duration_seconds: Histogram, - ) -> Self { - Self { - inner, - op, - handle, - bytes_counter, - requests_duration_seconds, + duration: Duration, + ) { + let labels = OperationLabels { + scheme, + namespace, + root, + path, + operation: op, + error: None, } + .into_labels(self.path_label_level); + histogram!(observe::METRIC_OPERATION_DURATION_SECONDS.name(), labels).record(duration) } -} -impl oio::Read for MetricWrapper { - async fn read(&mut self) -> Result { - let start = Instant::now(); - - match self.inner.read().await { - Ok(bs) => { - self.bytes_counter.increment(bs.remaining() as u64); - self.requests_duration_seconds - .record(start.elapsed().as_secs_f64()); - Ok(bs) - } - Err(e) => { - self.handle.increment_errors_total(self.op, e.kind()); - Err(e) - } + fn observe_operation_bytes( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + bytes: usize, + ) { + let labels = OperationLabels { + scheme, + namespace, + root, + path, + operation: op, + error: None, } + .into_labels(self.path_label_level); + histogram!(observe::METRIC_OPERATION_BYTES.name(), labels).record(bytes as f64) } -} - -impl oio::BlockingRead for MetricWrapper { - fn read(&mut self) -> Result { - let start = Instant::now(); - self.inner - .read() - .map(|bs| { - self.bytes_counter.increment(bs.remaining() as u64); - self.requests_duration_seconds - .record(start.elapsed().as_secs_f64()); - bs - }) - .map_err(|e| { - self.handle.increment_errors_total(self.op, e.kind()); - e - }) + fn observe_operation_errors_total( + &self, + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &str, + op: Operation, + error: ErrorKind, + ) { + let labels = OperationLabels { + scheme, + namespace, + root, + path, + operation: op, + error: Some(error), + } + .into_labels(self.path_label_level); + counter!(observe::METRIC_OPERATION_ERRORS_TOTAL.name(), labels).increment(1) } } -impl oio::Write for MetricWrapper { - async fn write(&mut self, bs: Buffer) -> Result<()> { - let start = Instant::now(); - let size = bs.len(); - - self.inner - .write(bs) - .await - .map(|_| { - self.bytes_counter.increment(size as u64); - self.requests_duration_seconds - .record(start.elapsed().as_secs_f64()); - }) - .map_err(|err| { - self.handle.increment_errors_total(self.op, err.kind()); - err - }) - } - - async fn abort(&mut self) -> Result<()> { - self.inner.abort().await.map_err(|err| { - self.handle.increment_errors_total(self.op, err.kind()); - err - }) - } - - async fn close(&mut self) -> Result<()> { - self.inner.close().await.map_err(|err| { - self.handle.increment_errors_total(self.op, err.kind()); - err - }) - } +struct OperationLabels<'a> { + scheme: Scheme, + namespace: Arc, + root: Arc, + path: &'a str, + operation: Operation, + error: Option, } -impl oio::BlockingWrite for MetricWrapper { - fn write(&mut self, bs: Buffer) -> Result<()> { - let size = bs.len(); +impl<'a> OperationLabels<'a> { + /// labels: + /// + /// 1. `["scheme", "namespace", "root", "operation"]` + /// 2. `["scheme", "namespace", "root", "operation", "path"]` + /// 3. `["scheme", "namespace", "root", "operation", "error"]` + /// 4. `["scheme", "namespace", "root", "operation", "path", "error"]` + fn into_labels(self, path_label_level: usize) -> Vec for MetricsLayer { type LayeredAccess = observe::MetricsAccessor; fn layer(&self, inner: A) -> Self::LayeredAccess { - observe::MetricsLayer::new(self.interceptor.clone()).layer(inner) + let interceptor = MetricsInterceptor { + path_label_level: self.path_label_level, + }; + observe::MetricsLayer::new(interceptor).layer(inner) } } From 0c349c99c71dd3cfca690f3257b4265bfe0be54e Mon Sep 17 00:00:00 2001 From: koushiro Date: Thu, 5 Sep 2024 17:33:17 +0800 Subject: [PATCH 3/3] make clippy happy --- core/src/layers/metrics.rs | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/core/src/layers/metrics.rs b/core/src/layers/metrics.rs index b1e2c39303d..c7fc75fa1f1 100644 --- a/core/src/layers/metrics.rs +++ b/core/src/layers/metrics.rs @@ -69,19 +69,11 @@ use crate::*; /// let (recorder, exporter) = builder.build().expect("failed to build recorder/exporter"); /// let recorder = builder.build_recorder().expect("failed to build recorder"); /// ``` -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Default)] pub struct MetricsLayer { path_label_level: usize, } -impl Default for MetricsLayer { - fn default() -> Self { - Self { - path_label_level: 0, - } - } -} - impl MetricsLayer { /// Set the level of path label. ///