Skip to content

Commit

Permalink
provide unschedule method
Browse files Browse the repository at this point in the history
Closes #16
  • Loading branch information
maxcountryman committed Oct 27, 2024
1 parent 2614a46 commit cfd8d06
Show file tree
Hide file tree
Showing 5 changed files with 292 additions and 0 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

188 changes: 188 additions & 0 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,35 @@ impl<T: Task> Queue<T> {
Ok(())
}

/// Removes the configured schedule for the queue, if one exsists..
///
/// # Errors
///
/// This function will return an error if:
///
/// - The database operation fails during insert.
#[instrument(
skip_all,
fields(queue.name = self.name),
err
)]
pub async fn unschedule<'a, E>(&self, executor: E) -> Result
where
E: PgExecutor<'a>,
{
sqlx::query!(
r#"
delete from underway.task_schedule
where name = $1
"#,
self.name,
)
.execute(executor)
.await?;

Ok(())
}

#[instrument(skip(self, executor), err)]
pub(crate) async fn task_schedule<'a, E>(
&self,
Expand Down Expand Up @@ -1560,4 +1589,163 @@ mod tests {

Ok(())
}

#[sqlx::test]
async fn schedule(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::<TestTask>::builder()
.name("schedule")
.pool(pool.clone())
.build()
.await?;

// Set the schedule
let input = serde_json::json!(());
let daily = "@daily[America/Los_Angeles]"
.parse()
.expect("Zoned schedule should parse");
queue.schedule(&pool, &daily, &input).await?;

// Check the schedule was actually set
let schedule_row = sqlx::query!(
r#"
select schedule, timezone, input from underway.task_schedule where name = $1
"#,
"schedule"
)
.fetch_optional(&pool)
.await?;

let schedule = schedule_row.expect("Schedule should be set");
assert_eq!(schedule.schedule, "@daily");
assert_eq!(schedule.timezone, "America/Los_Angeles");
assert_eq!(schedule.input, input);

Ok(())
}

#[sqlx::test]
async fn schedule_twice(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::<TestTask>::builder()
.name("schedule_twice")
.pool(pool.clone())
.build()
.await?;

// Set the first schedule
let input = serde_json::json!(());
let daily = "@daily[America/Los_Angeles]"
.parse()
.expect("Zoned schedule should parse");
queue.schedule(&pool, &daily, &input).await?;

// Set the second schedule, overwriting the first
let input = serde_json::json!(());
let monthly = "@monthly[America/Los_Angeles]"
.parse()
.expect("Zoned schedule should parse");
queue.schedule(&pool, &monthly, &input).await?;

// Check the schedule was actually set
let schedule_row = sqlx::query!(
r#"
select schedule, timezone, input
from underway.task_schedule
where name = $1
"#,
queue.name
)
.fetch_optional(&pool)
.await?;

let schedule = schedule_row.expect("Schedule should be set");
assert_eq!(schedule.schedule, "@monthly");
assert_eq!(schedule.timezone, "America/Los_Angeles");
assert_eq!(schedule.input, input);

Ok(())
}

#[sqlx::test]
async fn task_schedule(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::<TestTask>::builder()
.name("task_schedule")
.pool(pool.clone())
.build()
.await?;

// Set the schedule
let input = serde_json::json!(());
let daily = "@daily[America/Los_Angeles]"
.parse()
.expect("Zoned schedule should parse");
queue.schedule(&pool, &daily, &input).await?;

// Check the task schedule for the queue
let (zoned_schedule, schedule_input) = queue
.task_schedule(&pool)
.await?
.expect("Schedule should be set");

assert_eq!(zoned_schedule, daily);
assert_eq!(schedule_input, input);

Ok(())
}

#[sqlx::test]
async fn task_schedule_without_schedule(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::<TestTask>::builder()
.name("task_schedule")
.pool(pool.clone())
.build()
.await?;

assert!(queue.task_schedule(&pool).await?.is_none());

Ok(())
}

#[sqlx::test]
async fn unschedule(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::<TestTask>::builder()
.name("unschedule")
.pool(pool.clone())
.build()
.await?;

// Set the schedule
let input = serde_json::json!(());
let daily = "@daily[America/Los_Angeles]"
.parse()
.expect("Zoned schedule should parse");
queue.schedule(&pool, &daily, &input).await?;
queue.unschedule(&pool).await?;

// Check the schedule was actually set
let schedule_row = sqlx::query!(
r#"
select schedule from underway.task_schedule where name = $1
"#,
"schedule"
)
.fetch_optional(&pool)
.await?;

assert!(schedule_row.is_none());

Ok(())
}

#[sqlx::test]
async fn unschedule_without_schedule(pool: PgPool) -> sqlx::Result<(), Error> {
let queue = Queue::<TestTask>::builder()
.name("unschedule_without_schedule")
.pool(pool.clone())
.build()
.await?;

assert!(queue.unschedule(&pool).await.is_ok());

Ok(())
}
}

0 comments on commit cfd8d06

Please sign in to comment.