Skip to content

Commit 043417e

Browse files
authored
refactor(goroutine): improve concurrency safety and testing (#122)
- Enable race condition detection in Go tests by adding `-race` flag - Refactor `Queue` to use a local variable for `workerCount` with proper locking - Refactor `Ring` to use a local variable for `count` with proper locking and defer unlocking - Replace direct access to `busyWorkers` metric with `BusyWorkers()` method in tests Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 841b61d commit 043417e

File tree

4 files changed

+14
-6
lines changed

4 files changed

+14
-6
lines changed

.github/workflows/go.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ jobs:
6666
${{ runner.os }}-go-
6767
- name: Run Tests
6868
run: |
69-
go test -v -covermode=atomic -coverprofile=coverage.out
69+
go test -race -v -covermode=atomic -coverprofile=coverage.out
7070
7171
- name: Run Benchmark
7272
run: |

queue.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,10 @@ func NewQueue(opts ...Option) (*Queue, error) {
5757

5858
// Start to enable all worker
5959
func (q *Queue) Start() {
60-
if q.workerCount == 0 {
60+
q.Lock()
61+
count := q.workerCount
62+
q.Unlock()
63+
if count == 0 {
6164
return
6265
}
6366
q.routineGroup.Run(func() {
@@ -262,7 +265,9 @@ func (q *Queue) handle(m *job.Message) error {
262265

263266
// UpdateWorkerCount to update worker number dynamically.
264267
func (q *Queue) UpdateWorkerCount(num int) {
268+
q.Lock()
265269
q.workerCount = num
270+
q.Unlock()
266271
q.schedule()
267272
}
268273

ring.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ func (s *Ring) Shutdown() error {
3737
}
3838

3939
s.stopOnce.Do(func() {
40-
if s.count > 0 {
40+
s.Lock()
41+
count := s.count
42+
s.Unlock()
43+
if count > 0 {
4144
<-s.exit
4245
}
4346
})
@@ -75,10 +78,11 @@ func (s *Ring) Request() (core.QueuedMessage, error) {
7578
return nil, ErrQueueHasBeenClosed
7679
}
7780

81+
s.Lock()
82+
defer s.Unlock()
7883
if s.count == 0 {
7984
return nil, ErrNoTaskInQueue
8085
}
81-
s.Lock()
8286
data := s.taskQueue[s.head]
8387
s.taskQueue[s.head] = nil
8488
s.head = (s.head + 1) % len(s.taskQueue)
@@ -87,7 +91,6 @@ func (s *Ring) Request() (core.QueuedMessage, error) {
8791
if n := len(s.taskQueue) / 2; n > 2 && s.count <= n {
8892
s.resize(n)
8993
}
90-
s.Unlock()
9194

9295
return data, nil
9396
}

ring_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
142142
assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)}))
143143
q.Start()
144144
time.Sleep(10 * time.Millisecond)
145-
assert.Equal(t, 2, int(q.metric.busyWorkers))
145+
assert.Equal(t, 2, q.BusyWorkers())
146146
q.Release()
147147
}
148148

0 commit comments

Comments
 (0)