@@ -10,56 +10,69 @@ import (
10
10
11
11
var _ core.Worker = (* Ring )(nil )
12
12
13
- // Ring for simple queue using buffer channel
13
+ // Ring represents a simple queue using a buffer channel.
14
14
type Ring struct {
15
15
sync.Mutex
16
- taskQueue []core.TaskMessage
17
- runFunc func (context.Context , core.TaskMessage ) error
18
- capacity int
19
- count int
20
- head int
21
- tail int
22
- exit chan struct {}
23
- logger Logger
24
- stopOnce sync.Once
25
- stopFlag int32
16
+ taskQueue []core.TaskMessage // taskQueue holds the tasks in the ring buffer.
17
+ runFunc func (context.Context , core.TaskMessage ) error // runFunc is the function responsible for processing tasks.
18
+ capacity int // capacity is the maximum number of tasks the queue can hold.
19
+ count int // count is the current number of tasks in the queue.
20
+ head int // head is the index of the first task in the queue.
21
+ tail int // tail is the index where the next task will be added.
22
+ exit chan struct {} // exit is used to signal when the queue is shutting down.
23
+ logger Logger // logger is used for logging messages.
24
+ stopOnce sync.Once // stopOnce ensures the shutdown process only runs once.
25
+ stopFlag int32 // stopFlag indicates whether the queue is shutting down.
26
26
}
27
27
28
- // Run to execute new task
28
+ // Run executes a new task using the provided context and task message.
29
+ // It calls the runFunc function, which is responsible for processing the task.
30
+ // The context allows for cancellation and timeout control of the task execution.
29
31
func (s * Ring ) Run (ctx context.Context , task core.TaskMessage ) error {
30
32
return s .runFunc (ctx , task )
31
33
}
32
34
33
- // Shutdown the worker
35
+ // Shutdown gracefully shuts down the worker.
36
+ // It sets the stopFlag to indicate that the queue is shutting down and prevents new tasks from being added.
37
+ // If the queue is already shut down, it returns ErrQueueShutdown.
38
+ // It waits for all tasks to be processed before completing the shutdown.
34
39
func (s * Ring ) Shutdown () error {
40
+ // Attempt to set the stopFlag from 0 to 1. If it fails, the queue is already shut down.
35
41
if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
36
42
return ErrQueueShutdown
37
43
}
38
44
45
+ // Ensure the shutdown process only runs once.
39
46
s .stopOnce .Do (func () {
40
47
s .Lock ()
41
48
count := s .count
42
49
s .Unlock ()
50
+ // If there are tasks in the queue, wait for them to be processed.
43
51
if count > 0 {
44
52
<- s .exit
45
53
}
46
54
})
47
55
return nil
48
56
}
49
57
50
- // Queue send task to the buffer channel
58
+ // Queue adds a task to the ring buffer queue.
59
+ // It returns an error if the queue is shut down or has reached its maximum capacity.
51
60
func (s * Ring ) Queue (task core.TaskMessage ) error { //nolint:stylecheck
61
+ // Check if the queue is shut down
52
62
if atomic .LoadInt32 (& s .stopFlag ) == 1 {
53
63
return ErrQueueShutdown
54
64
}
65
+ // Check if the queue has reached its maximum capacity
55
66
if s .capacity > 0 && s .count >= s .capacity {
56
67
return ErrMaxCapacity
57
68
}
58
69
59
70
s .Lock ()
71
+ // Resize the queue if necessary
60
72
if s .count == len (s .taskQueue ) {
61
73
s .resize (s .count * 2 )
62
74
}
75
+ // Add the task to the queue
63
76
s .taskQueue [s .tail ] = task
64
77
s .tail = (s .tail + 1 ) % len (s .taskQueue )
65
78
s .count ++
@@ -68,7 +81,15 @@ func (s *Ring) Queue(task core.TaskMessage) error { //nolint:stylecheck
68
81
return nil
69
82
}
70
83
71
- // Request a new task from channel
84
+ // Request retrieves the next task message from the ring queue.
85
+ // If the queue has been stopped and is empty, it signals the exit channel
86
+ // and returns an error indicating the queue has been closed.
87
+ // If the queue is empty but not stopped, it returns an error indicating
88
+ // there are no tasks in the queue.
89
+ // If a task is successfully retrieved, it is removed from the queue,
90
+ // and the queue may be resized if it is less than half full.
91
+ // Returns the task message and nil on success, or an error if the queue
92
+ // is empty or has been closed.
72
93
func (s * Ring ) Request () (core.TaskMessage , error ) {
73
94
if atomic .LoadInt32 (& s .stopFlag ) == 1 && s .count == 0 {
74
95
select {
@@ -95,6 +116,15 @@ func (s *Ring) Request() (core.TaskMessage, error) {
95
116
return data , nil
96
117
}
97
118
119
+ // resize adjusts the size of the ring buffer to the specified capacity n.
120
+ // It reallocates the underlying slice to the new size and copies the existing
121
+ // elements to the new slice in the correct order. The head and tail pointers
122
+ // are updated accordingly to maintain the correct order of elements in the
123
+ // resized buffer.
124
+ //
125
+ // Parameters:
126
+ //
127
+ // n - the new capacity of the ring buffer.
98
128
func (q * Ring ) resize (n int ) {
99
129
nodes := make ([]core.TaskMessage , n )
100
130
if q .head < q .tail {
@@ -109,7 +139,18 @@ func (q *Ring) resize(n int) {
109
139
q .taskQueue = nodes
110
140
}
111
141
112
- // NewRing for create new Ring instance
142
+ // NewRing creates a new Ring instance with the provided options.
143
+ // It initializes the task queue with a default size of 2, sets the capacity
144
+ // based on the provided options, and configures the logger and run function.
145
+ // The function returns a pointer to the newly created Ring instance.
146
+ //
147
+ // Parameters:
148
+ //
149
+ // opts - A variadic list of Option functions to configure the Ring instance.
150
+ //
151
+ // Returns:
152
+ //
153
+ // *Ring - A pointer to the newly created Ring instance.
113
154
func NewRing (opts ... Option ) * Ring {
114
155
o := NewOptions (opts ... )
115
156
w := & Ring {
0 commit comments