Skip to content

Commit

Permalink
more method examples
Browse files Browse the repository at this point in the history
  • Loading branch information
maxcountryman committed Oct 28, 2024
1 parent 09f61e6 commit 26364ef
Showing 1 changed file with 238 additions and 5 deletions.
243 changes: 238 additions & 5 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1724,6 +1724,15 @@ impl<I, S> Default for Builder<I, I, S, Initial> {

impl<I, S> Builder<I, I, S, Initial> {
/// Create a new builder.
///
/// # Example
///
/// ```rust
/// use underway::Job;
///
/// // Instantiate a new builder from the `Job` method.
/// let job_builder = Job::<(), ()>::builder();
/// ```
pub fn new() -> Builder<I, I, S, Initial> {
Builder::<I, I, S, _> {
builder_state: Initial,
Expand All @@ -1734,8 +1743,30 @@ impl<I, S> Builder<I, I, S, Initial> {

/// Provides a state shared amongst all steps.
///
/// The state type must be `Clone`.
///
/// State is useful for providing shared resources to steps. This could
/// include shared connections, clients, or other configuration that may be
/// used throughout step functions.
///
/// **Note:** State is not persisted and therefore should not be relied on
/// when durability is needed.
///
/// # Example
///
/// ```rust
/// use underway::Job;
///
/// #[derive(Clone)]
/// struct State {
/// data: String,
/// }
///
/// // Set state.
/// let job_builder = Job::<(), _>::builder().state(State {
/// data: "foo".to_string(),
/// });
/// ```
pub fn state(self, state: S) -> Builder<I, I, S, StateSet<S>> {
Builder {
builder_state: StateSet { state },
Expand All @@ -1746,8 +1777,23 @@ impl<I, S> Builder<I, I, S, Initial> {

/// Add a step to the job.
///
/// This method ensures that the input type of the new step matches the
/// output type of the previous step.
/// A step function should take the job context as its first argument
/// followed by some type that's `Serialize` and `Deserialize` as its
/// second argument.
///
/// It should also return one of the [`To`] variants. For convenience, `To`
/// provides methods that return the correct types. Most commonly these
/// will be [`To::next`], when going on to another step, or [`To::done`],
/// when there are no more steps.
///
/// # Example
///
/// ```rust
/// use underway::{Job, To};
///
/// // Set a step.
/// let job_builder = Job::<(), ()>::builder().step(|_cx, _| async move { To::done() });
/// ```
pub fn step<F, O, Fut>(mut self, func: F) -> Builder<I, O, S, StepSet<O, ()>>
where
I: DeserializeOwned + Serialize + Send + Sync + 'static,
Expand All @@ -1774,8 +1820,35 @@ impl<I, S> Builder<I, I, S, Initial> {
impl<I, S> Builder<I, I, S, StateSet<S>> {
/// Add a step to the job.
///
/// This method ensures that the input type of the new step matches the
/// output type of the previous step.
/// A step function should take the job context as its first argument
/// followed by some type that's `Serialize` and `Deserialize` as its
/// second argument.
///
/// It should also return one of the [`To`] variants. For convenience, `To`
/// provides methods that return the correct types. Most commonly these
/// will be [`To::next`], when going on to another step, or [`To::done`],
/// when there are no more steps.
///
/// # Example
///
/// ```rust
/// use underway::{Job, To};
///
/// #[derive(Clone)]
/// struct State {
/// data: String,
/// }
///
/// // Set a step with state.
/// let job_builder = Job::<(), _>::builder()
/// .state(State {
/// data: "foo".to_string(),
/// })
/// .step(|cx, _| async move {
/// println!("State data: {}", cx.state.data);
/// To::done()
/// });
/// ```
pub fn step<F, O, Fut>(mut self, func: F) -> Builder<I, O, S, StepSet<O, S>>
where
I: DeserializeOwned + Serialize + Send + Sync + 'static,
Expand Down Expand Up @@ -1804,6 +1877,23 @@ impl<I, Current, S> Builder<I, Current, S, StepSet<Current, S>> {
///
/// This method ensures that the input type of the new step matches the
/// output type of the previous step.
///
/// # Example
///
/// ```rust
/// use serde::{Deserialize, Serialize};
/// use underway::{Job, To};
///
/// #[derive(Deserialize, Serialize)]
/// struct Step2 {
/// n: usize,
/// }
///
/// // Set one step after another.
/// let job_builder = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::next(Step2 { n: 42 }) })
/// .step(|_cx, Step2 { n }| async move { To::done() });
/// ```
pub fn step<F, New, Fut>(mut self, func: F) -> Builder<I, New, S, StepSet<New, S>>
where
Current: DeserializeOwned + Serialize + Send + Sync + 'static,
Expand All @@ -1826,6 +1916,22 @@ impl<I, Current, S> Builder<I, Current, S, StepSet<Current, S>> {
}

/// Sets the retry policy of the previous step.
///
/// This policy applies to the step immediately before the method. That
/// means that a retry policy may be defined for each step and each
/// step's policy may differ from the others.
///
/// # Example
///
/// ```rust
/// use underway::{task::RetryPolicy, Job, To};
///
/// // Set a retry policy for the step.
/// let retry_policy = RetryPolicy::builder().max_attempts(15).build();
/// let job_builder = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .retry_policy(retry_policy);
/// ```
pub fn retry_policy(
mut self,
retry_policy: RetryPolicy,
Expand All @@ -1851,6 +1957,25 @@ where
S: Clone + Send + Sync + 'static,
{
/// Set the name of the job's queue.
///
/// This provides the name of the underlying queue that will be created for
/// this job.
///
/// **Note:** It's important that this name be unique amongst all tasks. If
/// it's not and other tasks define differing input types this will
/// cause runtime errors when mismatching types are deserialized from
/// the database.
///
/// # Example
///
/// ```rust
/// use underway::{Job, To};
///
/// // Set a name.
/// let job_builder = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .name("example");
/// ```
pub fn name(self, name: impl Into<String>) -> Builder<I, (), S, QueueNameSet<S>> {
Builder {
builder_state: QueueNameSet {
Expand All @@ -1868,7 +1993,34 @@ where
I: Send + Sync + 'static,
S: Clone + Send + Sync + 'static,
{
/// Set the name of the job's queue.
/// Set the pool of the job's queue.
///
/// This provides the connection pool to the database that the underlying
/// queue will use for this job.
///
/// # Example
///
/// ```rust,no_run
/// use std::env;
///
/// use sqlx::PgPool;
/// use underway::{Job, To};
/// # use tokio::runtime::Runtime;
/// # fn main() {
/// # let rt = Runtime::new().unwrap();
/// # rt.block_on(async {
///
/// let pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
///
/// // Set a pool.
/// let job_builder = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .name("example")
/// .pool(pool);
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
/// ```
pub fn pool(self, pool: PgPool) -> Builder<I, (), S, PoolSet<S>> {
let QueueNameSet { queue_name, state } = self.builder_state;
Builder {
Expand All @@ -1889,6 +2041,32 @@ where
S: Clone + Send + Sync + 'static,
{
/// Finalize the builder into a `Job`.
///
/// # Example
///
/// ```rust,no_run
/// use std::env;
///
/// use sqlx::PgPool;
/// use underway::{Job, To};
///
/// # use tokio::runtime::Runtime;
/// # fn main() {
/// # let rt = Runtime::new().unwrap();
/// # rt.block_on(async {
/// let pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?;
///
/// // Build the job.
/// let job = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .name("example")
/// .pool(pool)
/// .build()
/// .await?;
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
/// ```
pub async fn build(self) -> Result<Job<I, S>> {
let PoolSet {
state,
Expand All @@ -1913,6 +2091,35 @@ where
S: Clone + Send + Sync + 'static,
{
/// Set the queue.
///
/// This allows providing a `Queue` directly, for situations where the queue
/// has been defined separately from the job.
///
/// # Example
///
/// ```rust,no_run
/// # use sqlx::PgPool;
/// use underway::{Job, Queue, To};
///
/// # use tokio::runtime::Runtime;
/// # fn main() {
/// # let rt = Runtime::new().unwrap();
/// # rt.block_on(async {
/// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
/// # /*
/// let pool = { /* A `PgPool`. */ };
/// # */
/// #
/// let queue = Queue::builder().name("example").pool(pool).build().await?;
///
/// // Set a queue.
/// let job = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .queue(queue);
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
/// ```
pub fn queue(self, queue: JobQueue<I, S>) -> Builder<I, (), S, QueueSet<I, S>> {
Builder {
builder_state: QueueSet {
Expand All @@ -1931,6 +2138,32 @@ where
S: Clone + Send + Sync + 'static,
{
/// Finalize the builder into a `Job`.
///
/// # Example
///
/// ```rust,no_run
/// # use sqlx::PgPool;
/// use underway::{Job, To, Queue};
///
/// # use tokio::runtime::Runtime;
/// # fn main() {
/// # let rt = Runtime::new().unwrap();
/// # rt.block_on(async {
/// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?;
/// # /*
/// let pool = { /* A `PgPool`. */ };
/// # */
/// #
/// let queue = Queue::builder().name("example").pool(pool).build().await?;
///
/// // Build the job.
/// let job = Job::<(), ()>::builder()
/// .step(|_cx, _| async move { To::done() })
/// .queue(queue)
/// .build();
/// # Ok::<(), Box<dyn std::error::Error>>(())
/// # });
/// # }
pub fn build(self) -> Job<I, S> {
let QueueSet { state, queue } = self.builder_state;
Job {
Expand Down

0 comments on commit 26364ef

Please sign in to comment.