Skip to content

Commit

Permalink
decrement attempt on snooze, unaltered max_attempts
Browse files Browse the repository at this point in the history
When snoozing was implemented, we needed to ensure that snoozes didn't
count against a job's attempts so that it could keep snoozing repeatedly
if desired. The design chosen for this was to increment `max_attempts`
since `attempt` is automatically incremented when a job is fetched.

This choice turns out to have some downsides:

* It leads to situations where `max_attempts` could accidentally
  overflow the `smallint` range, even if the job wasn't erroring and was
  just meant to snooze indefinitely.

* The implementation of a `RetryPolicy` is a little trickier because one
  can't merely rely on the `attempt` since it gets inflated by snoozes.
  Instead, you'd need to use `len(job.errors)` to determine retry
  backoffs.

To resolve both these situations, change the implementation so that
snoozing causes `attempt` to be _decremented_ rather than _incrementing_
`max_attempts`. In order to make this easier to detect, set a `snoozes`
attribute in the job's metadata and increment it each time a snooze
occurs.

The implementation of the builtin `RetryPolicy` implementations is not
changed, so this change should not cause any user-facing breakage unless
somebody is relying on `attempt - len(errors)` for some reason.

Fixes #720.
  • Loading branch information
bgentry committed Jan 25, 2025
1 parent e296461 commit 3f43b90
Show file tree
Hide file tree
Showing 16 changed files with 324 additions and 210 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- The reindexer maintenance process has been enabled. As of now, it will reindex only the `river_job_args_index` and `river_jobs_metadata_index` `GIN` indexes, which are more prone to bloat than b-tree indexes. By default it runs daily at midnight UTC, but can be customized on the `river.Config` type via `ReindexerSchedule`. Most installations will benefit from this process, but it can be disabled altogether using `NeverSchedule`. [PR #718](https://github.com/riverqueue/river/pull/718).
- Periodic jobs now have a `"periodic": true` attribute set in their metadata to make them more easily distinguishable from other types of jobs. [PR #728](https://github.com/riverqueue/river/pull/728).
- Snoozing a job now causes its `attempt` to be _decremented_, whereas previously the `max_attempts` would be incremented. In either case, this avoids allowing a snooze to exhaust a job's retries; however the new behavior also avoids potential issues with wrapping the `max_attempts` value, and makes it simpler to implement a `RetryPolicy` based on either `attempt` or `max_attempts`. The number of snoozes is also tracked in the job's metadata as `snoozes` for debugging purposes.

The implementation of the builtin `RetryPolicy` implementations is not changed, so this change should not cause any user-facing breakage unless you're relying on `attempt - len(errors)` for some reason. [PR #730](https://github.com/riverqueue/river/pull/730).

## [0.15.0] - 2024-12-26

Expand Down
1 change: 1 addition & 0 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func Test_Client(t *testing.T) {

updatedJob, err := client.JobGet(ctx, insertRes.Job.ID)
require.NoError(t, err)
require.Equal(t, 0, updatedJob.Attempt)
require.Equal(t, rivertype.JobStateScheduled, updatedJob.State)
require.WithinDuration(t, time.Now().Add(15*time.Minute), updatedJob.ScheduledAt, 2*time.Second)
})
Expand Down
4 changes: 2 additions & 2 deletions example_job_snooze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func (w *SnoozingWorker) Work(ctx context.Context, job *river.Job[SnoozingArgs])

// Example_jobSnooze demonstrates how to snooze a job from within Work using
// JobSnooze. The job will be run again after 5 minutes and the snooze attempt
// will increment the job's max attempts, ensuring that one can snooze as many
// times as desired.
// will decrement the job's attempt count, ensuring that one can snooze as many
// times as desired without being impacted by the max attempts.
func Example_jobSnooze() { //nolint:dupl
ctx := context.Background()

Expand Down
29 changes: 16 additions & 13 deletions internal/jobcompleter/job_completer.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,20 +377,22 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
// it's done this way to allocate as few new slices as necessary.
mapBatch := func(setStateBatch map[int64]*batchCompleterSetState) *riverdriver.JobSetStateIfRunningManyParams {
params := &riverdriver.JobSetStateIfRunningManyParams{
ID: make([]int64, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
MaxAttempts: make([]*int, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
ID: make([]int64, len(setStateBatch)),
Attempt: make([]*int, len(setStateBatch)),
ErrData: make([][]byte, len(setStateBatch)),
FinalizedAt: make([]*time.Time, len(setStateBatch)),
ScheduledAt: make([]*time.Time, len(setStateBatch)),
SnoozeDoIncrement: make([]bool, len(setStateBatch)),
State: make([]rivertype.JobState, len(setStateBatch)),
}
var i int
for _, setState := range setStateBatch {
params.ID[i] = setState.Params.ID
params.Attempt[i] = setState.Params.Attempt
params.ErrData[i] = setState.Params.ErrData
params.FinalizedAt[i] = setState.Params.FinalizedAt
params.MaxAttempts[i] = setState.Params.MaxAttempts
params.ScheduledAt[i] = setState.Params.ScheduledAt
params.SnoozeDoIncrement[i] = setState.Params.SnoozeDoIncrement
params.State[i] = setState.Params.State
i++
}
Expand All @@ -412,12 +414,13 @@ func (c *BatchCompleter) handleBatch(ctx context.Context) error {
for i := 0; i < len(setStateBatch); i += c.completionMaxSize {
endIndex := min(i+c.completionMaxSize, len(params.ID)) // beginning of next sub-batch or end of slice
subBatch := &riverdriver.JobSetStateIfRunningManyParams{
ID: params.ID[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
MaxAttempts: params.MaxAttempts[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
State: params.State[i:endIndex],
ID: params.ID[i:endIndex],
Attempt: params.Attempt[i:endIndex],
ErrData: params.ErrData[i:endIndex],
FinalizedAt: params.FinalizedAt[i:endIndex],
ScheduledAt: params.ScheduledAt[i:endIndex],
SnoozeDoIncrement: params.SnoozeDoIncrement[i:endIndex],
State: params.State[i:endIndex],
}
jobRowsSubBatch, err := completeSubBatch(subBatch)
if err != nil {
Expand Down
87 changes: 75 additions & 12 deletions internal/riverinternaltest/riverdrivertest/riverdrivertest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,38 +1979,32 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,

setStateManyParams := func(params ...*riverdriver.JobSetStateIfRunningParams) *riverdriver.JobSetStateIfRunningManyParams {
batchParams := &riverdriver.JobSetStateIfRunningManyParams{}
// ID: make([]int64, len(params)),
// ErrData: make([]byte, len(params)),
// FinalizedAt: make([]*time.Time, len(params)),
// MaxAttempts: []*int{maxAttempts},
// ScheduledAt: []*time.Time{scheduledAt},
// State: []rivertype.JobState{params.State},
// }
for _, param := range params {
var (
attempt *int
errData []byte
finalizedAt *time.Time
maxAttempts *int
scheduledAt *time.Time
)
if param.Attempt != nil {
attempt = param.Attempt
}
if param.ErrData != nil {
errData = param.ErrData
}
if param.FinalizedAt != nil {
finalizedAt = param.FinalizedAt
}
if param.MaxAttempts != nil {
maxAttempts = param.MaxAttempts
}
if param.ScheduledAt != nil {
scheduledAt = param.ScheduledAt
}

batchParams.ID = append(batchParams.ID, param.ID)
batchParams.Attempt = append(batchParams.Attempt, attempt)
batchParams.ErrData = append(batchParams.ErrData, errData)
batchParams.FinalizedAt = append(batchParams.FinalizedAt, finalizedAt)
batchParams.MaxAttempts = append(batchParams.MaxAttempts, maxAttempts)
batchParams.ScheduledAt = append(batchParams.ScheduledAt, scheduledAt)
batchParams.SnoozeDoIncrement = append(batchParams.SnoozeDoIncrement, param.SnoozeDoIncrement)
batchParams.State = append(batchParams.State, param.State)
}

Expand Down Expand Up @@ -2227,6 +2221,75 @@ func Exercise[TTx any](ctx context.Context, t *testing.T,
})
})

t.Run("JobSetStateIfRunningMany_JobSetStateSnoozed", func(t *testing.T) {
t.Parallel()

t.Run("SnoozesARunningJob_WithNoPreexistingMetadata", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

now := time.Now().UTC()
snoozeUntil := now.Add(1 * time.Minute)

job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
Attempt: ptrutil.Ptr(5),
State: ptrutil.Ptr(rivertype.JobStateRunning),
UniqueKey: []byte("unique-key"),
})

jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateSnoozed(job.ID, snoozeUntil, 4)))
require.NoError(t, err)
jobAfter := jobsAfter[0]
require.Equal(t, 4, jobAfter.Attempt)
require.Equal(t, job.MaxAttempts, jobAfter.MaxAttempts)
require.JSONEq(t, `{"snoozes": 1}`, string(jobAfter.Metadata))
require.Equal(t, rivertype.JobStateScheduled, jobAfter.State)
require.WithinDuration(t, snoozeUntil, jobAfter.ScheduledAt, time.Microsecond)

jobUpdated, err := exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, 4, jobUpdated.Attempt)
require.Equal(t, job.MaxAttempts, jobUpdated.MaxAttempts)
require.JSONEq(t, `{"snoozes": 1}`, string(jobUpdated.Metadata))
require.Equal(t, rivertype.JobStateScheduled, jobUpdated.State)
require.Equal(t, "unique-key", string(jobUpdated.UniqueKey))
})

t.Run("SnoozesARunningJob_WithPreexistingMetadata", func(t *testing.T) {
t.Parallel()

exec, _ := setup(ctx, t)

now := time.Now().UTC()
snoozeUntil := now.Add(1 * time.Minute)

job := testfactory.Job(ctx, t, exec, &testfactory.JobOpts{
Attempt: ptrutil.Ptr(5),
State: ptrutil.Ptr(rivertype.JobStateRunning),
UniqueKey: []byte("unique-key"),
Metadata: []byte(`{"foo": "bar", "snoozes": 5}`),
})

jobsAfter, err := exec.JobSetStateIfRunningMany(ctx, setStateManyParams(riverdriver.JobSetStateSnoozed(job.ID, snoozeUntil, 4)))
require.NoError(t, err)
jobAfter := jobsAfter[0]
require.Equal(t, 4, jobAfter.Attempt)
require.Equal(t, job.MaxAttempts, jobAfter.MaxAttempts)
require.JSONEq(t, `{"foo": "bar", "snoozes": 6}`, string(jobAfter.Metadata))
require.Equal(t, rivertype.JobStateScheduled, jobAfter.State)
require.WithinDuration(t, snoozeUntil, jobAfter.ScheduledAt, time.Microsecond)

jobUpdated, err := exec.JobGetByID(ctx, job.ID)
require.NoError(t, err)
require.Equal(t, 4, jobUpdated.Attempt)
require.Equal(t, job.MaxAttempts, jobUpdated.MaxAttempts)
require.JSONEq(t, `{"foo": "bar", "snoozes": 6}`, string(jobUpdated.Metadata))
require.Equal(t, rivertype.JobStateScheduled, jobUpdated.State)
require.Equal(t, "unique-key", string(jobUpdated.UniqueKey))
})
})

t.Run("JobSetStateIfRunningMany_MultipleJobsAtOnce", func(t *testing.T) {
t.Parallel()

Expand Down
13 changes: 7 additions & 6 deletions job_complete_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ func JobCompleteTx[TDriver riverdriver.Driver[TTx], TTx any, TArgs JobArgs](ctx
execTx := driver.UnwrapExecutor(tx)
params := riverdriver.JobSetStateCompleted(job.ID, time.Now())
rows, err := pilot.JobSetStateIfRunningMany(ctx, execTx, &riverdriver.JobSetStateIfRunningManyParams{
ID: []int64{params.ID},
ErrData: [][]byte{params.ErrData},
FinalizedAt: []*time.Time{params.FinalizedAt},
MaxAttempts: []*int{params.MaxAttempts},
ScheduledAt: []*time.Time{params.ScheduledAt},
State: []rivertype.JobState{params.State},
ID: []int64{params.ID},
Attempt: []*int{params.Attempt},
ErrData: [][]byte{params.ErrData},
FinalizedAt: []*time.Time{params.FinalizedAt},
ScheduledAt: []*time.Time{params.ScheduledAt},
SnoozeDoIncrement: []bool{params.SnoozeDoIncrement},
State: []rivertype.JobState{params.State},
})
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions job_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,9 +293,9 @@ func (e *jobExecutor) reportResult(ctx context.Context, res *jobExecutorResult)
// smaller than the scheduler's run interval.
var params *riverdriver.JobSetStateIfRunningParams
if nextAttemptScheduledAt.Sub(e.Time.NowUTC()) <= e.SchedulerInterval {
params = riverdriver.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)
params = riverdriver.JobSetStateSnoozedAvailable(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.Attempt-1)
} else {
params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.MaxAttempts+1)
params = riverdriver.JobSetStateSnoozed(e.JobRow.ID, nextAttemptScheduledAt, e.JobRow.Attempt-1)
}
if err := e.Completer.JobSetStateIfRunning(ctx, e.stats, params); err != nil {
e.Logger.ErrorContext(ctx, e.Name+": Error snoozing job",
Expand Down
12 changes: 6 additions & 6 deletions job_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,11 @@ func TestJobExecutor_Execute(t *testing.T) {
require.Equal(t, "", job.Errors[0].Trace)
})

t.Run("JobSnoozeErrorReschedulesJobAndIncrementsMaxAttempts", func(t *testing.T) {
t.Run("JobSnoozeErrorReschedulesJobAndDecrementsAttempt", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)
maxAttemptsBefore := bundle.jobRow.MaxAttempts
attemptBefore := bundle.jobRow.Attempt

cancelErr := JobSnooze(30 * time.Minute)
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow)
Expand All @@ -381,15 +381,15 @@ func TestJobExecutor_Execute(t *testing.T) {
require.NoError(t, err)
require.Equal(t, rivertype.JobStateScheduled, job.State)
require.WithinDuration(t, time.Now().Add(30*time.Minute), job.ScheduledAt, 2*time.Second)
require.Equal(t, maxAttemptsBefore+1, job.MaxAttempts)
require.Equal(t, attemptBefore-1, job.Attempt)
require.Empty(t, job.Errors)
})

t.Run("JobSnoozeErrorInNearFutureMakesJobAvailableAndIncrementsMaxAttempts", func(t *testing.T) {
t.Run("JobSnoozeErrorInNearFutureMakesJobAvailableAndDecrementsAttempt", func(t *testing.T) {
t.Parallel()

executor, bundle := setup(t)
maxAttemptsBefore := bundle.jobRow.MaxAttempts
attemptBefore := bundle.jobRow.Attempt

cancelErr := JobSnooze(time.Millisecond)
executor.WorkUnit = newWorkUnitFactoryWithCustomRetry(func() error { return cancelErr }, nil).MakeUnit(bundle.jobRow)
Expand All @@ -401,7 +401,7 @@ func TestJobExecutor_Execute(t *testing.T) {
require.NoError(t, err)
require.Equal(t, rivertype.JobStateAvailable, job.State)
require.WithinDuration(t, time.Now(), job.ScheduledAt, 2*time.Second)
require.Equal(t, maxAttemptsBefore+1, job.MaxAttempts)
require.Equal(t, attemptBefore-1, job.Attempt)
require.Empty(t, job.Errors)
})

Expand Down
10 changes: 6 additions & 4 deletions retry_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ type DefaultClientRetryPolicy struct {
// try will be scheduled in 1 seconds, 16 seconds after the second, 1 minute and
// 21 seconds after the third, etc.
//
// In order to avoid penalizing jobs that are snoozed, the number of errors is
// used instead of the attempt count. This means that snoozing a job (even
// repeatedly) will not lead to a future error having a longer than expected
// retry delay.
// Snoozes do not count as attempts and do not influence retry behavior.
// Earlier versions of River would allow the attempt to increment each time a
// job was snoozed. Although this has been changed and snoozes now decrement the
// attempt count, we can maintain the same retry schedule even for pre-existing
// jobs by using the number of errors instead of the attempt count. This ensures
// consistent behavior across River versions.
//
// At degenerately high retry counts (>= 310) the policy starts adding the
// equivalent of the maximum of time.Duration to each retry, about 292 years.
Expand Down
34 changes: 18 additions & 16 deletions riverdriver/river_driver_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,12 +320,13 @@ type JobSetCompleteIfRunningManyParams struct {
// running job. Use one of the constructors below to ensure a correct
// combination of parameters.
type JobSetStateIfRunningParams struct {
ID int64
ErrData []byte
FinalizedAt *time.Time
MaxAttempts *int
ScheduledAt *time.Time
State rivertype.JobState
ID int64
Attempt *int
ErrData []byte
FinalizedAt *time.Time
ScheduledAt *time.Time
SnoozeDoIncrement bool
State rivertype.JobState
}

func JobSetStateCancelled(id int64, finalizedAt time.Time, errData []byte) *JobSetStateIfRunningParams {
Expand All @@ -348,24 +349,25 @@ func JobSetStateErrorRetryable(id int64, scheduledAt time.Time, errData []byte)
return &JobSetStateIfRunningParams{ID: id, ErrData: errData, ScheduledAt: &scheduledAt, State: rivertype.JobStateRetryable}
}

func JobSetStateSnoozed(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, MaxAttempts: &maxAttempts, ScheduledAt: &scheduledAt, State: rivertype.JobStateScheduled}
func JobSetStateSnoozed(id int64, scheduledAt time.Time, attempt int) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, Attempt: &attempt, ScheduledAt: &scheduledAt, SnoozeDoIncrement: true, State: rivertype.JobStateScheduled}
}

func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, maxAttempts int) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, MaxAttempts: &maxAttempts, ScheduledAt: &scheduledAt, State: rivertype.JobStateAvailable}
func JobSetStateSnoozedAvailable(id int64, scheduledAt time.Time, attempt int) *JobSetStateIfRunningParams {
return &JobSetStateIfRunningParams{ID: id, Attempt: &attempt, ScheduledAt: &scheduledAt, SnoozeDoIncrement: true, State: rivertype.JobStateAvailable}
}

// JobSetStateIfRunningManyParams are parameters to update the state of
// currently running jobs. Use one of the constructors below to ensure a correct
// combination of parameters.
type JobSetStateIfRunningManyParams struct {
ID []int64
ErrData [][]byte
FinalizedAt []*time.Time
MaxAttempts []*int
ScheduledAt []*time.Time
State []rivertype.JobState
ID []int64
Attempt []*int
ErrData [][]byte
FinalizedAt []*time.Time
ScheduledAt []*time.Time
SnoozeDoIncrement []bool
State []rivertype.JobState
}

type JobUpdateParams struct {
Expand Down
Loading

0 comments on commit 3f43b90

Please sign in to comment.