-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsubtask.go
174 lines (153 loc) · 5.33 KB
/
subtask.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package pluginsubtask
import (
"context"
"fmt"
"strings"
"github.com/juju/errors"
"github.com/loopfz/gadgeto/zesty"
"github.com/ovh/utask"
"github.com/ovh/utask/models/task"
"github.com/ovh/utask/models/tasktemplate"
"github.com/ovh/utask/pkg/auth"
"github.com/ovh/utask/pkg/constants"
"github.com/ovh/utask/pkg/plugins/taskplugin"
"github.com/ovh/utask/pkg/taskutils"
"github.com/ovh/utask/pkg/templateimport"
"github.com/ovh/utask/pkg/utils"
)
// the subtask plugin spawns a new µTask task, given a template and inputs
// an extra parameter is accepted, not available on API
// resolver usernames can be dynamically set for the task
var (
Plugin = taskplugin.New("subtask", "0.1", exec,
taskplugin.WithConfig(validConfig, SubtaskConfig{}),
taskplugin.WithContextFunc(ctx),
)
)
// SubtaskConfig is the necessary configuration to spawn a new task
type SubtaskConfig struct {
Template string `json:"template"`
Input map[string]interface{} `json:"input"`
ResolverUsernames string `json:"resolver_usernames"`
WatcherUsernames string `json:"watcher_usernames"`
Delay *string `json:"delay"`
Tags map[string]string `json:"tags"`
}
// SubtaskContext is the metadata inherited from the "parent" task"
type SubtaskContext struct {
ParentTaskID string `json:"parent_task_id"`
TaskID string `json:"task_id"`
RequesterUsername string `json:"requester_username"`
}
func ctx(stepName string) interface{} {
return &SubtaskContext{
ParentTaskID: "{{ .task.task_id }}",
TaskID: fmt.Sprintf("{{ if (index .step `%s` ) }}{{ if (index .step `%s` `output`) }}{{ index .step `%s` `output` `id` }}{{ end }}{{ end }}", stepName, stepName, stepName),
RequesterUsername: "{{.task.requester_username}}",
}
}
func validConfig(config interface{}) error {
cfg := config.(*SubtaskConfig)
if err := utils.ValidateTags(cfg.Tags); err != nil {
return err
}
dbp, err := zesty.NewDBProvider(utask.DBName)
if err != nil {
return fmt.Errorf("can't retrieve connexion to DB: %s", err)
}
_, err = tasktemplate.LoadFromName(dbp, cfg.Template)
if err == nil {
return nil
}
if !errors.IsNotFound(err) {
return fmt.Errorf("can't load template from name: %s", err)
}
// searching into currently imported templates
templates := templateimport.GetTemplates()
for _, template := range templates {
if template == cfg.Template {
return nil
}
}
return errors.NotFoundf("sub-task template %q", cfg.Template)
}
func exec(stepName string, config interface{}, ctx interface{}) (interface{}, interface{}, error) {
dbp, err := zesty.NewDBProvider(utask.DBName)
if err != nil {
return nil, nil, err
}
cfg := config.(*SubtaskConfig)
stepContext := ctx.(*SubtaskContext)
var t *task.Task
if stepContext.TaskID != "" {
// subtask was already launched, retrieve its current state and exit
t, err = task.LoadFromPublicID(dbp, stepContext.TaskID)
if err != nil {
return nil, nil, err
}
} else {
// spawn new subtask
tt, err := tasktemplate.LoadFromName(dbp, cfg.Template)
if err != nil {
return nil, nil, err
}
if err := dbp.Tx(); err != nil {
return nil, nil, err
}
var resolverUsernames, watcherUsernames []string
if cfg.ResolverUsernames != "" {
resolverUsernames, err = utils.ConvertJSONRowToSlice(cfg.ResolverUsernames)
if err != nil {
return nil, nil, fmt.Errorf("can't convert JSON to row slice: %s", err)
}
}
if cfg.WatcherUsernames != "" {
watcherUsernames, err = utils.ConvertJSONRowToSlice(cfg.WatcherUsernames)
if err != nil {
return nil, nil, fmt.Errorf("can't convert JSON to row slice: %s", err)
}
}
// TODO inherit watchers from parent task
ctx := auth.WithIdentity(context.Background(), stepContext.RequesterUsername)
if cfg.Tags == nil {
cfg.Tags = map[string]string{}
}
cfg.Tags[constants.SubtaskTagParentTaskID] = stepContext.ParentTaskID
t, err = taskutils.CreateTask(ctx, dbp, tt, watcherUsernames, resolverUsernames, cfg.Input, nil, "Auto created subtask, parent task "+stepContext.ParentTaskID, cfg.Delay, cfg.Tags)
if err != nil {
dbp.Rollback()
return nil, nil, err
}
if err := dbp.Commit(); err != nil {
dbp.Rollback()
return nil, nil, err
}
}
var stepError error
switch t.State {
case task.StateDone:
stepError = nil
case task.StateCancelled, task.StateWontfix, task.StateBlocked:
// Stop retrying the subtask.
stepError = errors.BadRequestf("Task '%s' changed state: %s", t.PublicID, strings.ToLower(t.State))
case task.StateTODO:
if t.Resolution == nil {
stepError = errors.NewNotAssigned(fmt.Errorf("Task %q is waiting for human validation", t.PublicID), "")
} else {
stepError = errors.NewNotAssigned(fmt.Errorf("Task %q will start shortly", t.PublicID), "")
}
case task.StateRunning:
stepError = errors.NewNotProvisioned(fmt.Errorf("Task %q is currently RUNNING", t.PublicID), "")
default:
// keep step running while subtask is not done
// FIXME, use proper error type
stepError = fmt.Errorf("Task %q not done yet (current state is %s)", t.PublicID, t.State)
}
return map[string]interface{}{
"id": t.PublicID,
"state": t.State,
"result": t.Result,
"resolver_username": t.ResolverUsername,
"requester_username": t.RequesterUsername,
}, nil, stepError
}