Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): implement Clone for GracefulShutdown #136

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

tottoto
Copy link
Contributor

@tottoto tottoto commented Jul 10, 2024

Implements Clone for GracefulShutdown. This allows users to use GracefulShutdown in other functions.

@seanmonstar
Copy link
Member

seanmonstar commented Jul 12, 2024

in other functions.

Not that I'm against adding this, but knowing the use cases would help.

There's a part of me that worries if having multiple clones could be confusing when shutdown() consumes self...

@tottoto
Copy link
Contributor Author

tottoto commented Jul 12, 2024

hyperium/tonic#1788 is a use case of this change.

@tottoto tottoto force-pushed the implement-clone-for-gracefulshutdown branch from 32d1c48 to eb6167a Compare July 16, 2024 10:57
@tottoto tottoto force-pushed the implement-clone-for-gracefulshutdown branch 2 times, most recently from 3e306ec to d3b844d Compare July 26, 2024 16:40
@tottoto tottoto force-pushed the implement-clone-for-gracefulshutdown branch from d3b844d to 670aec7 Compare August 7, 2024 10:16
@zacknewman
Copy link

zacknewman commented Dec 26, 2024

@seanmonstar, when you need to move GracefulShutdown into a separate task before calling watch, you can't since it doesn't implement Clone. A trivial example is handling TLS connections. The time it takes to accept a TLS connection can take too long to do outside of a dedicated task; thus it should be done in a separate task.

Personally the amount of code it takes to do your own graceful shutdown is very little, so I will likely keep using Sender<()> and Receiver<()> directly since I get to avoid a clone on Sender<()> altogether since I can call subscribe before accepting the TLS connection; however seeing how GracefulShutdown exists, others may be willing to pay the price of an extra clone.

async fn main_loop<S>(
    listener: tokio::net::TcpListener,
    acceptor: tokio_rustls::TlsAcceptor,
    app: S,
    server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
    sender: &hyper_util::server::graceful::GracefulShutdown,
) -> core::convert::Infallible
where
    S: hyper::service::Service<
            axum::extract::Request<hyper::body::Incoming>,
            Response = axum::response::Response<axum::body::Body>,
            Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
            Error = core::convert::Infallible,
        > + core::clone::Clone
        + core::marker::Send
        + 'static,
{
    loop {
        if let Ok(val) = listener.accept().await {
            // This can take too long; thus should be done inside `tokio::spawn`;
            // however we can't since that `move`s `sender`.
            if let Ok(io) = acceptor.accept(val.0).await {
                let con = sender.watch(server.serve_connection(hyper_util::rt::TokioIo::new(io), app.clone()));
                tokio::spawn(async move {
                    let _r: Result<(), hyper::Error> = con.await;
                });
            }
        }
    }
}

If GracefulShutdown implemented Clone, however, then above could be transformed into below:

async fn main_loop<S>(
    listener: tokio::net::TcpListener,
    acceptor: tokio_rustls::TlsAcceptor,
    app: S,
    server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
    sender: hyper_util::server::graceful::GracefulShutdown,
) -> core::convert::Infallible
where
    S: hyper::service::Service<
            axum::extract::Request<hyper::body::Incoming>,
            Response = axum::response::Response<axum::body::Body>,
            Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
            Error = core::convert::Infallible,
        > + core::clone::Clone
        + core::marker::Send
        + 'static,
{
    loop {
        if let Ok(val) = listener.accept().await {
            let tls = acceptor.clone();
            let http = app.clone();
            let builder = server.clone();
            let shutdown = sender.clone();
            tokio::spawn(async move {
                if let Ok(io) = tls.accept(val.0).await {
                    let _r: Result<(), hyper::Error> = shutdown.watch(builder.serve_connection(hyper_util::rt::TokioIo::new(io), http)).await;
                }
            });
        }
    }
}

"Hand-rolled" graceful shutdown:

struct ShutdownConnection<S, F>
where
    S: hyper::service::Service<
        axum::extract::Request<hyper::body::Incoming>,
        Response = axum::response::Response<axum::body::Body>,
        Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
        Error = core::convert::Infallible,
    >,
{
    con: hyper::server::conn::http2::Connection<
        hyper_util::rt::TokioIo<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>,
        S,
        hyper_util::rt::TokioExecutor,
    >,
    signal: F,
    shutting_down: bool,
}
impl<S, F> ShutdownConnection<S, F>
where
    S: hyper::service::Service<
        axum::extract::Request<hyper::body::Incoming>,
        Response = axum::response::Response<axum::body::Body>,
        Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
        Error = core::convert::Infallible,
    >,
{
    fn new(
        con: hyper::server::conn::http2::Connection<
            hyper_util::rt::TokioIo<tokio_rustls::server::TlsStream<tokio::net::TcpStream>>,
            S,
            hyper_util::rt::TokioExecutor,
        >,
        signal: F,
    ) -> Self {
        Self {
            con,
            signal,
            shutting_down: false,
        }
    }
}
impl<S, F> core::future::Future for ShutdownConnection<S, F>
where
    S: hyper::service::Service<
        axum::extract::Request<hyper::body::Incoming>,
        Response = axum::response::Response<axum::body::Body>,
        Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
        Error = core::convert::Infallible,
    >,
    F: core::future::Future<Output = Result<(), tokio::sync::watch::error::RecvError>>
        + core::marker::Unpin,
{
    type Output = Result<(), hyper::Error>;
    fn poll(
        mut self: core::pin::Pin<&mut Self>,
        cx: &mut core::task::Context<'_>,
    ) -> core::task::Poll<Self::Output> {
        if !self.shutting_down && core::pin::Pin::new(&mut self.signal).poll(cx).is_ready() {
            self.shutting_down = true;
            core::pin::Pin::new(&mut self.con).graceful_shutdown();
        }
        core::pin::Pin::new(&mut self.con).poll(cx)
    }
}
async fn main_loop<S>(
    listener: tokio::net::TcpListener,
    acceptor: tokio_rustls::TlsAcceptor,
    app: S,
    server: hyper::server::conn::http2::Builder<hyper_util::rt::TokioExecutor>,
    sender: &tokio::sync::watch::Sender<()>,
) -> core::convert::Infallible
where
    S: hyper::service::Service<
            axum::extract::Request<hyper::body::Incoming>,
            Response = axum::response::Response<axum::body::Body>,
            Future = axum::routing::future::RouteFuture<core::convert::Infallible>,
            Error = core::convert::Infallible,
        > + core::clone::Clone
        + core::marker::Send
        + 'static,
{
    loop {
        if let Ok(val) = listener.accept().await {
            let tls = acceptor.clone();
            let http = app.clone();
            let builder = server.clone();
            let mut rx = sender.subscribe();
            tokio::spawn(async move {
                if let Ok(io) = tls.accept(val.0).await {
                    let _r: Result<(), hyper::Error> = ShutdownConnection::new(
                        builder.serve_connection(hyper_util::rt::TokioIo::new(io), http),
                        core::pin::pin!(rx.changed()),
                    )
                    .await;
                }
            });
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants