Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not gracefully drain the proxy on SIGTERM #2266

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion linkerd/app/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
mod server;
mod stack;

pub use self::server::{Admin, Latch, Readiness};
pub use self::server::{Admin, Readiness};
pub use self::stack::{Config, Task};
44 changes: 22 additions & 22 deletions linkerd/app/admin/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod json;
mod log;
mod readiness;

pub use self::readiness::{Latch, Readiness};
pub use self::readiness::Readiness;

#[derive(Clone)]
pub struct Admin<M> {
Expand Down Expand Up @@ -61,7 +61,7 @@ impl<M> Admin<M> {
}

fn ready_rsp(&self) -> Response<Body> {
if self.ready.is_ready() {
if self.ready.get() {
Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, "text/plain")
Expand Down Expand Up @@ -269,32 +269,32 @@ mod tests {

const TIMEOUT: Duration = Duration::from_secs(1);

macro_rules! call {
($admin:expr) => {{
let r = Request::builder()
.method(Method::GET)
.uri("http://0.0.0.0/ready")
.body(Body::empty())
.unwrap();
let f = $admin.clone().oneshot(r);
timeout(TIMEOUT, f).await.expect("timeout").expect("call")
}};
}

#[tokio::test]
async fn ready_when_latches_dropped() {
let (r, l0) = Readiness::new();
let l1 = l0.clone();
async fn readiness() {
let readiness = Readiness::new(false);

let (_, t) = trace::Settings::default().build();
let (s, _) = mpsc::unbounded_channel();
let admin = Admin::new((), r, s, t);
macro_rules! call {
() => {{
let r = Request::builder()
.method(Method::GET)
.uri("http://0.0.0.0/ready")
.body(Body::empty())
.unwrap();
let f = admin.clone().oneshot(r);
timeout(TIMEOUT, f).await.expect("timeout").expect("call")
}};
}

assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE);
let admin = Admin::new((), readiness.clone(), s, t);
assert_eq!(call!(admin).status(), StatusCode::SERVICE_UNAVAILABLE);

drop(l0);
assert_eq!(call!().status(), StatusCode::SERVICE_UNAVAILABLE);
readiness.set(true);
assert_eq!(call!(admin).status(), StatusCode::OK);

drop(l1);
assert_eq!(call!().status(), StatusCode::OK);
readiness.set(false);
assert_eq!(call!(admin).status(), StatusCode::SERVICE_UNAVAILABLE);
}
}
34 changes: 8 additions & 26 deletions linkerd/app/admin/src/server/readiness.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,18 @@
use std::sync::{Arc, Weak};
use std::sync::{atomic::AtomicBool, Arc};

/// Tracks the processes's readiness to serve traffic.
///
/// Once `is_ready()` returns true, it will never return false.
#[derive(Clone, Debug)]
pub struct Readiness(Weak<()>);

/// When all latches are dropped, the process is considered ready.
#[derive(Clone, Debug)]
pub struct Latch(Arc<()>);
pub struct Readiness(Arc<AtomicBool>);

impl Readiness {
pub fn new() -> (Readiness, Latch) {
let r = Arc::new(());
(Readiness(Arc::downgrade(&r)), Latch(r))
}

pub fn is_ready(&self) -> bool {
self.0.upgrade().is_none()
pub fn new(init: bool) -> Readiness {
Readiness(Arc::new(init.into()))
}
}

/// ALways ready.
impl Default for Readiness {
fn default() -> Self {
Self::new().0
pub fn get(&self) -> bool {
self.0.load(std::sync::atomic::Ordering::Acquire)
}
}

impl Latch {
/// Releases this readiness latch.
pub fn release(self) {
drop(self);
pub fn set(&self, ready: bool) {
self.0.store(ready, std::sync::atomic::Ordering::Release)
}
}
9 changes: 5 additions & 4 deletions linkerd/app/admin/src/stack.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::Readiness;
use linkerd_app_core::{
classify,
config::ServerConfig,
Expand All @@ -24,7 +25,7 @@ pub struct Config {

pub struct Task {
pub listen_addr: Local<ServerAddr>,
pub latch: crate::Latch,
pub ready: Readiness,
pub serve: Pin<Box<dyn std::future::Future<Output = ()> + Send + 'static>>,
}

Expand Down Expand Up @@ -97,8 +98,8 @@ impl Config {
.get_policy(inbound::policy::LookupAddr(listen_addr.into()))
.await?;

let (ready, latch) = crate::server::Readiness::new();
let admin = crate::server::Admin::new(report, ready, shutdown, trace);
let ready = crate::server::Readiness::new(false);
let admin = crate::server::Admin::new(report, ready.clone(), shutdown, trace);
let admin = svc::stack(move |_| admin.clone())
.push(metrics.proxy.http_endpoint.to_layer::<classify::Response, _, Permitted>())
.push_map_target(|(permit, http)| Permitted { permit, http })
Expand Down Expand Up @@ -169,7 +170,7 @@ impl Config {
let serve = Box::pin(serve::serve(listen, admin, drain.signaled()));
Ok(Task {
listen_addr,
latch,
ready,
serve,
})
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/app/integration/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ async fn run(proxy: Proxy, mut env: TestEnv, random_ports: bool) -> Listening {
Poll::Ready(())
});

let drain = main.spawn();
let (_, drain) = main.spawn();

tokio::select! {
_ = on_shutdown => {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/app/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ impl App {
}
}

pub fn spawn(self) -> drain::Signal {
pub fn spawn(self) -> (admin::Readiness, drain::Signal) {
let App {
admin,
drain,
Expand All @@ -312,6 +312,7 @@ impl App {
tap,
..
} = self;
let readiness = admin.ready.clone();

// Run a daemon thread for all administrative tasks.
//
Expand Down Expand Up @@ -348,11 +349,11 @@ impl App {
.instrument(info_span!("identity").or_current()),
);

let latch = admin.latch;
let readiness = admin.ready;
tokio::spawn(
ready
.map(move |()| {
latch.release();
readiness.set(true);
info!(id = %local_id, "Certified identity");
})
.instrument(info_span!("identity").or_current()),
Expand Down Expand Up @@ -387,6 +388,6 @@ impl App {

tokio::spawn(start_proxy);

drain
(readiness, drain)
}
}
27 changes: 8 additions & 19 deletions linkerd/signal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,38 @@
#![forbid(unsafe_code)]

/// Returns a `Future` that completes when the proxy should start to shutdown.
pub async fn shutdown() {
pub async fn shutdown() -> &'static str {
imp::shutdown().await
}

#[cfg(unix)]
mod imp {
use tokio::signal::unix::{signal, SignalKind};
use tracing::info;

pub(super) async fn shutdown() {
pub(super) async fn shutdown() -> &'static str {
tokio::select! {
// SIGINT - To allow Ctrl-c to emulate SIGTERM while developing.
() = sig(SignalKind::interrupt(), "SIGINT") => {}
() = sig(SignalKind::interrupt()) => "SIGINT",
// SIGTERM - Kubernetes sends this to start a graceful shutdown.
() = sig(SignalKind::terminate(), "SIGTERM") => {}
};
() = sig(SignalKind::terminate()) => "SIGTERM",
}
}

async fn sig(kind: SignalKind, name: &'static str) {
async fn sig(kind: SignalKind) {
// Create a Future that completes the first
// time the process receives 'sig'.
signal(kind)
.expect("Failed to register signal handler")
.recv()
.await;
info!(
// use target to remove 'imp' from output
target: "linkerd_proxy::signal",
"received {}, starting shutdown",
name,
);
}
}

#[cfg(not(unix))]
mod imp {
use tracing::info;

pub(super) async fn shutdown() {
pub(super) async fn shutdown() -> &'static str {
// On Windows, we don't have all the signals, but Windows also
// isn't our expected deployment target. This implementation allows
// developers on Windows to simulate proxy graceful shutdown
Expand All @@ -51,10 +44,6 @@ mod imp {
.expect("Failed to register signal handler")
.recv()
.await;
info!(
// use target to remove 'imp' from output
target: "linkerd_proxy::signal",
"received Ctrl-C, starting shutdown",
);
"Ctrl-C"
}
}
25 changes: 16 additions & 9 deletions linkerd2-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,22 @@ fn main() {
}
}

let drain = app.spawn();
tokio::select! {
_ = signal::shutdown() => {
info!("Received shutdown signal");
}
_ = shutdown_rx.recv() => {
info!("Received shutdown via admin interface");
}
}
let (readiness, drain) = app.spawn();

// When a shutdown signal is received, set the readiness to false so
// that probes can discover that the proxy should not receive more traffic.
// Then, we do NOTHING. We expect a SIGKILL to come along and finish off
// the process.
tokio::spawn(async move {
let signal = signal::shutdown().await;
readiness.set(false);
info!("Received {signal}. Waiting to be terminated forcefully.");
});

// If the admin's shutdown channel is used, we gracefully drain open
// connections or terminate after a timeout.
shutdown_rx.recv().await;
info!("Received shutdown via admin interface");
match time::timeout(shutdown_grace_period, drain.drain()).await {
Ok(()) => debug!("Shutdown completed gracefully"),
Err(_) => warn!(
Expand Down