-
Notifications
You must be signed in to change notification settings - Fork 180
/
Copy pathrate_limiter_test.go
185 lines (143 loc) · 5.3 KB
/
rate_limiter_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package ingest_test
import (
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/time/rate"
"github.com/onflow/flow-go/access"
"github.com/onflow/flow-go/engine/collection/ingest"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/utils/unittest"
)
var _ access.RateLimiter = (*ingest.AddressRateLimiter)(nil)
func TestLimiterAddRemoveAddress(t *testing.T) {
t.Parallel()
good1 := unittest.RandomAddressFixture()
limited1 := unittest.RandomAddressFixture()
limited2 := unittest.RandomAddressFixture()
numPerSec := rate.Limit(1)
burst := 1
l := ingest.NewAddressRateLimiter(numPerSec, burst)
require.False(t, l.IsRateLimited(good1))
require.False(t, l.IsRateLimited(good1)) // address are not limited
l.AddAddress(limited1)
require.Equal(t, []flow.Address{limited1}, l.GetAddresses())
require.False(t, l.IsRateLimited(limited1)) // address 1 is not limited on the first call
require.True(t, l.IsRateLimited(limited1)) // limited on the second call immediately
require.True(t, l.IsRateLimited(limited1)) // limited on the second call immediately
require.False(t, l.IsRateLimited(good1))
require.False(t, l.IsRateLimited(good1)) // address are not limited
l.AddAddress(limited2)
list := l.GetAddresses()
require.Len(t, list, 2)
require.Contains(t, list, limited1, limited2)
require.False(t, l.IsRateLimited(limited2)) // address 2 is not limited on the first call
require.True(t, l.IsRateLimited(limited2)) // limited on the second call immediately
require.True(t, l.IsRateLimited(limited2)) // limited on the second call immediately
l.RemoveAddress(limited1) // after remove the limit, it no longer limited
require.False(t, l.IsRateLimited(limited1))
require.False(t, l.IsRateLimited(limited1))
// but limit2 is still limited
require.True(t, l.IsRateLimited(limited2))
}
func TestLimiterBurst(t *testing.T) {
t.Parallel()
limited1 := unittest.RandomAddressFixture()
numPerSec := rate.Limit(1)
burst := 3
l := ingest.NewAddressRateLimiter(numPerSec, burst)
l.AddAddress(limited1)
for i := 0; i < burst; i++ {
require.False(t, l.IsRateLimited(limited1), fmt.Sprintf("%v-nth call", i))
}
require.True(t, l.IsRateLimited(limited1)) // limited
require.True(t, l.IsRateLimited(limited1)) // limited
}
// verify that if wait long enough after rate limited
func TestLimiterWaitLongEnough(t *testing.T) {
t.Parallel()
addr1 := unittest.RandomAddressFixture()
// with limit set to 10, it means we allow 10 messages per second,
// and with burst set to 1, it means we only allow 1 message at a time,
// so the limit is 1 message per 100 milliseconds.
// Note rate.Limit(0.1) is not to set 1 message per 100 milliseconds, but
// 1 message per 10 seconds.
numPerSec := rate.Limit(10)
burst := 1
l := ingest.NewAddressRateLimiter(numPerSec, burst)
l.AddAddress(addr1)
require.False(t, l.IsRateLimited(addr1))
require.True(t, l.IsRateLimited(addr1))
// check every 10 Millisecond then after 100 Millisecond it should be allowed
require.Eventually(t, func() bool {
return l.Allow(addr1)
}, 110*time.Millisecond, 10*time.Millisecond)
// block again until another 100 ms
require.True(t, l.IsRateLimited(addr1))
// block until another 100 ms
require.Eventually(t, func() bool {
return l.Allow(addr1)
}, 110*time.Millisecond, 10*time.Millisecond)
}
func TestLimiterConcurrentSafe(t *testing.T) {
t.Parallel()
good1 := unittest.RandomAddressFixture()
limited1 := unittest.RandomAddressFixture()
numPerSec := rate.Limit(1)
burst := 1
l := ingest.NewAddressRateLimiter(numPerSec, burst)
l.AddAddress(limited1)
wg := sync.WaitGroup{}
wg.Add(2)
succeed := atomic.NewUint64(0)
go func(wg *sync.WaitGroup) {
defer wg.Done()
ok := l.IsRateLimited(limited1)
if ok {
succeed.Add(1)
}
require.False(t, l.IsRateLimited(good1)) // never limited
}(&wg)
go func(wg *sync.WaitGroup) {
defer wg.Done()
ok := l.IsRateLimited(limited1)
if ok {
succeed.Add(1)
}
require.False(t, l.IsRateLimited(good1)) // never limited
}(&wg)
wg.Wait()
require.Equal(t, uint64(1), succeed.Load()) // should only succeed once
}
func TestLimiterGetSetConfig(t *testing.T) {
t.Parallel()
addr1 := unittest.RandomAddressFixture()
// with limit set to 10, it means we allow 10 messages per second,
// and with burst set to 1, it means we only allow 1 message at a time,
// so the limit is 1 message per 100 milliseconds.
// Note rate.Limit(0.1) is not to set 1 message per 100 milliseconds, but
// 1 message per 10 seconds.
numPerSec := rate.Limit(10)
burst := 1
l := ingest.NewAddressRateLimiter(numPerSec, burst)
l.AddAddress(addr1)
require.False(t, l.IsRateLimited(addr1))
require.True(t, l.IsRateLimited(addr1))
limitConfig, burstConfig := l.GetLimitConfig()
require.Equal(t, numPerSec, limitConfig)
require.Equal(t, burst, burstConfig)
// change from 1 message per 100 ms to 4 messages per 200 ms
l.SetLimitConfig(rate.Limit(20), 4)
// verify the quota is reset, and the new limit is applied
for i := 0; i < 4; i++ {
require.False(t, l.IsRateLimited(addr1), fmt.Sprintf("fail at %v-th call", i))
}
require.True(t, l.IsRateLimited(addr1))
// check every 10 Millisecond then after 100 Millisecond it should be allowed
require.Eventually(t, func() bool {
return l.Allow(addr1)
}, 210*time.Millisecond, 10*time.Millisecond)
}