From 30be06a1ad119bd64f0e3f07b5ec2ad4e8b4a86e Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 24 Feb 2023 20:10:51 +0000 Subject: [PATCH] Do not gracefully drain the proxy on SIGTERM Per the discussion in linkerd/linkerd2#10379, the proxy's shutdown behavior is overly aggressive and does not properly honor a pod's `terminationGracePeriodSeconds` configuration: while an application container may continue to run with a server for some period after the pod's delete process begins, the proxy will not permit any new connections in or out of the pod. This may also interfere with administrative probes during the shutdown process. To address this, the proxy is updated to only change its readiness probe state when it receives a SIGTERM. The proxy continues to run-- without closing any connections or rejecting any work--until it is terminated (i.e., by kubelet) with a SIGKILL. It is expected that, in most cases, clients will voluntarily move traffic as they process discovery updates. We need not do anything to interfere with the application's graceful termination behavior. The proxy continues its former graceful shutdown behavior when shutdown is initiated via the admin server `/shutdown` endpoint (e.g., by linkerd-await). --- linkerd/app/admin/src/lib.rs | 2 +- linkerd/app/admin/src/server.rs | 44 +++++++++++------------ linkerd/app/admin/src/server/readiness.rs | 34 +++++------------- linkerd/app/admin/src/stack.rs | 9 ++--- linkerd/app/integration/src/proxy.rs | 2 +- linkerd/app/src/lib.rs | 9 ++--- linkerd/signal/src/lib.rs | 27 +++++--------- linkerd2-proxy/src/main.rs | 25 ++++++++----- 8 files changed, 66 insertions(+), 86 deletions(-) diff --git a/linkerd/app/admin/src/lib.rs b/linkerd/app/admin/src/lib.rs index 97c66dc513..c48f04d300 100644 --- a/linkerd/app/admin/src/lib.rs +++ b/linkerd/app/admin/src/lib.rs @@ -4,5 +4,5 @@ mod server; mod stack; -pub use self::server::{Admin, Latch, Readiness}; +pub use self::server::{Admin, Readiness}; pub use self::stack::{Config, Task}; diff --git a/linkerd/app/admin/src/server.rs b/linkerd/app/admin/src/server.rs index d70b53c5a8..41a4d92ee5 100644 --- a/linkerd/app/admin/src/server.rs +++ b/linkerd/app/admin/src/server.rs @@ -32,7 +32,7 @@ mod json; mod log; mod readiness; -pub use self::readiness::{Latch, Readiness}; +pub use self::readiness::Readiness; #[derive(Clone)] pub struct Admin { @@ -61,7 +61,7 @@ impl Admin { } fn ready_rsp(&self) -> Response { - if self.ready.is_ready() { + if self.ready.get() { Response::builder() .status(StatusCode::OK) .header(http::header::CONTENT_TYPE, "text/plain") @@ -269,32 +269,32 @@ mod tests { const TIMEOUT: Duration = Duration::from_secs(1); + macro_rules! call { + ($admin:expr) => {{ + let r = Request::builder() + .method(Method::GET) + .uri("http://0.0.0.0/ready") + .body(Body::empty()) + .unwrap(); + let f = $admin.clone().oneshot(r); + timeout(TIMEOUT, f).await.expect("timeout").expect("call") + }}; + } + #[tokio::test] - async fn ready_when_latches_dropped() { - let (r, l0) = Readiness::new(); - let l1 = l0.clone(); + async fn readiness() { + let readiness = Readiness::new(false); let (_, t) = trace::Settings::default().build(); let (s, _) = mpsc::unbounded_channel(); - let admin = Admin::new((), r, s, t); - macro_rules! call { - () => {{ - let r = Request::builder() - .method(Method::GET) - .uri("http://0.0.0.0/ready") - .body(Body::empty()) - .unwrap(); - let f = admin.clone().oneshot(r); - timeout(TIMEOUT, f).await.expect("timeout").expect("call") - }}; - } - assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); + let admin = Admin::new((), readiness.clone(), s, t); + assert_eq!(call!(admin).status(), StatusCode::SERVICE_UNAVAILABLE); - drop(l0); - assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE); + readiness.set(true); + assert_eq!(call!(admin).status(), StatusCode::OK); - drop(l1); - assert_eq!(call!().status(), StatusCode::OK); + readiness.set(false); + assert_eq!(call!(admin).status(), StatusCode::SERVICE_UNAVAILABLE); } } diff --git a/linkerd/app/admin/src/server/readiness.rs b/linkerd/app/admin/src/server/readiness.rs index 917fecef19..ce7908a1dd 100644 --- a/linkerd/app/admin/src/server/readiness.rs +++ b/linkerd/app/admin/src/server/readiness.rs @@ -1,36 +1,18 @@ -use std::sync::{Arc, Weak}; +use std::sync::{atomic::AtomicBool, Arc}; -/// Tracks the processes's readiness to serve traffic. -/// -/// Once `is_ready()` returns true, it will never return false. #[derive(Clone, Debug)] -pub struct Readiness(Weak<()>); - -/// When all latches are dropped, the process is considered ready. -#[derive(Clone, Debug)] -pub struct Latch(Arc<()>); +pub struct Readiness(Arc); impl Readiness { - pub fn new() -> (Readiness, Latch) { - let r = Arc::new(()); - (Readiness(Arc::downgrade(&r)), Latch(r)) - } - - pub fn is_ready(&self) -> bool { - self.0.upgrade().is_none() + pub fn new(init: bool) -> Readiness { + Readiness(Arc::new(init.into())) } -} -/// ALways ready. -impl Default for Readiness { - fn default() -> Self { - Self::new().0 + pub fn get(&self) -> bool { + self.0.load(std::sync::atomic::Ordering::Acquire) } -} -impl Latch { - /// Releases this readiness latch. - pub fn release(self) { - drop(self); + pub fn set(&self, ready: bool) { + self.0.store(ready, std::sync::atomic::Ordering::Release) } } diff --git a/linkerd/app/admin/src/stack.rs b/linkerd/app/admin/src/stack.rs index 55f364b8a2..4527714d31 100644 --- a/linkerd/app/admin/src/stack.rs +++ b/linkerd/app/admin/src/stack.rs @@ -1,3 +1,4 @@ +use crate::Readiness; use linkerd_app_core::{ classify, config::ServerConfig, @@ -24,7 +25,7 @@ pub struct Config { pub struct Task { pub listen_addr: Local, - pub latch: crate::Latch, + pub ready: Readiness, pub serve: Pin + Send + 'static>>, } @@ -97,8 +98,8 @@ impl Config { .get_policy(inbound::policy::LookupAddr(listen_addr.into())) .await?; - let (ready, latch) = crate::server::Readiness::new(); - let admin = crate::server::Admin::new(report, ready, shutdown, trace); + let ready = crate::server::Readiness::new(false); + let admin = crate::server::Admin::new(report, ready.clone(), shutdown, trace); let admin = svc::stack(move |_| admin.clone()) .push(metrics.proxy.http_endpoint.to_layer::()) .push_map_target(|(permit, http)| Permitted { permit, http }) @@ -169,7 +170,7 @@ impl Config { let serve = Box::pin(serve::serve(listen, admin, drain.signaled())); Ok(Task { listen_addr, - latch, + ready, serve, }) } diff --git a/linkerd/app/integration/src/proxy.rs b/linkerd/app/integration/src/proxy.rs index a2444639f6..1a162254f6 100644 --- a/linkerd/app/integration/src/proxy.rs +++ b/linkerd/app/integration/src/proxy.rs @@ -442,7 +442,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening { Poll::Ready(()) }); - let drain = main.spawn(); + let (_, drain) = main.spawn(); tokio::select! { _ = on_shutdown => { diff --git a/linkerd/app/src/lib.rs b/linkerd/app/src/lib.rs index e8e97edf8a..ab48ab8586 100644 --- a/linkerd/app/src/lib.rs +++ b/linkerd/app/src/lib.rs @@ -302,7 +302,7 @@ impl App { } } - pub fn spawn(self) -> drain::Signal { + pub fn spawn(self) -> (admin::Readiness, drain::Signal) { let App { admin, drain, @@ -312,6 +312,7 @@ impl App { tap, .. } = self; + let readiness = admin.ready.clone(); // Run a daemon thread for all administrative tasks. // @@ -348,11 +349,11 @@ impl App { .instrument(info_span!("identity").or_current()), ); - let latch = admin.latch; + let readiness = admin.ready; tokio::spawn( ready .map(move |()| { - latch.release(); + readiness.set(true); info!(id = %local_id, "Certified identity"); }) .instrument(info_span!("identity").or_current()), @@ -387,6 +388,6 @@ impl App { tokio::spawn(start_proxy); - drain + (readiness, drain) } } diff --git a/linkerd/signal/src/lib.rs b/linkerd/signal/src/lib.rs index 44fa63f98c..4bb4a4dd39 100644 --- a/linkerd/signal/src/lib.rs +++ b/linkerd/signal/src/lib.rs @@ -4,37 +4,30 @@ #![forbid(unsafe_code)] /// Returns a `Future` that completes when the proxy should start to shutdown. -pub async fn shutdown() { +pub async fn shutdown() -> &'static str { imp::shutdown().await } #[cfg(unix)] mod imp { use tokio::signal::unix::{signal, SignalKind}; - use tracing::info; - pub(super) async fn shutdown() { + pub(super) async fn shutdown() -> &'static str { tokio::select! { // SIGINT - To allow Ctrl-c to emulate SIGTERM while developing. - () = sig(SignalKind::interrupt(), "SIGINT") => {} + () = sig(SignalKind::interrupt()) => "SIGINT", // SIGTERM - Kubernetes sends this to start a graceful shutdown. - () = sig(SignalKind::terminate(), "SIGTERM") => {} - }; + () = sig(SignalKind::terminate()) => "SIGTERM", + } } - async fn sig(kind: SignalKind, name: &'static str) { + async fn sig(kind: SignalKind) { // Create a Future that completes the first // time the process receives 'sig'. signal(kind) .expect("Failed to register signal handler") .recv() .await; - info!( - // use target to remove 'imp' from output - target: "linkerd_proxy::signal", - "received {}, starting shutdown", - name, - ); } } @@ -42,7 +35,7 @@ mod imp { mod imp { use tracing::info; - pub(super) async fn shutdown() { + pub(super) async fn shutdown() -> &'static str { // On Windows, we don't have all the signals, but Windows also // isn't our expected deployment target. This implementation allows // developers on Windows to simulate proxy graceful shutdown @@ -51,10 +44,6 @@ mod imp { .expect("Failed to register signal handler") .recv() .await; - info!( - // use target to remove 'imp' from output - target: "linkerd_proxy::signal", - "received Ctrl-C, starting shutdown", - ); + "Ctrl-C" } } diff --git a/linkerd2-proxy/src/main.rs b/linkerd2-proxy/src/main.rs index d106ab910d..d4b8dbafa3 100644 --- a/linkerd2-proxy/src/main.rs +++ b/linkerd2-proxy/src/main.rs @@ -111,15 +111,22 @@ fn main() { } } - let drain = app.spawn(); - tokio::select! { - _ = signal::shutdown() => { - info!("Received shutdown signal"); - } - _ = shutdown_rx.recv() => { - info!("Received shutdown via admin interface"); - } - } + let (readiness, drain) = app.spawn(); + + // When a shutdown signal is received, set the readiness to false so + // that probes can discover that the proxy should not receive more traffic. + // Then, we do NOTHING. We expect a SIGKILL to come along and finish off + // the process. + tokio::spawn(async move { + let signal = signal::shutdown().await; + readiness.set(false); + info!("Received {signal}. Waiting to be terminated forcefully."); + }); + + // If the admin's shutdown channel is used, we gracefully drain open + // connections or terminate after a timeout. + shutdown_rx.recv().await; + info!("Received shutdown via admin interface"); match time::timeout(shutdown_grace_period, drain.drain()).await { Ok(()) => debug!("Shutdown completed gracefully"), Err(_) => warn!(