Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

On worker stop #1195

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions server/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub struct Builder {
pub(crate) enable_signal: bool,
pub(crate) shutdown_timeout: Duration,
pub(crate) on_worker_start: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
pub(crate) on_worker_stop: Box<dyn Fn() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
backlog: u32,
}

Expand All @@ -40,6 +41,7 @@ impl Builder {
enable_signal: true,
shutdown_timeout: Duration::from_secs(30),
on_worker_start: Box::new(|| Box::pin(async {})),
on_worker_stop: Box::new(|| Box::pin(async {})),
backlog: 2048,
}
}
Expand Down Expand Up @@ -131,6 +133,25 @@ impl Builder {
self
}

#[doc(hidden)]
/// Async callback called when worker thread is being shut down.
///
/// *. This API is subject to change with no stable guarantee.
pub fn on_worker_stop<F, Fut>(mut self, on_stop: F) -> Self
where
F: Fn() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
self.on_worker_stop = Box::new(move || {
let fut = on_stop();
Box::pin(async {
fut.await;
})
});

self
}

pub fn listen<N, L, F, St>(mut self, name: N, listener: L, service: F) -> Self
where
N: AsRef<str>,
Expand Down
7 changes: 7 additions & 0 deletions server/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl Server {
factories,
shutdown_timeout,
on_worker_start,
on_worker_stop,
..
} = builder;

Expand All @@ -60,6 +61,7 @@ impl Server {
let is_graceful_shutdown = Arc::new(AtomicBool::new(false));

let on_start_fut = on_worker_start();
let on_stop_fut = on_worker_stop();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the closure executes eagerly on worker start meaning

|| {
// happens when worker start.
async {
    // happens when worker stop.
}
}

This can be a potential footgun.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup it shot me as well. Should we make this more similar to tokio's on_thread_start/on_thread_stop callbacks that just fake a non-async function?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My opinion is we make it accepting a future rather than a closure. Reason being worker itself runs in an async context already so there is no need to do async { closure().await } and we can just do async { future.await }.
on_worker_start and tokio's case are different because these APIs have access to non async context (or partially so )

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately I can't think of how that would work since the future would need to be awaited multiple times, once per worker task thread and that's not a power that future's have. I think that's why you used a generator pattern where the callback vends a new future each time and why Tokio doesn't have this problem since a normal Fn can be invoked multiple times.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind waiting on this PR for a bit we can switch to async closure when Rust 1.85 release later this month. xitca as a whole would switch to Rust 1.85 and edition 2024 at the same time. After which we can use on_worker_stop(async || {}) so there is no accidental eager execution on the closure.


let fut = async {
on_start_fut.await;
Expand All @@ -78,6 +80,8 @@ impl Server {

worker::wait_for_stop(handles, services, shutdown_timeout, &is_graceful_shutdown).await;

on_stop_fut.await;

Ok::<_, io::Error>(())
};

Expand All @@ -96,6 +100,7 @@ impl Server {
factories,
shutdown_timeout,
on_worker_start,
on_worker_stop,
..
} = builder;

Expand Down Expand Up @@ -153,6 +158,8 @@ impl Server {
}

worker::wait_for_stop(handles, services, shutdown_timeout, &is_graceful_shutdown).await;

on_worker_stop().await;
};

#[cfg(not(feature = "io-uring"))]
Expand Down
10 changes: 10 additions & 0 deletions web/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,16 @@ where
self
}

#[doc(hidden)]
pub fn on_worker_stop<FS, Fut>(mut self, on_stop: FS) -> Self
where
FS: Fn() -> Fut + Send + Sync + 'static,
Fut: Future + Send + 'static,
{
self.builder = self.builder.on_worker_stop(on_stop);
self
}

#[cfg(not(target_family = "wasm"))]
pub fn bind<A, ResB, BE>(mut self, addr: A) -> std::io::Result<Self>
where
Expand Down