diff --git a/src/job.rs b/src/job.rs index 40400cc..b7c797e 100644 --- a/src/job.rs +++ b/src/job.rs @@ -449,8 +449,8 @@ where S: Clone + Send + Sync + 'static, { /// Create a new job builder. - pub fn builder() -> JobBuilder { - JobBuilder::default() + pub fn builder() -> Builder { + Builder::default() } /// Enqueue the job using a connection from the queue's pool. @@ -634,7 +634,7 @@ mod builder_states { /// Builder for [`Job`]. #[derive(Debug)] -pub struct JobBuilder +pub struct Builder where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, @@ -649,7 +649,7 @@ where _marker: PhantomData<(I, S)>, } -impl JobBuilder> +impl Builder> where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, @@ -789,14 +789,14 @@ where } } -impl JobBuilder +impl Builder where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, { /// Create a job builder. - pub fn new() -> JobBuilder { - JobBuilder:: { + pub fn new() -> Builder { + Builder:: { builder_state: Initial, retry_policy: RetryPolicy::default(), timeout: Span::new().minutes(15), @@ -837,8 +837,8 @@ where /// /// Job::<(), _>::builder().state(state); /// ``` - pub fn state(self, state: S) -> JobBuilder> { - JobBuilder { + pub fn state(self, state: S) -> Builder> { + Builder { builder_state: StateSet { state }, retry_policy: self.retry_policy, timeout: self.timeout, @@ -870,12 +870,12 @@ where /// Ok(()) /// }); /// ``` - pub fn execute(self, f: F) -> JobBuilder> + pub fn execute(self, f: F) -> Builder> where F: Fn(I) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { - JobBuilder { + Builder { builder_state: ExecutorSet { execute_fn: ExecuteFn::Simple(Arc::new(move |input: I| { let fut = f(input); @@ -894,7 +894,7 @@ where } } -impl Default for JobBuilder +impl Default for Builder where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, @@ -904,7 +904,7 @@ where } } -impl JobBuilder> +impl Builder> where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, @@ -943,12 +943,12 @@ where /// Ok(()) /// }); /// ``` - pub fn execute(self, f: F) -> JobBuilder> + pub fn execute(self, f: F) -> Builder> where F: Fn(I, S) -> Fut + Send + Sync + 'static, Fut: Future + Send + 'static, { - JobBuilder { + Builder { builder_state: ExecutorSet { execute_fn: ExecuteFn::Stateful(Arc::new(move |input: I, state: S| { let fut = f(input, state); @@ -967,7 +967,7 @@ where } } -impl JobBuilder> +impl Builder> where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, @@ -1003,8 +1003,8 @@ where /// # }); /// # } /// ``` - pub fn queue(self, queue: Queue>) -> JobBuilder> { - JobBuilder { + pub fn queue(self, queue: Queue>) -> Builder> { + Builder { builder_state: QueueSet { state: self.builder_state.state, execute_fn: self.builder_state.execute_fn, @@ -1021,7 +1021,7 @@ where } } -impl JobBuilder> +impl Builder> where I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, diff --git a/src/queue.rs b/src/queue.rs index cbfabd6..3998c00 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -290,8 +290,8 @@ impl Clone for Queue { impl Queue { /// Creates a builder for a new queue. - pub fn builder() -> QueueBuilder { - QueueBuilder::default() + pub fn builder() -> Builder { + Builder::default() } /// Enqueues a new task into the task queue, returning the task's unique ID. @@ -867,18 +867,18 @@ mod builder_states { /// A builder for [`Queue`]. #[derive(Debug)] -pub struct QueueBuilder { +pub struct Builder { state: S, _marker: PhantomData, } -impl Default for QueueBuilder { +impl Default for Builder { fn default() -> Self { - QueueBuilder::new() + Builder::new() } } -impl QueueBuilder { +impl Builder { /// Create a new queue builder. pub fn new() -> Self { Self { @@ -888,8 +888,8 @@ impl QueueBuilder { } /// Set the queue name. - pub fn name(self, name: impl Into) -> QueueBuilder { - QueueBuilder { + pub fn name(self, name: impl Into) -> Builder { + Builder { state: NameSet { name: name.into(), dlq_name: None, @@ -899,7 +899,7 @@ impl QueueBuilder { } } -impl QueueBuilder { +impl Builder { /// Set the dead-letter queue name. pub fn dead_letter_queue(mut self, dlq_name: impl Into) -> Self { self.state.dlq_name = Some(dlq_name.into()); @@ -907,8 +907,8 @@ impl QueueBuilder { } /// Set the database connection pool. - pub fn pool(self, pool: PgPool) -> QueueBuilder { - QueueBuilder { + pub fn pool(self, pool: PgPool) -> Builder { + Builder { state: PoolSet { name: self.state.name, dlq_name: self.state.dlq_name, @@ -919,7 +919,7 @@ impl QueueBuilder { } } -impl QueueBuilder { +impl Builder { /// Builds the queue. pub async fn build(self) -> Result> { let state = self.state; diff --git a/src/task.rs b/src/task.rs index 7d33538..ccc3f23 100644 --- a/src/task.rs +++ b/src/task.rs @@ -73,6 +73,11 @@ use serde::{de::DeserializeOwned, Serialize}; use sqlx::postgres::types::PgInterval; use uuid::Uuid; +pub(crate) use self::retry_policy::RetryCount; +pub use self::retry_policy::RetryPolicy; + +mod retry_policy; + /// A type alias for task identifiers. /// /// Task IDs are [ULID][ULID]s which are converted to UUIDv4 for storage. @@ -452,140 +457,6 @@ pub struct DequeuedTask { pub concurrency_key: Option, } -/// Configuration of a policy for retries in case of task failure. -/// -/// # Example -/// -/// ```rust -/// use underway::task::RetryPolicy; -/// -/// let retry_policy = RetryPolicy::builder() -/// .max_attempts(10) -/// .backoff_coefficient(4.0) -/// .build(); -/// ``` -#[derive(Debug, Clone, Copy, PartialEq)] -pub struct RetryPolicy { - pub(crate) max_attempts: i32, - pub(crate) initial_interval_ms: i32, - pub(crate) max_interval_ms: i32, - pub(crate) backoff_coefficient: f32, -} - -pub(crate) type RetryCount = i32; - -impl RetryPolicy { - /// Create a new builder. - pub fn builder() -> RetryPolicyBuilder { - RetryPolicyBuilder::default() - } - - /// Returns the delay relative to the given retry count. - pub fn calculate_delay(&self, retry_count: RetryCount) -> Span { - let base_delay = self.initial_interval_ms as f32; - let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1); - let delay = backoff_delay.min(self.max_interval_ms as f32) as i64; - delay.milliseconds() - } -} - -impl From for RetryPolicy { - fn from( - DequeuedTask { - max_attempts, - initial_interval_ms, - max_interval_ms, - backoff_coefficient, - .. - }: DequeuedTask, - ) -> Self { - Self { - max_attempts, - initial_interval_ms, - max_interval_ms, - backoff_coefficient, - } - } -} - -const DEFAULT_RETRY_POLICY: RetryPolicy = RetryPolicy { - max_attempts: 5, - initial_interval_ms: 1_000, - max_interval_ms: 60_000, - backoff_coefficient: 2.0, -}; - -impl Default for RetryPolicy { - fn default() -> Self { - DEFAULT_RETRY_POLICY - } -} - -/// A builder for constructing `RetryPolicy`. -/// -/// # Example -/// -/// ``` -/// use underway::task::RetryPolicyBuilder; -/// -/// let retry_policy = RetryPolicyBuilder::new() -/// .max_attempts(3) -/// .initial_interval_ms(500) -/// .max_interval_ms(5_000) -/// .backoff_coefficient(1.5) -/// .build(); -/// ``` -#[derive(Debug, Default)] -pub struct RetryPolicyBuilder { - inner: RetryPolicy, -} - -impl RetryPolicyBuilder { - /// Creates a new `RetryPolicyBuilder` with the default retry settings. - pub const fn new() -> Self { - Self { - inner: DEFAULT_RETRY_POLICY, - } - } - - /// Sets the maximum number of retry attempts. - /// - /// Default value is `5`. - pub const fn max_attempts(mut self, max_attempts: i32) -> Self { - self.inner.max_attempts = max_attempts; - self - } - - /// Sets the initial interval before the first retry (in milliseconds). - /// - /// Default value is `1_000`. - pub const fn initial_interval_ms(mut self, initial_interval_ms: i32) -> Self { - self.inner.initial_interval_ms = initial_interval_ms; - self - } - - /// Sets the maximum interval between retries (in milliseconds). - /// - /// Default value is `60_000`. - pub const fn max_interval_ms(mut self, max_interval_ms: i32) -> Self { - self.inner.max_interval_ms = max_interval_ms; - self - } - - /// Sets the backoff coefficient to apply after each retry. - /// - /// Default value is `2.0`. - pub const fn backoff_coefficient(mut self, backoff_coefficient: f32) -> Self { - self.inner.backoff_coefficient = backoff_coefficient; - self - } - - /// Builds the `RetryPolicy` with the configured parameters. - pub const fn build(self) -> RetryPolicy { - self.inner - } -} - /// Represents the possible states a task can be in. #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)] #[sqlx(type_name = "underway.task_state", rename_all = "snake_case")] @@ -664,7 +535,7 @@ mod tests { #[test] fn retry_policy_custom() { - let retry_policy = RetryPolicyBuilder::new() + let retry_policy = RetryPolicy::builder() .max_attempts(3) .initial_interval_ms(500) .max_interval_ms(5_000) diff --git a/src/task/retry_policy.rs b/src/task/retry_policy.rs new file mode 100644 index 0000000..6c1d2c0 --- /dev/null +++ b/src/task/retry_policy.rs @@ -0,0 +1,122 @@ +use jiff::{Span, ToSpan}; + +use crate::task::DequeuedTask; + +/// Configuration of a policy for retries in case of task failure. +/// +/// # Example +/// +/// ```rust +/// use underway::task::RetryPolicy; +/// +/// let retry_policy = RetryPolicy::builder() +/// .max_attempts(10) +/// .backoff_coefficient(4.0) +/// .build(); +/// ``` +#[derive(Debug, Clone, Copy, PartialEq)] +pub struct RetryPolicy { + pub(crate) max_attempts: i32, + pub(crate) initial_interval_ms: i32, + pub(crate) max_interval_ms: i32, + pub(crate) backoff_coefficient: f32, +} + +pub(crate) type RetryCount = i32; + +impl RetryPolicy { + /// Create a new builder. + pub fn builder() -> Builder { + Builder::default() + } + + pub(crate) fn calculate_delay(&self, retry_count: RetryCount) -> Span { + let base_delay = self.initial_interval_ms as f32; + let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1); + let delay = backoff_delay.min(self.max_interval_ms as f32) as i64; + delay.milliseconds() + } +} + +impl From for RetryPolicy { + fn from( + DequeuedTask { + max_attempts, + initial_interval_ms, + max_interval_ms, + backoff_coefficient, + .. + }: DequeuedTask, + ) -> Self { + Self { + max_attempts, + initial_interval_ms, + max_interval_ms, + backoff_coefficient, + } + } +} + +const DEFAULT_RETRY_POLICY: RetryPolicy = RetryPolicy { + max_attempts: 5, + initial_interval_ms: 1_000, + max_interval_ms: 60_000, + backoff_coefficient: 2.0, +}; + +impl Default for RetryPolicy { + fn default() -> Self { + DEFAULT_RETRY_POLICY + } +} + +#[derive(Debug, Default)] +pub struct Builder { + inner: RetryPolicy, +} + +impl Builder { + /// Creates a new `RetryPolicyBuilder` with the default retry settings. + pub const fn new() -> Self { + Self { + inner: DEFAULT_RETRY_POLICY, + } + } + + /// Sets the maximum number of retry attempts. + /// + /// Default value is `5`. + pub const fn max_attempts(mut self, max_attempts: i32) -> Self { + self.inner.max_attempts = max_attempts; + self + } + + /// Sets the initial interval before the first retry (in milliseconds). + /// + /// Default value is `1_000`. + pub const fn initial_interval_ms(mut self, initial_interval_ms: i32) -> Self { + self.inner.initial_interval_ms = initial_interval_ms; + self + } + + /// Sets the maximum interval between retries (in milliseconds). + /// + /// Default value is `60_000`. + pub const fn max_interval_ms(mut self, max_interval_ms: i32) -> Self { + self.inner.max_interval_ms = max_interval_ms; + self + } + + /// Sets the backoff coefficient to apply after each retry. + /// + /// Default value is `2.0`. + pub const fn backoff_coefficient(mut self, backoff_coefficient: f32) -> Self { + self.inner.backoff_coefficient = backoff_coefficient; + self + } + + /// Builds the `RetryPolicy` with the configured parameters. + pub const fn build(self) -> RetryPolicy { + self.inner + } +}