Skip to content

Latest commit

 

History

History
390 lines (318 loc) · 9.08 KB

64fef4899b9ea0c00617ded60f7f459f.md

File metadata and controls

390 lines (318 loc) · 9.08 KB

实现一个Go并发处理队列

在日常任务处理中,多任务并发执行的需求很常见。很多时候,我们简单可以封装一个并发处理的任务调度框架,以便在多个项目中复用。

这里我们的并发处理模型中有三个角色,分别是job,worker和queue。

  • job是我们要执行的任务,通常由一个函数及传入函数的参数组成,按照返回分为同步任务和异步任务,同步任务只需要在异步任务的回调后将结果写入channel,接收端等待channel的信息即可。 高效

  • worker是任务执行者,这里为了方便多worker的重复使用,我们构造一个worker池,并构造一些worker放入其中,每个worker执行完任务后,重新放入worker池。

  • queue是任务执行的缓冲区,维护了一个缓冲队列,每提交一个任务就插入到队列中,同时queue会按顺序抽取队列中的任务分发给worker池中的worker。

job实现

job对象需要实现一个Job方法,用来执行某个任务

type Jober interface {
	Job()
}

具体实现如下,用一个参数和一个函数标识一个job

type job struct {
	v        interface{}
	callback func(interface{})
}

// NewJob create an asynchronous task
func NewJob(v interface{}, fn func(interface{})) Jober {
	return &job{
		v:        v,
		callback: fn,
	}
}

func (j *job) Job() {
	j.callback(j.v)
}

同步job需要将执行结果和错误信息等通知调用者

type SyncJober interface {
	Jober
	Wait() <-chan interface{}
	Error() error
}

具体实现如下:

type syncJob struct {
	err      error
	result   chan interface{}
	v        interface{}
	callback func(interface{}) (interface{}, error)
}

// NewSyncJob create a synchronization task
func NewSyncJob(v interface{}, fn func(interface{}) (interface{}, error)) SyncJober {
	return &syncJob{
		result:   make(chan interface{}, 1),
		v:        v,
		callback: fn,
	}
}

func (j *syncJob) Job() {
	result, err := j.callback(j.v)
	if err != nil {
		j.err = err
		close(j.result)
		return
	}

	j.result <- result

	close(j.result)
}

func (j *syncJob) Wait() <-chan interface{} {
	return j.result
}

func (j *syncJob) Error() error {
	return j.err
}

worker实现

worker是任务的具体执行者,同时需要维护worker池,worker被实例化后需要放入worker池,执行完任务后需要放回worker池,以用来重复使用,同时需要实现退出方法,防止goroutine泄漏。

func newWorker(pool chan chan Jober, wg *sync.WaitGroup) *worker {
	return &worker{
		pool:    pool,
		wg:      wg,
		jobChan: make(chan Jober),
		quit:    make(chan struct{}),
	}
}

// worker thread
type worker struct {
	pool    chan chan Jober
	wg      *sync.WaitGroup
	jobChan chan Jober
	quit    chan struct{}
}

// start the worker
func (w *worker) Start() {
	w.pool <- w.jobChan
	go w.dispatcher()
}

func (w *worker) dispatcher() {
	for {
		select {
		case j := <-w.jobChan:
			j.Job()
			w.pool <- w.jobChan
			w.wg.Done()
		case <-w.quit:
			<-w.pool
			close(w.jobChan)
			return
		}
	}
}

// stop the worker
func (w *worker) Stop() {
	close(w.quit)
}

queue实现

queue需要实现三个基本功能,初始化worker并监听缓冲区,提交任务接口和关闭实例功能。

type Queuer interface {
	Run()
	Push(job Jober)
	Terminate()
}

queue的内部结构可以是channel也可以是list,无论使用哪种方式实现,需要一些共同的功能:

  • 任务缓冲区,用来获取任务列表,并从worker池中取出一个worker交由执行。

  • 缓冲池大小,worker的最大数量

  • 缓冲池,保存各个woker工作通信channel

  • workers,保存各个worker实例

channel实现的queue

unc NewQueue(maxCapacity, maxThread int) *Queue {
	return &Queue{
		jobQueue:   make(chan Jober, maxCapacity),
		maxWorkers: maxThread,
		workerPool: make(chan chan Jober, maxThread),
		workers:    make([]*worker, maxThread),
		wg:         new(sync.WaitGroup),
	}
}

// Queue a task queue for mitigating server pressure in high concurrency situations
// and improving task processing
type Queue struct {
	maxWorkers int
	jobQueue   chan Jober
	workerPool chan chan Jober
	workers    []*worker
	running    uint32
	wg         *sync.WaitGroup
}

// Run start running queues
func (q *Queue) Run() {
	if atomic.LoadUint32(&q.running) == 1 {
		return
	}

	atomic.StoreUint32(&q.running, 1)
	for i := 0; i < q.maxWorkers; i++ {
		q.workers[i] = newWorker(q.workerPool, q.wg)
		q.workers[i].Start()
	}

	go q.dispatcher()
}

func (q *Queue) dispatcher() {
	for job := range q.jobQueue {
		worker := <-q.workerPool
		worker <- job
	}
}

