Skip to content

Commit

Permalink
fix: preserve time zone information in Task Scheduler (#25112) (#25113)
Browse files Browse the repository at this point in the history
Avoid converting times to int64 in the Task Scheduler
to preserve time zone information. This corrects a
failure after fall back time changes which halts
every-type tasks

closes #25110

(cherry picked from commit ebb597d)

closes #25111
  • Loading branch information
davidby-influx authored Jun 28, 2024
1 parent 26db97e commit 0b7cd24
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 18 deletions.
98 changes: 98 additions & 0 deletions task/backend/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,96 @@ func TestSchedule_Next(t *testing.T) {
})
}

func TestTreeScheduler_TimeChange(t *testing.T) {
loc := mustParseLocation("America/Los_Angeles")
tests := []struct {
name string // also used as the cron time string
start time.Time
timeElapsed time.Duration
}{
{
// Daylight Savings time boundary autumn (fall back)
start: time.Date(2023, 11, 05, 01, 58, 00, 0, loc),
name: "@every 1m",
timeElapsed: time.Minute,
},
{
// Daylight Savings time boundary autumn (fall back)
start: time.Date(2023, 11, 05, 00, 00, 00, 0, loc),
name: "@every 1h",
timeElapsed: time.Hour,
},
{
// Daylight Savings time boundary (spring forward)
start: time.Date(2023, 3, 12, 00, 00, 00, 0, loc),
name: "@every 1h",
timeElapsed: time.Hour,
},
{
// Daylight Savings time boundary (spring forward)
start: time.Date(2023, 3, 12, 01, 58, 00, 0, loc),
name: "@every 1m",
timeElapsed: time.Minute,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
}
}}
mockTime := clock.NewMock()
mockTime.Set(tt.start)
sch, _, err := NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error {
return nil
}},
WithTime(mockTime),
WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, ts, err := NewSchedule(tt.name, mockTime.Now().UTC())
if err != nil {
t.Fatal(err)
}

err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: ts})
if err != nil {
t.Fatal(err)
}
go func() {
sch.mu.Lock()
mockTime.Set(mockTime.Now().UTC().Add(17 * tt.timeElapsed))
sch.mu.Unlock()
}()

after := time.After(6 * time.Second)
oldCheckC := ts
for i := 0; i < 16; i++ {
select {
case checkC := <-c:
if checkC.Sub(oldCheckC) != tt.timeElapsed {
t.Fatalf("task didn't fire on correct interval fired on %s interval", checkC.Sub(oldCheckC))
}
if !checkC.Truncate(tt.timeElapsed).Equal(checkC) {
t.Fatalf("task didn't fire at the correct time boundary")
}
oldCheckC = checkC
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i)
}
}
})
}
}

func TestTreeScheduler_Stop(t *testing.T) {
now := time.Now().Add(-20 * time.Second)
mockTime := clock.NewMock()
Expand Down Expand Up @@ -641,3 +731,11 @@ func TestNewSchedule(t *testing.T) {
})
}
}

func mustParseLocation(tzname string) *time.Location {
loc, err := time.LoadLocation(tzname)
if err != nil {
panic(err)
}
return loc
}
36 changes: 18 additions & 18 deletions task/backend/scheduler/treescheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ const (
type TreeScheduler struct {
mu sync.RWMutex
priorityQueue *btree.BTree
nextTime map[ID]int64 // we need this index so we can delete items from the scheduled
nextTime map[ID]time.Time // we need this index so we can delete items from the scheduled
when time.Time
executor Executor
onErr ErrorFunc
Expand Down Expand Up @@ -114,7 +114,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
s := &TreeScheduler{
executor: executor,
priorityQueue: btree.New(degreeBtreeScheduled),
nextTime: map[ID]int64{},
nextTime: map[ID]time.Time{},
onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {},
time: clock.New(),
done: make(chan struct{}, 1),
Expand Down Expand Up @@ -173,7 +173,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
continue schedulerLoop
}
it := min.(Item)
if ts := s.time.Now().UTC(); it.When().After(ts) {
if ts := s.time.Now(); it.When().After(ts) {
s.timer.Reset(ts.Sub(it.When()))
s.mu.Unlock()
continue schedulerLoop
Expand Down Expand Up @@ -245,7 +245,7 @@ func (s *TreeScheduler) iterator(ts time.Time) btree.ItemIterator {
return false
}
it := i.(Item) // we want it to panic if things other than Items are populating the scheduler, as it is something we can't recover from.
if time.Unix(it.next+it.Offset, 0).After(ts) {
if it.next.Add(it.Offset).After(ts) {
return false
}
// distribute to the right worker.
Expand Down Expand Up @@ -310,7 +310,7 @@ func (s *TreeScheduler) work(ctx context.Context, ch chan Item) {
s.wg.Done()
}()
for it = range ch {
t := time.Unix(it.next, 0)
t := it.next
err := func() (err error) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -342,7 +342,7 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {
it := Item{
cron: sch.Schedule(),
id: sch.ID(),
Offset: int64(sch.Offset().Seconds()),
Offset: sch.Offset(),
//last: sch.LastScheduled().Unix(),
}
nt, err := it.cron.Next(sch.LastScheduled())
Expand All @@ -351,8 +351,8 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {
s.onErr(context.Background(), it.id, time.Time{}, err)
return err
}
it.next = nt.UTC().Unix()
it.when = it.next + it.Offset
it.next = nt
it.when = it.next.Add(it.Offset)

s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -377,7 +377,7 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {
id: it.id,
})
}
s.nextTime[it.id] = it.next + it.Offset
s.nextTime[it.id] = it.next.Add(it.Offset)

// insert the new task run time
s.priorityQueue.ReplaceOrInsert(it)
Expand All @@ -386,33 +386,33 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {

// Item is a task in the scheduler.
type Item struct {
when int64
when time.Time
id ID
cron Schedule
next int64
Offset int64
next time.Time
Offset time.Duration
}

func (it Item) Next() time.Time {
return time.Unix(it.next, 0)
return it.next
}

func (it Item) When() time.Time {
return time.Unix(it.when, 0)
return it.when
}

// Less tells us if one Item is less than another
func (it Item) Less(bItem btree.Item) bool {
it2 := bItem.(Item)
return it.when < it2.when || ((it.when == it2.when) && it.id < it2.id)
return it2.when.After(it.when) || (it.when.Equal(it2.when) && it.id < it2.id)
}

func (it *Item) updateNext() error {
newNext, err := it.cron.Next(time.Unix(it.next, 0))
newNext, err := it.cron.Next(it.next)
if err != nil {
return err
}
it.next = newNext.UTC().Unix()
it.when = it.next + it.Offset
it.next = newNext
it.when = it.next.Add(it.Offset)
return nil
}

0 comments on commit 0b7cd24

Please sign in to comment.