Skip to content

Commit

Permalink
provide enqueue_after method
Browse files Browse the repository at this point in the history
This allows tasks to be enqueued in a way that only makes them available
after the specified delay. While tasks may already specify a delay, that
configuration is static whereas this method allows for dynamic delays in
accordance with runtime requirements.
  • Loading branch information
maxcountryman committed Oct 11, 2024
1 parent 79891ac commit 8f9e049
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 3 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,49 @@ where
Ok(id)
}

/// Enqueue the job after the given delay using a connection from the
/// queue's pool
///
/// The given delay is added to the task's configured delay, if one is set.
pub async fn enqueue_after<'a, E>(
&self,
executor: E,
input: JobInput<I>,
delay: Span,
) -> Result<TaskId>
where
E: PgExecutor<'a>,
{
self.enqueue_after_using(executor, input, delay).await
}

/// Enqueue the job using the provided executor after the given delay.
///
/// The given delay is added to the task's configured delay, if one is set.
///
/// This allows jobs to be enqueued using the same transaction as an
/// application may already be using in a given context.
///
/// **Note:** If you pass a transactional executor and the transaction is
/// rolled back, the returned task ID will not correspond to any persisted
/// task.
pub async fn enqueue_after_using<'a, E>(
&self,
executor: E,
input: JobInput<I>,
delay: Span,
) -> Result<TaskId>
where
E: PgExecutor<'a>,
{
let id = self
.queue
.enqueue_after(executor, self, input, delay)
.await?;

Ok(id)
}

/// Schedule the job using a connection from the queue's pool.
pub async fn schedule(&self, zoned_schedule: ZonedSchedule, input: JobInput<I>) -> Result {
let mut conn = self.queue.pool.acquire().await?;
Expand Down
75 changes: 73 additions & 2 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,44 @@ impl<T: Task> Queue<T> {
/// [ULID]: https://github.com/ulid/spec?tab=readme-ov-file#specification
#[instrument(skip(self, executor, task, input), fields(task.id = tracing::field::Empty), err)]
pub async fn enqueue<'a, E>(&self, executor: E, task: &T, input: T::Input) -> Result<TaskId>
where
E: PgExecutor<'a>,
{
self.enqueue_with_delay(executor, task, input, task.delay())
.await
}

/// Same as [`enqueue`](Queue::enqueue), but the task doesn't become
/// available until after the specified delay.
///
/// **Note:** The provided delay is added to the task's configured delay.
/// This means that if you provide a five-minute delay and the task is
/// already configured with a thirty-second delay the task will not be
/// dequeued for at least five and half minutes.
pub async fn enqueue_after<'a, E>(
&self,
executor: E,
task: &T,
input: T::Input,
delay: Span,
) -> Result<TaskId>
where
E: PgExecutor<'a>,
{
let calculated_delay = task.delay().checked_add(delay)?;
self.enqueue_with_delay(executor, task, input, calculated_delay)
.await
}

// Explicitly provide for a delay so that we can also facilitate calculated
// retries, i.e. `enqueue_after`.
async fn enqueue_with_delay<'a, E>(
&self,
executor: E,
task: &T,
input: T::Input,
delay: Span,
) -> Result<TaskId>
where
E: PgExecutor<'a>,
{
Expand All @@ -336,7 +374,6 @@ impl<T: Task> Queue<T> {
let retry_policy = task.retry_policy();
let timeout = task.timeout();
let ttl = task.ttl();
let delay = task.delay();
let concurrency_key = task.concurrency_key();
let priority = task.priority();

Expand Down Expand Up @@ -927,7 +964,7 @@ mod tests {
use std::collections::HashSet;

use super::*;
use crate::task::Result as TaskResult;
use crate::{task::Result as TaskResult, worker::pg_interval_to_span};

struct TestTask;

Expand Down Expand Up @@ -1021,6 +1058,40 @@ mod tests {
Ok(())
}

#[sqlx::test]
async fn enqueue_after(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::builder()
.name("test_enqueue_after")
.pool(pool.clone())
.build()
.await?;

let input = serde_json::json!({ "key": "value" });
let task = TestTask;

let task_id = queue
.enqueue_after(&pool, &task, input.clone(), 5.minutes())
.await?;

// Check the delay
let dequeued_task = sqlx::query!(
r#"
select delay from underway.task
where id = $1
"#,
task_id
)
.fetch_one(&pool)
.await?;

assert_eq!(
pg_interval_to_span(&dequeued_task.delay).compare(5.minutes())?,
std::cmp::Ordering::Equal
);

Ok(())
}

#[sqlx::test]
async fn dequeue_task(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::builder()
Expand Down
2 changes: 1 addition & 1 deletion src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ impl<T: Task> Worker<T> {
}
}

fn pg_interval_to_span(
pub(crate) fn pg_interval_to_span(
PgInterval {
months,
days,
Expand Down

0 comments on commit 8f9e049

Please sign in to comment.