Skip to content

Commit

Permalink
set "periodic": true in periodic job metadata
Browse files Browse the repository at this point in the history
This attribute makes periodic jobs easier to identify over regular jobs.
  • Loading branch information
bgentry committed Jan 23, 2025
1 parent f945c3f commit f536c6e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Periodic jobs now have a `"periodic": true` attribute set in their metadata to make them more easily distinguishable from other types of jobs. [PR #728](https://github.com/riverqueue/river/pull/728).

## [0.15.0] - 2024-12-26

### Added
Expand Down
7 changes: 7 additions & 0 deletions internal/maintenance/periodic_job_enqueuer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

"github.com/tidwall/sjson"

"github.com/riverqueue/river/riverdriver"
"github.com/riverqueue/river/rivershared/baseservice"
"github.com/riverqueue/river/rivershared/startstop"
Expand Down Expand Up @@ -371,6 +373,11 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c
insertParams.ScheduledAt = &scheduledAt
}

if insertParams.Metadata, err = sjson.SetBytes(insertParams.Metadata, "periodic", true); err != nil {
s.Logger.ErrorContext(ctx, s.Name+": Error setting periodic metadata", "error", err.Error())
return nil, false
}

return insertParams, true
}

Expand Down
38 changes: 38 additions & 0 deletions internal/maintenance/periodic_job_enqueuer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,44 @@ func TestPeriodicJobEnqueuer(t *testing.T) {
requireNJobs(t, bundle.exec, "periodic_job_1500ms", 1)
})

t.Run("SetsPeriodicMetadataAttribute", func(t *testing.T) {
t.Parallel()

svc, bundle := setup(t)

jobConstructorWithMetadata := func(name string, metadata []byte) func() (*rivertype.JobInsertParams, error) {
return func() (*rivertype.JobInsertParams, error) {
params, err := jobConstructorFunc(name, false)()
if err != nil {
return nil, err
}
params.Metadata = metadata
return params, nil
}
}

svc.AddMany([]*PeriodicJob{
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_nil", nil)},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_empty_string", []byte(""))},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_empty_obj", []byte("{}"))},
{ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_existing", []byte(`{"key": "value"}`))},
})

startService(t, svc)

svc.TestSignals.InsertedJobs.WaitOrTimeout()

assertMetadata := func(name string, expected string) {
job := requireNJobs(t, bundle.exec, name, 1)[0]
require.JSONEq(t, expected, string(job.Metadata))
}

assertMetadata("p_md_nil", `{"periodic": true}`)
assertMetadata("p_md_empty_string", `{"periodic": true}`)
assertMetadata("p_md_empty_obj", `{"periodic": true}`)
assertMetadata("p_md_existing", `{"key": "value", "periodic": true}`)
})

t.Run("SetsScheduledAtAccordingToExpectedNextRunAt", func(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit f536c6e

Please sign in to comment.