forked from gammazero/workerpool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkerpool.go
276 lines (255 loc) · 8.21 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package workerpool
import (
"github.com/gammazero/deque"
"sync"
"sync/atomic"
"time"
)
const (
// This value is the size of the queue that workers register their
// availability to the dispatcher. There may be hundreds of workers, but
// only a small channel is needed to register some of the workers.
readyQueueSize = 16
// If worker pool receives no new work for this period of time, then stop
// a worker goroutine.
idleTimeoutSec = 5
)
// New creates and starts a pool of worker goroutines.
//
// The maxWorkers parameter specifies the maximum number of workers that will
// execute tasks concurrently. After each timeout period, a worker goroutine
// is stopped until there are no remaining workers.
func New(maxWorkers int) *WorkerPool {
// There must be at least one worker.
if maxWorkers < 1 {
maxWorkers = 1
}
pool := &WorkerPool{
taskQueue: make(chan func(), 1),
maxWorkers: maxWorkers,
readyWorkers: make(chan chan func(), readyQueueSize),
timeout: time.Second * idleTimeoutSec,
stoppedChan: make(chan struct{}),
}
// Start the task dispatcher.
go pool.dispatch()
return pool
}
// WorkerPool is a collection of goroutines, where the number of concurrent
// goroutines processing requests does not exceed the specified maximum.
type WorkerPool struct {
maxWorkers int
timeout time.Duration
taskQueue chan func()
readyWorkers chan chan func()
stoppedChan chan struct{}
waitingQueue deque.Deque
stopOnce sync.Once
stopped int32
waiting int32
}
// Stop stops the worker pool and waits for only currently running tasks to
// complete. Pending tasks that are not currently running are abandoned.
// Tasks must not be submitted to the worker pool after calling stop.
//
// Since creating the worker pool starts at least one goroutine, for the
// dispatcher, Stop() or StopWait() should be called when the worker pool is no
// longer needed.
func (p *WorkerPool) Stop() {
p.stop(false)
}
// StopWait stops the worker pool and waits for all queued tasks tasks to
// complete. No additional tasks may be submitted, but all pending tasks are
// executed by workers before this function returns.
func (p *WorkerPool) StopWait() {
p.stop(true)
}
// Stopped returns true if this worker pool has been stopped.
func (p *WorkerPool) Stopped() bool {
return atomic.LoadInt32(&p.stopped) != 0
}
// Submit enqueues a function for a worker to execute.
//
// Any external values needed by the task function must be captured in a
// closure. Any return values should be returned over a channel that is
// captured in the task function closure.
//
// Submit will not block regardless of the number of tasks submitted. Each
// task is immediately given to an available worker or passed to a goroutine to
// be given to the next available worker. If there are no available workers,
// the dispatcher adds a worker, until the maximum number of workers are
// running.
//
// After the maximum number of workers are running, and no workers are
// available, incoming tasks are put onto a queue and will be executed as
// workers become available.
//
// When no new tasks have been submitted for a time period and a worker is
// available, the worker is shutdown. As long as no new tasks arrive, one
// available worker is shutdown each time period until there are no more idle
// workers. Since the time to start new goroutines is not significant, there
// is no need to retain idle workers.
func (p *WorkerPool) Submit(task func()) {
if task != nil {
p.taskQueue <- task
}
}
// SubmitWait enqueues the given function and waits for it to be executed.
func (p *WorkerPool) SubmitWait(task func()) {
if task == nil {
return
}
doneChan := make(chan struct{})
p.taskQueue <- func() {
task()
close(doneChan)
}
<-doneChan
}
// WaitingQueueSize will return the size of the waiting queue
func (p *WorkerPool) WaitingQueueSize() int {
return int(atomic.LoadInt32(&p.waiting))
}
// dispatch sends the next queued task to an available worker.
func (p *WorkerPool) dispatch() {
defer close(p.stoppedChan)
timeout := time.NewTimer(p.timeout)
var (
workerCount int
task func()
ok, wait bool
workerTaskChan chan func()
)
startReady := make(chan chan func())
Loop:
for {
// As long as tasks are in the waiting queue, remove and execute these
// tasks as workers become available, and place new incoming tasks on
// the queue. Once the queue is empty, then go back to submitting
// incoming tasks directly to available workers.
if p.waitingQueue.Len() != 0 {
select {
case task, ok = <-p.taskQueue:
if !ok {
break Loop
}
if task == nil {
wait = true
break Loop
}
p.waitingQueue.PushBack(task)
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker.
workerTaskChan <- p.waitingQueue.PopFront().(func())
}
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
continue
}
timeout.Reset(p.timeout)
select {
case task, ok = <-p.taskQueue:
if !ok || task == nil {
break Loop
}
// Got a task to do.
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker.
workerTaskChan <- task
default:
// No workers ready.
// Create a new worker, if not at max.
if workerCount < p.maxWorkers {
workerCount++
go func(t func()) {
startWorker(startReady, p.readyWorkers)
// Submit the task when the new worker.
taskChan := <-startReady
taskChan <- t
}(task)
} else {
// Enqueue task to be executed by next available worker.
p.waitingQueue.PushBack(task)
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}
case <-timeout.C:
// Timed out waiting for work to arrive. Kill a ready worker.
if workerCount > 0 {
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so kill.
close(workerTaskChan)
workerCount--
default:
// No work, but no ready workers. All workers are busy.
}
}
}
}
// If instructed to wait for all queued tasks, then remove from queue and
// give to workers until queue is empty.
if wait {
for p.waitingQueue.Len() != 0 {
workerTaskChan = <-p.readyWorkers
// A worker is ready, so give task to worker.
workerTaskChan <- p.waitingQueue.PopFront().(func())
atomic.StoreInt32(&p.waiting, int32(p.waitingQueue.Len()))
}
}
// Stop all remaining workers as they become ready.
for workerCount > 0 {
workerTaskChan = <-p.readyWorkers
close(workerTaskChan)
workerCount--
}
}
// startWorker starts a goroutine that executes tasks given by the dispatcher.
//
// When a new worker starts, it registers its availability on the startReady
// channel. This ensures that the goroutine associated with starting the
// worker gets to use the worker to execute its task. Otherwise, the main
// dispatcher loop could steal the new worker and not know to start up another
// worker for the waiting goroutine. The task would then have to wait for
// another existing worker to become available, even though capacity is
// available to start additional workers.
//
// A worker registers that is it available to do work by putting its task
// channel on the readyWorkers channel. The dispatcher reads a worker's task
// channel from the readyWorkers channel, and writes a task to the worker over
// the worker's task channel. To stop a worker, the dispatcher closes a
// worker's task channel, instead of writing a task to it.
func startWorker(startReady, readyWorkers chan chan func()) {
go func() {
taskChan := make(chan func())
var task func()
var ok bool
// Register availability on starReady channel.
startReady <- taskChan
for {
// Read task from dispatcher.
task, ok = <-taskChan
if !ok {
// Dispatcher has told worker to stop.
break
}
// Execute the task.
task()
// Register availability on readyWorkers channel.
readyWorkers <- taskChan
}
}()
}
// stop tells the dispatcher to exit, and whether or not to complete queued
// tasks.
func (p *WorkerPool) stop(wait bool) {
p.stopOnce.Do(func() {
atomic.StoreInt32(&p.stopped, 1)
if wait {
p.taskQueue <- nil
}
// Close task queue and wait for currently running tasks to finish.
close(p.taskQueue)
<-p.stoppedChan
})
}