From 369a65eae837ee0cfda143a4435f00a3620386ba Mon Sep 17 00:00:00 2001 From: Oleg Zaytsev Date: Mon, 14 Oct 2024 10:49:31 +0200 Subject: [PATCH] Remove mutex from ReusableGoroutinesPool I find it unsettling that we need to acquire a mutex (a concurrency primitive) to send data to a channel (another concurrency primitive). I think this achieves the same effect (we can stop the worker goroutines in a safe way) without having to mix them and leaving the main code path clear (just try to send a job, or run a new goroutine). Signed-off-by: Oleg Zaytsev --- concurrency/worker.go | 33 +++++++++++---------------------- 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/concurrency/worker.go b/concurrency/worker.go index 72acc7dd0..10a59e600 100644 --- a/concurrency/worker.go +++ b/concurrency/worker.go @@ -1,20 +1,22 @@ package concurrency -import ( - "sync" -) - // NewReusableGoroutinesPool creates a new worker pool with the given size. // These workers will run the workloads passed through Go() calls. // If all workers are busy, Go() will spawn a new goroutine to run the workload. func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { p := &ReusableGoroutinesPool{ - jobs: make(chan func()), + jobs: make(chan func()), + closed: make(chan struct{}), } for i := 0; i < size; i++ { go func() { - for f := range p.jobs { - f() + for { + select { + case f := <-p.jobs: + f() + case <-p.closed: + return + } } }() } @@ -22,23 +24,13 @@ func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool { } type ReusableGoroutinesPool struct { - jobsMu sync.RWMutex - closed bool jobs chan func() + closed chan struct{} } // Go will run the given function in a worker of the pool. // If all workers are busy, Go() will spawn a new goroutine to run the workload. func (p *ReusableGoroutinesPool) Go(f func()) { - p.jobsMu.RLock() - defer p.jobsMu.RUnlock() - - // If the pool is closed, run the function in a new goroutine. - if p.closed { - go f() - return - } - select { case p.jobs <- f: default: @@ -51,8 +43,5 @@ func (p *ReusableGoroutinesPool) Go(f func()) { // Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads. // Close is intended to be used in tests to ensure that no goroutines are leaked. func (p *ReusableGoroutinesPool) Close() { - p.jobsMu.Lock() - defer p.jobsMu.Unlock() - p.closed = true - close(p.jobs) + close(p.closed) }