在日常任务处理中,多任务并发执行的需求很常见。很多时候,我们简单可以封装一个并发处理的任务调度框架,以便在多个项目中复用。
这里我们的并发处理模型中有三个角色,分别是job,worker和queue。
-
job是我们要执行的任务,通常由一个函数及传入函数的参数组成,按照返回分为同步任务和异步任务,同步任务只需要在异步任务的回调后将结果写入channel,接收端等待channel的信息即可。 高效
-
worker是任务执行者,这里为了方便多worker的重复使用,我们构造一个worker池,并构造一些worker放入其中,每个worker执行完任务后,重新放入worker池。
-
queue是任务执行的缓冲区,维护了一个缓冲队列,每提交一个任务就插入到队列中,同时queue会按顺序抽取队列中的任务分发给worker池中的worker。
job对象需要实现一个Job方法,用来执行某个任务
type Jober interface {
Job()
}
具体实现如下,用一个参数和一个函数标识一个job
type job struct {
v interface{}
callback func(interface{})
}
// NewJob create an asynchronous task
func NewJob(v interface{}, fn func(interface{})) Jober {
return &job{
v: v,
callback: fn,
}
}
func (j *job) Job() {
j.callback(j.v)
}
同步job需要将执行结果和错误信息等通知调用者
type SyncJober interface {
Jober
Wait() <-chan interface{}
Error() error
}
具体实现如下:
type syncJob struct {
err error
result chan interface{}
v interface{}
callback func(interface{}) (interface{}, error)
}
// NewSyncJob create a synchronization task
func NewSyncJob(v interface{}, fn func(interface{}) (interface{}, error)) SyncJober {
return &syncJob{
result: make(chan interface{}, 1),
v: v,
callback: fn,
}
}
func (j *syncJob) Job() {
result, err := j.callback(j.v)
if err != nil {
j.err = err
close(j.result)
return
}
j.result <- result
close(j.result)
}
func (j *syncJob) Wait() <-chan interface{} {
return j.result
}
func (j *syncJob) Error() error {
return j.err
}
worker是任务的具体执行者,同时需要维护worker池,worker被实例化后需要放入worker池,执行完任务后需要放回worker池,以用来重复使用,同时需要实现退出方法,防止goroutine泄漏。
func newWorker(pool chan chan Jober, wg *sync.WaitGroup) *worker {
return &worker{
pool: pool,
wg: wg,
jobChan: make(chan Jober),
quit: make(chan struct{}),
}
}
// worker thread
type worker struct {
pool chan chan Jober
wg *sync.WaitGroup
jobChan chan Jober
quit chan struct{}
}
// start the worker
func (w *worker) Start() {
w.pool <- w.jobChan
go w.dispatcher()
}
func (w *worker) dispatcher() {
for {
select {
case j := <-w.jobChan:
j.Job()
w.pool <- w.jobChan
w.wg.Done()
case <-w.quit:
<-w.pool
close(w.jobChan)
return
}
}
}
// stop the worker
func (w *worker) Stop() {
close(w.quit)
}
queue需要实现三个基本功能,初始化worker并监听缓冲区,提交任务接口和关闭实例功能。
type Queuer interface {
Run()
Push(job Jober)
Terminate()
}
queue的内部结构可以是channel也可以是list,无论使用哪种方式实现,需要一些共同的功能:
-
任务缓冲区,用来获取任务列表,并从worker池中取出一个worker交由执行。
-
缓冲池大小,worker的最大数量
-
缓冲池,保存各个woker工作通信channel
-
workers,保存各个worker实例
unc NewQueue(maxCapacity, maxThread int) *Queue {
return &Queue{
jobQueue: make(chan Jober, maxCapacity),
maxWorkers: maxThread,
workerPool: make(chan chan Jober, maxThread),
workers: make([]*worker, maxThread),
wg: new(sync.WaitGroup),
}
}
// Queue a task queue for mitigating server pressure in high concurrency situations
// and improving task processing
type Queue struct {
maxWorkers int
jobQueue chan Jober
workerPool chan chan Jober
workers []*worker
running uint32
wg *sync.WaitGroup
}
// Run start running queues
func (q *Queue) Run() {
if atomic.LoadUint32(&q.running) == 1 {
return
}
atomic.StoreUint32(&q.running, 1)
for i := 0; i < q.maxWorkers; i++ {
q.workers[i] = newWorker(q.workerPool, q.wg)
q.workers[i].Start()
}
go q.dispatcher()
}
func (q *Queue) dispatcher() {
for job := range q.jobQueue {
worker := <-q.workerPool
worker <- job
}
}
// Terminate terminate the queue to receive the task and release the resource
func (q *Queue) Terminate() {
if atomic.LoadUint32(&q.running) != 1 {
return
}
atomic.StoreUint32(&q.running, 0)
q.wg.Wait()
close(q.jobQueue)
for i := 0; i < q.maxWorkers; i++ {
q.workers[i].Stop()
}
close(q.workerPool)
}
// Push put the executable task into the queue
func (q *Queue) Push(job Jober) {
if atomic.LoadUint32(&q.running) != 1 {
return
}
q.wg.Add(1)
q.jobQueue <- job
}
在多个线程中,只能有一个queue实例,这里通过running值的原子性来得以保证,同时为了保证优雅退出,这里使用sync.WaitGroup保证收到退出信号时将还在执行的任务执行完。
// NewListQueueWithMaxLen create a list queue that specifies the number of worker threads
// and the maximum number of elements
func NewListQueueWithMaxLen(maxThread, maxLen int) *ListQueue {
return &ListQueue{
maxLen: maxLen,
maxWorker: maxThread,
workers: make([]*worker, maxThread),
workerPool: make(chan chan Jober, maxThread),
list: list.New(),
lock: new(sync.RWMutex),
wg: new(sync.WaitGroup),
}
}
// ListQueue a list task queue for mitigating server pressure in high concurrency situations
// and improving task processing
type ListQueue struct {
maxLen int
maxWorker int
workers []*worker
workerPool chan chan Jober
list *list.List
lock *sync.RWMutex
wg *sync.WaitGroup
running uint32
}
// Run start running queues
func (q *ListQueue) Run() {
if atomic.LoadUint32(&q.running) == 1 {
return
}
atomic.StoreUint32(&q.running, 1)
for i := 0; i < q.maxWorker; i++ {
q.workers[i] = newWorker(q.workerPool, q.wg)
q.workers[i].Start()
}
go q.dispatcher()
}
func (q *ListQueue) dispatcher() {
for {
q.lock.RLock()
if atomic.LoadUint32(&q.running) != 1 && q.list.Len() == 0 {
q.lock.RUnlock()
break
}
ele := q.list.Front()
q.lock.RUnlock()
if ele == nil {
time.Sleep(time.Millisecond * 10)
continue
}
worker := <-q.workerPool
worker <- ele.Value.(Jober)
q.lock.Lock()
q.list.Remove(ele)
q.lock.Unlock()
}
}
// Push put the executable task into the queue
func (q *ListQueue) Push(job Jober) {
if atomic.LoadUint32(&q.running) != 1 {
return
}
if q.maxLen > 0 {
q.lock.RLock()
if q.list.Len() > q.maxLen {
q.lock.RUnlock()
return
}
q.lock.RUnlock()
}
q.wg.Add(1)
q.lock.Lock()
q.list.PushBack(job)
q.lock.Unlock()
}
// Terminate terminate the queue to receive the task and release the resource
func (q *ListQueue) Terminate() {
if atomic.LoadUint32(&q.running) != 1 {
return
}
atomic.StoreUint32(&q.running, 0)
q.wg.Wait()
for i := 0; i < q.maxWorker; i++ {
q.workers[i].Stop()
}
close(q.workerPool)
}
这里的实现基本上和channel一致,区别是,由于list是非线程安全的,所以这里多了一把读写锁,读取的时候加读锁,写入的时候加写锁,来保证list的线程安全。同时需要人为判断list的长度与缓冲的最大长度,以免溢出。
这里的一点重要区别是,使用channel的方式实现,如果缓冲区满了,新加入的job会阻塞等待,直到缓冲区有空位时再插入。而使用list的方式,如果缓冲区满了,直接返回,新的job将会直接丢弃。
一般在项目中都会内置一个默认对象,并提供一些方法,这些方法都是对默认的内置对象进行操作。这里我们也设置一个默认的内置对象,并提供一些默认操作内置对象的方法
// Run start running queues,
// specify the number of buffers, and the number of worker threads
func Run(maxCapacity, maxThread int) {
if internalQueue == nil {
internalQueue = NewQueue(maxCapacity, maxThread)
}
internalQueue.Run()
}
// RunListQueue start running list queues
// ,specify the number of worker threads
func RunListQueue(maxThread int) {
if internalQueue == nil {
internalQueue = NewListQueue(maxThread)
}
internalQueue.Run()
}
// Push put the executable task into the queue
func Push(job Jober) {
if internalQueue == nil {
return
}
internalQueue.Push(job)
}
// Terminate terminate the queue to receive the task and release the resource
func Terminate() {
if internalQueue == nil {
return
}
internalQueue.Terminate()
}
这里封装了多任务并发处理的模型,抽象出了job,worker和queue三个部件,在多任务并发执行的场景中,可以引入这个包,进行比较方便高效的任务处理。