forked from slackhq/nebula
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimeout.go
225 lines (184 loc) · 5.6 KB
/
timeout.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package nebula
import (
"sync"
"time"
)
// How many timer objects should be cached
const timerCacheMax = 50000
type TimerWheel[T any] struct {
// Current tick
current int
// Cheat on finding the length of the wheel
wheelLen int
// Last time we ticked, since we are lazy ticking
lastTick *time.Time
// Durations of a tick and the entire wheel
tickDuration time.Duration
wheelDuration time.Duration
// The actual wheel which is just a set of singly linked lists, head/tail pointers
wheel []*TimeoutList[T]
// Singly linked list of items that have timed out of the wheel
expired *TimeoutList[T]
// Item cache to avoid garbage collect
itemCache *TimeoutItem[T]
itemsCached int
}
type LockingTimerWheel[T any] struct {
m sync.Mutex
t *TimerWheel[T]
}
// TimeoutList Represents a tick in the wheel
type TimeoutList[T any] struct {
Head *TimeoutItem[T]
Tail *TimeoutItem[T]
}
// TimeoutItem Represents an item within a tick
type TimeoutItem[T any] struct {
Item T
Next *TimeoutItem[T]
}
// NewTimerWheel Builds a timer wheel and identifies the tick duration and wheel duration from the provided values
// Purge must be called once per entry to actually remove anything
// The TimerWheel does not handle concurrency on its own.
// Locks around access to it must be used if multiple routines are manipulating it.
func NewTimerWheel[T any](min, max time.Duration) *TimerWheel[T] {
//TODO provide an error
//if min >= max {
// return nil
//}
// Round down and add 2 so we can have the smallest # of ticks in the wheel and still account for a full
// max duration, even if our current tick is at the maximum position and the next item to be added is at maximum
// timeout
wLen := int((max / min) + 2)
tw := TimerWheel[T]{
wheelLen: wLen,
wheel: make([]*TimeoutList[T], wLen),
tickDuration: min,
wheelDuration: max,
expired: &TimeoutList[T]{},
}
for i := range tw.wheel {
tw.wheel[i] = &TimeoutList[T]{}
}
return &tw
}
// NewLockingTimerWheel is version of TimerWheel that is safe for concurrent use with a small performance penalty
func NewLockingTimerWheel[T any](min, max time.Duration) *LockingTimerWheel[T] {
return &LockingTimerWheel[T]{
t: NewTimerWheel[T](min, max),
}
}
// Add will add an item to the wheel in its proper timeout.
// Caller should Advance the wheel prior to ensure the proper slot is used.
func (tw *TimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
i := tw.findWheel(timeout)
// Try to fetch off the cache
ti := tw.itemCache
if ti != nil {
tw.itemCache = ti.Next
tw.itemsCached--
ti.Next = nil
} else {
ti = &TimeoutItem[T]{}
}
// Relink and return
ti.Item = v
if tw.wheel[i].Tail == nil {
tw.wheel[i].Head = ti
tw.wheel[i].Tail = ti
} else {
tw.wheel[i].Tail.Next = ti
tw.wheel[i].Tail = ti
}
return ti
}
// Purge removes and returns the first available expired item from the wheel and the 2nd argument is true.
// If no item is available then an empty T is returned and the 2nd argument is false.
func (tw *TimerWheel[T]) Purge() (T, bool) {
if tw.expired.Head == nil {
var na T
return na, false
}
ti := tw.expired.Head
tw.expired.Head = ti.Next
if tw.expired.Head == nil {
tw.expired.Tail = nil
}
// Clear out the items references
ti.Next = nil
// Maybe cache it for later
if tw.itemsCached < timerCacheMax {
ti.Next = tw.itemCache
tw.itemCache = ti
tw.itemsCached++
}
return ti.Item, true
}
// findWheel find the next position in the wheel for the provided timeout given the current tick
func (tw *TimerWheel[T]) findWheel(timeout time.Duration) (i int) {
if timeout < tw.tickDuration {
// Can't track anything below the set resolution
timeout = tw.tickDuration
} else if timeout > tw.wheelDuration {
// We aren't handling timeouts greater than the wheels duration
timeout = tw.wheelDuration
}
// Find the next highest, rounding up
tick := int(((timeout - 1) / tw.tickDuration) + 1)
// Add another tick since the current tick may almost be over then map it to the wheel from our
// current position
tick += tw.current + 1
if tick >= tw.wheelLen {
tick -= tw.wheelLen
}
return tick
}
// Advance will move the wheel forward by the appropriate number of ticks for the provided time and all items
// passed over will be moved to the expired list. Calling Purge is necessary to remove them entirely.
func (tw *TimerWheel[T]) Advance(now time.Time) {
if tw.lastTick == nil {
tw.lastTick = &now
}
// We want to round down
ticks := int(now.Sub(*tw.lastTick) / tw.tickDuration)
adv := ticks
if ticks > tw.wheelLen {
ticks = tw.wheelLen
}
for i := 0; i < ticks; i++ {
tw.current++
if tw.current >= tw.wheelLen {
tw.current = 0
}
if tw.wheel[tw.current].Head != nil {
// We need to append the expired items as to not starve evicting the oldest ones
if tw.expired.Tail == nil {
tw.expired.Head = tw.wheel[tw.current].Head
tw.expired.Tail = tw.wheel[tw.current].Tail
} else {
tw.expired.Tail.Next = tw.wheel[tw.current].Head
tw.expired.Tail = tw.wheel[tw.current].Tail
}
tw.wheel[tw.current].Head = nil
tw.wheel[tw.current].Tail = nil
}
}
// Advance the tick based on duration to avoid losing some accuracy
newTick := tw.lastTick.Add(tw.tickDuration * time.Duration(adv))
tw.lastTick = &newTick
}
func (lw *LockingTimerWheel[T]) Add(v T, timeout time.Duration) *TimeoutItem[T] {
lw.m.Lock()
defer lw.m.Unlock()
return lw.t.Add(v, timeout)
}
func (lw *LockingTimerWheel[T]) Purge() (T, bool) {
lw.m.Lock()
defer lw.m.Unlock()
return lw.t.Purge()
}
func (lw *LockingTimerWheel[T]) Advance(now time.Time) {
lw.m.Lock()
defer lw.m.Unlock()
lw.t.Advance(now)
}