From cac3b1a6aa7b522a244ca00f27db1730205df4bc Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Sun, 27 Oct 2024 10:21:31 -0700 Subject: [PATCH] expand docs with additional examples (#41) --- src/job.rs | 584 ++++++++++++++++++++++++++++++++++++++++++++++- src/queue.rs | 283 ++++++++++++++++++++++- src/scheduler.rs | 180 ++++++++++++++- src/worker.rs | 316 +++++++++++++++++++++++++ 4 files changed, 1356 insertions(+), 7 deletions(-) diff --git a/src/job.rs b/src/job.rs index 80982aa..3bcec51 100644 --- a/src/job.rs +++ b/src/job.rs @@ -791,12 +791,71 @@ where I: Serialize + Sync + Send + 'static, S: Clone + Send + Sync + 'static, { - /// Create a new job builder. + /// Creates a new job builder. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// use underway::{Job, To}; + /// + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// + /// let job = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .name("example") + /// .pool(pool) + /// .build() + /// .await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn builder() -> Builder { Builder::new() } /// Enqueue the job using a connection from the queue's pool. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::enqueue`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Enqueue a new job with the given input. + /// job.enqueue(&()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn enqueue(&self, input: &I) -> Result { let mut conn = self.queue.pool.acquire().await?; self.enqueue_using(&mut *conn, input).await @@ -810,6 +869,46 @@ where /// **Note:** If you pass a transactional executor and the transaction is /// rolled back, the returned task ID will not correspond to any persisted /// task. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::enqueue`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// + /// // Our own transaction. + /// let mut tx = pool.begin().await?; + /// + /// # let job = Job::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Enqueue using the transaction we already have. + /// job.enqueue_using(&mut *tx, &()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn enqueue_using<'a, E>(&self, executor: E, input: &I) -> Result where E: PgExecutor<'a>, @@ -821,10 +920,41 @@ where /// queue's pool /// /// The given delay is added to the task's configured delay, if one is set. - pub async fn enqueue_after<'a, E>(&self, input: &I, delay: Span) -> Result - where - E: PgExecutor<'a>, - { + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::enqueue_after`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// use jiff::ToSpan; + /// + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Enqueue after an hour. + /// job.enqueue_after(&(), 1.hour()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` + pub async fn enqueue_after(&self, input: &I, delay: Span) -> Result { let mut conn = self.queue.pool.acquire().await?; self.enqueue_after_using(&mut *conn, input, delay).await } @@ -839,6 +969,48 @@ where /// **Note:** If you pass a transactional executor and the transaction is /// rolled back, the returned task ID will not correspond to any persisted /// task. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::enqueue_after`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// use jiff::ToSpan; + /// + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// + /// // Our own transaction. + /// let mut tx = pool.begin().await?; + /// + /// # let job = Job::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Enqueue after two days using the transaction we already have. + /// job.enqueue_after_using(&mut *tx, &(), 2.days()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn enqueue_after_using<'a, E>( &self, executor: E, @@ -858,6 +1030,38 @@ where } /// Schedule the job using a connection from the queue's pool. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::schedule`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// let every_minute = "0 * * * *[America/Los_Angeles]".parse()?; + /// job.schedule(&every_minute, &()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn schedule(&self, zoned_schedule: &ZonedSchedule, input: &I) -> Result { let mut conn = self.queue.pool.acquire().await?; self.schedule_using(&mut *conn, zoned_schedule, input).await @@ -867,6 +1071,47 @@ where /// /// This allows jobs to be scheduled using the same transaction as an /// application may already be using in a given context. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::schedule`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// + /// // Our own transaction. + /// let mut tx = pool.acquire().await?; + /// + /// # let job = Job::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Schedule weekly using the transaction we already have. + /// let weekly = "@weekly[America/Los_Angeles]".parse()?; + /// job.schedule_using(&mut *tx, &weekly, &()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn schedule_using<'a, E>( &self, executor: E, @@ -884,7 +1129,127 @@ where Ok(()) } + /// Removes the job's schedule using a connection from the queue's pool. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::unschedule`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Remove the schedule if one is set. + /// job.unschedule().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + pub async fn unschedule(&self) -> Result { + let mut conn = self.queue.pool.acquire().await?; + self.unschedule_using(&mut *conn).await + } + + /// Removes the job's schedule using the provided executor. + /// + /// This allows jobs to be unscheduled using the same transaction as an + /// application may already be using in a given context. + /// + /// # Errors + /// + /// This has the same error conditions as [`Queue::unschedule`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// + /// let mut tx = pool.acquire().await?; + /// + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// // Remove the schedule using a transaction we provide. + /// job.unschedule_using(&mut *tx).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` + pub async fn unschedule_using<'a, E>(&self, executor: E) -> Result + where + E: PgExecutor<'a>, + { + self.queue.unschedule(executor).await?; + + Ok(()) + } + /// Constructs a worker which then immediately runs task processing. + /// + /// # Errors + /// + /// This has the same error conditions as [`Worker::run`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// job.run_worker().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn run_worker(self) -> WorkerResult { let queue = self.queue.clone(); let job = self.clone(); @@ -893,6 +1258,37 @@ where } /// Contructs a worker which then immediately runs schedule processing. + /// + /// # Errors + /// + /// This has the same error conditions as [`Scheduler::run`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// job.run_scheduler().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn run_scheduler(self) -> SchedulerResult { let queue = self.queue.clone(); let job = self.clone(); @@ -901,6 +1297,39 @@ where } /// Runs both a worker and scheduler for the job. + /// + /// # Errors + /// + /// This has the same error conditions as [`Worker::run`] and + /// [`Scheduler::run`]. It will also return an error if either of the + /// spawned worker or scheduler cannot be joined. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// job.run().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn run(self) -> Result { let queue = self.queue.clone(); let job = self.clone(); @@ -924,6 +1353,42 @@ where } /// Starts both a worker and scheduler for the job and returns a handle. + /// + /// The returned handle may be used to gracefully shutdown the worker and + /// scheduler. + /// + /// # Errors + /// + /// This has the same error conditions as [`Worker::run`] and + /// [`Scheduler::run`]. It will also return an error if either of the + /// spawned worker or scheduler cannot be joined. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// # use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let job = Job::<(), _>::builder() + /// # .step(|_cx, _| async move { To::done() }) + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # /* + /// let job = { /* A `Job`. */ }; + /// # */ + /// # + /// + /// job.start().await??; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn start(self) -> JobHandle { let shutdown_token = CancellationToken::new(); let mut workers = JoinSet::new(); @@ -2088,4 +2553,113 @@ mod tests { Ok(()) } + + #[sqlx::test] + async fn schedule(pool: PgPool) -> sqlx::Result<(), Error> { + #[derive(Serialize, Deserialize)] + struct Input { + message: String, + } + + let queue = Queue::builder() + .name("schedule") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, Input { message }| async move { + println!("Executing job with message: {message}"); + To::done() + }) + .queue(queue.clone()) + .build(); + + assert_eq!(job.retry_policy(), RetryPolicy::default()); + + let daily = "@daily[America/Los_Angeles]" + .parse() + .expect("Schedule should parse"); + let input = Input { + message: "Hello, world!".to_string(), + }; + job.schedule(&daily, &input).await?; + + let (zoned_schedule, schedule_input) = queue + .task_schedule(&pool) + .await? + .expect("Schedule should be set"); + + assert_eq!(zoned_schedule, daily); + assert_eq!(schedule_input.step_index, 0); + assert_eq!(schedule_input.step_input, serde_json::to_value(input)?); + + Ok(()) + } + + #[sqlx::test] + async fn unschedule(pool: PgPool) -> sqlx::Result<(), Error> { + #[derive(Serialize, Deserialize)] + struct Input { + message: String, + } + + let queue = Queue::builder() + .name("unschedule") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, Input { message }| async move { + println!("Executing job with message: {message}"); + To::done() + }) + .queue(queue.clone()) + .build(); + + assert_eq!(job.retry_policy(), RetryPolicy::default()); + + let daily = "@daily[America/Los_Angeles]" + .parse() + .expect("Schedule should parse"); + let input = Input { + message: "Hello, world!".to_string(), + }; + job.schedule(&daily, &input).await?; + job.unschedule().await?; + + assert!(queue.task_schedule(&pool).await?.is_none()); + + Ok(()) + } + + #[sqlx::test] + async fn unschedule_without_schedule(pool: PgPool) -> sqlx::Result<(), Error> { + #[derive(Serialize, Deserialize)] + struct Input { + message: String, + } + + let queue = Queue::builder() + .name("unschedule_without_schedule") + .pool(pool.clone()) + .build() + .await?; + + let job = Job::builder() + .step(|_cx, Input { message }| async move { + println!("Executing job with message: {message}"); + To::done() + }) + .queue(queue.clone()) + .build(); + + assert_eq!(job.retry_policy(), RetryPolicy::default()); + + assert!(job.unschedule().await.is_ok()); + assert!(queue.task_schedule(&pool).await?.is_none()); + + Ok(()) + } } diff --git a/src/queue.rs b/src/queue.rs index 29863ab..520d820 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -294,6 +294,44 @@ impl Clone for Queue { impl Queue { /// Creates a builder for a new queue. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult}; + /// use underway::Queue; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// let queue: Queue = Queue::builder() + /// .name("example") + /// .dead_letter_queue("example_dlq") + /// .pool(pool) + /// .build() + /// .await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn builder() -> Builder { Builder::default() } @@ -310,7 +348,7 @@ impl Queue { /// type. /// /// An ID, which is a [`ULID`][ULID] converted to `UUIDv4`, is also assigned - /// to the task and returned upon successfully enqueue. + /// to the task and returned upon successful enqueue. /// /// **Note:** If you pass a transactional executor and the transaction is /// rolled back, the returned tasl ID will not correspond to any persisted @@ -326,6 +364,54 @@ impl Queue { /// `std::time::Duration`. /// /// [ULID]: https://github.com/ulid/spec?tab=readme-ov-file#specification + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult}; + /// # use underway::Queue; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # /* + /// let task = { /* An implementer of `Task`. */ }; + /// # */ + /// # + /// + /// // Enqueue a new task with input. + /// let task_id = queue.enqueue(&pool, &task, &()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn enqueue<'a, E>(&self, executor: E, task: &T, input: &T::Input) -> Result where E: PgExecutor<'a>, @@ -341,6 +427,55 @@ impl Queue { /// This means that if you provide a five-minute delay and the task is /// already configured with a thirty-second delay the task will not be /// dequeued for at least five and half minutes. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult}; + /// # use underway::Queue; + /// use jiff::ToSpan; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # /* + /// let task = { /* An implementer of `Task`. */ }; + /// # */ + /// # + /// + /// // Enqueue a new task with input after five minutes. + /// let task_id = queue.enqueue_after(&pool, &task, &(), 5.minutes()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn enqueue_after<'a, E>( &self, executor: E, @@ -449,6 +584,59 @@ impl Queue { /// /// - The database operation fails during select. /// - The database operation fails during update. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult}; + /// # use underway::Queue; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # /* + /// let task = { /* An implementer of `Task`. */ }; + /// # */ + /// # + /// + /// // Enqueue a new task. + /// queue.enqueue(&pool, &task, &()).await?; + /// + /// // Dequeue the enqueued task. + /// let pending_task = queue + /// .dequeue(&pool) + /// .await? + /// .expect("There should be a pending task."); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` #[instrument( skip(self, conn), fields(queue.name = self.name, task.id = tracing::field::Empty), @@ -504,12 +692,61 @@ impl Queue { /// Schedules are useful when a task should be run periodically, according /// to a crontab definition. /// + /// **Note:** After a schedule has been set, [a scheduler + /// instance](crate::Scheduler) must be run in order for schedules to + /// fire. + /// /// # Errors /// /// This function will return an error if: /// /// - The input value cannot be serialized. /// - The database operation fails during insert. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult}; + /// # use underway::Queue; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # queue.enqueue(&pool, &task, &()).await?; + /// + /// // Set a schedule on the queue with the given input. + /// let daily = "@daily[America/Los_Angeles]".parse()?; + /// queue.schedule(&pool, &daily, &()).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` #[instrument( skip(self, executor, zoned_schedule, input), fields(queue.name = self.name), @@ -558,6 +795,50 @@ impl Queue { /// This function will return an error if: /// /// - The database operation fails during insert. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult}; + /// # use underway::Queue; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # queue.enqueue(&pool, &task, &()).await?; + /// + /// // Unset the schedule if one was set. + /// queue.unschedule(&pool).await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` #[instrument( skip_all, fields(queue.name = self.name), diff --git a/src/scheduler.rs b/src/scheduler.rs index 29a914a..aaccb4c 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -52,7 +52,51 @@ pub struct Scheduler { } impl Scheduler { - /// Creates a new scheduler. + /// Creates a new scheduler with the given queue and task. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue}; + /// use underway::Scheduler; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # /* + /// let task = { /* An implementer of `Task`. */ }; + /// # */ + /// # + /// + /// Scheduler::new(queue, task); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn new(queue: Queue, task: T) -> Self { let queue_lock = queue_scheduler_lock(&queue.name); Self { @@ -64,18 +108,152 @@ impl Scheduler { } /// Sets the shutdown token. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Scheduler}; + /// use tokio_util::sync::CancellationToken; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let scheduler = Scheduler::new(queue, task); + /// # /* + /// let scheduler = { /* A `Scheduler`. */ }; + /// # */ + /// # + /// + /// // Set a custom cancellation token. + /// let token = CancellationToken::new(); + /// scheduler.shutdown_token(token); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn shutdown_token(mut self, shutdown_token: CancellationToken) -> Self { self.shutdown_token = shutdown_token; self } /// Cancels the shutdown token causing the scheduler to exit. + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Scheduler}; + /// use tokio_util::sync::CancellationToken; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let scheduler = Scheduler::new(queue, task); + /// # /* + /// let scheduler = { /* A `Scheduler`. */ }; + /// # */ + /// # + /// + /// // Stop the scheduler. + /// scheduler.shutdown(); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn shutdown(&self) { self.shutdown_token.cancel(); } /// Loops over the configured schedule, enqueuing tasks as the duration /// arrives. + /// + /// # Errors + /// + /// This function returns an error if: + /// + /// - It cannot acquire a new connection from the queue's pool. + /// - It fails to listen on either the shutdown channel. + /// - The cron expression or timezone IANA name are malformed. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Scheduler}; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let scheduler = Scheduler::new(queue, task); + /// # /* + /// let scheduler = { /* A `Scheduler`. */ }; + /// # */ + /// # + /// + /// // Run the scheduler in separate task. + /// tokio::spawn(async move { scheduler.run().await }); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn run(&self) -> Result { let conn = self.queue.pool.acquire().await?; let Some(_guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else { diff --git a/src/worker.rs b/src/worker.rs index e2e5414..4209a80 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -186,6 +186,50 @@ impl Clone for Worker { impl Worker { /// Creates a new worker with the given queue and task. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue}; + /// use underway::Worker; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # /* + /// let queue = { /* A `Queue`. */ }; + /// # */ + /// # let task = ExampleTask; + /// # /* + /// let task = { /* An implementer of `Task`. */ }; + /// # */ + /// # + /// + /// Worker::new(queue, task); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn new(queue: Queue, task: T) -> Self { Self { queue, @@ -198,12 +242,97 @@ impl Worker { /// Sets the concurrency limit for this worker. /// /// Defaults to CPU count as per [`num_cpus::get`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let worker = Worker::new(queue, task); + /// # /* + /// let worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Set a fixed concurrency limit. + /// worker.concurrency_limit(32); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn concurrency_limit(mut self, concurrency_limit: usize) -> Self { self.concurrency_limit = concurrency_limit; self } /// Sets the shutdown token. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// use tokio_util::sync::CancellationToken; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let worker = Worker::new(queue, task); + /// # /* + /// let worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Set a custom cancellation token. + /// let token = CancellationToken::new(); + /// worker.shutdown_token(token); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn shutdown_token(mut self, shutdown_token: CancellationToken) -> Self { self.shutdown_token = shutdown_token; self @@ -214,6 +343,47 @@ impl Worker { /// /// Note that tasks are given until their configured timeout to complete /// before the worker will exit. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let worker = Worker::new(queue, task); + /// # /* + /// let worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Begin graceful shutdown. + /// worker.shutdown(); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn shutdown(&self) { self.shutdown_token.cancel(); } @@ -222,12 +392,110 @@ impl Worker { /// /// Tasks are processed via a subscription to a Postgres channel and polling /// in a loop. A one-minute sleep occurs between polls. + /// + /// # Errors + /// + /// This function will return an error if: + /// - It fails to listen on either the shutdown channel or the task change + /// channel. + /// - Task timeouts fails to be converted to std::time::Duration. + /// + /// It also has the same error conditions as [`Queue::dequeue`], as this is + /// used internally. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let worker = Worker::new(queue, task); + /// # /* + /// let worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Run the worker in a separate task. + /// tokio::spawn(async move { worker.run().await }); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn run(&self) -> Result { self.run_every(1.minute()).await } /// Same as `run` but allows for the configuration of the delay between /// polls. + /// + /// # Errors + /// + /// This has the same error conditions as [`Worker::run`]. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// use jiff::ToSpan; + /// + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let worker = Worker::new(queue, task); + /// # /* + /// let worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// // Increase the polling interval to every ten seconds. + /// tokio::spawn(async move { worker.run_every(10.seconds()).await }); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn run_every(&self, period: Span) -> Result { let mut polling_interval = tokio::time::interval(period.try_into()?); @@ -384,6 +652,54 @@ impl Worker { /// is [explicitly fatal](crate::task::Error::Fatal) or no more retries are /// left, then the task will be re-queued in a /// [`Pending`](crate::task::State::Pending) state. + /// + /// # Errors + /// + /// This function will return an error if a new transaction cannot be + /// acquired from the queue pool. + /// + /// It also has the same error conditions as [`Queue::dequeue`] as this is + /// used internally. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::{PgPool, Transaction, Postgres}; + /// # use underway::{Task, task::Result as TaskResult, Queue, Worker}; + /// # struct ExampleTask; + /// # impl Task for ExampleTask { + /// # type Input = (); + /// # type Output = (); + /// # async fn execute( + /// # &self, + /// # _: Transaction<'_, Postgres>, + /// # _: Self::Input, + /// # ) -> TaskResult { + /// # Ok(()) + /// # } + /// # } + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # let queue = Queue::builder() + /// # .name("example") + /// # .pool(pool.clone()) + /// # .build() + /// # .await?; + /// # let task = ExampleTask; + /// # let worker = Worker::new(queue, task); + /// # /* + /// let worker = { /* A `Worker`. */ }; + /// # */ + /// # + /// + /// worker.process_next_task().await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` #[instrument( skip(self), fields(