-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathqueueInt64autogrow.go
197 lines (178 loc) · 5.53 KB
/
queueInt64autogrow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright 2020 rateLimit Author(https://github.com/yudeguang/ratelimit). All Rights Reserved.
//
// q Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with q file,
// You can obtain one at https://github.com/yudeguang/ratelimit.
package ratelimit
import (
"errors"
"sync"
"time"
)
/*
用环形队列做为底层数据结构来存储用户访问数据,并能实现自动增长以及收缩
*/
//使用切片实现的队列
type autoGrowCircleQueueInt64 struct {
key interface{}
//注意,maxSize比实际存储长度大1
maxSize int
//maxSizeTemp与visitorRecord长度相同,visitorRecord长度设计根据实际情况成自动增长
maxSizeTemp int
visitorRecord []int64
head int //头
tail int //尾
//存盘时临时用到的虚拟队列的头和尾
headForCopy int
tailForCopy int
locker *sync.Mutex
}
//初始化环形队列,长度超过1023的队列暂时只分配1023的空间
func newAutoGrowCircleQueueInt64(size int) *autoGrowCircleQueueInt64 {
var c autoGrowCircleQueueInt64
c.maxSize = size + 1
if c.maxSize > 1024 {
c.maxSizeTemp = 1024
} else {
c.maxSizeTemp = c.maxSize
}
c.visitorRecord = make([]int64, c.maxSizeTemp)
c.locker = new(sync.Mutex)
return &c
}
//队列无人使用时,对于队列实际使用空间长度大于1023的需要对此队列做收缩操作以节省空间
func (q *autoGrowCircleQueueInt64) reSet() {
q.locker.Lock()
defer q.locker.Unlock()
if q.maxSize > 1024 && q.maxSizeTemp > 1024 {
newVisitorRecord := make([]int64, 1024)
q.visitorRecord = newVisitorRecord
q.maxSizeTemp = 1024
q.head = 0
q.tail = 0
}
}
//队列是否需要扩容
func (q *autoGrowCircleQueueInt64) needGrow() bool {
if q.maxSizeTemp == q.maxSize {
return false
}
if q.tempQueueIsFull() {
return true
}
return false
}
//对队列进行扩容操作
func (q *autoGrowCircleQueueInt64) grow() {
newVisitorRecordLen := len(q.visitorRecord) * 2
if newVisitorRecordLen > q.maxSize {
newVisitorRecordLen = q.maxSize
}
newVisitorRecord := make([]int64, newVisitorRecordLen)
//复制数据
oldQueueLen := q.tempQueueLen()
for i := 0; i < oldQueueLen; i++ {
newVisitorRecord[i] = q.visitorRecord[q.head]
q.head = (q.head + 1) % q.maxSizeTemp
}
//新旧数据替换
q.visitorRecord = newVisitorRecord
q.maxSizeTemp = newVisitorRecordLen
q.head = 0
q.tail = oldQueueLen
}
//访问时间入对列,只用于从本地备份文件加载历史访问数据,本身是线性访问,无并发安全问题
func (q *autoGrowCircleQueueInt64) push(val int64) (err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.needGrow() {
q.grow()
}
if q.tempQueueIsFull() {
return errors.New("queue is full")
}
q.visitorRecord[q.tail] = val
q.tail = (q.tail + 1) % q.maxSizeTemp
return
}
//访问时间入对列,并发安全,由于不同协程在高并发的时候,极端情况下,也即前后两次访问的时间差,与两协程的系统切换时间非常接近的情况下
//由调用者自己生成时间容易出现紊乱的情况,所以访问时间只能到这个地方来统一生成,也即有极小的概率,先访问的时间比后访问的时间大
func (q *autoGrowCircleQueueInt64) pushWithConcurrencysafety(defaultExpiration time.Duration) (err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.needGrow() {
q.grow()
}
if q.tempQueueIsFull() {
return errors.New("queue is full")
}
q.visitorRecord[q.tail] = time.Now().Add(defaultExpiration).UnixNano()
q.tail = (q.tail + 1) % q.maxSizeTemp
return
}
//出对列
func (q *autoGrowCircleQueueInt64) pop() (val int64, err error) {
q.locker.Lock()
defer q.locker.Unlock()
if q.tempQueueIsEmpty() {
return 0, errors.New("queue is empty")
}
val = q.visitorRecord[q.head]
q.head = (q.head + 1) % q.maxSizeTemp
return
}
//用于备份数据的时候,虚拟队列的出队列操作,但实际未进行出队列操作
func (q *autoGrowCircleQueueInt64) tempQueuePopForCopy() (val int64, err error) {
if q.tempQueueIsEmptyForCopy() {
return 0, errors.New("queue is empty")
}
val = q.visitorRecord[q.headForCopy]
q.headForCopy = (q.headForCopy + 1) % q.maxSizeTemp
return
}
//用于备份数据的时候,判断虚拟队列是否已满
func (q *autoGrowCircleQueueInt64) tempQueueIsFull() bool {
return (q.tail+1)%q.maxSizeTemp == q.head
}
//判断队列是否为空
func (q *autoGrowCircleQueueInt64) tempQueueIsEmpty() bool {
return q.tail == q.head
}
//用于备份数据的时候,判断虚拟队列是否为空
func (q *autoGrowCircleQueueInt64) tempQueueIsEmptyForCopy() bool {
return q.tailForCopy == q.headForCopy
}
//判断队列已使用多少个元素
func (q *autoGrowCircleQueueInt64) usedSize() int {
return (q.tail + q.maxSizeTemp - q.head) % q.maxSizeTemp
}
//判断队列中还有多少空间未使用
func (q *autoGrowCircleQueueInt64) tempQueueUnUsedSize() int {
return q.maxSizeTemp - 1 - q.usedSize()
}
//判断队列中还有多少空间未使用
func (q *autoGrowCircleQueueInt64) unUsedSize() int {
return q.maxSize - 1 - ((q.tail + q.maxSizeTemp - q.head) % q.maxSizeTemp)
}
//队列总的可用空间长度
func (q *autoGrowCircleQueueInt64) tempQueueLen() int {
return q.maxSizeTemp - 1
}
//删除过期数据
func (q *autoGrowCircleQueueInt64) deleteExpired(key interface{}) {
q.locker.Lock()
defer q.locker.Unlock()
now := time.Now().UnixNano()
size := q.usedSize()
if size == 0 {
return
}
//依次删除过期数据
for i := 0; i < size; i++ {
if now > q.visitorRecord[q.head] {
q.head = (q.head + 1) % q.maxSizeTemp
} else {
return
}
}
}