Skip to content

Commit

Permalink
rename all builders as "builder" (#15)
Browse files Browse the repository at this point in the history
This is technically a breaking change, but cleans up the conventions and
provides improved organization.
  • Loading branch information
maxcountryman committed Oct 18, 2024
1 parent 2eeedc2 commit 5134931
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 166 deletions.
38 changes: 19 additions & 19 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ where
S: Clone + Send + Sync + 'static,
{
/// Create a new job builder.
pub fn builder() -> JobBuilder<I, S, Initial> {
JobBuilder::default()
pub fn builder() -> Builder<I, S, Initial> {
Builder::default()
}

/// Enqueue the job using a connection from the queue's pool.
Expand Down Expand Up @@ -634,7 +634,7 @@ mod builder_states {

/// Builder for [`Job`].
#[derive(Debug)]
pub struct JobBuilder<I, S, B = Initial>
pub struct Builder<I, S, B = Initial>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
Expand All @@ -649,7 +649,7 @@ where
_marker: PhantomData<(I, S)>,
}

impl<I, S> JobBuilder<I, S, ExecutorSet<I, S>>
impl<I, S> Builder<I, S, ExecutorSet<I, S>>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -789,14 +789,14 @@ where
}
}

impl<I, S> JobBuilder<I, S, Initial>
impl<I, S> Builder<I, S, Initial>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
{
/// Create a job builder.
pub fn new() -> JobBuilder<I, S, Initial> {
JobBuilder::<I, S, _> {
pub fn new() -> Builder<I, S, Initial> {
Builder::<I, S, _> {
builder_state: Initial,
retry_policy: RetryPolicy::default(),
timeout: Span::new().minutes(15),
Expand Down Expand Up @@ -837,8 +837,8 @@ where
///
/// Job::<(), _>::builder().state(state);
/// ```
pub fn state(self, state: S) -> JobBuilder<I, S, StateSet<S>> {
JobBuilder {
pub fn state(self, state: S) -> Builder<I, S, StateSet<S>> {
Builder {
builder_state: StateSet { state },
retry_policy: self.retry_policy,
timeout: self.timeout,
Expand Down Expand Up @@ -870,12 +870,12 @@ where
/// Ok(())
/// });
/// ```
pub fn execute<F, Fut>(self, f: F) -> JobBuilder<I, S, ExecutorSet<I, ()>>
pub fn execute<F, Fut>(self, f: F) -> Builder<I, S, ExecutorSet<I, ()>>
where
F: Fn(I) -> Fut + Send + Sync + 'static,
Fut: Future<Output = TaskResult> + Send + 'static,
{
JobBuilder {
Builder {
builder_state: ExecutorSet {
execute_fn: ExecuteFn::Simple(Arc::new(move |input: I| {
let fut = f(input);
Expand All @@ -894,7 +894,7 @@ where
}
}

impl<I, S> Default for JobBuilder<I, S, Initial>
impl<I, S> Default for Builder<I, S, Initial>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
Expand All @@ -904,7 +904,7 @@ where
}
}

impl<I, S> JobBuilder<I, S, StateSet<S>>
impl<I, S> Builder<I, S, StateSet<S>>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -943,12 +943,12 @@ where
/// Ok(())
/// });
/// ```
pub fn execute<F, Fut>(self, f: F) -> JobBuilder<I, S, ExecutorSet<I, S>>
pub fn execute<F, Fut>(self, f: F) -> Builder<I, S, ExecutorSet<I, S>>
where
F: Fn(I, S) -> Fut + Send + Sync + 'static,
Fut: Future<Output = TaskResult> + Send + 'static,
{
JobBuilder {
Builder {
builder_state: ExecutorSet {
execute_fn: ExecuteFn::Stateful(Arc::new(move |input: I, state: S| {
let fut = f(input, state);
Expand All @@ -967,7 +967,7 @@ where
}
}

impl<I, S> JobBuilder<I, S, ExecutorSet<I, S>>
impl<I, S> Builder<I, S, ExecutorSet<I, S>>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -1003,8 +1003,8 @@ where
/// # });
/// # }
/// ```
pub fn queue(self, queue: Queue<Job<I, S>>) -> JobBuilder<I, S, QueueSet<I, S>> {
JobBuilder {
pub fn queue(self, queue: Queue<Job<I, S>>) -> Builder<I, S, QueueSet<I, S>> {
Builder {
builder_state: QueueSet {
state: self.builder_state.state,
execute_fn: self.builder_state.execute_fn,
Expand All @@ -1021,7 +1021,7 @@ where
}
}

impl<I, S> JobBuilder<I, S, QueueSet<I, S>>
impl<I, S> Builder<I, S, QueueSet<I, S>>
where
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
Expand Down
24 changes: 12 additions & 12 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,8 @@ impl<T: Task> Clone for Queue<T> {

impl<T: Task> Queue<T> {
/// Creates a builder for a new queue.
pub fn builder() -> QueueBuilder<T, Initial> {
QueueBuilder::default()
pub fn builder() -> Builder<T, Initial> {
Builder::default()
}

/// Enqueues a new task into the task queue, returning the task's unique ID.
Expand Down Expand Up @@ -867,18 +867,18 @@ mod builder_states {

/// A builder for [`Queue`].
#[derive(Debug)]
pub struct QueueBuilder<T: Task, S> {
pub struct Builder<T: Task, S> {
state: S,
_marker: PhantomData<T>,
}

impl<T: Task> Default for QueueBuilder<T, Initial> {
impl<T: Task> Default for Builder<T, Initial> {
fn default() -> Self {
QueueBuilder::new()
Builder::new()
}
}

impl<T: Task> QueueBuilder<T, Initial> {
impl<T: Task> Builder<T, Initial> {
/// Create a new queue builder.
pub fn new() -> Self {
Self {
Expand All @@ -888,8 +888,8 @@ impl<T: Task> QueueBuilder<T, Initial> {
}

/// Set the queue name.
pub fn name(self, name: impl Into<String>) -> QueueBuilder<T, NameSet> {
QueueBuilder {
pub fn name(self, name: impl Into<String>) -> Builder<T, NameSet> {
Builder {
state: NameSet {
name: name.into(),
dlq_name: None,
Expand All @@ -899,16 +899,16 @@ impl<T: Task> QueueBuilder<T, Initial> {
}
}

impl<T: Task> QueueBuilder<T, NameSet> {
impl<T: Task> Builder<T, NameSet> {
/// Set the dead-letter queue name.
pub fn dead_letter_queue(mut self, dlq_name: impl Into<String>) -> Self {
self.state.dlq_name = Some(dlq_name.into());
self
}

/// Set the database connection pool.
pub fn pool(self, pool: PgPool) -> QueueBuilder<T, PoolSet> {
QueueBuilder {
pub fn pool(self, pool: PgPool) -> Builder<T, PoolSet> {
Builder {
state: PoolSet {
name: self.state.name,
dlq_name: self.state.dlq_name,
Expand All @@ -919,7 +919,7 @@ impl<T: Task> QueueBuilder<T, NameSet> {
}
}

impl<T: Task> QueueBuilder<T, PoolSet> {
impl<T: Task> Builder<T, PoolSet> {
/// Builds the queue.
pub async fn build(self) -> Result<Queue<T>> {
let state = self.state;
Expand Down
141 changes: 6 additions & 135 deletions src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ use serde::{de::DeserializeOwned, Serialize};
use sqlx::postgres::types::PgInterval;
use uuid::Uuid;

pub(crate) use self::retry_policy::RetryCount;
pub use self::retry_policy::RetryPolicy;

mod retry_policy;

/// A type alias for task identifiers.
///
/// Task IDs are [ULID][ULID]s which are converted to UUIDv4 for storage.
Expand Down Expand Up @@ -452,140 +457,6 @@ pub struct DequeuedTask {
pub concurrency_key: Option<String>,
}

/// Configuration of a policy for retries in case of task failure.
///
/// # Example
///
/// ```rust
/// use underway::task::RetryPolicy;
///
/// let retry_policy = RetryPolicy::builder()
/// .max_attempts(10)
/// .backoff_coefficient(4.0)
/// .build();
/// ```
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct RetryPolicy {
pub(crate) max_attempts: i32,
pub(crate) initial_interval_ms: i32,
pub(crate) max_interval_ms: i32,
pub(crate) backoff_coefficient: f32,
}

pub(crate) type RetryCount = i32;

impl RetryPolicy {
/// Create a new builder.
pub fn builder() -> RetryPolicyBuilder {
RetryPolicyBuilder::default()
}

/// Returns the delay relative to the given retry count.
pub fn calculate_delay(&self, retry_count: RetryCount) -> Span {
let base_delay = self.initial_interval_ms as f32;
let backoff_delay = base_delay * self.backoff_coefficient.powi(retry_count - 1);
let delay = backoff_delay.min(self.max_interval_ms as f32) as i64;
delay.milliseconds()
}
}

impl From<DequeuedTask> for RetryPolicy {
fn from(
DequeuedTask {
max_attempts,
initial_interval_ms,
max_interval_ms,
backoff_coefficient,
..
}: DequeuedTask,
) -> Self {
Self {
max_attempts,
initial_interval_ms,
max_interval_ms,
backoff_coefficient,
}
}
}

const DEFAULT_RETRY_POLICY: RetryPolicy = RetryPolicy {
max_attempts: 5,
initial_interval_ms: 1_000,
max_interval_ms: 60_000,
backoff_coefficient: 2.0,
};

impl Default for RetryPolicy {
fn default() -> Self {
DEFAULT_RETRY_POLICY
}
}

/// A builder for constructing `RetryPolicy`.
///
/// # Example
///
/// ```
/// use underway::task::RetryPolicyBuilder;
///
/// let retry_policy = RetryPolicyBuilder::new()
/// .max_attempts(3)
/// .initial_interval_ms(500)
/// .max_interval_ms(5_000)
/// .backoff_coefficient(1.5)
/// .build();
/// ```
#[derive(Debug, Default)]
pub struct RetryPolicyBuilder {
inner: RetryPolicy,
}

impl RetryPolicyBuilder {
/// Creates a new `RetryPolicyBuilder` with the default retry settings.
pub const fn new() -> Self {
Self {
inner: DEFAULT_RETRY_POLICY,
}
}

/// Sets the maximum number of retry attempts.
///
/// Default value is `5`.
pub const fn max_attempts(mut self, max_attempts: i32) -> Self {
self.inner.max_attempts = max_attempts;
self
}

/// Sets the initial interval before the first retry (in milliseconds).
///
/// Default value is `1_000`.
pub const fn initial_interval_ms(mut self, initial_interval_ms: i32) -> Self {
self.inner.initial_interval_ms = initial_interval_ms;
self
}

/// Sets the maximum interval between retries (in milliseconds).
///
/// Default value is `60_000`.
pub const fn max_interval_ms(mut self, max_interval_ms: i32) -> Self {
self.inner.max_interval_ms = max_interval_ms;
self
}

/// Sets the backoff coefficient to apply after each retry.
///
/// Default value is `2.0`.
pub const fn backoff_coefficient(mut self, backoff_coefficient: f32) -> Self {
self.inner.backoff_coefficient = backoff_coefficient;
self
}

/// Builds the `RetryPolicy` with the configured parameters.
pub const fn build(self) -> RetryPolicy {
self.inner
}
}

/// Represents the possible states a task can be in.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, sqlx::Type)]
#[sqlx(type_name = "underway.task_state", rename_all = "snake_case")]
Expand Down Expand Up @@ -664,7 +535,7 @@ mod tests {

#[test]
fn retry_policy_custom() {
let retry_policy = RetryPolicyBuilder::new()
let retry_policy = RetryPolicy::builder()
.max_attempts(3)
.initial_interval_ms(500)
.max_interval_ms(5_000)
Expand Down
Loading

0 comments on commit 5134931

Please sign in to comment.