Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Step functions #24

Merged
merged 22 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ tokio = { version = "1.40.0", features = [
] } # TODO: "full" shouldn't be required
tracing = { version = "0.1.40", features = ["log"] }
ulid = { version = "1.1.3", features = ["uuid"] }
uuid = { version = "1.10.0", features = ["v4"] }
uuid = { version = "1.10.0", features = ["v4", "serde"] }
num_cpus = "1.16.0"

[dev-dependencies]
Expand Down
209 changes: 169 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
</h1>

<p align="center">
<strong>Underway</strong> is a PostgreSQL-backed job queue for reliable background task processing.
Durable step functions via Postgres.
</p>

<div align="center">
Expand All @@ -20,37 +20,51 @@

## 🎨 Overview

Underway provides a robust and efficient way to execute asynchronous tasks using PostgreSQL as the backend for task storage and coordination. It is designed to be simple, scalable, and resilient, handling job processing in a way that ensures safe concurrency and reliable task execution. Whether you're processing tasks on a single server or across multiple workers, Underway makes it easy to manage background jobs with confidence.
**Underway** is a framework for building robust, asynchronous background
jobs in Rust, leveraging PostgreSQL as its queuing backend. It provides a
streamlined interface for defining jobs as a series of "steps," where each
step's output becomes the input for the next. This design enables the
construction of complex, durable, and resilient workflows with ease.

Key Features:

- **PostgreSQL-Backed** Built on PostgreSQL for robust task storage and
coordination, ensuring consistency and safe concurrency across all
operations.
- **Transactional Task Management** Supports enqueuing tasks within existing
database transactions, guaranteeing that tasks are only added if the
transaction commits successfully—perfect for operations like user
registration.
- **Automatic Retries** Offers customizable retry strategies for failed
executions, ensuring tasks are reliably completed even after transient
failures.
- **Cron-Like Scheduling** Supports scheduling recurring tasks with
cron-like expressions, enabling automated, time-based job execution.
- **Scalable and Flexible** Scales from a single worker to multiple workers
with minimal configuration, allowing seamless background job processing.
- **PostgreSQL-Backed** Leverages PostgreSQL with `FOR UPDATE SKIP LOCKED` for
reliable task storage and coordination, ensuring efficient, safe
- **Atomic Task Management** Enqueue tasks within your transactions and use
the worker's transaction within your tasks for atomic database
queries--ensuring consisteny.
- **Automatic Retries** Configurable retry strategies ensure tasks are
reliably completed, even after transient failures.
- **Cron-Like Scheduling** Schedule recurring tasks with cron-like
expressions for automated, time-based job execution.
- **Scalable and Flexible** Easily scales from a single worker to many,
enabling seamless background job processing with minimal setup.

## 🤸 Usage

Underway is suitable for many different use cases, ranging from simple
single-step jobs to more sophisticated multi-step jobs, where dependencies
are built up between steps.

## Welcome emails

A common use case is deferring work that can be processed later. For
instance, during user registration, we might want to send a welcome email to
new users. Rather than handling this within the registration process (e.g.,
form validation, database insertion), we can offload it to run "out-of-band"
using Underway. By defining a job for sending the welcome email, Underway
ensures it gets processed in the background, without slowing down the user
registration flow.

```rust
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, Queue};
use underway::{Job, To};

const QUEUE_NAME: &str = "email";

#[derive(Clone, Deserialize, Serialize)]
// This is the input we'll provide to the job when we enqueue it.
#[derive(Deserialize, Serialize)]
struct WelcomeEmail {
user_id: i32,
email: String,
Expand All @@ -66,46 +80,161 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Run migrations.
underway::MIGRATOR.run(&pool).await?;

// Create the task queue.
let queue = Queue::builder().name(QUEUE_NAME).pool(pool).build().await?;

// Build the job.
let job = Job::builder()
.execute(
|WelcomeEmail {
.step(
|_cx,
WelcomeEmail {
user_id,
email,
name,
}| async move {
// Simulate sending an email.
println!("Sending welcome email to {name} <{email}> (user_id: {user_id})");
Ok(())
// Returning this indicates this is the final step.
To::done()
},
)
.queue(queue)
.build();

// Enqueue a job task.
let task_id = job
.enqueue(WelcomeEmail {
user_id: 42,
email: "[email protected]".to_string(),
name: "Ferris".to_string(),
.name("welcome-email")
.pool(pool)
.build()
.await?;

// Here we enqueue a new job to be processed later.
job.enqueue(WelcomeEmail {
user_id: 42,
email: "[email protected]".to_string(),
name: "Ferris".to_string(),
})
.await?;

// Start processing enqueued tasks.
job.start().await??;

Ok(())
}
```

