Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provide multitask example #14

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions examples/multitask/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "example-multitask"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1"
sqlx = { version = "0.8.2", features = ["postgres", "runtime-tokio-rustls"] }
tokio = { version = "1.34.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
underway = { path = "../../" }
200 changes: 200 additions & 0 deletions examples/multitask/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
use std::env;

use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use underway::{
queue::Error as QueueError,
task::{Id as TaskId, Result as TaskResult},
Queue, Task, Worker,
};

const QUEUE_NAME: &str = "example-multitask";

#[derive(Debug, Clone, Deserialize, Serialize)]
struct WelcomeEmail {
user_id: i32,
email: String,
name: String,
}

struct WelcomeEmailTask;

impl WelcomeEmailTask {
async fn enqueue(
&self,
pool: &PgPool,
queue: &Queue<Multitask>,
input: WelcomeEmail,
) -> Result<TaskId, QueueError> {
// This ensures our task-specific configuration is applied.
let welcome_email_task = self.into();
queue
.enqueue(pool, &welcome_email_task, TaskInput::WelcomeEmail(input))
.await
}
}

impl Task for WelcomeEmailTask {
type Input = WelcomeEmail;

async fn execute(&self, input: Self::Input) -> TaskResult {
tracing::info!(?input, "Simulate sending a welcome email");
Ok(())
}
}

#[derive(Debug, Clone, Deserialize, Serialize)]
struct Order {
user_id: i32,
sku: String,
}

struct OrderTask;

impl OrderTask {
async fn enqueue(
&self,
pool: &PgPool,
queue: &Queue<Multitask>,
input: Order,
) -> Result<TaskId, QueueError> {
// This ensures our task-specific configuration is applied.
let order_task = self.into();
queue
.enqueue(pool, &order_task, TaskInput::Order(input))
.await
}
}

impl Task for OrderTask {
type Input = Order;

async fn execute(&self, input: Self::Input) -> TaskResult {
tracing::info!(?input, "Simulate order processing");
Ok(())
}

fn priority(&self) -> i32 {
10 // We'll make Order tasks higher priority.
}
}

#[derive(Clone, Deserialize, Serialize)]
enum TaskInput {
WelcomeEmail(WelcomeEmail),
Order(Order),
}

struct Multitask {
welcome_email: WelcomeEmailTask,
order: OrderTask,
priority: i32,
}

impl Multitask {
fn new() -> Self {
Self {
welcome_email: WelcomeEmailTask,
order: OrderTask,
priority: 0, // This is set when we convert from one of our tasks.
}
}
}

impl From<&WelcomeEmailTask> for Multitask {
fn from(welcome_email_task: &WelcomeEmailTask) -> Self {
Self {
welcome_email: WelcomeEmailTask,
order: OrderTask,
priority: welcome_email_task.priority(), // Proxy task-specific configuration.
}
}
}

impl From<&OrderTask> for Multitask {
fn from(order_task: &OrderTask) -> Self {
Self {
welcome_email: WelcomeEmailTask,
order: OrderTask,
priority: order_task.priority(), // Proxy task-specific configuration.
}
}
}

impl Task for Multitask {
type Input = TaskInput;

async fn execute(&self, input: Self::Input) -> TaskResult {
match input {
TaskInput::WelcomeEmail(input) => self.welcome_email.execute(input).await,
TaskInput::Order(input) => self.order.execute(input).await,
}
}

fn priority(&self) -> i32 {
self.priority
}
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize the tracing subscriber.
tracing_subscriber::registry()
.with(EnvFilter::new(
env::var("RUST_LOG").unwrap_or_else(|_| "debug,underway=info,sqlx=warn".into()),
))
.with(tracing_subscriber::fmt::layer())
.try_init()?;

// Set up the database connection pool.
let database_url = &env::var("DATABASE_URL").expect("DATABASE_URL should be set");
let pool = PgPool::connect(database_url).await?;

// Run migrations.
underway::MIGRATOR.run(&pool).await?;

// Create the task queue.
let queue = Queue::builder()
.name(QUEUE_NAME)
.pool(pool.clone())
.build()
.await?;

// Enqueue a welcome email task.
let welcome_email_task = WelcomeEmailTask;
let task_id = welcome_email_task
.enqueue(
&pool,
&queue,
WelcomeEmail {
user_id: 42,
email: "[email protected]".to_string(),
name: "Ferris".to_string(),
},
)
.await?;

tracing::info!(task.id = %task_id.as_hyphenated(), "Enqueued welcome email task");

// Enqueue an order task.
let order_task = OrderTask;
let task_id = order_task
.enqueue(
&pool,
&queue,
Order {
user_id: 42,
sku: "SKU0-0042".to_string(),
},
)
.await?;

tracing::info!(task.id = %task_id.as_hyphenated(), "Enqueued order task");

// Run a worker that processes all tasks.
let multitask = Multitask::new();
Worker::new(queue, multitask).run().await?;

Ok(())
}