Skip to content

Commit

Permalink
demonstrate deletion task cleanup (#5)
Browse files Browse the repository at this point in the history
This extends the examples which use a deletion task for cleanup such
that the task is aborted as part of a graceful shutdown.
  • Loading branch information
maxcountryman authored Feb 1, 2024
1 parent 6ebc561 commit 51cc758
Show file tree
Hide file tree
Showing 7 changed files with 261 additions and 12 deletions.
41 changes: 40 additions & 1 deletion mongodb-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{Expiry, Session, SessionManagerLayer};
use tower_sessions_core::ExpiredDeletion;
use tower_sessions_mongodb_store::{mongodb::Client, MongoDBStore};

const COUNTER_KEY: &str = "counter";
Expand All @@ -33,6 +35,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = std::option_env!("DATABASE_URL").expect("Missing DATABASE_URL.");
let client = Client::with_uri_str(database_url).await?;
let session_store = MongoDBStore::new(client, "tower-sessions".to_string());

let deletion_task = tokio::task::spawn(
session_store
.clone()
.continuously_delete_expired(tokio::time::Duration::from_secs(60)),
);

let session_layer = SessionManagerLayer::new(session_store)
.with_secure(false)
.with_expiry(Expiry::OnInactivity(Duration::seconds(10)));
Expand All @@ -41,8 +50,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
```
41 changes: 40 additions & 1 deletion mongodb-store/examples/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{Expiry, Session, SessionManagerLayer};
use tower_sessions_core::ExpiredDeletion;
use tower_sessions_mongodb_store::{mongodb::Client, MongoDBStore};

const COUNTER_KEY: &str = "counter";
Expand All @@ -22,6 +24,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let database_url = std::option_env!("DATABASE_URL").expect("Missing DATABASE_URL.");
let client = Client::with_uri_str(database_url).await?;
let session_store = MongoDBStore::new(client, "tower-sessions".to_string());

let deletion_task = tokio::task::spawn(
session_store
.clone()
.continuously_delete_expired(tokio::time::Duration::from_secs(60)),
);

let session_layer = SessionManagerLayer::new(session_store)
.with_secure(false)
.with_expiry(Expiry::OnInactivity(Duration::seconds(10)));
Expand All @@ -30,7 +39,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
93 changes: 90 additions & 3 deletions sqlx-store/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer};
use tower_sessions_sqlx_store::{sqlx::MySqlPool, MySqlStore};

Expand Down Expand Up @@ -51,12 +52,40 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
```

### Postgres example
Expand All @@ -67,6 +96,7 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer};
use tower_sessions_sqlx_store::{sqlx::PgPool, PostgresStore};

Expand Down Expand Up @@ -102,12 +132,40 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
```

### SQLite example
Expand All @@ -118,6 +176,7 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer};
use tower_sessions_sqlx_store::{sqlx::SqlitePool, SqliteStore};

Expand Down Expand Up @@ -152,10 +211,38 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
```
31 changes: 30 additions & 1 deletion sqlx-store/examples/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer};
use tower_sessions_sqlx_store::{sqlx::MySqlPool, MySqlStore};

Expand Down Expand Up @@ -38,9 +39,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
31 changes: 30 additions & 1 deletion sqlx-store/examples/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::net::SocketAddr;
use axum::{response::IntoResponse, routing::get, Router};
use serde::{Deserialize, Serialize};
use time::Duration;
use tokio::{signal, task::AbortHandle};
use tower_sessions::{session_store::ExpiredDeletion, Expiry, Session, SessionManagerLayer};
use tower_sessions_sqlx_store::{sqlx::PgPool, PostgresStore};

Expand Down Expand Up @@ -38,9 +39,37 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app.into_make_service()).await?;

// Ensure we use a shutdown signal to abort the deletion task.
axum::serve(listener, app.into_make_service())
.with_graceful_shutdown(shutdown_signal(deletion_task.abort_handle()))
.await?;

deletion_task.await??;

Ok(())
}

async fn shutdown_signal(deletion_task_abort_handle: AbortHandle) {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => { deletion_task_abort_handle.abort() },
_ = terminate => { deletion_task_abort_handle.abort() },
}
}
Loading

0 comments on commit 51cc758

Please sign in to comment.