Skip to content

Commit 71baee5

Browse files
committed
notify mechanism fix for new task added
1 parent 832f9e2 commit 71baee5

File tree

1 file changed

+5
-2
lines changed

1 file changed

+5
-2
lines changed

queue.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type (
2626
routineGroup *routineGroup
2727
quit chan struct{}
2828
ready chan struct{}
29+
newTaskAdded chan struct{}
2930
worker core.Worker
3031
stopOnce sync.Once
3132
stopFlag int32
@@ -43,6 +44,7 @@ func NewQueue(opts ...Option) (*Queue, error) {
4344
routineGroup: newRoutineGroup(),
4445
quit: make(chan struct{}),
4546
ready: make(chan struct{}, 1),
47+
newTaskAdded: make(chan struct{}),
4648
workerCount: o.workerCount,
4749
logger: o.logger,
4850
worker: o.worker,
@@ -147,6 +149,7 @@ func (q *Queue) queue(m *job.Message) error {
147149
}
148150

149151
q.metric.IncSubmittedTask()
152+
q.newTaskAdded <- struct{}{}
150153

151154
return nil
152155
}
@@ -320,8 +323,8 @@ func (q *Queue) start() {
320323
close(tasks)
321324
return
322325
}
323-
case <-time.After(time.Second):
324-
// sleep 1 second to fetch new task
326+
case <-q.newTaskAdded:
327+
// New task added
325328
}
326329
}
327330
}

0 commit comments

Comments
 (0)