diff --git a/.sqlx/query-3aabbeae862994ed4bf90f24d6c7c5992c022ab53683b6a43d86d4ffd294fc79.json b/.sqlx/query-3aabbeae862994ed4bf90f24d6c7c5992c022ab53683b6a43d86d4ffd294fc79.json new file mode 100644 index 0000000..d7f73a9 --- /dev/null +++ b/.sqlx/query-3aabbeae862994ed4bf90f24d6c7c5992c022ab53683b6a43d86d4ffd294fc79.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n select delay from underway.task\n where id = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "delay", + "type_info": "Interval" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "3aabbeae862994ed4bf90f24d6c7c5992c022ab53683b6a43d86d4ffd294fc79" +} diff --git a/src/job.rs b/src/job.rs index 903493d..3d538a1 100644 --- a/src/job.rs +++ b/src/job.rs @@ -333,6 +333,49 @@ where Ok(id) } + /// Enqueue the job after the given delay using a connection from the + /// queue's pool + /// + /// The given delay is added to the task's configured delay, if one is set. + pub async fn enqueue_after<'a, E>( + &self, + executor: E, + input: JobInput, + delay: Span, + ) -> Result + where + E: PgExecutor<'a>, + { + self.enqueue_after_using(executor, input, delay).await + } + + /// Enqueue the job using the provided executor after the given delay. + /// + /// The given delay is added to the task's configured delay, if one is set. + /// + /// This allows jobs to be enqueued using the same transaction as an + /// application may already be using in a given context. + /// + /// **Note:** If you pass a transactional executor and the transaction is + /// rolled back, the returned task ID will not correspond to any persisted + /// task. + pub async fn enqueue_after_using<'a, E>( + &self, + executor: E, + input: JobInput, + delay: Span, + ) -> Result + where + E: PgExecutor<'a>, + { + let id = self + .queue + .enqueue_after(executor, self, input, delay) + .await?; + + Ok(id) + } + /// Schedule the job using a connection from the queue's pool. pub async fn schedule(&self, zoned_schedule: ZonedSchedule, input: JobInput) -> Result { let mut conn = self.queue.pool.acquire().await?; diff --git a/src/queue.rs b/src/queue.rs index 84051e7..21ab344 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -326,6 +326,44 @@ impl Queue { /// [ULID]: https://github.com/ulid/spec?tab=readme-ov-file#specification #[instrument(skip(self, executor, task, input), fields(task.id = tracing::field::Empty), err)] pub async fn enqueue<'a, E>(&self, executor: E, task: &T, input: T::Input) -> Result + where + E: PgExecutor<'a>, + { + self.enqueue_with_delay(executor, task, input, task.delay()) + .await + } + + /// Same as [`enqueue`](Queue::enqueue), but the task doesn't become + /// available until after the specified delay. + /// + /// **Note:** The provided delay is added to the task's configured delay. + /// This means that if you provide a five-minute delay and the task is + /// already configured with a thirty-second delay the task will not be + /// dequeued for at least five and half minutes. + pub async fn enqueue_after<'a, E>( + &self, + executor: E, + task: &T, + input: T::Input, + delay: Span, + ) -> Result + where + E: PgExecutor<'a>, + { + let calculated_delay = task.delay().checked_add(delay)?; + self.enqueue_with_delay(executor, task, input, calculated_delay) + .await + } + + // Explicitly provide for a delay so that we can also facilitate calculated + // retries, i.e. `enqueue_after`. + async fn enqueue_with_delay<'a, E>( + &self, + executor: E, + task: &T, + input: T::Input, + delay: Span, + ) -> Result where E: PgExecutor<'a>, { @@ -336,7 +374,6 @@ impl Queue { let retry_policy = task.retry_policy(); let timeout = task.timeout(); let ttl = task.ttl(); - let delay = task.delay(); let concurrency_key = task.concurrency_key(); let priority = task.priority(); @@ -927,7 +964,7 @@ mod tests { use std::collections::HashSet; use super::*; - use crate::task::Result as TaskResult; + use crate::{task::Result as TaskResult, worker::pg_interval_to_span}; struct TestTask; @@ -1021,6 +1058,40 @@ mod tests { Ok(()) } + #[sqlx::test] + async fn enqueue_after(pool: PgPool) -> sqlx::Result<(), Error> { + let queue = Queue::builder() + .name("test_enqueue_after") + .pool(pool.clone()) + .build() + .await?; + + let input = serde_json::json!({ "key": "value" }); + let task = TestTask; + + let task_id = queue + .enqueue_after(&pool, &task, input.clone(), 5.minutes()) + .await?; + + // Check the delay + let dequeued_task = sqlx::query!( + r#" + select delay from underway.task + where id = $1 + "#, + task_id + ) + .fetch_one(&pool) + .await?; + + assert_eq!( + pg_interval_to_span(&dequeued_task.delay).compare(5.minutes())?, + std::cmp::Ordering::Equal + ); + + Ok(()) + } + #[sqlx::test] async fn dequeue_task(pool: PgPool) -> sqlx::Result<(), Error> { let queue = Queue::builder() diff --git a/src/worker.rs b/src/worker.rs index 15d74f8..71e1b05 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -342,7 +342,7 @@ impl Worker { } } -fn pg_interval_to_span( +pub(crate) fn pg_interval_to_span( PgInterval { months, days,