-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathwait.go
140 lines (119 loc) · 3.64 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/*
Waiting for Jobs to Complete
The amboy package proves a number of generic methods that, using the
Queue.Stats() method, block until all jobs are complete. They provide different
semantics, which may be useful in different circumstances. All of the Wait*
functions wait until the total number of jobs submitted to the queue is equal to
the number of completed jobs, and as a result these methods don't prevent other
threads from adding jobs to the queue after beginning to wait. As a special
case, retryable queues will also wait until there are no retrying jobs
remaining.
Additionally, there are a set of methods, WaitJob*, that allow callers to wait
for a specific job to complete.
*/
package amboy
import (
"context"
"time"
)
// Wait takes a queue and blocks until all job are completed or the context is
// canceled. This operation runs in a tight-loop, which means that the Wait will
// return *as soon* as all jobs are complete. Conversely, it's also possible
// that frequent repeated calls to Stats() may contend with resources needed for
// dispatching jobs or marking them complete. Retrying jobs are not considered
// complete.
func Wait(ctx context.Context, q Queue) bool {
for {
if ctx.Err() != nil {
return false
}
stat := q.Stats(ctx)
if stat.IsComplete() {
return true
}
}
}
// WaitInterval provides the Wait operation and accepts a context for
// cancellation while also waiting for an interval between stats calls. The
// return value reports if the operation was canceled or if all jobs are
// complete. Retrying jobs are not considered complete.
func WaitInterval(ctx context.Context, q Queue, interval time.Duration) bool {
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return false
case <-timer.C:
if q.Stats(ctx).IsComplete() {
return true
}
timer.Reset(interval)
}
}
}
// WaitIntervalNum waits for a certain number of jobs to complete. Retrying jobs
// are not considered complete.
func WaitIntervalNum(ctx context.Context, q Queue, interval time.Duration, num int) bool {
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return false
case <-timer.C:
if q.Stats(ctx).Completed-q.Stats(ctx).Retrying >= num {
return true
}
}
}
}
// WaitJob blocks until the job, based on its ID, is marked complete in the
// queue, or the context is canceled. The return value is false if the job does
// not exist (or is removed) and true when the job completes. A retrying job is
// not considered complete.
func WaitJob(ctx context.Context, j Job, q Queue) bool {
var ok bool
for {
if ctx.Err() != nil {
return false
}
j, ok = q.Get(ctx, j.ID())
if !ok {
return false
}
if ctx.Err() != nil {
return false
}
completed := j.Status().Completed && j.RetryInfo().ShouldRetry()
if completed {
return true
}
}
}
// WaitJobInterval takes a job and queue object and waits for the job to be
// marked complete. The interval parameter controls how long the operation waits
// between checks, and can be used to limit the impact of waiting on a busy
// queue. The operation returns false if the job is not registered in the queue,
// and true when the job completes. A retrying job is not considered complete.
func WaitJobInterval(ctx context.Context, j Job, q Queue, interval time.Duration) bool {
var ok bool
timer := time.NewTimer(0)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return false
case <-timer.C:
j, ok = q.Get(ctx, j.ID())
if !ok {
return false
}
completed := j.Status().Completed && !j.RetryInfo().ShouldRetry()
if completed {
return true
}
timer.Reset(interval)
}
}
}