From 26364ef44dcdac4d1f925e3569c16ec46ba4e30d Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Mon, 28 Oct 2024 09:14:03 -0700 Subject: [PATCH] more method examples --- src/job.rs | 243 +++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 238 insertions(+), 5 deletions(-) diff --git a/src/job.rs b/src/job.rs index 4d359ba..c644cfd 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1724,6 +1724,15 @@ impl Default for Builder { impl Builder { /// Create a new builder. + /// + /// # Example + /// + /// ```rust + /// use underway::Job; + /// + /// // Instantiate a new builder from the `Job` method. + /// let job_builder = Job::<(), ()>::builder(); + /// ``` pub fn new() -> Builder { Builder:: { builder_state: Initial, @@ -1734,8 +1743,30 @@ impl Builder { /// Provides a state shared amongst all steps. /// + /// The state type must be `Clone`. + /// + /// State is useful for providing shared resources to steps. This could + /// include shared connections, clients, or other configuration that may be + /// used throughout step functions. + /// /// **Note:** State is not persisted and therefore should not be relied on /// when durability is needed. + /// + /// # Example + /// + /// ```rust + /// use underway::Job; + /// + /// #[derive(Clone)] + /// struct State { + /// data: String, + /// } + /// + /// // Set state. + /// let job_builder = Job::<(), _>::builder().state(State { + /// data: "foo".to_string(), + /// }); + /// ``` pub fn state(self, state: S) -> Builder> { Builder { builder_state: StateSet { state }, @@ -1746,8 +1777,23 @@ impl Builder { /// Add a step to the job. /// - /// This method ensures that the input type of the new step matches the - /// output type of the previous step. + /// A step function should take the job context as its first argument + /// followed by some type that's `Serialize` and `Deserialize` as its + /// second argument. + /// + /// It should also return one of the [`To`] variants. For convenience, `To` + /// provides methods that return the correct types. Most commonly these + /// will be [`To::next`], when going on to another step, or [`To::done`], + /// when there are no more steps. + /// + /// # Example + /// + /// ```rust + /// use underway::{Job, To}; + /// + /// // Set a step. + /// let job_builder = Job::<(), ()>::builder().step(|_cx, _| async move { To::done() }); + /// ``` pub fn step(mut self, func: F) -> Builder> where I: DeserializeOwned + Serialize + Send + Sync + 'static, @@ -1774,8 +1820,35 @@ impl Builder { impl Builder> { /// Add a step to the job. /// - /// This method ensures that the input type of the new step matches the - /// output type of the previous step. + /// A step function should take the job context as its first argument + /// followed by some type that's `Serialize` and `Deserialize` as its + /// second argument. + /// + /// It should also return one of the [`To`] variants. For convenience, `To` + /// provides methods that return the correct types. Most commonly these + /// will be [`To::next`], when going on to another step, or [`To::done`], + /// when there are no more steps. + /// + /// # Example + /// + /// ```rust + /// use underway::{Job, To}; + /// + /// #[derive(Clone)] + /// struct State { + /// data: String, + /// } + /// + /// // Set a step with state. + /// let job_builder = Job::<(), _>::builder() + /// .state(State { + /// data: "foo".to_string(), + /// }) + /// .step(|cx, _| async move { + /// println!("State data: {}", cx.state.data); + /// To::done() + /// }); + /// ``` pub fn step(mut self, func: F) -> Builder> where I: DeserializeOwned + Serialize + Send + Sync + 'static, @@ -1804,6 +1877,23 @@ impl Builder> { /// /// This method ensures that the input type of the new step matches the /// output type of the previous step. + /// + /// # Example + /// + /// ```rust + /// use serde::{Deserialize, Serialize}; + /// use underway::{Job, To}; + /// + /// #[derive(Deserialize, Serialize)] + /// struct Step2 { + /// n: usize, + /// } + /// + /// // Set one step after another. + /// let job_builder = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::next(Step2 { n: 42 }) }) + /// .step(|_cx, Step2 { n }| async move { To::done() }); + /// ``` pub fn step(mut self, func: F) -> Builder> where Current: DeserializeOwned + Serialize + Send + Sync + 'static, @@ -1826,6 +1916,22 @@ impl Builder> { } /// Sets the retry policy of the previous step. + /// + /// This policy applies to the step immediately before the method. That + /// means that a retry policy may be defined for each step and each + /// step's policy may differ from the others. + /// + /// # Example + /// + /// ```rust + /// use underway::{task::RetryPolicy, Job, To}; + /// + /// // Set a retry policy for the step. + /// let retry_policy = RetryPolicy::builder().max_attempts(15).build(); + /// let job_builder = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .retry_policy(retry_policy); + /// ``` pub fn retry_policy( mut self, retry_policy: RetryPolicy, @@ -1851,6 +1957,25 @@ where S: Clone + Send + Sync + 'static, { /// Set the name of the job's queue. + /// + /// This provides the name of the underlying queue that will be created for + /// this job. + /// + /// **Note:** It's important that this name be unique amongst all tasks. If + /// it's not and other tasks define differing input types this will + /// cause runtime errors when mismatching types are deserialized from + /// the database. + /// + /// # Example + /// + /// ```rust + /// use underway::{Job, To}; + /// + /// // Set a name. + /// let job_builder = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .name("example"); + /// ``` pub fn name(self, name: impl Into) -> Builder> { Builder { builder_state: QueueNameSet { @@ -1868,7 +1993,34 @@ where I: Send + Sync + 'static, S: Clone + Send + Sync + 'static, { - /// Set the name of the job's queue. + /// Set the pool of the job's queue. + /// + /// This provides the connection pool to the database that the underlying + /// queue will use for this job. + /// + /// # Example + /// + /// ```rust,no_run + /// use std::env; + /// + /// use sqlx::PgPool; + /// use underway::{Job, To}; + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// + /// let pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?; + /// + /// // Set a pool. + /// let job_builder = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .name("example") + /// .pool(pool); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn pool(self, pool: PgPool) -> Builder> { let QueueNameSet { queue_name, state } = self.builder_state; Builder { @@ -1889,6 +2041,32 @@ where S: Clone + Send + Sync + 'static, { /// Finalize the builder into a `Job`. + /// + /// # Example + /// + /// ```rust,no_run + /// use std::env; + /// + /// use sqlx::PgPool; + /// use underway::{Job, To}; + /// + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// let pool = PgPool::connect(&env::var("DATABASE_URL").unwrap()).await?; + /// + /// // Build the job. + /// let job = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .name("example") + /// .pool(pool) + /// .build() + /// .await?; + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub async fn build(self) -> Result> { let PoolSet { state, @@ -1913,6 +2091,35 @@ where S: Clone + Send + Sync + 'static, { /// Set the queue. + /// + /// This allows providing a `Queue` directly, for situations where the queue + /// has been defined separately from the job. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// use underway::{Job, Queue, To}; + /// + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// let queue = Queue::builder().name("example").pool(pool).build().await?; + /// + /// // Set a queue. + /// let job = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .queue(queue); + /// # Ok::<(), Box>(()) + /// # }); + /// # } + /// ``` pub fn queue(self, queue: JobQueue) -> Builder> { Builder { builder_state: QueueSet { @@ -1931,6 +2138,32 @@ where S: Clone + Send + Sync + 'static, { /// Finalize the builder into a `Job`. + /// + /// # Example + /// + /// ```rust,no_run + /// # use sqlx::PgPool; + /// use underway::{Job, To, Queue}; + /// + /// # use tokio::runtime::Runtime; + /// # fn main() { + /// # let rt = Runtime::new().unwrap(); + /// # rt.block_on(async { + /// # let pool = PgPool::connect(&std::env::var("DATABASE_URL")?).await?; + /// # /* + /// let pool = { /* A `PgPool`. */ }; + /// # */ + /// # + /// let queue = Queue::builder().name("example").pool(pool).build().await?; + /// + /// // Build the job. + /// let job = Job::<(), ()>::builder() + /// .step(|_cx, _| async move { To::done() }) + /// .queue(queue) + /// .build(); + /// # Ok::<(), Box>(()) + /// # }); + /// # } pub fn build(self) -> Job { let QueueSet { state, queue } = self.builder_state; Job {