From 0f545ecd18315544d474b38144dd3a2e8a0c7c18 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Thu, 1 Feb 2024 10:39:17 -0800 Subject: [PATCH 1/2] demonstrate deletion task cleanup This extends the examples which use a deletion task for cleanup such that the task is aborted as part of a graceful shutdown. --- mongodb-store/README.md | 41 +++++++++++++- mongodb-store/examples/mongodb.rs | 41 +++++++++++++- sqlx-store/README.md | 93 ++++++++++++++++++++++++++++++- sqlx-store/examples/mysql.rs | 31 ++++++++++- sqlx-store/examples/postgres.rs | 31 ++++++++++- sqlx-store/examples/sqlite.rs | 31 ++++++++++- 6 files changed, 260 insertions(+), 8 deletions(-) diff --git a/mongodb-store/README.md b/mongodb-store/README.md index 4afef84..463e7ff 100644 --- a/mongodb-store/README.md +++ b/mongodb-store/README.md @@ -14,7 +14,9 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{Expiry, Session, SessionManagerLayer}; +use tower_sessions_core::ExpiredDeletion; use tower_sessions_mongodb_store::{mongodb::Client, MongoDBStore}; const COUNTER_KEY: &str = "counter"; @@ -33,6 +35,13 @@ async fn main() -> Result<(), Box> { let database_url = std::option_env!("DATABASE_URL").expect("Missing DATABASE_URL."); let client = Client::with_uri_str(database_url).await?; let session_store = MongoDBStore::new(client, "tower-sessions".to_string()); + + let deletion_task = tokio::task::spawn( + session_store + .clone() + .continuously_delete_expired(tokio::time::Duration::from_secs(60)), + ); + let session_layer = SessionManagerLayer::new(session_store) .with_secure(false) .with_expiry(Expiry::OnInactivity(Duration::seconds(10))); @@ -41,8 +50,38 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; + + deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} ``` diff --git a/mongodb-store/examples/mongodb.rs b/mongodb-store/examples/mongodb.rs index 1f6e018..c72d285 100644 --- a/mongodb-store/examples/mongodb.rs +++ b/mongodb-store/examples/mongodb.rs @@ -3,7 +3,9 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{Expiry, Session, SessionManagerLayer}; +use tower_sessions_core::ExpiredDeletion; use tower_sessions_mongodb_store::{mongodb::Client, MongoDBStore}; const COUNTER_KEY: &str = "counter"; @@ -22,6 +24,13 @@ async fn main() -> Result<(), Box> { let database_url = std::option_env!("DATABASE_URL").expect("Missing DATABASE_URL."); let client = Client::with_uri_str(database_url).await?; let session_store = MongoDBStore::new(client, "tower-sessions".to_string()); + + let deletion_task = tokio::task::spawn( + session_store + .clone() + .continuously_delete_expired(tokio::time::Duration::from_secs(60)), + ); + let session_layer = SessionManagerLayer::new(session_store) .with_secure(false) .with_expiry(Expiry::OnInactivity(Duration::seconds(10))); @@ -30,7 +39,37 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; + + deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} diff --git a/sqlx-store/README.md b/sqlx-store/README.md index 075f852..87ede2c 100644 --- a/sqlx-store/README.md +++ b/sqlx-store/README.md @@ -16,6 +16,7 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer}; use tower_sessions_sqlx_store::{sqlx::MySqlPool, MySqlStore}; @@ -51,12 +52,40 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} ``` ### Postgres example @@ -67,6 +96,7 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer}; use tower_sessions_sqlx_store::{sqlx::PgPool, PostgresStore}; @@ -102,12 +132,40 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} ``` ### SQLite example @@ -118,6 +176,7 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer}; use tower_sessions_sqlx_store::{sqlx::SqlitePool, SqliteStore}; @@ -152,10 +211,38 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} ``` diff --git a/sqlx-store/examples/mysql.rs b/sqlx-store/examples/mysql.rs index e8d6cdc..5d0803e 100644 --- a/sqlx-store/examples/mysql.rs +++ b/sqlx-store/examples/mysql.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer}; use tower_sessions_sqlx_store::{sqlx::MySqlPool, MySqlStore}; @@ -38,9 +39,37 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} diff --git a/sqlx-store/examples/postgres.rs b/sqlx-store/examples/postgres.rs index 52bbee5..9a4f666 100644 --- a/sqlx-store/examples/postgres.rs +++ b/sqlx-store/examples/postgres.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer}; use tower_sessions_sqlx_store::{sqlx::PgPool, PostgresStore}; @@ -38,9 +39,37 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} diff --git a/sqlx-store/examples/sqlite.rs b/sqlx-store/examples/sqlite.rs index 4c550eb..d7533bf 100644 --- a/sqlx-store/examples/sqlite.rs +++ b/sqlx-store/examples/sqlite.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use axum::{response::IntoResponse, routing::get, Router}; use serde::{Deserialize, Serialize}; use time::Duration; +use tokio::{signal, task::AbortHandle}; use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer}; use tower_sessions_sqlx_store::{sqlx::SqlitePool, SqliteStore}; @@ -37,9 +38,37 @@ async fn main() -> Result<(), Box> { let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); let listener = tokio::net::TcpListener::bind(&addr).await?; - axum::serve(listener, app.into_make_service()).await?; + + // Ensure we use a shutdown signal to abort the deletion task. + axum::serve(listener, app.into_make_service()) + .with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle())) + .await?; deletion_task.await??; Ok(()) } + +async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { deletion_task_abort_handle.abort() }, + _ = terminate => { deletion_task_abort_handle.abort() }, + } +} From 291c6f7af10a8b846300c6fbaf55b0a47c8164e1 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Thu, 1 Feb 2024 10:55:59 -0800 Subject: [PATCH 2/2] align test suite with upstream semantic --- tests/common/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4afd9c4..1fd22bf 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -178,9 +178,7 @@ macro_rules! route_tests { assert_eq!(session_cookie.name(), "id"); assert_eq!(session_cookie.http_only(), Some(true)); assert_eq!(session_cookie.same_site(), Some(SameSite::Strict)); - assert!(session_cookie - .max_age() - .is_some_and(|d| d <= Duration::weeks(2))); + assert!(session_cookie.max_age().is_none()); assert_eq!(session_cookie.secure(), Some(true)); assert_eq!(session_cookie.path(), Some("/")); } @@ -272,7 +270,6 @@ macro_rules! route_tests { .header(header::COOKIE, second_session_cookie.encoded().to_string()) .body(Body::empty()) .unwrap(); - dbg!("foo"); let res = dbg!(app.oneshot(req).await).unwrap(); assert_ne!(first_session_cookie.value(), second_session_cookie.value());