Skip to content

Commit

Permalink
enable the reindexer by default, add NeverSchedule (#718)
Browse files Browse the repository at this point in the history
  • Loading branch information
bgentry authored Jan 24, 2025
1 parent 0e97c89 commit e296461
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 11 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- `NeverSchedule` returns a `PeriodicSchedule` that never runs. This can be used to effectively disable the reindexer or any other maintenance service. [PR #718](https://github.com/riverqueue/river/pull/718).

### Changed

- The reindexer maintenance process has been enabled. As of now, it will reindex only the `river_job_args_index` and `river_jobs_metadata_index` `GIN` indexes, which are more prone to bloat than b-tree indexes. By default it runs daily at midnight UTC, but can be customized on the `river.Config` type via `ReindexerSchedule`. Most installations will benefit from this process, but it can be disabled altogether using `NeverSchedule`. [PR #718](https://github.com/riverqueue/river/pull/718).
- Periodic jobs now have a `"periodic": true` attribute set in their metadata to make them more easily distinguishable from other types of jobs. [PR #728](https://github.com/riverqueue/river/pull/728).

## [0.15.0] - 2024-12-26
Expand Down
27 changes: 25 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -3640,10 +3641,9 @@ func Test_Client_Maintenance(t *testing.T) {

t.Run("Reindexer", func(t *testing.T) {
t.Parallel()
t.Skip("Reindexer is disabled for further development")

config := newTestConfig(t, nil)
config.ReindexerSchedule = cron.Every(time.Second)
config.ReindexerSchedule = &runOnceSchedule{}

client, _ := setup(t, config)

Expand All @@ -3656,6 +3656,18 @@ func Test_Client_Maintenance(t *testing.T) {
})
}

type runOnceSchedule struct {
ran atomic.Bool
}

func (s *runOnceSchedule) Next(time.Time) time.Time {
if !s.ran.Swap(true) {
return time.Now()
}
// Return the maximum future time so that the schedule doesn't run again.
return time.Unix(1<<63-1, 0)
}

func Test_Client_QueueGet(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -4935,6 +4947,12 @@ func Test_NewClient_Defaults(t *testing.T) {
require.Zero(t, enqueuer.Config.AdvisoryLockPrefix)
require.False(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
require.Equal(t, []string{"river_job_args_index", "river_job_metadata_index"}, reindexer.Config.IndexNames)
now := time.Now().UTC()
nextMidnight := time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC).AddDate(0, 0, 1)
require.Equal(t, nextMidnight, reindexer.Config.ScheduleFunc(now))

require.Nil(t, client.config.ErrorHandler)
require.Equal(t, FetchCooldownDefault, client.config.FetchCooldown)
require.Equal(t, FetchPollIntervalDefault, client.config.FetchPollInterval)
Expand Down Expand Up @@ -4979,6 +4997,7 @@ func Test_NewClient_Overrides(t *testing.T) {
Logger: logger,
MaxAttempts: 5,
Queues: map[string]QueueConfig{QueueDefault: {MaxWorkers: 1}},
ReindexerSchedule: &periodicIntervalSchedule{interval: time.Hour},
RetryPolicy: retryPolicy,
TestOnly: true, // disables staggered start in maintenance services
Workers: workers,
Expand All @@ -4998,6 +5017,10 @@ func Test_NewClient_Overrides(t *testing.T) {
require.Equal(t, int32(123_456), enqueuer.Config.AdvisoryLockPrefix)
require.True(t, enqueuer.StaggerStartupIsDisabled())

reindexer := maintenance.GetService[*maintenance.Reindexer](client.queueMaintainer)
now := time.Now().UTC()
require.Equal(t, now.Add(time.Hour), reindexer.Config.ScheduleFunc(now))

require.Equal(t, errorHandler, client.config.ErrorHandler)
require.Equal(t, 123*time.Millisecond, client.config.FetchCooldown)
require.Equal(t, 124*time.Millisecond, client.config.FetchPollInterval)
Expand Down
10 changes: 5 additions & 5 deletions internal/maintenance/reindexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const (
ReindexerTimeoutDefault = 15 * time.Second
)

var defaultIndexNames = []string{} //nolint:gochecknoglobals
var defaultIndexNames = []string{"river_job_args_index", "river_job_metadata_index"} //nolint:gochecknoglobals

// Test-only properties.
type ReindexerTestSignals struct {
Expand Down Expand Up @@ -74,7 +74,7 @@ func NewReindexer(archetype *baseservice.Archetype, config *ReindexerConfig, exe

scheduleFunc := config.ScheduleFunc
if scheduleFunc == nil {
scheduleFunc = (&defaultReindexerSchedule{}).Next
scheduleFunc = (&DefaultReindexerSchedule{}).Next
}

return baseservice.Init(archetype, &Reindexer{
Expand Down Expand Up @@ -165,11 +165,11 @@ func (s *Reindexer) reindexOne(ctx context.Context, indexName string) error {
return nil
}

// defaultReindexerSchedule is a default schedule for the reindexer job which
// DefaultReindexerSchedule is a default schedule for the reindexer job which
// runs at midnight UTC daily.
type defaultReindexerSchedule struct{}
type DefaultReindexerSchedule struct{}

// Next returns the next scheduled time for the reindexer job.
func (s *defaultReindexerSchedule) Next(t time.Time) time.Time {
func (s *DefaultReindexerSchedule) Next(t time.Time) time.Time {
return t.Add(24 * time.Hour).Truncate(24 * time.Hour)
}
8 changes: 4 additions & 4 deletions internal/maintenance/reindexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestReindexer(t *testing.T) {

require.Equal(t, defaultIndexNames, svc.Config.IndexNames)
require.Equal(t, ReindexerTimeoutDefault, svc.Config.Timeout)
require.Equal(t, svc.Config.ScheduleFunc(bundle.now), (&defaultReindexerSchedule{}).Next(bundle.now))
require.Equal(t, svc.Config.ScheduleFunc(bundle.now), (&DefaultReindexerSchedule{}).Next(bundle.now))
})
}

Expand All @@ -138,23 +138,23 @@ func TestDefaultReindexerSchedule(t *testing.T) {
t.Run("WithMidnightInputReturnsMidnight24HoursLater", func(t *testing.T) {
t.Parallel()

schedule := &defaultReindexerSchedule{}
schedule := &DefaultReindexerSchedule{}
result := schedule.Next(time.Date(2023, 8, 31, 0, 0, 0, 0, time.UTC))
require.Equal(t, time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), result)
})

t.Run("WithMidnightInputReturnsMidnight24HoursLater", func(t *testing.T) {
t.Parallel()

schedule := &defaultReindexerSchedule{}
schedule := &DefaultReindexerSchedule{}
result := schedule.Next(time.Date(2023, 8, 31, 0, 0, 0, 0, time.UTC))
require.Equal(t, time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), result)
})

t.Run("With1NanosecondBeforeMidnightItReturnsUpcomingMidnight", func(t *testing.T) {
t.Parallel()

schedule := &defaultReindexerSchedule{}
schedule := &DefaultReindexerSchedule{}
result := schedule.Next(time.Date(2023, 8, 31, 23, 59, 59, 999999999, time.UTC))
require.Equal(t, time.Date(2023, 9, 1, 0, 0, 0, 0, time.UTC), result)
})
Expand Down
12 changes: 12 additions & 0 deletions periodic_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,18 @@ func NewPeriodicJob(scheduleFunc PeriodicSchedule, constructorFunc PeriodicJobCo
}
}

type neverSchedule struct{}

func (s *neverSchedule) Next(t time.Time) time.Time {
// Return the maximum future time so that the schedule never runs.
return time.Unix(1<<63-1, 0)
}

// NeverSchedule returns a PeriodicSchedule that never runs.
func NeverSchedule() PeriodicSchedule {
return &neverSchedule{}
}

type periodicIntervalSchedule struct {
interval time.Duration
}
Expand Down
11 changes: 11 additions & 0 deletions periodic_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import (
"github.com/riverqueue/river/rivershared/riversharedtest"
)

func TestNeverSchedule(t *testing.T) {
t.Parallel()

t.Run("NextReturnsMaximumTime", func(t *testing.T) {
t.Parallel()

schedule := NeverSchedule()
require.Equal(t, time.Unix(1<<63-1, 0), schedule.Next(time.Now()))
})
}

func TestPeriodicJobBundle(t *testing.T) {
t.Parallel()

Expand Down

0 comments on commit e296461

Please sign in to comment.