From 1cbf6a5d469cea85f4a65862f5bc3150bad13b7f Mon Sep 17 00:00:00 2001 From: Aaron Friel Date: Mon, 3 Jan 2022 14:09:59 -0800 Subject: [PATCH] app: Implement a separate health check server A separate server provides identically behaving /live and /ready routes to the admin server. Does not remove the existing admin server's routes. Background: On some Kubernetes distributions, requests from the control plane may not come from a private address range IP address or even a consistent IP address. This poses a problem, because the admin server used in a multicluster mesh needs to simultaneously serve /live and /ready routes to: * The Kubernetes control plane, for liveness and readiness probes respectively * Remote clusters as part of probing for remote gateway In order to avoid exposing the other admin routes, the multicluster gateway uses an authorization policy forbidding unauthorized and out-of-cluster requests. This causes the gateway to fail readiness and liveness probes. Resolution: Implement a separate server in the proxy app that can securely serve /live and /ready routes. The port that server listens on can be used for health check probes internally, without an authorization policy. See: https://github.com/linkerd/linkerd2/pull/7548 --- Cargo.lock | 16 + linkerd/app/Cargo.toml | 1 + linkerd/app/health/Cargo.toml | 31 ++ linkerd/app/health/src/lib.rs | 8 + linkerd/app/health/src/server.rs | 127 ++++++++ linkerd/app/health/src/server/readiness.rs | 36 +++ linkerd/app/health/src/stack.rs | 296 ++++++++++++++++++ linkerd/app/integration/src/proxy.rs | 8 +- linkerd/app/integration/src/tests/identity.rs | 2 +- linkerd/app/integration/src/tests/tap.rs | 4 +- linkerd/app/src/env.rs | 19 +- linkerd/app/src/lib.rs | 37 ++- linkerd2-proxy/src/main.rs | 3 +- 13 files changed, 578 insertions(+), 10 deletions(-) create mode 100644 linkerd/app/health/Cargo.toml create mode 100644 linkerd/app/health/src/lib.rs create mode 100644 linkerd/app/health/src/server.rs create mode 100644 linkerd/app/health/src/server/readiness.rs create mode 100644 linkerd/app/health/src/stack.rs diff --git a/Cargo.lock b/Cargo.lock index dec33ff054..c5ada9f50d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -747,6 +747,7 @@ dependencies = [ "linkerd-app-admin", "linkerd-app-core", "linkerd-app-gateway", + "linkerd-app-health", "linkerd-app-inbound", "linkerd-app-outbound", "linkerd-error", @@ -855,6 +856,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "linkerd-app-health" +version = "0.1.0" +dependencies = [ + "futures", + "http", + "hyper", + "linkerd-app-core", + "linkerd-app-inbound", + "thiserror", + "tokio", + "tower", + "tracing", +] + [[package]] name = "linkerd-app-inbound" version = "0.1.0" diff --git a/linkerd/app/Cargo.toml b/linkerd/app/Cargo.toml index 3c806e7b39..eb6d63db47 100644 --- a/linkerd/app/Cargo.toml +++ b/linkerd/app/Cargo.toml @@ -17,6 +17,7 @@ allow-loopback = ["linkerd-app-outbound/allow-loopback"] [dependencies] futures = { version = "0.3", default-features = false } linkerd-app-admin = { path = "./admin" } +linkerd-app-health = { path = "./health" } linkerd-app-core = { path = "./core" } linkerd-app-gateway = { path = "./gateway" } linkerd-app-inbound = { path = "./inbound" } diff --git a/linkerd/app/health/Cargo.toml b/linkerd/app/health/Cargo.toml new file mode 100644 index 0000000000..c7746b710b --- /dev/null +++ b/linkerd/app/health/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "linkerd-app-health" +version = "0.1.0" +authors = ["Linkerd Developers "] +license = "Apache-2.0" +edition = "2021" +publish = false +description = """ +The linkerd proxy's health check server. +""" + +[dependencies] +http = "0.2" +hyper = { version = "0.14", features = ["http1", "http2"] } +futures = { version = "0.3", default-features = false } +linkerd-app-core = { path = "../core" } +linkerd-app-inbound = { path = "../inbound" } +thiserror = "1" +tokio = { version = "1", features = ["macros", "sync", "parking_lot"]} +tracing = "0.1" + +[dependencies.tower] +version = "0.4" +default-features = false +features = [ + "buffer", + "make", + "spawn-ready", + "timeout", + "util", +] diff --git a/linkerd/app/health/src/lib.rs b/linkerd/app/health/src/lib.rs new file mode 100644 index 0000000000..cdb6e65e71 --- /dev/null +++ b/linkerd/app/health/src/lib.rs @@ -0,0 +1,8 @@ +#![deny(warnings, rust_2018_idioms)] +#![forbid(unsafe_code)] + +mod server; +mod stack; + +pub use self::server::{Health, Latch, Readiness}; +pub use self::stack::{Config, Task}; diff --git a/linkerd/app/health/src/server.rs b/linkerd/app/health/src/server.rs new file mode 100644 index 0000000000..537be84090 --- /dev/null +++ b/linkerd/app/health/src/server.rs @@ -0,0 +1,127 @@ +//! Serves an HTTP health server. +//! +//! * `GET /ready` -- returns 200 when the proxy is ready to participate in meshed +//! traffic. +//! * `GET /live` -- returns 200 when the proxy is live. + +use futures::future; +use http::StatusCode; +use hyper::{ + body::{Body, HttpBody}, + Request, Response, +}; +use linkerd_app_core::Error; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +mod readiness; + +pub use self::readiness::{Latch, Readiness}; + +#[derive(Clone)] +pub struct Health { + ready: Readiness, +} + +pub type ResponseFuture = + Pin, Error>> + Send + 'static>>; + +impl Health { + pub fn new(ready: Readiness) -> Self { + Self { ready } + } + + fn ready_rsp(&self) -> Response { + if self.ready.is_ready() { + Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "text/plain") + .body("ready\n".into()) + .expect("builder with known status code must not fail") + } else { + Response::builder() + .status(StatusCode::SERVICE_UNAVAILABLE) + .body("not ready\n".into()) + .expect("builder with known status code must not fail") + } + } + + fn live_rsp() -> Response { + Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, "text/plain") + .body("live\n".into()) + .expect("builder with known status code must not fail") + } + + fn not_found() -> Response { + Response::builder() + .status(http::StatusCode::NOT_FOUND) + .body(Body::empty()) + .expect("builder with known status code must not fail") + } +} + +impl tower::Service> for Health +where + B: HttpBody + Send + Sync + 'static, + B::Error: Into, + B::Data: Send, +{ + type Response = http::Response; + type Error = Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + match req.uri().path() { + "/live" => Box::pin(future::ok(Self::live_rsp())), + "/ready" => Box::pin(future::ok(self.ready_rsp())), + _ => Box::pin(future::ok(Self::not_found())), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http::method::Method; + use std::time::Duration; + use tokio::time::timeout; + use tower::util::ServiceExt; + + const TIMEOUT: Duration = Duration::from_secs(1); + + #[tokio::test] + async fn ready_when_latches_dropped() { + let (r, l0) = Readiness::new(); + let l1 = l0.clone(); + + let health = Health::new(r); + macro_rules! call { + () => {{ + let r = Request::builder() + .method(Method::GET) + .uri("http://0.0.0.0/ready") + .body(Body::empty()) + .unwrap(); + let f = health.clone().oneshot(r); + timeout(TIMEOUT, f).await.expect("timeout").expect("call") + }}; + } + + assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); + + drop(l0); + assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); + + drop(l1); + assert_eq!(call!().status(), StatusCode::OK); + } +} diff --git a/linkerd/app/health/src/server/readiness.rs b/linkerd/app/health/src/server/readiness.rs new file mode 100644 index 0000000000..917fecef19 --- /dev/null +++ b/linkerd/app/health/src/server/readiness.rs @@ -0,0 +1,36 @@ +use std::sync::{Arc, Weak}; + +/// Tracks the processes's readiness to serve traffic. +/// +/// Once `is_ready()` returns true, it will never return false. +#[derive(Clone, Debug)] +pub struct Readiness(Weak<()>); + +/// When all latches are dropped, the process is considered ready. +#[derive(Clone, Debug)] +pub struct Latch(Arc<()>); + +impl Readiness { + pub fn new() -> (Readiness, Latch) { + let r = Arc::new(()); + (Readiness(Arc::downgrade(&r)), Latch(r)) + } + + pub fn is_ready(&self) -> bool { + self.0.upgrade().is_none() + } +} + +/// ALways ready. +impl Default for Readiness { + fn default() -> Self { + Self::new().0 + } +} + +impl Latch { + /// Releases this readiness latch. + pub fn release(self) { + drop(self); + } +} diff --git a/linkerd/app/health/src/stack.rs b/linkerd/app/health/src/stack.rs new file mode 100644 index 0000000000..c3ae5a2c75 --- /dev/null +++ b/linkerd/app/health/src/stack.rs @@ -0,0 +1,296 @@ +use linkerd_app_core::{ + classify, + config::ServerConfig, + detect, drain, errors, identity, metrics, + proxy::http, + serve, + svc::{self, ExtractParam, InsertParam, Param}, + tls, + transport::{self, listen::Bind, ClientAddr, Local, OrigDstAddr, Remote, ServerAddr}, + Error, Result, +}; +use linkerd_app_inbound as inbound; +use std::{pin::Pin, time::Duration}; +use thiserror::Error; +use tracing::debug; + +#[derive(Clone, Debug)] +pub struct Config { + pub server: ServerConfig, + pub metrics_retain_idle: Duration, +} + +pub struct Task { + pub listen_addr: Local, + pub latch: crate::Latch, + pub serve: Pin + Send + 'static>>, +} + +#[derive(Debug, Error)] +#[error("non-HTTP connection from {}", self.0)] +struct NonHttpClient(Remote); + +#[derive(Debug, Error)] +#[error("Unexpected TLS connection to {} from {}", self.0, self.1)] +struct UnexpectedSni(tls::ServerId, Remote); + +#[derive(Clone, Debug)] +struct Tcp { + policy: inbound::policy::AllowPolicy, + addr: Local, + client: Remote, + tls: tls::ConditionalServerTls, +} + +#[derive(Clone, Debug)] +struct Http { + tcp: Tcp, + version: http::Version, +} + +#[derive(Clone, Debug)] +struct Permitted { + permit: inbound::policy::Permit, + http: Http, +} + +#[derive(Clone)] +struct TlsParams { + identity: identity::Server, +} + +const DETECT_TIMEOUT: Duration = Duration::from_secs(1); + +#[derive(Copy, Clone, Debug)] +struct Rescue; + +// === impl Config === + +impl Config { + #[allow(clippy::too_many_arguments)] + pub fn build( + self, + bind: B, + policy: impl inbound::policy::CheckPolicy, + identity: identity::Server, + metrics: inbound::Metrics, + drain: drain::Watch, + ) -> Result + where + B: Bind, + B::Addrs: svc::Param> + svc::Param>, + { + let (listen_addr, listen) = bind.bind(&self.server)?; + + // Get the policy for the health server. + let policy = policy.check_policy(OrigDstAddr(listen_addr.into()))?; + + let (ready, latch) = crate::server::Readiness::new(); + let health = crate::server::Health::new(ready); + let health = svc::stack(move |_| health.clone()) + .push(metrics.proxy.http_endpoint.to_layer::()) + .push_map_target(|(permit, http)| Permitted { permit, http }) + .push(inbound::policy::NewAuthorizeHttp::layer(metrics.http_authz.clone())) + .push(Rescue::layer()) + .push_on_service(http::BoxResponse::layer()) + .push(http::NewServeHttp::layer(Default::default(), drain.clone())) + .push_request_filter( + |(http, tcp): ( + Result, detect::DetectTimeoutError<_>>, + Tcp, + )| { + match http { + Ok(Some(version)) => Ok(Http { version, tcp }), + // If detection timed out, we can make an educated guess at the proper + // behavior: + // - If the connection was meshed, it was most likely transported over + // HTTP/2. + // - If the connection was unmeshed, it was mostly likely HTTP/1. + // - If we received some unexpected SNI, the client is mostly likely + // confused/stale. + Err(_timeout) => { + let version = match tcp.tls.clone() { + tls::ConditionalServerTls::None(_) => http::Version::Http1, + tls::ConditionalServerTls::Some(tls::ServerTls::Established { + .. + }) => http::Version::H2, + tls::ConditionalServerTls::Some(tls::ServerTls::Passthru { + sni, + }) => { + debug_assert!(false, "If we know the stream is non-mesh TLS, we should be able to prove its not HTTP."); + return Err(Error::from(UnexpectedSni(sni, tcp.client))); + } + }; + debug!(%version, "HTTP detection timed out; assuming HTTP"); + Ok(Http { version, tcp }) + } + // If the connection failed HTTP detection, check if we detected TLS for + // another target. This might indicate that the client is confused/stale. + Ok(None) => match tcp.tls { + tls::ConditionalServerTls::Some(tls::ServerTls::Passthru { sni }) => { + Err(UnexpectedSni(sni, tcp.client).into()) + } + _ => Err(NonHttpClient(tcp.client).into()), + }, + } + }, + ) + .push(svc::ArcNewService::layer()) + .push(detect::NewDetectService::layer(svc::stack::CloneParam::from(detect::Config::::from_timeout(DETECT_TIMEOUT)))) + .push(transport::metrics::NewServer::layer(metrics.proxy.transport)) + .push_map_target(move |(tls, addrs): (tls::ConditionalServerTls, B::Addrs)| { + Tcp { + tls, + client: addrs.param(), + addr: addrs.param(), + policy: policy.clone(), + } + }) + .push(svc::ArcNewService::layer()) + .push(tls::NewDetectTls::::layer(TlsParams { + identity, + })) + .into_inner(); + + let serve = Box::pin(serve::serve(listen, health, drain.signaled())); + Ok(Task { + listen_addr, + latch, + serve, + }) + } +} + +// === impl Tcp === + +impl Param for Tcp { + fn param(&self) -> transport::labels::Key { + transport::labels::Key::inbound_server( + self.tls.clone(), + self.addr.into(), + self.policy.server_label(), + ) + } +} + +// === impl Http === + +impl Param for Http { + fn param(&self) -> http::Version { + self.version + } +} + +impl Param for Http { + fn param(&self) -> OrigDstAddr { + OrigDstAddr(self.tcp.addr.into()) + } +} + +impl Param> for Http { + fn param(&self) -> Remote { + self.tcp.client + } +} + +impl Param for Http { + fn param(&self) -> tls::ConditionalServerTls { + self.tcp.tls.clone() + } +} + +impl Param for Http { + fn param(&self) -> inbound::policy::AllowPolicy { + self.tcp.policy.clone() + } +} + +impl Param for Http { + fn param(&self) -> metrics::ServerLabel { + self.tcp.policy.server_label() + } +} + +// === impl Permitted === + +impl Param for Permitted { + fn param(&self) -> metrics::EndpointLabels { + metrics::InboundEndpointLabels { + tls: self.http.tcp.tls.clone(), + authority: None, + target_addr: self.http.tcp.addr.into(), + policy: self.permit.labels.clone(), + } + .into() + } +} + +// === TlsParams === + +impl ExtractParam for TlsParams { + #[inline] + fn extract_param(&self, _: &T) -> tls::server::Timeout { + tls::server::Timeout(DETECT_TIMEOUT) + } +} + +impl ExtractParam for TlsParams { + #[inline] + fn extract_param(&self, _: &T) -> identity::Server { + self.identity.clone() + } +} + +impl InsertParam for TlsParams { + type Target = (tls::ConditionalServerTls, T); + + #[inline] + fn insert_param(&self, tls: tls::ConditionalServerTls, target: T) -> Self::Target { + (tls, target) + } +} + +// === impl Rescue === + +impl Rescue { + /// Synthesizes responses for HTTP requests that encounter errors. + fn layer( + ) -> impl svc::layer::Layer> + Clone { + errors::respond::layer(Self) + } +} + +impl ExtractParam for Rescue { + #[inline] + fn extract_param(&self, _: &T) -> Self { + Self + } +} + +impl> ExtractParam for Rescue { + #[inline] + fn extract_param(&self, t: &T) -> errors::respond::EmitHeaders { + // Only emit informational headers to meshed peers. + let emit = t + .param() + .value() + .map(|tls| match tls { + tls::ServerTls::Established { client_id, .. } => client_id.is_some(), + _ => false, + }) + .unwrap_or(false); + errors::respond::EmitHeaders(emit) + } +} + +impl errors::HttpRescue for Rescue { + fn rescue(&self, error: Error) -> Result { + let cause = errors::root_cause(&*error); + if cause.is::() { + return Ok(errors::SyntheticHttpResponse::permission_denied(error)); + } + + tracing::warn!(%error, "Unexpected error"); + Ok(errors::SyntheticHttpResponse::unexpected_error()) + } +} diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index 727a6e20c1..71c9d0e191 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -38,6 +38,7 @@ pub struct Listening { pub inbound: SocketAddr, pub outbound: SocketAddr, pub metrics: SocketAddr, + pub health: SocketAddr, pub outbound_server: Option, pub inbound_server: Option, @@ -365,9 +366,10 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { let bind_in = inbound; let bind_out = outbound; let bind_adm = listen::BindTcp::default(); + let bind_health = listen::BindTcp::default(); let (shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel(); let main = config - .build(bind_in, bind_out, bind_adm, shutdown_tx, trace_handle) + .build(bind_in, bind_out, bind_adm, bind_health, shutdown_tx, trace_handle) .await .expect("config"); @@ -380,6 +382,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { main.inbound_addr(), main.outbound_addr(), main.admin_addr(), + main.health_addr(), ); let mut running = Some((running_tx, addrs)); let on_shutdown = futures::future::poll_fn::<(), _>(move |cx| { @@ -411,7 +414,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { }) .expect("spawn"); - let (tap_addr, identity_addr, inbound_addr, outbound_addr, metrics_addr) = + let (tap_addr, identity_addr, inbound_addr, outbound_addr, metrics_addr, health_addr) = running_rx.await.unwrap(); tracing::info!( @@ -429,6 +432,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { inbound: inbound_addr.into(), outbound: outbound_addr.into(), metrics: metrics_addr.into(), + health: health_addr.into(), outbound_server: proxy.outbound_server, inbound_server: proxy.inbound_server, diff --git a/linkerd/app/integration/src/tests/identity.rs b/linkerd/app/integration/src/tests/identity.rs index f8cb3e9c63..1350e5d684 100644 --- a/linkerd/app/integration/src/tests/identity.rs +++ b/linkerd/app/integration/src/tests/identity.rs @@ -171,7 +171,7 @@ async fn ready() { let proxy = proxy::new().identity(id_svc).run_with_test_env(env).await; - let client = client::http1(proxy.metrics, "localhost"); + let client = client::http1(proxy.health, "localhost"); let ready = || async { client diff --git a/linkerd/app/integration/src/tests/tap.rs b/linkerd/app/integration/src/tests/tap.rs index 5c772ead8d..6e3b1ac47e 100644 --- a/linkerd/app/integration/src/tests/tap.rs +++ b/linkerd/app/integration/src/tests/tap.rs @@ -123,7 +123,7 @@ async fn inbound_http1() { .await; // Wait for the server proxy to become ready - let client = client::http1(srv_proxy.metrics, "localhost"); + let client = client::http1(srv_proxy.health, "localhost"); let ready = || async { client .request(client.request_builder("/ready").method("GET")) @@ -214,7 +214,7 @@ async fn grpc_headers_end() { .await; // Wait for the server proxy to become ready - let client = client::http2(srv_proxy.metrics, "localhost"); + let client = client::http2(srv_proxy.health, "localhost"); let ready = || async { client .request(client.request_builder("/ready").method("GET")) diff --git a/linkerd/app/src/env.rs b/linkerd/app/src/env.rs index 00069a6038..09863a10b3 100644 --- a/linkerd/app/src/env.rs +++ b/linkerd/app/src/env.rs @@ -91,6 +91,7 @@ pub const ENV_OUTBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_OUTBOUND_LISTEN_ADDR" pub const ENV_INBOUND_LISTEN_ADDR: &str = "LINKERD2_PROXY_INBOUND_LISTEN_ADDR"; pub const ENV_CONTROL_LISTEN_ADDR: &str = "LINKERD2_PROXY_CONTROL_LISTEN_ADDR"; pub const ENV_ADMIN_LISTEN_ADDR: &str = "LINKERD2_PROXY_ADMIN_LISTEN_ADDR"; +pub const ENV_HEALTH_LISTEN_ADDR: &str = "LINKERD2_PROXY_HEALTH_LISTEN_ADDR"; pub const ENV_METRICS_RETAIN_IDLE: &str = "LINKERD2_PROXY_METRICS_RETAIN_IDLE"; @@ -223,6 +224,7 @@ const DEFAULT_OUTBOUND_LISTEN_ADDR: &str = "127.0.0.1:4140"; pub const DEFAULT_INBOUND_LISTEN_ADDR: &str = "0.0.0.0:4143"; pub const DEFAULT_CONTROL_LISTEN_ADDR: &str = "0.0.0.0:4190"; const DEFAULT_ADMIN_LISTEN_ADDR: &str = "127.0.0.1:4191"; +const DEFAULT_HEALTH_LISTEN_ADDR: &str = "127.0.0.1:4192"; const DEFAULT_METRICS_RETAIN_IDLE: Duration = Duration::from_secs(10 * 60); const DEFAULT_INBOUND_DISPATCH_TIMEOUT: Duration = Duration::from_secs(1); const DEFAULT_INBOUND_DETECT_TIMEOUT: Duration = Duration::from_secs(10); @@ -298,6 +300,7 @@ pub fn parse_config(strings: &S) -> Result let outbound_listener_addr = parse(strings, ENV_OUTBOUND_LISTEN_ADDR, parse_socket_addr); let inbound_listener_addr = parse(strings, ENV_INBOUND_LISTEN_ADDR, parse_socket_addr); let admin_listener_addr = parse(strings, ENV_ADMIN_LISTEN_ADDR, parse_socket_addr); + let health_listener_addr = parse(strings, ENV_HEALTH_LISTEN_ADDR, parse_socket_addr); let inbound_detect_timeout = parse(strings, ENV_INBOUND_DETECT_TIMEOUT, parse_duration); let inbound_dispatch_timeout = parse(strings, ENV_INBOUND_DISPATCH_TIMEOUT, parse_duration); @@ -480,6 +483,10 @@ pub fn parse_config(strings: &S) -> Result let admin_listener_addr = admin_listener_addr? .unwrap_or_else(|| parse_socket_addr(DEFAULT_ADMIN_LISTEN_ADDR).unwrap()); + + let health_listener_addr = health_listener_addr? + .unwrap_or_else(|| parse_socket_addr(DEFAULT_HEALTH_LISTEN_ADDR).unwrap()); + let inbound = { let addr = ListenAddr( inbound_listener_addr? @@ -717,7 +724,7 @@ pub fn parse_config(strings: &S) -> Result }; let admin = super::admin::Config { - metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), + metrics_retain_idle: metrics_retain_idle.clone()?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), server: ServerConfig { addr: ListenAddr(admin_listener_addr), keepalive: inbound.proxy.server.keepalive, @@ -725,6 +732,15 @@ pub fn parse_config(strings: &S) -> Result }, }; + let health = super::health::Config { + metrics_retain_idle: metrics_retain_idle?.unwrap_or(DEFAULT_METRICS_RETAIN_IDLE), + server: ServerConfig { + addr: ListenAddr(health_listener_addr), + keepalive: inbound.proxy.server.keepalive, + h2_settings, + }, + }; + let dns = dns::Config { min_ttl: dns_min_ttl?, max_ttl: dns_max_ttl?, @@ -793,6 +809,7 @@ pub fn parse_config(strings: &S) -> Result Ok(super::Config { admin, + health, dns, dst, tap, diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index f20267b6fe..69dca3d997 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -12,6 +12,7 @@ pub mod tap; pub use self::metrics::Metrics; use futures::{future, Future, FutureExt}; use linkerd_app_admin as admin; +use linkerd_app_health as health; pub use linkerd_app_core::{self as core, metrics, trace}; use linkerd_app_core::{ config::ServerConfig, @@ -54,12 +55,14 @@ pub struct Config { pub identity: identity::Config, pub dst: dst::Config, pub admin: admin::Config, + pub health: health::Config, pub tap: tap::Config, pub oc_collector: oc_collector::Config, } pub struct App { admin: admin::Task, + health: health::Task, drain: drain::Signal, dst: ControlAddr, identity: identity::Identity, @@ -81,11 +84,12 @@ impl Config { /// /// It is currently required that this be run on a Tokio runtime, since some /// services are created eagerly and must spawn tasks to do so. - pub async fn build( + pub async fn build( self, bind_in: BIn, bind_out: BOut, bind_admin: BAdmin, + bind_health: BHealth, shutdown_tx: mpsc::UnboundedSender<()>, log_level: trace::Handle, ) -> Result @@ -96,9 +100,12 @@ impl Config { BOut::Addrs: Param> + Param> + Param, BAdmin: Bind + Clone + 'static, BAdmin::Addrs: Param> + Param>, + BHealth: Bind + Clone + 'static, + BHealth::Addrs: Param> + Param>, { let Config { admin, + health, dns, dst, identity, @@ -166,6 +173,7 @@ impl Config { .metrics() .and_report(outbound.metrics()) .and_report(report); + let drain_rx = drain_rx.clone(); info_span!("admin").in_scope(move || { admin.build( bind_admin, @@ -180,6 +188,21 @@ impl Config { })? }; + let health = { + let identity = identity.receiver().server(); + let metrics = inbound.metrics(); + let policy = inbound_policies.clone(); + info_span!("admin").in_scope(move || { + health.build( + bind_health, + policy, + identity, + metrics, + drain_rx, + ) + })? + }; + let dst_addr = dst.addr.clone(); let gateway_stack = gateway::stack( gateway, @@ -231,6 +254,7 @@ impl Config { Ok(App { admin, + health, dst: dst_addr, drain: drain_tx, identity, @@ -263,6 +287,10 @@ impl App { self.admin.listen_addr } + pub fn health_addr(&self) -> Local { + self.health.listen_addr + } + pub fn inbound_addr(&self) -> Local { self.inbound_addr } @@ -300,6 +328,7 @@ impl App { pub fn spawn(self) -> drain::Signal { let App { admin, + health, drain, identity, oc_collector, @@ -343,11 +372,13 @@ impl App { .instrument(info_span!("identity").or_current()), ); - let latch = admin.latch; + let admin_latch = admin.latch; + let health_latch = health.latch; tokio::spawn( ready .map(move |()| { - latch.release(); + admin_latch.release(); + health_latch.release(); info!(id = %local_id, "Certified identity"); }) .instrument(info_span!("identity").or_current()), diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index 10b10cec2e..c690752af5 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -48,7 +48,7 @@ fn main() { let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); let bind = BindTcp::with_orig_dst(); let app = match config - .build(bind, bind, BindTcp::default(), shutdown_tx, trace) + .build(bind, bind, BindTcp::default(), BindTcp::default(), shutdown_tx, trace) .await { Ok(app) => app, @@ -59,6 +59,7 @@ fn main() { }; info!("Admin interface on {}", app.admin_addr()); + info!("Health interface on {}", app.health_addr()); info!("Inbound interface on {}", app.inbound_addr()); info!("Outbound interface on {}", app.outbound_addr());