Skip to content

Commit

Permalink
additional asserts
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcountryman committed Oct 26, 2024
1 parent e2af32b commit 27a8f83
Showing 1 changed file with 156 additions and 38 deletions.
194 changes: 156 additions & 38 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,18 +1414,36 @@ mod tests {
message: String,
}

let queue = Queue::builder()
.name("one_step")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.step(|_cx, Input { message }| async move {
println!("Executing job with message: {message}");
To::done()
})
.name("one_step")
.pool(pool)
.build()
.await?;
.queue(queue.clone())
.build();

assert_eq!(job.retry_policy(), RetryPolicy::default());

let input = Input {
message: "Hello, world!".to_string(),
};
job.enqueue(&input).await?;

let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 0);
assert_eq!(job_state.step_input, serde_json::to_value(&input)?);

Ok(())
}

Expand All @@ -1441,15 +1459,30 @@ mod tests {
To::done()
}

let job = Job::builder()
.step(step)
let queue = Queue::builder()
.name("one_step_named")
.pool(pool)
.pool(pool.clone())
.build()
.await?;

let job = Job::builder().step(step).queue(queue.clone()).build();

assert_eq!(job.retry_policy(), RetryPolicy::default());

let input = Input {
message: "Hello, world!".to_string(),
};
job.enqueue(&input).await?;

let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 0);
assert_eq!(job_state.step_input, serde_json::to_value(&input)?);

Ok(())
}

Expand All @@ -1465,6 +1498,12 @@ mod tests {
message: String,
}

let queue = Queue::builder()
.name("one_step_with_state")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.state(State {
data: "data".to_string(),
Expand All @@ -1476,13 +1515,25 @@ mod tests {
);
To::done()
})
.name("one_step_with_state")
.pool(pool)
.build()
.await?;
.queue(queue.clone())
.build();

assert_eq!(job.retry_policy(), RetryPolicy::default());

let input = Input {
message: "Hello, world!".to_string(),
};
job.enqueue(&input).await?;

let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 0);
assert_eq!(job_state.step_input, serde_json::to_value(&input)?);

Ok(())
}

Expand Down Expand Up @@ -1552,16 +1603,34 @@ mod tests {
data: "data".to_string(),
};

let job = Job::builder()
.state(state)
.step(step)
let queue = Queue::builder()
.name("one_step_named")
.pool(pool)
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.state(state)
.step(step)
.queue(queue.clone())
.build();

assert_eq!(job.retry_policy(), RetryPolicy::default());

let input = Input {
message: "Hello, world!".to_string(),
};
job.enqueue(&input).await?;

let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 0);
assert_eq!(job_state.step_input, serde_json::to_value(&input)?);

Ok(())
}

Expand Down Expand Up @@ -1589,23 +1658,16 @@ mod tests {
let input = Input {
message: "Hello, world!".to_string(),
};
let task_id = job.enqueue(&input).await?;

let Some(dequeued_task) = queue.dequeue(&pool).await? else {
panic!("Task should exist");
};
job.enqueue(&input).await?;

assert_eq!(task_id, dequeued_task.id);
let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(dequeued_task.input).unwrap();
assert_eq!(
JobState {
step_index: 0,
step_input: serde_json::to_value(input).unwrap(),
job_id: job_state.job_id
},
job_state
);
let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 0);
assert_eq!(job_state.step_input, serde_json::to_value(&input)?);

Ok(())
}
Expand Down Expand Up @@ -1655,6 +1717,12 @@ mod tests {
data: Vec<u8>,
}

let queue = Queue::builder()
.name("multi_step")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.step(|_cx, Step1 { message }| async move {
println!("Executing job with message: {message}");
Expand All @@ -1666,13 +1734,35 @@ mod tests {
println!("Executing job with data: {data:?}");
To::done()
})
.name("multi_step")
.pool(pool)
.build()
.await?;
.queue(queue.clone())
.build();

assert_eq!(job.retry_policy(), RetryPolicy::default());

let input = Step1 {
message: "Hello, world!".to_string(),
};
job.enqueue(&input).await?;
let worker = Worker::new(queue.clone(), job.clone());

// Process the first task.
worker.process_next_task().await?;

// Inspect the second task.
let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 1);
assert_eq!(
job_state.step_input,
serde_json::to_value(&Step2 {
data: "Hello, world!".as_bytes().to_vec()
})?
);

Ok(())
}

Expand Down Expand Up @@ -1774,6 +1864,12 @@ mod tests {
data: Vec<u8>,
}

let queue = Queue::builder()
.name("multi_step_with_state")
.pool(pool.clone())
.build()
.await?;

let job = Job::builder()
.state(State {
data: "data".to_string(),
Expand All @@ -1794,13 +1890,35 @@ mod tests {
);
To::done()
})
.name("multi_step_with_state")
.pool(pool)
.build()
.await?;
.queue(queue.clone())
.build();

assert_eq!(job.retry_policy(), RetryPolicy::default());

let input = Step1 {
message: "Hello, world!".to_string(),
};
job.enqueue(&input).await?;
let worker = Worker::new(queue.clone(), job.clone());

// Process the first task.
worker.process_next_task().await?;

// Inspect the second task.
let pending_task = queue
.dequeue(&pool)
.await?
.expect("There should be an enqueued task");

let job_state: JobState = serde_json::from_value(pending_task.input)?;
assert_eq!(job_state.step_index, 1);
assert_eq!(
job_state.step_input,
serde_json::to_value(&Step2 {
data: "Hello, world!".as_bytes().to_vec()
})?
);

Ok(())
}

Expand Down

0 comments on commit 27a8f83

Please sign in to comment.