diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ffe894b..0076cbb6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Periodic jobs now have a `"periodic": true` attribute set in their metadata to make them more easily distinguishable from other types of jobs. [PR #728](https://github.com/riverqueue/river/pull/728). + ## [0.15.0] - 2024-12-26 ### Added diff --git a/internal/maintenance/periodic_job_enqueuer.go b/internal/maintenance/periodic_job_enqueuer.go index 68b4a987..232d3c6a 100644 --- a/internal/maintenance/periodic_job_enqueuer.go +++ b/internal/maintenance/periodic_job_enqueuer.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/tidwall/sjson" + "github.com/riverqueue/river/riverdriver" "github.com/riverqueue/river/rivershared/baseservice" "github.com/riverqueue/river/rivershared/startstop" @@ -371,6 +373,11 @@ func (s *PeriodicJobEnqueuer) insertParamsFromConstructor(ctx context.Context, c insertParams.ScheduledAt = &scheduledAt } + if insertParams.Metadata, err = sjson.SetBytes(insertParams.Metadata, "periodic", true); err != nil { + s.Logger.ErrorContext(ctx, s.Name+": Error setting periodic metadata", "error", err.Error()) + return nil, false + } + return insertParams, true } diff --git a/internal/maintenance/periodic_job_enqueuer_test.go b/internal/maintenance/periodic_job_enqueuer_test.go index a10ead11..ca79ccc4 100644 --- a/internal/maintenance/periodic_job_enqueuer_test.go +++ b/internal/maintenance/periodic_job_enqueuer_test.go @@ -173,6 +173,44 @@ func TestPeriodicJobEnqueuer(t *testing.T) { requireNJobs(t, bundle.exec, "periodic_job_1500ms", 1) }) + t.Run("SetsPeriodicMetadataAttribute", func(t *testing.T) { + t.Parallel() + + svc, bundle := setup(t) + + jobConstructorWithMetadata := func(name string, metadata []byte) func() (*rivertype.JobInsertParams, error) { + return func() (*rivertype.JobInsertParams, error) { + params, err := jobConstructorFunc(name, false)() + if err != nil { + return nil, err + } + params.Metadata = metadata + return params, nil + } + } + + svc.AddMany([]*PeriodicJob{ + {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_nil", nil)}, + {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_empty_string", []byte(""))}, + {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_empty_obj", []byte("{}"))}, + {ScheduleFunc: periodicIntervalSchedule(500 * time.Millisecond), ConstructorFunc: jobConstructorWithMetadata("p_md_existing", []byte(`{"key": "value"}`))}, + }) + + startService(t, svc) + + svc.TestSignals.InsertedJobs.WaitOrTimeout() + + assertMetadata := func(name string, expected string) { + job := requireNJobs(t, bundle.exec, name, 1)[0] + require.JSONEq(t, expected, string(job.Metadata)) + } + + assertMetadata("p_md_nil", `{"periodic": true}`) + assertMetadata("p_md_empty_string", `{"periodic": true}`) + assertMetadata("p_md_empty_obj", `{"periodic": true}`) + assertMetadata("p_md_existing", `{"key": "value", "periodic": true}`) + }) + t.Run("SetsScheduledAtAccordingToExpectedNextRunAt", func(t *testing.T) { t.Parallel()