Skip to content

Commit

Permalink
fix: Add cron trigger (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
codelite7 authored Oct 21, 2022
1 parent cff12cd commit f8ff530
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 103 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ Planned Backends:

Implemented Triggers:
* ExecuteOnce - Executes once at the specified time, respects retries and expiration
* Cron - Executes on a cron schedule, respects retries and expiration

Planned Triggers:
* CRON - Execute on a cron schedule
* Natural language - Execute every other tuesday at 9:34am

FAQ:
Expand Down Expand Up @@ -165,7 +165,6 @@ There are no prerequisites, it's an importable go library. If you're using a bac
<!-- ROADMAP -->
## Roadmap

- [ ] Add CRON trigger
- [ ] Add Cockroachdb backend support
- [ ] Add Natural language trigger

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ module github.com/catalystsquad/go-scheduler
go 1.18

require (
github.com/allegro/bigcache/v3 v3.0.2
github.com/brianvoe/gofakeit/v6 v6.19.0
github.com/catalystsquad/app-utils-go v1.0.5
github.com/emirpasic/gods v1.18.1
github.com/google/uuid v1.3.0
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
github.com/joomcode/errorx v1.1.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
github.com/allegro/bigcache/v3 v3.0.2 h1:AKZCw+5eAaVyNTBmI2fgyPVJhHkdWder3O9IrprcQfI=
github.com/allegro/bigcache/v3 v3.0.2/go.mod h1:aPyh7jEvrog9zAwx5N7+JUQX5dZTSGpxF1LAR4dr35I=
github.com/brianvoe/gofakeit/v6 v6.19.0 h1:g+yJ+meWVEsAmR+bV4mNM/eXI0N+0pZ3D+Mi+G5+YQo=
github.com/brianvoe/gofakeit/v6 v6.19.0/go.mod h1:Ow6qC71xtwm79anlwKRlWZW6zVq9D2XHE4QSSMP/rU8=
github.com/catalystsquad/app-utils-go v1.0.5 h1:4zlrrCzDwZ144+whL8PFd/UHK/Oo1/hriYzqDsob5dU=
Expand All @@ -13,6 +11,8 @@ github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FM
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75 h1:f0n1xnMSmBLzVfsMMvriDyA75NB/oBgILX2GcHXIQzY=
github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75/go.mod h1:g2644b03hfBX9Ov0ZBDgXXens4rxSxmqFBbhvKv2yVA=
github.com/joomcode/errorx v1.1.0 h1:dizuSG6yHzlvXOOGHW00gwsmM4Sb9x/yWEfdtPztqcs=
github.com/joomcode/errorx v1.1.0/go.mod h1:eQzdtdlNyN7etw6YCS4W4+lu442waxZYw5yvz0ULrRo=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down
34 changes: 34 additions & 0 deletions pkg/cron_trigger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package pkg

import (
"github.com/gorhill/cronexpr"
"time"
)

type CronTrigger struct {
expression *cronexpr.Expression
}

func (t CronTrigger) GetNextFireTime(task Task) *time.Time {
var nextFireTime time.Time
nextFireTimes := t.expression.NextN(time.Now(), 2)
if task.LastFireTime != nil && task.LastFireTime.Equal(nextFireTimes[0]) {
// this can happen if the task executes faster than the tick rate, in which case, schedule the task for the next scheduled time
nextFireTime = nextFireTimes[1]
} else {
nextFireTime = nextFireTimes[0]
}
return &nextFireTime
}

func (t CronTrigger) IsRecurring() bool {
return true
}

func NewCronTrigger(cronExpression string) (*CronTrigger, error) {
expression, err := cronexpr.Parse(cronExpression)
if err != nil {
return nil, err
}
return &CronTrigger{expression: expression}, nil
}
15 changes: 9 additions & 6 deletions pkg/trigger.go → pkg/execute_once_trigger.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
package pkg

import "time"
import (
"time"
)

type TriggerInterface interface {
GetNextFireTime() *time.Time
}
type ExecuteOnceTrigger struct {
FireAt time.Time
FireAt time.Time `json:"fire_at"`
}

func (t ExecuteOnceTrigger) GetNextFireTime() *time.Time {
func (t ExecuteOnceTrigger) GetNextFireTime(task Task) *time.Time {
return &t.FireAt
}

func (t ExecuteOnceTrigger) IsRecurring() bool {
return false
}

func NewExecuteOnceTrigger(fireAt time.Time) *ExecuteOnceTrigger {
return &ExecuteOnceTrigger{FireAt: fireAt}
}
25 changes: 21 additions & 4 deletions pkg/memory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ func (m *MemoryStore) Initialize() error {
func (m *MemoryStore) ScheduleTask(task Task) error {
m.lock.Lock()
defer m.lock.Unlock()
m.ScheduleTaskInScheduleTree(task)
m.taskTree.Put(task.IdString(), task)
return nil
}

func (m *MemoryStore) ScheduleTaskInScheduleTree(task Task) {
scheduleKey := GetScheduleTreeKey(task)
taskId := task.IdString()
m.scheduleTree.Put(scheduleKey, taskId) // only store the ID for less memory usage
m.taskTree.Put(taskId, task)
return nil
}

func (m *MemoryStore) UpdateTask(task Task) error {
Expand Down Expand Up @@ -60,16 +64,29 @@ func (m *MemoryStore) GetUpcomingTasks(limit time.Time) ([]Task, error) {
}
id := value.(string)
taskValue, found := m.taskTree.Get(id)
if found {
if shouldReturnTask(found, timestamp, taskValue) {
tasks = append(tasks, taskValue.(Task))
} else {
// task doesn't exist, remove it from the schedule tree
// task doesn't exist or the next fire time has been updated by the scheduler, remove it from the schedule tree
m.scheduleTree.Remove(key)
if found {
// task still exists but has a different fire time, add the task to the schedule tree with the new fire time
m.ScheduleTaskInScheduleTree(taskValue.(Task))
}
}
}
return tasks, nil
}

func shouldReturnTask(found bool, keyTimestamp time.Time, taskValue interface{}) bool {
if !found {
return false
}
task := taskValue.(Task)
taskNextFireTime := task.NextFireTime
return keyTimestamp.Format(time.RFC3339Nano) == taskNextFireTime.Format(time.RFC3339Nano)
}

func NewMemoryStore() StoreInterface {
store := &MemoryStore{}
return store
Expand Down
64 changes: 36 additions & 28 deletions pkg/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func (s *Scheduler) ScheduleTask(task Task) error {
if task.ExpireAfter == nil {
task.ExpireAfter = s.Window
}
task.NextFireTime = task.GetTrigger().GetNextFireTime(task)
return s.store.ScheduleTask(task)
}

Expand Down Expand Up @@ -93,45 +94,53 @@ func (s *Scheduler) scheduleJobs(firedAt time.Time) {
}
}

func (s *Scheduler) updateTaskInProgressTimestamp(task Task) error {
inProgressAt := time.Now().UTC()
task.InProgressAt = &inProgressAt
return s.store.UpdateTask(task)
}

func (s *Scheduler) shouldHandleTask(task Task) bool {
if task.InProgressAt == nil {
if !task.InProgress {
// not in progress, so we should handle it
return true
}
// check the expiration time against, if it's expired then that means it's run before but wasn't completed for some reason
// check the expiration time, if it's expired then that means it's run before but wasn't completed for some reason
// so we should run it again
expirationTime := task.InProgressAt.Add(*task.ExpireAfter)
expirationTime := task.LastFireTime.Add(*task.ExpireAfter)
return time.Now().After(expirationTime)
}

func (s *Scheduler) handleTask(task Task) {
// execute the handler
err := s.Handler(task)
if err == nil {
// no error, update the task's next fire time if it's a cron, or delete it if it's a single task
if err == nil || !task.RetryOnError {
// either no error, or there is an error but task shouldn't be retried, so update the next fire time so it doesn't get
// picked up again in the next window
s.updateNextFireTime(task)
} else {
// log the error
// just log the error, task will get picked up again in the next execution window since the fire time wasn't updated
logging.Log.WithError(err).WithFields(logrus.Fields{"id": task.Id}).Error("error handling task")
if !task.RetryOnError {
// task shouldn't be retried, so update the next fire time. Only do this for tasks that shouldn't be retried because
// if the next fire time isn't updated, then it will get picked up in the next window
s.updateNextFireTime(task)
}
}
}

func (s *Scheduler) markTaskInProgress(task *Task) error {
task.InProgress = true
task.LastFireTime = task.NextFireTime
logging.Log.Debug("setting task in progress")
return s.store.UpdateTask(*task)
}

func (s *Scheduler) updateNextFireTime(task Task) {
// only current trigger is execute once, so we just delete the task
err := s.store.DeleteTask(task.Id)
var err error
nextFireTime := time.Time{}
trigger := task.GetTrigger()
if trigger.IsRecurring() {
task.InProgress = false
nextFireTime = *task.GetTrigger().GetNextFireTime(task)
task.NextFireTime = &nextFireTime
logging.Log.WithFields(logrus.Fields{"current_time": time.Now().UTC().Format(time.RFC3339Nano), "next_fire_time": nextFireTime.Format(time.RFC3339Nano)}).Debug("scheduler setting next fire time for recurring trigger")
err = s.store.UpdateTask(task)
} else {
logging.Log.Debug("scheduler deleting task because trigger is non-recurring")
err = s.store.DeleteTask(task.Id)
}
if err != nil {
logging.Log.WithError(err).WithFields(logrus.Fields{"task_id": task.Id}).Error("error updating next fire time")
logging.Log.WithError(err).WithFields(logrus.Fields{"task_id": task.Id.String()}).Error("error updating or deleting task when updating next fire time")
}
}

Expand Down Expand Up @@ -167,15 +176,14 @@ func (s *Scheduler) consumeTasks() {
run = false // no more tasks left to handle
} else {
if s.shouldHandleTask(*task) {
inProgressAt := time.Now().UTC()
task.InProgressAt = &inProgressAt
err := s.store.UpdateTask(*task)
// mark task in progress
err := s.markTaskInProgress(task)
if err == nil {
// task set as in progress handle the task
go s.handleTask(*task)
} else {
if err != nil {
logging.Log.WithError(err).WithFields(logrus.Fields{"task_id": task.Id}).Error("error marking task in progress")
}
// failed to mark the task in progress, don't call the handler, will try again in the next window execution
logging.Log.WithError(err).WithFields(logrus.Fields{"task_id": task.Id}).Error("error marking task in progress")
}
}
}
Expand All @@ -202,10 +210,10 @@ func ScheduleTreeComparator(a, b interface{}) int {
}

func GetScheduleTreeKey(task Task) string {
return fmt.Sprintf("%s_%s", task.GetTrigger().GetNextFireTime().Format(time.RFC3339), task.Id.String())
return fmt.Sprintf("%s_%s", task.GetTrigger().GetNextFireTime(task).Format(time.RFC3339Nano), task.Id.String())
}

func getTimestampFromScheduleKey(key string) (time.Time, error) {
timestampString := strings.Split(key, "_")[0]
return time.Parse(time.RFC3339, timestampString)
return time.Parse(time.RFC3339Nano, timestampString)
}
20 changes: 14 additions & 6 deletions pkg/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (

type Task struct {
Id *uuid.UUID
Metadata interface{}
RetryOnError bool
ExpireAfter *time.Duration
InProgressAt *time.Time
ExecuteOnceTrigger *ExecuteOnceTrigger
Metadata interface{} `json:"metadata"`
RetryOnError bool `json:"retry_on_error"`
ExpireAfter *time.Duration `json:"expire_after"`
InProgress bool `json:"in_progress_at"`
LastFireTime *time.Time `json:"last_fire_time"`
NextFireTime *time.Time `json:"next_fire_time"`
ExecuteOnceTrigger *ExecuteOnceTrigger `json:"execute_once_trigger"`
CronTrigger *CronTrigger `json:"cron_trigger"`
}

func (t Task) GetIdBytes() []byte {
Expand All @@ -30,7 +33,12 @@ func TaskFromBytes(bytes []byte) (Task, error) {
}

func (t Task) GetTrigger() TriggerInterface {
return t.ExecuteOnceTrigger
if t.CronTrigger != nil {
return t.CronTrigger
} else if t.ExecuteOnceTrigger != nil {
return t.ExecuteOnceTrigger
}
return nil
}

func (t Task) IdString() string {
Expand Down
10 changes: 10 additions & 0 deletions pkg/trigger_interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package pkg

import (
"time"
)

type TriggerInterface interface {
GetNextFireTime(task Task) *time.Time
IsRecurring() bool
}
2 changes: 1 addition & 1 deletion pkg/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package pkg
import "time"

func TimeToString(theTime time.Time) string {
return theTime.Format(time.RFC3339)
return theTime.Format(time.RFC3339Nano)
}
Loading

0 comments on commit f8ff530

Please sign in to comment.