-
Notifications
You must be signed in to change notification settings - Fork 0
/
scheduler.go
151 lines (125 loc) · 3.66 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
//go:generate go run github.com/golang/mock/mockgen -destination=mock/$GOFILE -package=mock_$GOPACKAGE -source=$GOFILE
package scheduler
import (
"context"
"errors"
"fmt"
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
"github.com/googleapis/gax-go/v2"
taskspb "google.golang.org/genproto/googleapis/cloud/tasks/v2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
ErrTaskValidation = errors.New("task validation error")
ErrTaskAlreadyExists = errors.New("task already exists")
)
type CloudTasksClient interface {
ListTasks(ctx context.Context, req *taskspb.ListTasksRequest, opts ...gax.CallOption) *cloudtasks.TaskIterator
CreateTask(ctx context.Context, req *taskspb.CreateTaskRequest, opts ...gax.CallOption) (*taskspb.Task, error)
DeleteTask(ctx context.Context, req *taskspb.DeleteTaskRequest, opts ...gax.CallOption) error
}
var _ CloudTasksClient = (*cloudtasks.Client)(nil)
type Scheduler struct {
client CloudTasksClient
queuePath string
prefix string
iterator func(...gax.CallOption) *Iterator
}
func QueuePath(projectID, location, queue string) string {
return "projects/" + projectID + "/locations/" + location + "/queues/" + queue
}
func New(client CloudTasksClient, projectID, location, queue, prefix string) *Scheduler {
queuePath := QueuePath(projectID, location, queue)
return &Scheduler{
client: client,
queuePath: queuePath,
prefix: prefix,
iterator: func(opts ...gax.CallOption) *Iterator {
return NewIterator(TaskListerFunc(client.ListTasks), queuePath, prefix, opts...)
},
}
}
func (s *Scheduler) Sync(ctx context.Context, tasks []*Task, opts ...gax.CallOption) error {
taskMap := make(map[string]*Task, len(tasks))
for _, t := range tasks {
taskMap[t.comparisonID()] = t
}
iter := s.List(opts...)
var deleteNames []string
for {
remoteTask, err := iter.Next(ctx)
if err != nil {
if errors.Is(err, Done) {
break
}
return fmt.Errorf("failed to iterate remoteTasks: %w", err)
}
if t, ok := taskMap[remoteTask.comparisonID()]; ok {
if t.Compare(remoteTask) {
delete(taskMap, t.comparisonID())
continue
}
// delete remote task to update it
deleteNames = append(deleteNames, remoteTask.TaskName())
if t.Version <= remoteTask.Version {
t.Version = remoteTask.Version + 1
}
continue
}
// delete remote task
deleteNames = append(deleteNames, remoteTask.TaskName())
}
for _, name := range deleteNames {
if err := s.Delete(ctx, name, opts...); err != nil {
return err
}
}
for _, t := range tasks {
if _, ok := taskMap[t.comparisonID()]; !ok {
continue
}
if err := s.Create(ctx, t, opts...); err != nil {
return err
}
}
return nil
}
func (s *Scheduler) List(opts ...gax.CallOption) *Iterator {
return s.iterator(opts...)
}
func (s *Scheduler) Create(ctx context.Context, task *Task, opts ...gax.CallOption) error {
if task.Version == 0 {
task.Version = 1
}
if err := task.Validate(); err != nil {
return err
}
t, err := TaskToPbTask(task)
if err != nil {
return err
}
req := &taskspb.CreateTaskRequest{
Parent: task.QueuePath,
Task: t,
ResponseView: taskspb.Task_BASIC,
}
if _, err := s.client.CreateTask(ctx, req, opts...); err != nil {
switch status.Code(err) {
case codes.AlreadyExists:
return ErrTaskAlreadyExists
default:
return fmt.Errorf("failed to create task: %w", err)
}
}
return nil
}
func (s *Scheduler) Delete(ctx context.Context, taskName string, opts ...gax.CallOption) error {
req := &taskspb.DeleteTaskRequest{
Name: taskName,
}
if err := s.client.DeleteTask(ctx, req, opts...); err != nil {
return fmt.Errorf("failed to delete task: %w", err)
}
return nil
}