-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsafequeue.go
143 lines (122 loc) · 3.15 KB
/
safequeue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package safequeue
import (
"sync"
)
// SafeQueue implements thread safe FIFO queue
type SafeQueue struct {
sync.Mutex
shards [][]interface{}
shardSize int
tailIdx int
tail []interface{}
tailPos int
headIdx int
head []interface{}
headPos int
length uint64
}
// NewSafeQueue returns new instance of SafeQueue
func NewSafeQueue(shardSize int) *SafeQueue {
queue := &SafeQueue{
shardSize: shardSize,
shards: [][]interface{}{make([]interface{}, shardSize)},
}
queue.tailIdx = 0
queue.tail = queue.shards[queue.tailIdx]
queue.headIdx = 0
queue.head = queue.shards[queue.headIdx]
return queue
}
// Push append item into queue tail
func (queue *SafeQueue) Push(item interface{}) {
queue.Lock()
defer queue.Unlock()
queue.tail[queue.tailPos] = item
queue.tailPos++
queue.length++
if queue.tailPos == queue.shardSize {
queue.tailPos = 0
queue.tailIdx = len(queue.shards)
buffer := make([][]interface{}, len(queue.shards)+1)
buffer[queue.tailIdx] = make([]interface{}, queue.shardSize)
copy(buffer, queue.shards)
queue.shards = buffer
queue.tail = queue.shards[queue.tailIdx]
}
}
// PushHead append item into queue head
func (queue *SafeQueue) PushHead(item interface{}) {
queue.Lock()
defer queue.Unlock()
if queue.headPos == 0 {
buffer := make([][]interface{}, len(queue.shards)+1)
copy(buffer[1:], queue.shards)
buffer[queue.headIdx] = make([]interface{}, queue.shardSize)
queue.shards = buffer
queue.tailIdx++
queue.headPos = queue.shardSize
queue.tail = queue.shards[queue.tailIdx]
queue.head = queue.shards[queue.headIdx]
}
queue.length++
queue.headPos--
queue.head[queue.headPos] = item
}
// Pop returns item from queue head
func (queue *SafeQueue) Pop() (item interface{}) {
queue.Lock()
item = queue.DirtyPop()
queue.Unlock()
return
}
// DirtyPop returns item from queue head
// DirtyPop is not thread-safe
func (queue *SafeQueue) DirtyPop() (item interface{}) {
item, queue.head[queue.headPos] = queue.head[queue.headPos], nil
if item == nil {
return item
}
queue.headPos++
queue.length--
if queue.headPos == queue.shardSize {
buffer := make([][]interface{}, len(queue.shards)-1)
copy(buffer, queue.shards[queue.headIdx+1:])
queue.shards = buffer
queue.headPos = 0
queue.tailIdx--
queue.head = queue.shards[queue.headIdx]
}
return
}
// Length returns queue length
func (queue *SafeQueue) Length() uint64 {
queue.Lock()
defer queue.Unlock()
return queue.length
}
// DirtyLength returns queue length
// DirtyLength is not thread-safe
func (queue *SafeQueue) DirtyLength() uint64 {
return queue.length
}
// HeadItem returns queue head item
// HeadItem is not thread-safe
func (queue *SafeQueue) HeadItem() (res interface{}) {
return queue.head[queue.headPos]
}
// DirtyPurge clean queue
// DirtyPurge is not thread-safe
func (queue *SafeQueue) DirtyPurge() {
queue.shards = [][]interface{}{make([]interface{}, queue.shardSize)}
queue.tailIdx = 0
queue.tail = queue.shards[queue.tailIdx]
queue.headIdx = 0
queue.head = queue.shards[queue.headIdx]
queue.length = 0
}
// Purge clean queue
func (queue *SafeQueue) Purge() {
queue.Lock()
defer queue.Unlock()
queue.DirtyPurge()
}