-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbatch.go
71 lines (61 loc) · 1.92 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package piper
import (
"context"
)
// BatchExecFn is a method signature which defines the expectations of a BatchExecutable Execute function
type BatchExecFn func(context.Context, []DataIF) (map[string]error, error)
// BatchExecutable is an interface which exposes the Execute method, which is the user-defined batch execution call
type BatchExecutable interface {
Execute(context.Context, []DataIF) (map[string]error, error)
}
// batch is a struct which wraps all the data for a batch job along with the batch job meta information
type batch struct {
datum []DataIF // slice of data
jobsMap map[string]*job // map of Job IDs to Job
successMap map[string]*bool // map of Job IDs to boolean indicating success/failure of Job
}
// newBatch creates a pointer to a batch
func newBatch(size int) *batch {
return &batch{
datum: make([]DataIF, 0),
jobsMap: make(map[string]*job, size),
successMap: make(map[string]*bool, size),
}
}
// size returns the number of jobs in the batch
func (b *batch) size() int {
return len(b.jobsMap)
}
// add appends jobs to the batch
func (b *batch) add(jobs ...*job) {
for _, j := range jobs {
func(jobPtr *job) {
b.jobsMap[jobPtr.id] = jobPtr
b.datum = append(b.datum, jobPtr.data)
}(j)
}
}
// execute invokes the user-defined batch executable callback function and updates metadata about the batch job
func (b *batch) execute(ctx context.Context, fn BatchExecFn) error {
size := b.size()
if size > 0 {
// Execute the batch function call and update batch metadata
errorMap, err := fn(ctx, b.datum)
if err != nil {
return err
}
for k, v := range errorMap {
if v == nil {
b.updateSuccess(k, true)
} else {
b.updateSuccess(k, false)
b.jobsMap[k].addError(v)
}
}
}
return nil
}
// updateSuccess updates the status of the successMap for a given job ID
func (b *batch) updateSuccess(id string, success bool) {
b.successMap[id] = &success
}