diff --git a/linkerd/app/outbound/src/http/logical/policy/route.rs b/linkerd/app/outbound/src/http/logical/policy/route.rs index 46d2c698c3..cc1033e4d3 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route.rs @@ -14,7 +14,6 @@ pub(crate) mod retry; pub(crate) use self::backend::{Backend, MatchedBackend}; pub use self::filters::errors; -use self::metrics::labels::Route as RouteLabels; pub use self::metrics::{GrpcRouteMetrics, HttpRouteMetrics}; @@ -117,16 +116,35 @@ where .push(MatchedBackend::layer(metrics.backend.clone())) .lift_new_with_target() .push(NewDistribute::layer()) + .check_new::() + .check_new_service::>() // The router does not take the backend's availability into // consideration, so we must eagerly fail requests to prevent // leaking tasks onto the runtime. .push_on_service(svc::LoadShed::layer()) .push(filters::NewApplyFilters::::layer()) - .push(retry::NewHttpRetry::layer(metrics.retry.clone())) + .push({ + // TODO(kate): extracting route labels like this should ideally live somewhere + // else, like e.g. the `SetExtensions` middleware. + let mk_extract = |rt: &Self| { + let Route { + parent_ref, + route_ref, + .. + } = &rt.params; + retry::RetryLabelExtract(parent_ref.clone(), route_ref.clone()) + }; + let metrics = metrics.retry.clone(); + retry::NewHttpRetry::layer_via_mk(mk_extract, metrics) + }) + .check_new::() + .check_new_service::>() // Set request extensions based on the route configuration // AND/OR headers .push(extensions::NewSetExtensions::layer()) .push(metrics::layer(&metrics.requests)) + .check_new::() + .check_new_service::>() // Configure a classifier to use in the endpoint stack. // TODO(ver) move this into NewSetExtensions? .push(classify::NewClassify::layer()) @@ -149,15 +167,6 @@ impl svc::Param> for MatchedRoute svc::Param for MatchedRoute { - fn param(&self) -> RouteLabels { - RouteLabels( - self.params.parent_ref.clone(), - self.params.route_ref.clone(), - ) - } -} - // === impl Http === impl filters::Apply for Http { @@ -177,12 +186,14 @@ impl metrics::MkStreamLabel for Http { type DurationLabels = metrics::labels::Route; type StreamLabel = metrics::LabelHttpRouteRsp; - fn mk_stream_labeler(&self, _: &::http::Request) -> Option { + fn mk_stream_labeler(&self, req: &::http::Request) -> Option { let parent = self.params.parent_ref.clone(); let route = self.params.route_ref.clone(); - Some(metrics::LabelHttpRsp::from(metrics::labels::Route::from(( - parent, route, - )))) + Some(metrics::LabelHttpRsp::from(metrics::labels::Route::new( + parent, + route, + req.uri(), + ))) } } @@ -231,12 +242,14 @@ impl metrics::MkStreamLabel for Grpc { type DurationLabels = metrics::labels::Route; type StreamLabel = metrics::LabelGrpcRouteRsp; - fn mk_stream_labeler(&self, _: &::http::Request) -> Option { + fn mk_stream_labeler(&self, req: &::http::Request) -> Option { let parent = self.params.parent_ref.clone(); let route = self.params.route_ref.clone(); - Some(metrics::LabelGrpcRsp::from(metrics::labels::Route::from(( - parent, route, - )))) + Some(metrics::LabelGrpcRsp::from(metrics::labels::Route::new( + parent, + route, + req.uri(), + ))) } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs index b852c8b02d..fa355c2751 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/labels.rs @@ -1,11 +1,17 @@ //! Prometheus label types. -use linkerd_app_core::{errors, metrics::prom::EncodeLabelSetMut, proxy::http, Error as BoxError}; +use linkerd_app_core::{ + dns, errors, metrics::prom::EncodeLabelSetMut, proxy::http, Error as BoxError, +}; use prometheus_client::encoding::*; use crate::{BackendRef, ParentRef, RouteRef}; #[derive(Clone, Debug, Hash, PartialEq, Eq)] -pub struct Route(pub ParentRef, pub RouteRef); +pub struct Route { + parent: ParentRef, + route: RouteRef, + hostname: Option, +} #[derive(Clone, Debug, Hash, PartialEq, Eq)] pub struct RouteBackend(pub ParentRef, pub RouteRef, pub BackendRef); @@ -52,17 +58,47 @@ pub enum Error { // === impl Route === -impl From<(ParentRef, RouteRef)> for Route { - fn from((parent, route): (ParentRef, RouteRef)) -> Self { - Self(parent, route) +impl Route { + pub fn new(parent: ParentRef, route: RouteRef, uri: &http::uri::Uri) -> Self { + let hostname = uri + .host() + .map(str::as_bytes) + .map(dns::Name::try_from_ascii) + .and_then(Result::ok); + + Self { + parent, + route, + hostname, + } + } + + #[cfg(test)] + pub(super) fn new_with_name( + parent: ParentRef, + route: RouteRef, + hostname: Option, + ) -> Self { + Self { + parent, + route, + hostname, + } } } impl EncodeLabelSetMut for Route { fn encode_label_set(&self, enc: &mut LabelSetEncoder<'_>) -> std::fmt::Result { - let Self(parent, route) = self; + let Self { + parent, + route, + hostname, + } = self; + parent.encode_label_set(enc)?; route.encode_label_set(enc)?; + ("hostname", hostname.as_deref()).encode(enc.encode_label())?; + Ok(()) } } diff --git a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs index ad73a69cde..6c825796ee 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/metrics/tests.rs @@ -4,7 +4,14 @@ use super::{ test_util::*, LabelGrpcRouteRsp, LabelHttpRouteRsp, RequestMetrics, }; -use linkerd_app_core::svc::{self, http::BoxBody, Layer, NewService}; +use linkerd_app_core::{ + dns, + svc::{ + self, + http::{uri::Uri, BoxBody}, + Layer, NewService, + }, +}; use linkerd_proxy_client_policy as policy; #[tokio::test(flavor = "current_thread", start_paused = true)] @@ -18,7 +25,7 @@ async fn http_request_statuses() { // Send one request and ensure it's counted. let ok = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()), labels::HttpRsp { status: Some(http::StatusCode::OK), error: None, @@ -37,7 +44,7 @@ async fn http_request_statuses() { // Send another request and ensure it's counted with a different response // status. let no_content = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()), labels::HttpRsp { status: Some(http::StatusCode::NO_CONTENT), error: None, @@ -61,7 +68,7 @@ async fn http_request_statuses() { // Emit a response with an error and ensure it's counted. let unknown = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new(parent_ref.clone(), route_ref.clone(), &Uri::default()), labels::HttpRsp { status: None, error: Some(labels::Error::Unknown), @@ -75,7 +82,7 @@ async fn http_request_statuses() { // Emit a successful response with a body that fails and ensure that both // the status and error are recorded. let mixed = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref, route_ref), + labels::Route::new(parent_ref, route_ref, &Uri::default()), labels::HttpRsp { status: Some(http::StatusCode::OK), error: Some(labels::Error::Unknown), @@ -99,6 +106,131 @@ async fn http_request_statuses() { assert_eq!(mixed.get(), 1); } +#[tokio::test(flavor = "current_thread", start_paused = true)] +async fn http_request_hostnames() { + const HOST_1: &str = "great.website"; + const URI_1_1: &str = "https://great.website/path/to/index.html#fragment"; + const URI_1_2: &str = "https://great.website/another/index.html"; + const HOST_2: &str = "different.website"; + const URI_2: &str = "https://different.website/index.html"; + const URI_3: &str = "https://[3fff::]/index.html"; + + let _trace = linkerd_tracing::test::trace_init(); + + let metrics = super::HttpRouteMetrics::default().requests; + let parent_ref = crate::ParentRef(policy::Meta::new_default("parent")); + let route_ref = crate::RouteRef(policy::Meta::new_default("route")); + let (mut svc, mut handle) = mock_http_route_metrics(&metrics, &parent_ref, &route_ref); + + let get_counter = |host: Option<&'static str>, status: Option| { + metrics.get_statuses(&labels::Rsp( + labels::Route::new_with_name( + parent_ref.clone(), + route_ref.clone(), + host.map(str::parse::).map(Result::unwrap), + ), + labels::HttpRsp { + status, + error: None, + }, + )) + }; + + let host1_ok = get_counter(Some(HOST_1), Some(http::StatusCode::OK)); + let host1_teapot = get_counter(Some(HOST_1), Some(http::StatusCode::IM_A_TEAPOT)); + let host2_ok = get_counter(Some(HOST_2), Some(http::StatusCode::OK)); + let unlabeled_ok = get_counter(None, Some(http::StatusCode::OK)); + + // Send one request and ensure it's counted. + send_assert_incremented( + &host1_ok, + &mut handle, + &mut svc, + http::Request::builder() + .uri(URI_1_1) + .body(BoxBody::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::default()) + .unwrap(), + ) + }, + ) + .await; + assert_eq!(host1_ok.get(), 1); + assert_eq!(host1_teapot.get(), 0); + assert_eq!(host2_ok.get(), 0); + + // Send another request to a different path on the same host. + send_assert_incremented( + &host1_teapot, + &mut handle, + &mut svc, + http::Request::builder() + .uri(URI_1_2) + .body(BoxBody::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .status(418) + .body(BoxBody::default()) + .unwrap(), + ) + }, + ) + .await; + assert_eq!(host1_ok.get(), 1); + assert_eq!(host1_teapot.get(), 1); + assert_eq!(host2_ok.get(), 0); + + // Send a request to a different host. + send_assert_incremented( + &host2_ok, + &mut handle, + &mut svc, + http::Request::builder() + .uri(URI_2) + .body(BoxBody::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::default()) + .unwrap(), + ) + }, + ) + .await; + assert_eq!(host1_ok.get(), 1); + assert_eq!(host1_teapot.get(), 1); + assert_eq!(host2_ok.get(), 1); + + // Send a request to a url with an ip address host component, show that it is not labeled. + send_assert_incremented( + &unlabeled_ok, + &mut handle, + &mut svc, + http::Request::builder() + .uri(URI_3) + .body(BoxBody::default()) + .unwrap(), + |tx| { + tx.send_response( + http::Response::builder() + .status(200) + .body(BoxBody::default()) + .unwrap(), + ) + }, + ) + .await; +} + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn grpc_request_statuses_ok() { let _trace = linkerd_tracing::test::trace_init(); @@ -110,7 +242,11 @@ async fn grpc_request_statuses_ok() { // Send one request and ensure it's counted. let ok = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new( + parent_ref.clone(), + route_ref.clone(), + &Uri::from_static(MOCK_GRPC_REQ_URI), + ), labels::GrpcRsp { status: Some(tonic::Code::Ok), error: None, @@ -152,7 +288,11 @@ async fn grpc_request_statuses_not_found() { // Send another request and ensure it's counted with a different response // status. let not_found = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new( + parent_ref.clone(), + route_ref.clone(), + &Uri::from_static(MOCK_GRPC_REQ_URI), + ), labels::GrpcRsp { status: Some(tonic::Code::NotFound), error: None, @@ -192,7 +332,11 @@ async fn grpc_request_statuses_error_response() { let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); let unknown = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new( + parent_ref.clone(), + route_ref.clone(), + &Uri::from_static(MOCK_GRPC_REQ_URI), + ), labels::GrpcRsp { status: None, error: Some(labels::Error::Unknown), @@ -222,7 +366,11 @@ async fn grpc_request_statuses_error_body() { let (mut svc, mut handle) = mock_grpc_route_metrics(&metrics, &parent_ref, &route_ref); let unknown = metrics.get_statuses(&labels::Rsp( - labels::Route(parent_ref.clone(), route_ref.clone()), + labels::Route::new( + parent_ref.clone(), + route_ref.clone(), + &Uri::from_static(MOCK_GRPC_REQ_URI), + ), labels::GrpcRsp { status: None, error: Some(labels::Error::Unknown), @@ -252,6 +400,8 @@ async fn grpc_request_statuses_error_body() { // === Utils === +const MOCK_GRPC_REQ_URI: &str = "http://host/svc/method"; + pub fn mock_http_route_metrics( metrics: &RequestMetrics, parent_ref: &crate::ParentRef, @@ -301,7 +451,7 @@ pub fn mock_grpc_route_metrics( ) -> (svc::BoxHttp, Handle) { let req = http::Request::builder() .method("POST") - .uri("http://host/svc/method") + .uri(MOCK_GRPC_REQ_URI) .body(()) .unwrap(); let (r#match, _) = policy::route::find( diff --git a/linkerd/app/outbound/src/http/logical/policy/route/retry.rs b/linkerd/app/outbound/src/http/logical/policy/route/retry.rs index 76fe65b4c2..a6d856918e 100644 --- a/linkerd/app/outbound/src/http/logical/policy/route/retry.rs +++ b/linkerd/app/outbound/src/http/logical/policy/route/retry.rs @@ -1,11 +1,12 @@ use super::{extensions, metrics::labels::Route as RouteLabels}; +use crate::{ParentRef, RouteRef}; use futures::future::{Either, Ready}; use linkerd_app_core::{ cause_ref, classify, exp_backoff::ExponentialBackoff, is_caused_by, proxy::http::{self, stream_timeouts::ResponseTimeoutError}, - svc::{self, http::h2}, + svc::{self, http::h2, ExtractParam}, Error, Result, }; use linkerd_http_retry::{self as retry, peek_trailers::PeekTrailersBody}; @@ -16,7 +17,8 @@ use tokio::time; #[derive(Copy, Clone, Debug)] pub struct IsRetry(()); -pub type NewHttpRetry = retry::NewHttpRetry; +pub type NewHttpRetry = + retry::NewHttpRetry; #[derive(Clone, Debug)] pub struct RetryPolicy { @@ -29,6 +31,9 @@ pub struct RetryPolicy { pub backoff: Option, } +#[derive(Clone, Debug)] +pub struct RetryLabelExtract(pub ParentRef, pub RouteRef); + pub type RouteRetryMetrics = retry::MetricFamilies; // === impl RetryPolicy === @@ -158,3 +163,14 @@ impl RetryPolicy { false } } + +// === impl RetryLabelExtract === + +impl ExtractParam> for RetryLabelExtract { + fn extract_param(&self, t: &http::Request) -> RouteLabels { + let Self(parent, route) = self; + let uri = t.uri(); + + RouteLabels::new(parent.clone(), route.clone(), uri) + } +} diff --git a/linkerd/http/prom/src/record_response/request.rs b/linkerd/http/prom/src/record_response/request.rs index 6daa6b2cfb..44bff77fe8 100644 --- a/linkerd/http/prom/src/record_response/request.rs +++ b/linkerd/http/prom/src/record_response/request.rs @@ -72,6 +72,9 @@ where pub fn get_statuses(&self, labels: &StatL) -> Counter { (*self.statuses.get_or_create(labels)).clone() } + + // TODO(kate): it'd be nice if we could avoid creating a time series if it does not exist, + // so that tests can confirm that certain label sets do not exist within the family. } impl Default for RequestMetrics diff --git a/linkerd/http/retry/src/lib.rs b/linkerd/http/retry/src/lib.rs index 5c47cc2c64..5aedecf8bf 100644 --- a/linkerd/http/retry/src/lib.rs +++ b/linkerd/http/retry/src/lib.rs @@ -12,7 +12,7 @@ use linkerd_error::{Error, Result}; use linkerd_exp_backoff::ExponentialBackoff; use linkerd_http_box::BoxBody; use linkerd_metrics::prom; -use linkerd_stack::{layer, NewService, Param, Service}; +use linkerd_stack::{layer, ExtractParam, NewService, Param, Service}; use std::{ future::Future, hash::Hash, @@ -47,19 +47,21 @@ pub struct Params { } #[derive(Clone, Debug)] -pub struct NewHttpRetry { +pub struct NewHttpRetry { inner: N, metrics: MetricFamilies, - _marker: PhantomData P>, + extract: X, + _marker: PhantomData (ReqX, P)>, } /// A Retry middleware that attempts to extract a `P` typed request extension to /// instrument retries. When the request extension is not set, requests are not /// retried. #[derive(Clone, Debug)] -pub struct HttpRetry { +pub struct HttpRetry { inner: S, - metrics: Metrics, + metrics: MetricFamilies, + extract: ReqX, _marker: PhantomData P>, } @@ -81,31 +83,45 @@ struct Metrics { // === impl NewHttpRetry === -impl NewHttpRetry { - pub fn layer(metrics: MetricFamilies) -> impl layer::Layer + Clone { +impl NewHttpRetry { + pub fn layer_via_mk( + extract: X, + metrics: MetricFamilies, + ) -> impl tower::layer::Layer + Clone { layer::mk(move |inner| Self { inner, + extract: extract.clone(), metrics: metrics.clone(), _marker: PhantomData, }) } } -impl NewService for NewHttpRetry +impl NewService for NewHttpRetry where - T: Param, P: Policy, L: Clone + std::fmt::Debug + Hash + Eq + Send + Sync + prom::encoding::EncodeLabelSet + 'static, + X: Clone + ExtractParam, N: NewService, { - type Service = HttpRetry; + type Service = HttpRetry; fn new_service(&self, target: T) -> Self::Service { - let labels: L = target.param(); - let metrics = self.metrics.metrics(&labels); + let Self { + inner, + metrics, + extract, + _marker, + } = self; + + let metrics = metrics.clone(); + let extract = extract.extract_param(&target); + let svc = inner.new_service(target); + HttpRetry { - inner: self.inner.new_service(target), + inner: svc, metrics, + extract, _marker: PhantomData, } } @@ -179,21 +195,13 @@ where // === impl HttpRetry === -impl HttpRetry { - pub fn layer() -> impl layer::Layer + Copy { - layer::mk(|inner| Self { - inner, - metrics: Metrics::default(), - _marker: PhantomData, - }) - } -} - -impl Service> for HttpRetry +impl Service> for HttpRetry where P: Policy, P: Param, P: Clone + Send + Sync + std::fmt::Debug + 'static, + L: Clone + std::fmt::Debug + Hash + Eq + Send + Sync + prom::encoding::EncodeLabelSet + 'static, + ReqX: ExtractParam>, S: Service, Response = http::Response, Error = Error> + Clone + Send @@ -222,7 +230,11 @@ where return future::Either::Left(self.inner.call(req)); }; + // TODO(kate): extract the params, metrics, and labels. in the future, we would like to + // avoid this middleware needing to know about Prometheus labels. let params = policy.param(); + let labels = self.extract.extract_param(&req); + let metrics = self.metrics.metrics(&labels); // Since this request is retryable, we need to setup the request body to // be buffered/cloneable. If the request body is too large to be cloned, @@ -248,7 +260,7 @@ where // can rely on the fact that we've taken the ready inner service handle. let pending = self.inner.clone(); let svc = std::mem::replace(&mut self.inner, pending); - let call = send_req_with_retries(svc, req, policy, self.metrics.clone(), params); + let call = send_req_with_retries(svc, req, policy, metrics, params); future::Either::Right(Box::pin(call)) } }