-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstep.go
160 lines (134 loc) · 6.47 KB
/
step.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package choreograph
import (
"context"
"fmt"
"reflect"
"strings"
"github.com/pkg/errors"
)
var (
// ErrJobIsNotAFunction implies that something which is not a function was passed for a job.
ErrJobIsNotAFunction = errors.New("job field is not a function")
// ErrJobContextAsFirstParameter implies that job function does not have single context.Context parameter.
ErrJobContextAsFirstParameter = errors.New("job must have first parameter of type context.Context")
// ErrJobErrorOnOutputRequired implies that there should be error output parameter for job.
ErrJobErrorOnOutputRequired = errors.New("job must have at least one return parameter of type error")
// ErrJobFuncIsRequired implies that job is required and cannot be nil.
ErrJobFuncIsRequired = errors.New("job cannot be nil")
// ErrJobTooManyOutputParameters implies that job returns too many parameters.
ErrJobTooManyOutputParameters = errors.New("job must have maximum two return parameter")
// ErrJobErrorAsLastParameterRequired implies that job last output parameter is not an error.
ErrJobErrorAsLastParameterRequired = errors.New("job's last output parameter must be of type error")
// ErrJobTooManyInputParameters implies that job function have too many input parameters.
ErrJobTooManyInputParameters = errors.New("job has too many input parameters, two total parameters allowed")
// ErrJobContextParameterRequired implies that job function does not have a context.Context as parameter.
ErrJobContextParameterRequired = errors.New("job must have at least one parameter of type context.Context")
// ErrPreCheckIsNotAFunction implies that something which is not a function was passed for a pre-check.
ErrPreCheckIsNotAFunction = errors.New("preCheck field is not a function")
// ErrPreCheckContextAsFirstParameter implies that there is no context.Context parameter for pre-check.
ErrPreCheckContextAsFirstParameter = errors.New("preCheck must have first parameter of type context.Context")
// ErrPreCheckErrorOnOutputRequired implies that the only output parameter from pre-check function is not an error.
ErrPreCheckErrorOnOutputRequired = errors.New("preCheck must have at least one return parameter of type error")
// ErrPreCheckFuncIsRequired implies that pre-check is required and cannot be nil.
ErrPreCheckFuncIsRequired = errors.New("preCheck cannot be nil")
// ErrPreCheckTooManyOutputParameters implies that pre-check returns too many parameters.
ErrPreCheckTooManyOutputParameters = errors.New("preCheck must have maximum two return parameter")
// ErrPreCheckLastParamTypeErrorRequired implies that pre-check last output parameter is not an error.
ErrPreCheckLastParamTypeErrorRequired = errors.New("preCheck's last output parameter must be of type error")
// ErrPreCheckTooManyInputParameters implies that pre-check function have too many input parameters.
ErrPreCheckTooManyInputParameters = errors.New("preCheck has too many input parameters, two total parameters allowed")
// ErrPreCheckContextParameterRequired implies that pre-check function does not have a context.Context as parameter.
ErrPreCheckContextParameterRequired = errors.New("preCheck must have at least one parameter of type context.Context")
)
// Step represent single chunk of process which should be run.
type Step struct {
// Name is used to store returned data from all steps. Name is also used for better logging experience.
Name string
// Job is a function which will be run during the step.
//
// It is a function which only parameter is of type context.Context which will be provided from a coordinator.
// For output of this function there can be one or two return parameters but last one must be an error.
// If for some reason you want to stop execution of any further step, return ErrExecutionCanceled or wrap it.
// Example of acceptable functions:
// - func(ctx context.Context) error
// - func(ctx context.Context) (int, error)
// - func(ctx context.Context) (string, error)
// - func(ctx context.Context, input int) (float64, error)
Job interface{}
// PreCheck is a function which will be run before Job, it should ensure that Job can be run.
//
// It is a function which only parameter is of type context.Context which will be provided from a coordinator.
// For output of this function there can be one or two return parameters but last one must be an error.
// If for some reason you want to stop execution of any further step, return ErrExecutionCanceled or wrap it.
// Example of acceptable functions:
// - func(ctx context.Context) error
// - func(ctx context.Context) (int, error)
// - func(ctx context.Context) (string, error)
// - func(ctx context.Context, input int) (float64, error)
PreCheck interface{}
}
func checkStep(step *Step) error {
const (
maxInputParametersCount = 2
maxReturnsCount = 2
)
if step.Job == nil {
return ErrJobFuncIsRequired
}
if step.PreCheck == nil {
return ErrPreCheckFuncIsRequired
}
funcType := reflect.TypeOf(step.Job)
if funcType.Kind() != reflect.Func {
return ErrJobIsNotAFunction
}
if funcType.NumIn() == 0 {
return ErrJobContextParameterRequired
}
if funcType.NumIn() > maxInputParametersCount {
return ErrJobTooManyInputParameters
}
if funcType.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
return ErrJobContextAsFirstParameter
}
if funcType.NumOut() == 0 {
return ErrJobErrorOnOutputRequired
}
if funcType.NumOut() > maxReturnsCount {
return ErrJobTooManyOutputParameters
}
if !funcType.Out(funcType.NumOut() - 1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
return ErrJobErrorAsLastParameterRequired
}
preCheckType := reflect.TypeOf(step.PreCheck)
if preCheckType.Kind() != reflect.Func {
return ErrPreCheckIsNotAFunction
}
if preCheckType.NumIn() == 0 {
return ErrPreCheckContextParameterRequired
}
if preCheckType.NumIn() > maxInputParametersCount {
return ErrPreCheckTooManyInputParameters
}
if preCheckType.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
return ErrPreCheckContextAsFirstParameter
}
if preCheckType.NumOut() == 0 {
return ErrPreCheckErrorOnOutputRequired
}
if preCheckType.NumOut() > maxReturnsCount {
return ErrPreCheckTooManyOutputParameters
}
if !preCheckType.Out(preCheckType.NumOut() - 1).Implements(reflect.TypeOf((*error)(nil)).Elem()) {
return ErrPreCheckLastParamTypeErrorRequired
}
return nil
}
// Steps is a slice of Step.
type Steps []*Step
func (s Steps) StepName(i int) string {
if i >= len(s) || strings.TrimSpace(s[i].Name) == "" {
return fmt.Sprintf("#%d", i)
}
return s[i].Name
}