## Order receipts

Another common use case is defining dependencies between discrete steps of a
job. For instance, we might generate PDF receipts for orders and then email
these to customers. With Underway, each step is handled separately, making
it easy to create a job that first generates the PDF and, once
completed, proceeds to send the email.

This separation provides significant value: if the email sending service
is temporarily unavailable, we can retry the email step without having to
regenerate the PDF, avoiding unnecessary repeated work.

```rust
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};

#[derive(Deserialize, Serialize)]
struct GenerateReceipt {
// An order we want to generate a receipt for.
order_id: i32,
}

#[derive(Deserialize, Serialize)]
struct EmailReceipt {
// An object store key to our receipt PDF.
receipt_key: String,
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up the database connection pool.
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;

// Build the job.
let job = Job::builder()
.step(|_cx, GenerateReceipt { order_id }| async move {
// Use the order ID to build a receipt PDF...
let receipt_key = format!("receipts_bucket/{order_id}-receipt.pdf");
// ...store the PDF in an object store.

// We proceed to the next step with the receipt_key as its input.
To::next(EmailReceipt { receipt_key })
})
.step(|_cx, EmailReceipt { receipt_key }| async move {
// Retrieve the PDF from the object store, and send the email.
println!("Emailing receipt for {receipt_key}");
To::done()
})
.name("order-receipt")
.pool(pool)
.build()
.await?;

println!("Enqueued task with ID: {task_id}");
// Enqueue the job for the given order.
job.enqueue(GenerateReceipt { order_id: 42 }).await?;

// Start the worker to process tasks.
job.run().await?;
// Start processing enqueued jobs.
job.start().await??;

Ok(())
}
```

## 🦺 Safety
With this setup, if the email service is down, the `EmailReceipt` step can
be retried without redoing the PDF generation, saving time and resources by
not repeating the expensive step of generating the PDF.

This crate uses `#![forbid(unsafe_code)]` to ensure everything is implemented in 100% safe Rust.
## Daily reports

Jobs may also be run on a schedule. This makes them useful for situations
where we want to do things on a regular cadence, such as creating a daily
business report.

```rust
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, To};

#[derive(Deserialize, Serialize)]
struct DailyReport;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up the database connection pool.
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;

// Build the job.
let job = Job::builder()
.step(|_cx, _| async move {
// Here we would generate and store the report.
To::done()
})
.name("daily-report")
.pool(pool)
.build()
.await?;

// Set a daily schedule with the given input.
let daily = "@daily[America/Los_Angeles]".parse()?;
job.schedule(daily, DailyReport).await?;

// Start processing enqueued jobs.
job.start().await??;

Ok(())
}
```

## 🛟 Getting Help

Expand Down
18 changes: 9 additions & 9 deletions examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use underway::{Job, Queue};
use underway::{Job, To};

const QUEUE_NAME: &str = "example-basic";

Expand All @@ -22,24 +22,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Run migrations.
underway::MIGRATOR.run(&pool).await?;

// Create the task queue.
let queue = Queue::builder().name(QUEUE_NAME).pool(pool).build().await?;

// Build the job.
let job = Job::builder()
.execute(
|WelcomeEmail {
.step(
|_ctx,
WelcomeEmail {
user_id,
email,
name,
}| async move {
// Simulate sending an email.
println!("Sending welcome email to {name} <{email}> (user_id: {user_id})");
Ok(())
To::done()
},
)
.queue(queue)
.build();
.name(QUEUE_NAME)
.pool(pool)
.build()
.await?;

// Enqueue a job task.
let task_id = job
Expand Down
19 changes: 7 additions & 12 deletions examples/graceful_shutdown/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::env;
use sqlx::PgPool;
use tokio::signal;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use underway::{Job, Queue};
use underway::{Job, To};

const QUEUE_NAME: &str = "graceful-shutdown";

Expand Down Expand Up @@ -50,27 +50,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Run migrations.
underway::MIGRATOR.run(&pool).await?;

// Create the task queue.
let queue = Queue::builder()
.name(QUEUE_NAME)
.pool(pool.clone())
.build()
.await?;

// Build the job.
let job = Job::builder()
.execute(|_| async move {
.step(|_ctx, _input| async move {
let sleep_duration = std::time::Duration::from_secs(5);

tracing::info!(?sleep_duration, "Hello from a long-running task");

// Artificial delay to simulate a long-running job.
tokio::time::sleep(sleep_duration).await;

Ok(())
To::done()
})
.queue(queue)
.build();
.name(QUEUE_NAME)
.pool(pool.clone())
.build()
.await?;

let every_second = "* * * * * *[America/Los_Angeles]".parse()?;
job.schedule(every_second, ()).await?;
Expand Down
Loading