Skip to content

Commit

Permalink
Allow by period uniqueness to be based off scheduled time (#734)
Browse files Browse the repository at this point in the history
This one attempts to resolve #715. Currently, by period uniqueness
always bases the period off the current time, but there's a good
argument that if the job has been scheduled for a particular time in the
future, it should be based off that time instead.

This is one that could nominally be considered a small breaking change
in a Hyrum's Law sort of way, even though it's really patching what
could be considered a bug. Even though it was sort of broken before,
some apps may have come to depend on the broken behavior of the unique
code ignoring `ScheduledAt`. I'm not sure that it's a big enough problem
to be worth calling out though, so I didn't.

Fixes #715.
  • Loading branch information
brandur authored Jan 25, 2025
1 parent 4cea55a commit ee4cbd4
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Snoozing a job now causes its `attempt` to be _decremented_, whereas previously the `max_attempts` would be incremented. In either case, this avoids allowing a snooze to exhaust a job's retries; however the new behavior also avoids potential issues with wrapping the `max_attempts` value, and makes it simpler to implement a `RetryPolicy` based on either `attempt` or `max_attempts`. The number of snoozes is also tracked in the job's metadata as `snoozes` for debugging purposes.

The implementation of the builtin `RetryPolicy` implementations is not changed, so this change should not cause any user-facing breakage unless you're relying on `attempt - len(errors)` for some reason. [PR #730](https://github.com/riverqueue/river/pull/730).
- `ByPeriod` uniqueness is now based off a job's `ScheduledAt` instead of the current time if it has a value. [PR #734](https://github.com/riverqueue/river/pull/734).

## [0.15.0] - 2024-12-26

Expand Down
3 changes: 2 additions & 1 deletion internal/dbunique/db_unique.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/tidwall/sjson"

"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivershared/util/sliceutil"
"github.com/riverqueue/river/rivertype"
)
Expand Down Expand Up @@ -123,7 +124,7 @@ func buildUniqueKeyString(timeGen baseservice.TimeGenerator, uniqueOpts *UniqueO
}

if uniqueOpts.ByPeriod != time.Duration(0) {
lowerPeriodBound := timeGen.NowUTC().Truncate(uniqueOpts.ByPeriod)
lowerPeriodBound := ptrutil.ValOrDefaultFunc(params.ScheduledAt, timeGen.NowUTC).Truncate(uniqueOpts.ByPeriod)
sb.WriteString("&period=" + lowerPeriodBound.Format(time.RFC3339))
}

Expand Down
36 changes: 29 additions & 7 deletions internal/dbunique/db_unique_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/ptrutil"
"github.com/riverqueue/river/rivertype"
)

Expand All @@ -30,10 +31,11 @@ func TestUniqueKey(t *testing.T) {
stubSvc.StubNowUTC(now)

tests := []struct {
name string
argsFunc func() rivertype.JobArgs
uniqueOpts UniqueOpts
expectedJSON string
name string
argsFunc func() rivertype.JobArgs
modifyInsertParamsFunc func(insertParams *rivertype.JobInsertParams)
uniqueOpts UniqueOpts
expectedJSON string
}{
{
name: "ByArgsWithMultipleUniqueStructTagsAndDefaultStates",
Expand Down Expand Up @@ -165,6 +167,22 @@ func TestUniqueKey(t *testing.T) {
uniqueOpts: UniqueOpts{ByPeriod: time.Hour, ByState: []rivertype.JobState{rivertype.JobStateCompleted}},
expectedJSON: "&kind=worker_4&period=" + now.Truncate(time.Hour).Format(time.RFC3339),
},
{
name: "PeriodFromScheduledAt",
argsFunc: func() rivertype.JobArgs {
type TaskJobArgs struct {
JobArgsStaticKind
}
return TaskJobArgs{
JobArgsStaticKind: JobArgsStaticKind{kind: "worker_4"},
}
},
modifyInsertParamsFunc: func(insertParams *rivertype.JobInsertParams) {
insertParams.ScheduledAt = ptrutil.Ptr(now.Add(time.Hour))
},
uniqueOpts: UniqueOpts{ByPeriod: time.Hour},
expectedJSON: "&kind=worker_4&period=" + now.Add(time.Hour).Truncate(time.Hour).Format(time.RFC3339),
},
{
name: "ExcludeKindByArgs",
argsFunc: func() rivertype.JobArgs {
Expand Down Expand Up @@ -228,7 +246,7 @@ func TestUniqueKey(t *testing.T) {
states = tt.uniqueOpts.ByState
}

jobParams := &rivertype.JobInsertParams{
insertParams := &rivertype.JobInsertParams{
Args: args,
CreatedAt: &now,
EncodedArgs: encodedArgs,
Expand All @@ -241,12 +259,16 @@ func TestUniqueKey(t *testing.T) {
UniqueStates: UniqueStatesToBitmask(states),
}

uniqueKeyPreHash, err := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, jobParams)
if tt.modifyInsertParamsFunc != nil {
tt.modifyInsertParamsFunc(insertParams)
}

uniqueKeyPreHash, err := buildUniqueKeyString(stubSvc, &tt.uniqueOpts, insertParams)
require.NoError(t, err)
require.Equal(t, tt.expectedJSON, uniqueKeyPreHash)
expectedHash := sha256.Sum256([]byte(tt.expectedJSON))

uniqueKey, err := UniqueKey(stubSvc, &tt.uniqueOpts, jobParams)
uniqueKey, err := UniqueKey(stubSvc, &tt.uniqueOpts, insertParams)
require.NoError(t, err)
require.NotNil(t, uniqueKey)

Expand Down

0 comments on commit ee4cbd4

Please sign in to comment.