-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathscheduler.go
175 lines (148 loc) · 5.19 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package queue
import (
"errors"
"log"
"time"
"github.com/hibiken/asynq"
"github.com/robfig/cron/v3"
)
var ErrInvalidCronSpec = errors.New("invalid cron spec")
// Scheduler manages job scheduling with Asynq.
type Scheduler struct {
taskManager *asynq.PeriodicTaskManager
configProvider ConfigProvider
options SchedulerOptions
}
// SchedulerOptions contains options for the Scheduler.
type SchedulerOptions struct {
SyncInterval time.Duration
Location *time.Location
ConfigProvider ConfigProvider
Logger Logger
PreEnqueueFunc func(job *Job) // Pre-enqueue hook
PostEnqueueFunc func(job *JobInfo, err error) // Post-enqueue hook
}
// SchedulerOption defines a function signature for configuring the Scheduler.
type SchedulerOption func(*SchedulerOptions)
// WithSyncInterval sets the sync interval for the Scheduler's task manager.
func WithSyncInterval(interval time.Duration) SchedulerOption {
return func(opts *SchedulerOptions) {
opts.SyncInterval = interval
}
}
// WithSchedulerLocation sets the time location for the Scheduler.
func WithSchedulerLocation(loc *time.Location) SchedulerOption {
return func(opts *SchedulerOptions) {
opts.Location = loc
}
}
// WithConfigProvider sets a custom config provider for the Scheduler.
func WithConfigProvider(provider ConfigProvider) SchedulerOption {
return func(opts *SchedulerOptions) {
opts.ConfigProvider = provider
}
}
// WithSchedulerLogger sets a custom logger for the Scheduler.
func WithSchedulerLogger(logger Logger) SchedulerOption {
return func(opts *SchedulerOptions) {
opts.Logger = logger
}
}
// WithPreEnqueueFunc sets a function to be called before enqueuing a job.
func WithPreEnqueueFunc(fn func(job *Job)) SchedulerOption {
return func(opts *SchedulerOptions) {
opts.PreEnqueueFunc = fn
}
}
// WithPostEnqueueFunc sets a function to be called after enqueuing a job.
func WithPostEnqueueFunc(fn func(job *JobInfo, err error)) SchedulerOption {
return func(opts *SchedulerOptions) {
opts.PostEnqueueFunc = fn
}
}
// NewScheduler creates a new Scheduler instance with the provided Redis configuration and options.
func NewScheduler(redisConfig *RedisConfig, opts ...SchedulerOption) (*Scheduler, error) {
if redisConfig == nil {
return nil, ErrInvalidRedisConfig
}
asynqClientOpt := redisConfig.ToAsynqRedisOpt()
options := SchedulerOptions{
Location: time.UTC, // Default to UTC
SyncInterval: 60 * time.Second,
}
for _, opt := range opts {
opt(&options)
}
configProvider := options.ConfigProvider
if configProvider == nil {
configProvider = NewMemoryConfigProvider()
}
taskManager, err := asynq.NewPeriodicTaskManager(
asynq.PeriodicTaskManagerOpts{
RedisConnOpt: asynqClientOpt,
PeriodicTaskConfigProvider: configProvider,
SyncInterval: options.SyncInterval,
SchedulerOpts: &asynq.SchedulerOpts{
Location: options.Location,
Logger: options.Logger,
PreEnqueueFunc: func(task *asynq.Task, opts []asynq.Option) {
if options.PreEnqueueFunc != nil {
job, _ := NewJobFromAsynqTask(task)
options.PreEnqueueFunc(job)
}
},
PostEnqueueFunc: func(taskInfo *asynq.TaskInfo, err error) {
log.Printf("Enqueued task: %v, err: %v", taskInfo, err)
if options.PostEnqueueFunc != nil {
jobInfo := toJobInfo(taskInfo, nil)
options.PostEnqueueFunc(jobInfo, err)
}
},
},
})
if err != nil {
return nil, err
}
return &Scheduler{
taskManager: taskManager,
configProvider: configProvider,
options: options,
}, nil
}
// RegisterCron schedules a new cron job using the job type, payload, and options.
func (s *Scheduler) RegisterCron(spec string, jobType string, payload interface{}, opts ...JobOption) (string, error) {
job := NewJob(jobType, payload, opts...)
return s.RegisterCronJob(spec, job)
}
// RegisterCronJob schedules a new cron job using the job details.
func (s *Scheduler) RegisterCronJob(spec string, job *Job) (string, error) {
// Use cron/v3 to parse the spec and check if it's a valid cron expression.
_, err := cron.ParseStandard(spec)
if err != nil {
return "", ErrInvalidCronSpec
}
return s.configProvider.RegisterCronJob(spec, job)
}
// RegisterPeriodic schedules a new periodic job using the job type, payload, and options.
func (s *Scheduler) RegisterPeriodic(interval time.Duration, jobType string, payload interface{}, opts ...JobOption) (string, error) {
job := NewJob(jobType, payload, opts...)
return s.RegisterPeriodicJob(interval, job)
}
// RegisterPeriodicJob schedules a new periodic job using the job details and an interval.
func (s *Scheduler) RegisterPeriodicJob(interval time.Duration, job *Job) (string, error) {
spec := "@every " + interval.String()
return s.configProvider.RegisterCronJob(spec, job)
}
// UnregisterCronJob removes a scheduled cron job using its identifier.
func (s *Scheduler) UnregisterCronJob(identifier string) error {
return s.configProvider.UnregisterJob(identifier)
}
// Start begins the scheduler to enqueue tasks as per the schedule.
func (s *Scheduler) Start() error {
return s.taskManager.Run()
}
// Stop gracefully shuts down the scheduler.
func (s *Scheduler) Stop() error {
s.taskManager.Shutdown()
return nil
}