diff --git a/quickwit/quickwit-cluster/src/grpc_service.rs b/quickwit/quickwit-cluster/src/grpc_service.rs index 5798a385e27..ae5144cb4f9 100644 --- a/quickwit/quickwit-cluster/src/grpc_service.rs +++ b/quickwit/quickwit-cluster/src/grpc_service.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; use bytesize::ByteSize; use itertools::Itertools; use once_cell::sync::Lazy; -use quickwit_common::tower::{ClientGrpcConfig, GrpcMetricsLayer, make_channel}; +use quickwit_common::tower::{ClientGrpcConfig, ServiceMetricsLayer, make_channel}; use quickwit_proto::cluster::cluster_service_grpc_server::ClusterServiceGrpcServer; use quickwit_proto::cluster::{ ChitchatId as ProtoChitchatId, ClusterError, ClusterResult, ClusterService, @@ -30,10 +30,10 @@ use crate::Cluster; const MAX_MESSAGE_SIZE: ByteSize = ByteSize::mib(64); -static CLUSTER_GRPC_CLIENT_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("cluster", "client")); -static CLUSTER_GRPC_SERVER_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("cluster", "server")); +static CLUSTER_GRPC_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("cluster", "client")); +static CLUSTER_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("cluster", "server")); pub(crate) async fn cluster_grpc_client( socket_addr: SocketAddr, diff --git a/quickwit/quickwit-common/src/tower/metrics_grpc.rs b/quickwit/quickwit-common/src/tower/metrics_grpc.rs new file mode 100644 index 00000000000..8ea7265fd7a --- /dev/null +++ b/quickwit/quickwit-common/src/tower/metrics_grpc.rs @@ -0,0 +1,261 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::pin::Pin; +use std::task::{Context, Poll}; +use std::time::Instant; + +use futures::{Future, ready}; +use http::{Request, Response}; +use pin_project::{pin_project, pinned_drop}; +use prometheus::exponential_buckets; +use tonic::body::Body; +use tower::{Layer, Service}; + +use crate::metrics::{ + HistogramVec, IntCounterVec, IntGaugeVec, new_counter_vec, new_gauge_vec, new_histogram_vec, +}; + +fn extract_rpc_name_from_path(path: &str) -> &str { + // gRPC paths are typically: /package.Service/Method + if let Some(last_slash_pos) = path.rfind('/') { + &path[last_slash_pos + 1..] + } else { + "unknown" + } +} + +fn get_content_length(headers: &http::HeaderMap) -> u64 { + headers + .get(http::header::CONTENT_LENGTH) + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.parse().ok()) + .unwrap_or(0) +} + +#[derive(Clone)] +pub struct GrpcMetrics { + pub inner: S, + requests_total: IntCounterVec<2>, + requests_in_flight: IntGaugeVec<1>, + request_duration_seconds: HistogramVec<2>, + request_bytes_total: IntCounterVec<1>, + response_bytes_total: IntCounterVec<1>, +} + +impl Debug for GrpcMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GrpcMetrics").finish_non_exhaustive() + } +} + +impl Service> for GrpcMetrics +where S: Service, Response = Response> +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, request: Request) -> Self::Future { + let start = Instant::now(); + + let rpc_name = extract_rpc_name_from_path(request.uri().path()).to_string(); + + let request_size = get_content_length(request.headers()); + if request_size > 0 { + self.request_bytes_total + .with_label_values([&rpc_name]) + .inc_by(request_size); + } + + let inner = self.inner.call(request); + + self.requests_in_flight.with_label_values([&rpc_name]).inc(); + + ResponseFuture { + inner, + start, + rpc_name, + status: "cancelled", + requests_total: self.requests_total.clone(), + requests_in_flight: self.requests_in_flight.clone(), + request_duration_seconds: self.request_duration_seconds.clone(), + response_bytes_total: self.response_bytes_total.clone(), + } + } +} + +#[derive(Clone)] +pub struct GrpcMetricsLayer { + requests_total: IntCounterVec<2>, + requests_in_flight: IntGaugeVec<1>, + request_duration_seconds: HistogramVec<2>, + request_bytes_total: IntCounterVec<1>, + response_bytes_total: IntCounterVec<1>, +} + +impl GrpcMetricsLayer { + pub fn new(subsystem: &'static str) -> Self { + Self::new_inner(subsystem) + } + + fn new_inner(subsystem: &str) -> Self { + Self { + requests_total: new_counter_vec( + "grpc_requests_total", + "Total number of gRPC requests processed.", + subsystem, + &[("kind", "server")], + ["rpc", "status"], + ), + requests_in_flight: new_gauge_vec( + "grpc_requests_in_flight", + "Number of gRPC requests in-flight.", + subsystem, + &[("kind", "server")], + ["rpc"], + ), + request_duration_seconds: new_histogram_vec( + "grpc_request_duration_seconds", + "Duration of request in seconds.", + subsystem, + &[("kind", "server")], + ["rpc", "status"], + exponential_buckets(0.001, 2.0, 12).unwrap(), + ), + request_bytes_total: new_counter_vec( + "grpc_request_bytes_total", + "Total bytes sent in gRPC requests.", + subsystem, + &[("kind", "server")], + ["rpc"], + ), + response_bytes_total: new_counter_vec( + "grpc_response_bytes_total", + "Total bytes received in gRPC responses.", + subsystem, + &[("kind", "server")], + ["rpc"], + ), + } + } +} + +#[cfg(any(test, feature = "testsuite"))] +impl GrpcMetricsLayer { + pub fn for_test() -> Self { + use rand::Rng; + use rand::distributions::Alphanumeric; + + let slug: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(6) + .map(char::from) + .collect(); + Self::new_inner(&format!("test_grpc_{slug}")) + } + + pub fn requests_total(&self, rpc: &str, status: &str) -> u64 { + self.requests_total.with_label_values([rpc, status]).get() + } + + pub fn request_bytes_total(&self, rpc: &str) -> u64 { + self.request_bytes_total.with_label_values([rpc]).get() + } + + pub fn response_bytes_total(&self, rpc: &str) -> u64 { + self.response_bytes_total.with_label_values([rpc]).get() + } +} + +impl Layer for GrpcMetricsLayer { + type Service = GrpcMetrics; + + fn layer(&self, inner: S) -> Self::Service { + GrpcMetrics { + inner, + requests_total: self.requests_total.clone(), + requests_in_flight: self.requests_in_flight.clone(), + request_duration_seconds: self.request_duration_seconds.clone(), + request_bytes_total: self.request_bytes_total.clone(), + response_bytes_total: self.response_bytes_total.clone(), + } + } +} + +/// Response future for [`GrpcMetrics`]. +#[pin_project(PinnedDrop)] +pub struct ResponseFuture { + #[pin] + inner: F, + start: Instant, + rpc_name: String, + status: &'static str, + requests_total: IntCounterVec<2>, + requests_in_flight: IntGaugeVec<1>, + request_duration_seconds: HistogramVec<2>, + response_bytes_total: IntCounterVec<1>, +} + +#[pinned_drop] +impl PinnedDrop for ResponseFuture { + fn drop(self: Pin<&mut Self>) { + let elapsed = self.start.elapsed().as_secs_f64(); + let label_values = [&self.rpc_name, self.status]; + + self.requests_total.with_label_values(label_values).inc(); + self.request_duration_seconds + .with_label_values(label_values) + .observe(elapsed); + self.requests_in_flight + .with_label_values([&self.rpc_name]) + .dec(); + } +} + +impl Future for ResponseFuture +where F: Future, E>> +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response: Result, E> = ready!(this.inner.poll(cx)); + + match &response { + Ok(response) => { + *this.status = "success"; + + let response_size = get_content_length(response.headers()); + if response_size > 0 { + this.response_bytes_total + .with_label_values([&this.rpc_name]) + .inc_by(response_size); + } + } + Err(_) => { + *this.status = "error"; + } + } + + Poll::Ready(response) + } +} + +// TODO tests diff --git a/quickwit/quickwit-common/src/tower/metrics.rs b/quickwit/quickwit-common/src/tower/metrics_service.rs similarity index 93% rename from quickwit/quickwit-common/src/tower/metrics.rs rename to quickwit/quickwit-common/src/tower/metrics_service.rs index b2d093adbe3..f938127679a 100644 --- a/quickwit/quickwit-common/src/tower/metrics.rs +++ b/quickwit/quickwit-common/src/tower/metrics_service.rs @@ -30,14 +30,14 @@ pub trait RpcName { } #[derive(Clone)] -pub struct GrpcMetrics { +pub struct ServiceMetrics { inner: S, requests_total: IntCounterVec<2>, requests_in_flight: IntGaugeVec<1>, request_duration_seconds: HistogramVec<2>, } -impl Service for GrpcMetrics +impl Service for ServiceMetrics where S: Service, R: RpcName, @@ -70,31 +70,31 @@ where } #[derive(Clone)] -pub struct GrpcMetricsLayer { +pub struct ServiceMetricsLayer { requests_total: IntCounterVec<2>, requests_in_flight: IntGaugeVec<1>, request_duration_seconds: HistogramVec<2>, } -impl GrpcMetricsLayer { +impl ServiceMetricsLayer { pub fn new(subsystem: &'static str, kind: &'static str) -> Self { Self { requests_total: new_counter_vec( - "grpc_requests_total", + "service_requests_total", "Total number of gRPC requests processed.", subsystem, &[("kind", kind)], ["rpc", "status"], ), requests_in_flight: new_gauge_vec( - "grpc_requests_in_flight", + "service_requests_in_flight", "Number of gRPC requests in-flight.", subsystem, &[("kind", kind)], ["rpc"], ), request_duration_seconds: new_histogram_vec( - "grpc_request_duration_seconds", + "service_request_duration_seconds", "Duration of request in seconds.", subsystem, &[("kind", kind)], @@ -105,11 +105,11 @@ impl GrpcMetricsLayer { } } -impl Layer for GrpcMetricsLayer { - type Service = GrpcMetrics; +impl Layer for ServiceMetricsLayer { + type Service = ServiceMetrics; fn layer(&self, inner: S) -> Self::Service { - GrpcMetrics { + ServiceMetrics { inner, requests_total: self.requests_total.clone(), requests_in_flight: self.requests_in_flight.clone(), @@ -182,7 +182,7 @@ mod tests { #[tokio::test] async fn test_grpc_metrics() { - let layer = GrpcMetricsLayer::new("quickwit_test", "server"); + let layer = ServiceMetricsLayer::new("quickwit_test", "server"); let mut hello_service = layer diff --git a/quickwit/quickwit-common/src/tower/mod.rs b/quickwit/quickwit-common/src/tower/mod.rs index 1d62433e520..15c17fc307c 100644 --- a/quickwit/quickwit-common/src/tower/mod.rs +++ b/quickwit/quickwit-common/src/tower/mod.rs @@ -21,7 +21,8 @@ mod delay; mod estimate_rate; mod event_listener; mod load_shed; -mod metrics; +mod metrics_grpc; +mod metrics_service; mod one_task_per_call_layer; mod pool; mod rate; @@ -44,7 +45,8 @@ pub use estimate_rate::{EstimateRate, EstimateRateLayer}; pub use event_listener::{EventListener, EventListenerLayer}; use futures::Future; pub use load_shed::{LoadShed, LoadShedLayer, MakeLoadShedError}; -pub use metrics::{GrpcMetrics, GrpcMetricsLayer, RpcName}; +pub use metrics_grpc::GrpcMetricsLayer; +pub use metrics_service::{RpcName, ServiceMetrics, ServiceMetricsLayer}; pub use one_task_per_call_layer::{OneTaskPerCallLayer, TaskCancelled}; pub use pool::Pool; pub use rate::{ConstantRate, Rate}; diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index c136d8436a8..f8cdca7faf0 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -16,8 +16,9 @@ use std::collections::BTreeSet; use std::sync::Arc; use anyhow::Context; +use once_cell::sync::Lazy; use quickwit_cluster::cluster_grpc_server; -use quickwit_common::tower::BoxFutureInfaillible; +use quickwit_common::tower::{BoxFutureInfaillible, GrpcMetricsLayer}; use quickwit_config::GrpcConfig; use quickwit_config::service::QuickwitService; use quickwit_proto::developer::DeveloperServiceClient; @@ -27,6 +28,7 @@ use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_serv use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer; use quickwit_proto::search::search_service_server::SearchServiceServer; use quickwit_proto::tonic::codegen::CompressionEncoding; +use quickwit_proto::tonic::service::LayerExt; use quickwit_proto::tonic::transport::server::TcpIncoming; use quickwit_proto::tonic::transport::{Certificate, Identity, Server, ServerTlsConfig}; use tokio::net::TcpListener; @@ -36,9 +38,21 @@ use tonic_reflection::pb::v1::FILE_DESCRIPTOR_SET as REFLECTION_FILE_DESCRIPTOR_ use tonic_reflection::server::v1::{ServerReflection, ServerReflectionServer}; use tracing::*; +use crate::QuickwitServices; use crate::developer_api::DeveloperApiServer; use crate::search_api::GrpcSearchAdapter; -use crate::{INDEXING_GRPC_SERVER_METRICS_LAYER, QuickwitServices}; + +static CP_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("control_plane")); + +static INDEXING_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("indexing")); + +static INGEST_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("ingest")); + +static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy = + Lazy::new(|| GrpcMetricsLayer::new("metastore")); /// Starts and binds gRPC services to `grpc_listen_addr`. pub(crate) async fn start_grpc_server( @@ -80,7 +94,12 @@ pub(crate) async fn start_grpc_server( enabled_grpc_services.insert("metastore"); file_descriptor_sets.push(quickwit_proto::metastore::METASTORE_FILE_DESCRIPTOR_SET); - Some(metastore_server.as_grpc_service(grpc_config.max_message_size)) + let metastore_service = metastore_server.as_grpc_service(grpc_config.max_message_size); + Some( + METASTORE_GRPC_SERVER_METRICS_LAYER + .clone() + .named_layer(metastore_service), + ) } else { None }; @@ -93,10 +112,13 @@ pub(crate) async fn start_grpc_server( enabled_grpc_services.insert("indexing"); file_descriptor_sets.push(quickwit_proto::indexing::INDEXING_FILE_DESCRIPTOR_SET); - let indexing_service = IndexingServiceClient::tower() - .stack_layer(INDEXING_GRPC_SERVER_METRICS_LAYER.clone()) - .build_from_mailbox(indexing_service); - Some(indexing_service.as_grpc_service(grpc_config.max_message_size)) + let indexing_service = + IndexingServiceClient::tower().build_from_mailbox(indexing_service); + Some( + INDEXING_GRPC_SERVER_METRICS_LAYER + .clone() + .named_layer(indexing_service.as_grpc_service(grpc_config.max_message_size)), + ) } else { None } @@ -126,7 +148,11 @@ pub(crate) async fn start_grpc_server( let ingest_router_service = services .ingest_router_service .as_grpc_service(grpc_config.max_message_size); - Some(ingest_router_service) + Some( + INGEST_GRPC_SERVER_METRICS_LAYER + .clone() + .named_layer(ingest_router_service), + ) } else { None }; @@ -135,7 +161,11 @@ pub(crate) async fn start_grpc_server( enabled_grpc_services.insert("ingester"); file_descriptor_sets.push(quickwit_proto::ingest::INGEST_FILE_DESCRIPTOR_SET); let ingester_grpc_service = ingester_service.as_grpc_service(grpc_config.max_message_size); - Some(ingester_grpc_service) + Some( + INDEXING_GRPC_SERVER_METRICS_LAYER + .clone() + .named_layer(ingester_grpc_service), + ) } else { None }; @@ -147,11 +177,13 @@ pub(crate) async fn start_grpc_server( { enabled_grpc_services.insert("control-plane"); file_descriptor_sets.push(quickwit_proto::control_plane::CONTROL_PLANE_FILE_DESCRIPTOR_SET); - + let control_plane_service = services + .control_plane_client + .as_grpc_service(grpc_config.max_message_size); Some( - services - .control_plane_client - .as_grpc_service(grpc_config.max_message_size), + CP_GRPC_SERVER_METRICS_LAYER + .clone() + .named_layer(control_plane_service), ) } else { None diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 511fe905488..42161f9cb1e 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -67,8 +67,8 @@ use quickwit_common::retry::RetryParams; use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::tower::{ BalanceChannel, BoxFutureInfaillible, BufferLayer, Change, CircuitBreakerEvaluator, - ConstantRate, EstimateRateLayer, EventListenerLayer, GrpcMetricsLayer, LoadShedLayer, - RateLimitLayer, RetryLayer, RetryPolicy, SmaRateEstimator, TimeoutLayer, + ConstantRate, EstimateRateLayer, EventListenerLayer, LoadShedLayer, RateLimitLayer, RetryLayer, + RetryPolicy, ServiceMetricsLayer, SmaRateEstimator, TimeoutLayer, }; use quickwit_common::uri::Uri; use quickwit_common::{get_bool_from_env, spawn_named_task}; @@ -151,25 +151,29 @@ fn get_metastore_client_max_concurrency() -> usize { ) } -static CP_GRPC_CLIENT_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("control_plane", "client")); -static CP_GRPC_SERVER_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("control_plane", "server")); +static CP_LOCAL_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("control_plane", "local")); -static INDEXING_GRPC_CLIENT_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("indexing", "client")); -pub(crate) static INDEXING_GRPC_SERVER_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("indexing", "server")); +static CP_REMOTE_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("control_plane", "remote")); -static INGEST_GRPC_CLIENT_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("ingest", "client")); -static INGEST_GRPC_SERVER_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("ingest", "server")); +static INDEXING_LOCAL_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("indexing", "local")); -static METASTORE_GRPC_CLIENT_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("metastore", "client")); -static METASTORE_GRPC_SERVER_METRICS_LAYER: Lazy = - Lazy::new(|| GrpcMetricsLayer::new("metastore", "server")); +static INDEXING_REMOTE_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("indexing", "remote")); + +static INGEST_LOCAL_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("ingest", "local")); + +static INGEST_REMOTE_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("ingest", "remote")); + +static METASTORE_LOCAL_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("metastore", "local")); + +static METASTORE_REMOTE_SERVICE_CLIENT_METRICS_LAYER: Lazy = + Lazy::new(|| ServiceMetricsLayer::new("metastore", "remote")); static GRPC_INGESTER_SERVICE_TIMEOUT: Duration = Duration::from_secs(30); static GRPC_INDEXING_SERVICE_TIMEOUT: Duration = Duration::from_secs(30); @@ -283,7 +287,7 @@ async fn start_ingest_client_if_needed( let memory_capacity = ingest_api_service.ask(GetMemoryCapacity).await?; let min_rate = ConstantRate::new(ByteSize::mib(1).as_u64(), Duration::from_millis(100)); let rate_modulator = RateModulator::new(rate_estimator.clone(), memory_capacity, min_rate); - let ingest_service = IngestServiceClient::tower() + let ingest_service: IngestServiceClient = IngestServiceClient::tower() .stack_ingest_layer( ServiceBuilder::new() .layer(EstimateRateLayer::::new(rate_estimator)) @@ -338,7 +342,7 @@ async fn start_control_plane_if_needed( let control_plane_server_opt = Some(control_plane_mailbox.clone()); let control_plane_client = ControlPlaneServiceClient::tower() - .stack_layer(CP_GRPC_SERVER_METRICS_LAYER.clone()) + .stack_layer(CP_LOCAL_SERVICE_CLIENT_METRICS_LAYER.clone()) .stack_layer(LoadShedLayer::new(100)) .build_from_mailbox(control_plane_mailbox); Ok((control_plane_server_opt, control_plane_client)) @@ -364,7 +368,7 @@ async fn start_control_plane_if_needed( } let control_plane_server_opt = None; let control_plane_client = ControlPlaneServiceClient::tower() - .stack_layer(CP_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(CP_REMOTE_SERVICE_CLIENT_METRICS_LAYER.clone()) .build_from_balance_channel( balance_channel, node_config.grpc_config.max_message_size, @@ -438,7 +442,6 @@ pub async fn serve_quickwit( }; // These layers apply to all the RPCs of the metastore. let shared_layer = ServiceBuilder::new() - .layer(METASTORE_GRPC_SERVER_METRICS_LAYER.clone()) .layer(LoadShedLayer::new(max_in_flight_requests)) .into_inner(); let broker_layer = EventListenerLayer::new(event_broker.clone()); @@ -457,7 +460,9 @@ pub async fn serve_quickwit( // Instantiate a metastore client, either local if available or remote otherwise. let metastore_client: MetastoreServiceClient = if let Some(metastore_server) = &metastore_server_opt { - metastore_server.clone() + MetastoreServiceClient::tower() + .stack_layer(METASTORE_LOCAL_SERVICE_CLIENT_METRICS_LAYER.clone()) + .build(metastore_server.clone()) } else { info!("connecting to metastore"); @@ -475,7 +480,7 @@ pub async fn serve_quickwit( MetastoreServiceClient::tower() .stack_layer(RetryLayer::new(RetryPolicy::from(RetryParams::standard()))) .stack_layer(TimeoutLayer::new(GRPC_METASTORE_SERVICE_TIMEOUT)) - .stack_layer(METASTORE_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(METASTORE_REMOTE_SERVICE_CLIENT_METRICS_LAYER.clone()) .stack_layer(tower::limit::GlobalConcurrencyLimitLayer::new( get_metastore_client_max_concurrency(), )) @@ -842,7 +847,6 @@ fn ingester_service_layer_stack( layer_stack: IngesterServiceTowerLayerStack, ) -> IngesterServiceTowerLayerStack { layer_stack - .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) .stack_persist_layer(quickwit_common::tower::OneTaskPerCallLayer) .stack_persist_layer( // "3" may seem a little bit low, but we only consider error caused by a full WAL. @@ -887,9 +891,7 @@ async fn setup_ingest_v2( ); ingest_router.subscribe(); - let ingest_router_service = IngestRouterServiceClient::tower() - .stack_layer(INGEST_GRPC_SERVER_METRICS_LAYER.clone()) - .build(ingest_router.clone()); + let ingest_router_service = IngestRouterServiceClient::tower().build(ingest_router.clone()); let rate_limit = ConstantRate::bytes_per_sec(node_config.ingest_api_config.shard_throughput_limit); @@ -953,13 +955,13 @@ async fn setup_ingest_v2( .expect("ingester service should be initialized"); let ingester_service = ingester_service_layer_stack( IngesterServiceClient::tower() - .stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone()), + .stack_layer(INGEST_LOCAL_SERVICE_CLIENT_METRICS_LAYER.clone()), ) .build(ingester); Some(Change::Insert(node_id, ingester_service)) } else { let ingester_service = IngesterServiceClient::tower() - .stack_layer(INGEST_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(INGEST_REMOTE_SERVICE_CLIENT_METRICS_LAYER.clone()) .stack_layer(TimeoutLayer::new(GRPC_INGESTER_SERVICE_TIMEOUT)) .build_from_channel( node.grpc_advertise_addr(), @@ -1143,8 +1145,7 @@ fn setup_indexer_pool( .expect("indexing service should be initialized"); // These layers apply to all the RPCs of the indexing service. let shared_layers = ServiceBuilder::new() - .layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone()) - .layer(INDEXING_GRPC_SERVER_METRICS_LAYER.clone()) + .layer(INDEXING_LOCAL_SERVICE_CLIENT_METRICS_LAYER.clone()) .into_inner(); let client = IndexingServiceClient::tower() .stack_layer(shared_layers) @@ -1162,7 +1163,7 @@ fn setup_indexer_pool( Some(change) } else { let client = IndexingServiceClient::tower() - .stack_layer(INDEXING_GRPC_CLIENT_METRICS_LAYER.clone()) + .stack_layer(INDEXING_REMOTE_SERVICE_CLIENT_METRICS_LAYER.clone()) .stack_layer(TimeoutLayer::new(GRPC_INDEXING_SERVICE_TIMEOUT)) .build_from_channel( node.grpc_advertise_addr(),