// Terminate terminate the queue to receive the task and release the resource
func (q *Queue) Terminate() {
	if atomic.LoadUint32(&q.running) != 1 {
		return
	}

	atomic.StoreUint32(&q.running, 0)
	q.wg.Wait()

	close(q.jobQueue)
	for i := 0; i < q.maxWorkers; i++ {
		q.workers[i].Stop()
	}
	close(q.workerPool)
}

// Push put the executable task into the queue
func (q *Queue) Push(job Jober) {
	if atomic.LoadUint32(&q.running) != 1 {
		return
	}

	q.wg.Add(1)
	q.jobQueue <- job
}

在多个线程中,只能有一个queue实例,这里通过running值的原子性来得以保证,同时为了保证优雅退出,这里使用sync.WaitGroup保证收到退出信号时将还在执行的任务执行完。

list实现的queue

// NewListQueueWithMaxLen create a list queue that specifies the number of worker threads
// and the maximum number of elements
func NewListQueueWithMaxLen(maxThread, maxLen int) *ListQueue {
	return &ListQueue{
		maxLen:     maxLen,
		maxWorker:  maxThread,
		workers:    make([]*worker, maxThread),
		workerPool: make(chan chan Jober, maxThread),
		list:       list.New(),
		lock:       new(sync.RWMutex),
		wg:         new(sync.WaitGroup),
	}
}

// ListQueue a list task queue for mitigating server pressure in high concurrency situations
// and improving task processing
type ListQueue struct {
	maxLen     int
	maxWorker  int
	workers    []*worker
	workerPool chan chan Jober
	list       *list.List
	lock       *sync.RWMutex
	wg         *sync.WaitGroup
	running    uint32
}

// Run start running queues
func (q *ListQueue) Run() {
	if atomic.LoadUint32(&q.running) == 1 {
		return
	}
	atomic.StoreUint32(&q.running, 1)

	for i := 0; i < q.maxWorker; i++ {
		q.workers[i] = newWorker(q.workerPool, q.wg)
		q.workers[i].Start()
	}

	go q.dispatcher()
}

func (q *ListQueue) dispatcher() {
	for {
		q.lock.RLock()
		if atomic.LoadUint32(&q.running) != 1 && q.list.Len() == 0 {
			q.lock.RUnlock()
			break
		}
		ele := q.list.Front()
		q.lock.RUnlock()

		if ele == nil {
			time.Sleep(time.Millisecond * 10)
			continue
		}

		worker := <-q.workerPool
		worker <- ele.Value.(Jober)

		q.lock.Lock()
		q.list.Remove(ele)
		q.lock.Unlock()
	}
}

// Push put the executable task into the queue
func (q *ListQueue) Push(job Jober) {
	if atomic.LoadUint32(&q.running) != 1 {
		return
	}

	if q.maxLen > 0 {
		q.lock.RLock()
		if q.list.Len() > q.maxLen {
			q.lock.RUnlock()
			return
		}
		q.lock.RUnlock()
	}

	q.wg.Add(1)
	q.lock.Lock()
	q.list.PushBack(job)
	q.lock.Unlock()
}

// Terminate terminate the queue to receive the task and release the resource
func (q *ListQueue) Terminate() {
	if atomic.LoadUint32(&q.running) != 1 {
		return
	}

	atomic.StoreUint32(&q.running, 0)
	q.wg.Wait()

	for i := 0; i < q.maxWorker; i++ {
		q.workers[i].Stop()
	}
	close(q.workerPool)
}

这里的实现基本上和channel一致,区别是,由于list是非线程安全的,所以这里多了一把读写锁,读取的时候加读锁,写入的时候加写锁,来保证list的线程安全。同时需要人为判断list的长度与缓冲的最大长度,以免溢出。

这里的一点重要区别是,使用channel的方式实现,如果缓冲区满了,新加入的job会阻塞等待,直到缓冲区有空位时再插入。而使用list的方式,如果缓冲区满了,直接返回,新的job将会直接丢弃。

接口导出

一般在项目中都会内置一个默认对象,并提供一些方法,这些方法都是对默认的内置对象进行操作。这里我们也设置一个默认的内置对象,并提供一些默认操作内置对象的方法

// Run start running queues,
// specify the number of buffers, and the number of worker threads
func Run(maxCapacity, maxThread int) {
	if internalQueue == nil {
		internalQueue = NewQueue(maxCapacity, maxThread)
	}
	internalQueue.Run()
}

// RunListQueue start running list queues
// ,specify the number of worker threads
func RunListQueue(maxThread int) {
	if internalQueue == nil {
		internalQueue = NewListQueue(maxThread)
	}
	internalQueue.Run()
}

// Push put the executable task into the queue
func Push(job Jober) {
	if internalQueue == nil {
		return
	}
	internalQueue.Push(job)
}

// Terminate terminate the queue to receive the task and release the resource
func Terminate() {
	if internalQueue == nil {
		return
	}
	internalQueue.Terminate()
}

总结

这里封装了多任务并发处理的模型,抽象出了job,worker和queue三个部件,在多任务并发执行的场景中,可以引入这个包,进行比较方便高效的任务处理。