Skip to content

Commit 3d9885d

Browse files
committed
now ip chooser & subnet chooser would always returns results
1 parent 71ee70c commit 3d9885d

12 files changed

+327
-262
lines changed

storage/bucket.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"strings"
1515
"time"
1616

17-
"github.com/alex-ant/gomath/rational"
1817
"github.com/qiniu/go-sdk/v7/internal/clientv2"
1918
"github.com/qiniu/go-sdk/v7/storagev2/apis"
2019
"github.com/qiniu/go-sdk/v7/storagev2/apis/batch_ops"
@@ -1066,7 +1065,7 @@ func (m *BucketManager) chooser() chooser.Chooser {
10661065
if m.options.Chooser != nil {
10671066
return m.options.Chooser
10681067
}
1069-
return chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
1068+
return chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
10701069
}
10711070

10721071
// 构建op的方法,导出的方法支持在Batch操作中使用

storage/region_uc_v2.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
"golang.org/x/sync/singleflight"
1414

15-
"github.com/alex-ant/gomath/rational"
1615
"github.com/qiniu/go-sdk/v7/internal/clientv2"
1716
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
1817
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
@@ -262,7 +261,7 @@ func getRegionByV2(ak, bucket string, options UCApiOptions) (*Region, error) {
262261
}
263262
}
264263
if options.Chooser == nil {
265-
options.Chooser = chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
264+
options.Chooser = chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
266265
}
267266

268267
var ret UcQueryRet

storage/region_uc_v4.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212

1313
"golang.org/x/sync/singleflight"
1414

15-
"github.com/alex-ant/gomath/rational"
1615
"github.com/qiniu/go-sdk/v7/internal/clientv2"
1716
"github.com/qiniu/go-sdk/v7/storagev2/chooser"
1817
"github.com/qiniu/go-sdk/v7/storagev2/resolver"
@@ -160,7 +159,7 @@ func getRegionByV4(ak, bucket string, options UCApiOptions) (*RegionGroup, error
160159
}
161160
}
162161
if options.Chooser == nil {
163-
options.Chooser = chooser.NewNeverEmptyHandedChooser(chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil)), rational.New(1, 2))
162+
options.Chooser = chooser.NewShuffleChooser(chooser.NewSmartIPChooser(nil))
164163
}
165164

166165
var ret ucQueryV4Ret

storagev2/chooser/chooser.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package chooser
33
import (
44
"context"
55
"net"
6-
"time"
76
)
87

98
type (
@@ -22,10 +21,6 @@ type (
2221
FeedbackGood(context.Context, *FeedbackOptions)
2322
FeedbackBad(context.Context, *FeedbackOptions)
2423
}
25-
26-
blacklistItem struct {
27-
expiredAt time.Time
28-
}
2924
)
3025

3126
type directChooser struct {
@@ -46,3 +41,11 @@ func (chooser *directChooser) FeedbackGood(_ context.Context, _ *FeedbackOptions
4641
func (chooser *directChooser) FeedbackBad(_ context.Context, _ *FeedbackOptions) {
4742
// do nothing
4843
}
44+
45+
func (options *ChooseOptions) makeSet(makeKey func(string, net.IP) string) map[string]struct{} {
46+
m := make(map[string]struct{}, len(options.IPs))
47+
for _, ip := range options.IPs {
48+
m[makeKey(options.Domain, ip)] = struct{}{}
49+
}
50+
return m
51+
}

storagev2/chooser/ip_chooser.go

+179-69
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,38 @@
11
package chooser
22

33
import (
4+
"bytes"
5+
"container/heap"
46
"context"
57
"fmt"
68
"net"
79
"sync"
810
"time"
911
)
1012

11-
type ipChooser struct {
12-
blacklist map[string]blacklistItem
13-
blacklistMutex sync.Mutex
14-
freezeDuration time.Duration
15-
shrinkInterval time.Duration
16-
shrunkAt time.Time
17-
}
13+
type (
14+
blackItem struct {
15+
index int
16+
domain string
17+
ip net.IP
18+
expiredAt time.Time
19+
}
1820

19-
type IPChooserOptions struct {
20-
FreezeDuration time.Duration
21-
ShrinkInterval time.Duration
22-
}
21+
blackheap struct {
22+
m map[string]*blackItem
23+
items []*blackItem
24+
}
25+
26+
ipChooser struct {
27+
blackheap blackheap
28+
blackheapMutex sync.Mutex
29+
freezeDuration time.Duration
30+
}
31+
32+
IPChooserOptions struct {
33+
FreezeDuration time.Duration
34+
}
35+
)
2336

2437
// NewIPChooser 创建 IP 选择器
2538
func NewIPChooser(options *IPChooserOptions) Chooser {
@@ -29,94 +42,191 @@ func NewIPChooser(options *IPChooserOptions) Chooser {
2942
if options.FreezeDuration == 0 {
3043
options.FreezeDuration = 10 * time.Minute
3144
}
32-
if options.ShrinkInterval == 0 {
33-
options.ShrinkInterval = 10 * time.Minute
34-
}
3545
return &ipChooser{
36-
blacklist: make(map[string]blacklistItem),
46+
blackheap: blackheap{
47+
m: make(map[string]*blackItem, 1024),
48+
items: make([]*blackItem, 0, 1024),
49+
},
3750
freezeDuration: options.FreezeDuration,
38-
shrinkInterval: options.ShrinkInterval,
39-
shrunkAt: time.Now(),
4051
}
4152
}
4253

4354
func (chooser *ipChooser) Choose(_ context.Context, options *ChooseOptions) []net.IP {
44-
return chooser.isInBlacklist(options.Domain, options.IPs)
55+
if len(options.IPs) == 0 {
56+
return nil
57+
}
58+
59+
chooser.blackheapMutex.Lock()
60+
defer chooser.blackheapMutex.Unlock()
61+
62+
chosenIPs := make([]net.IP, 0, chooser.blackheap.Len())
63+
for _, ip := range options.IPs {
64+
if item := chooser.blackheap.FindByDomainAndIp(options.Domain, ip); item == nil {
65+
chosenIPs = append(chosenIPs, ip)
66+
}
67+
}
68+
if len(chosenIPs) > 0 {
69+
return chosenIPs
70+
}
71+
72+
var chosenExpiredAt time.Time
73+
toFind := options.makeSet(makeMapKey)
74+
backups := make([]*blackItem, 0, chooser.blackheap.Len())
75+
for chooser.blackheap.Len() > 0 {
76+
firstChosen := heap.Pop(&chooser.blackheap).(*blackItem)
77+
backups = append(backups, firstChosen)
78+
key := makeMapKey(firstChosen.domain, firstChosen.ip)
79+
if _, ok := toFind[key]; ok {
80+
chosenExpiredAt = firstChosen.expiredAt
81+
chosenIPs = append(chosenIPs, firstChosen.ip)
82+
delete(toFind, key)
83+
break
84+
}
85+
}
86+
if chosenExpiredAt.IsZero() {
87+
panic("chosenExpiredAt should not be empty")
88+
}
89+
for chooser.blackheap.Len() > 0 {
90+
item := heap.Pop(&chooser.blackheap).(*blackItem)
91+
backups = append(backups, item)
92+
if chosenExpiredAt.Equal(item.expiredAt) {
93+
key := makeMapKey(item.domain, item.ip)
94+
if _, ok := toFind[key]; ok {
95+
chosenIPs = append(chosenIPs, item.ip)
96+
delete(toFind, key)
97+
}
98+
} else {
99+
break
100+
}
101+
}
102+
for _, backup := range backups {
103+
if backup.expiredAt.After(time.Now()) {
104+
heap.Push(&chooser.blackheap, backup)
105+
}
106+
}
107+
return chosenIPs
45108
}
46109

47110
func (chooser *ipChooser) FeedbackGood(_ context.Context, options *FeedbackOptions) {
48-
chooser.deleteFromBlacklist(options.Domain, options.IPs)
49-
}
111+
if len(options.IPs) == 0 {
112+
return
113+
}
50114

51-
func (chooser *ipChooser) FeedbackBad(_ context.Context, options *FeedbackOptions) {
52-
chooser.putIntoBlacklist(options.Domain, options.IPs)
115+
chooser.blackheapMutex.Lock()
116+
defer chooser.blackheapMutex.Unlock()
117+
118+
for _, ip := range options.IPs {
119+
if item := chooser.blackheap.FindByDomainAndIp(options.Domain, ip); item != nil {
120+
heap.Remove(&chooser.blackheap, item.index)
121+
}
122+
}
53123
}
54124

55-
func (chooser *ipChooser) isInBlacklist(domain string, ips []net.IP) []net.IP {
56-
chooser.blacklistMutex.Lock()
57-
defer chooser.blacklistMutex.Unlock()
125+
func (chooser *ipChooser) FeedbackBad(_ context.Context, options *FeedbackOptions) {
126+
if len(options.IPs) == 0 {
127+
return
128+
}
58129

59-
filtered := make([]net.IP, 0, len(ips))
130+
chooser.blackheapMutex.Lock()
131+
defer chooser.blackheapMutex.Unlock()
60132

61-
for _, ip := range ips {
62-
blocklistKey := chooser.makeBlocklistKey(domain, ip)
63-
if blacklistItem, ok := chooser.blacklist[blocklistKey]; ok {
64-
if time.Now().After(blacklistItem.expiredAt) {
65-
delete(chooser.blacklist, blocklistKey)
66-
filtered = append(filtered, ip)
67-
}
133+
newExpiredAt := time.Now().Add(chooser.freezeDuration)
134+
for _, ip := range options.IPs {
135+
if item := chooser.blackheap.FindByDomainAndIp(options.Domain, ip); item != nil {
136+
chooser.blackheap.items[item.index].expiredAt = newExpiredAt
137+
heap.Fix(&chooser.blackheap, item.index)
68138
} else {
69-
filtered = append(filtered, ip)
139+
heap.Push(&chooser.blackheap, &blackItem{
140+
domain: options.Domain,
141+
ip: ip,
142+
expiredAt: newExpiredAt,
143+
})
70144
}
71145
}
146+
}
72147

73-
go chooser.shrink()
74-
75-
return filtered
148+
func (h *blackheap) Len() int {
149+
return len(h.items)
76150
}
77151

78-
func (chooser *ipChooser) putIntoBlacklist(domain string, ips []net.IP) {
79-
chooser.blacklistMutex.Lock()
80-
defer chooser.blacklistMutex.Unlock()
152+
func (h *blackheap) Less(i, j int) bool {
153+
return h.items[i].expiredAt.Before(h.items[j].expiredAt)
154+
}
81155

82-
for _, ip := range ips {
83-
blocklistKey := chooser.makeBlocklistKey(domain, ip)
84-
chooser.blacklist[blocklistKey] = blacklistItem{expiredAt: time.Now().Add(chooser.freezeDuration)}
156+
func (h *blackheap) Swap(i, j int) {
157+
if i == j {
158+
return
85159
}
160+
h.items[i].domain, h.items[j].domain = h.items[j].domain, h.items[i].domain
161+
h.items[i].ip, h.items[j].ip = h.items[j].ip, h.items[i].ip
162+
h.items[i].expiredAt, h.items[j].expiredAt = h.items[j].expiredAt, h.items[i].expiredAt
163+
h.m[makeMapKey(h.items[i].domain, h.items[i].ip)] = h.items[i]
164+
h.m[makeMapKey(h.items[j].domain, h.items[j].ip)] = h.items[j]
165+
}
166+
167+
func (h *blackheap) Push(x interface{}) {
168+
item := x.(*blackItem)
169+
item.index = len(h.items)
170+
h.items = append(h.items, item)
171+
h.m[makeMapKey(item.domain, item.ip)] = item
172+
}
86173

87-
go chooser.shrink()
174+
func (h *blackheap) Pop() interface{} {
175+
n := len(h.items)
176+
last := h.items[n-1]
177+
h.items = h.items[0 : n-1]
178+
delete(h.m, makeMapKey(last.domain, last.ip))
179+
return last
88180
}
89181

90-
func (chooser *ipChooser) deleteFromBlacklist(domain string, ips []net.IP) {
91-
chooser.blacklistMutex.Lock()
92-
defer chooser.blacklistMutex.Unlock()
182+
func (h *blackheap) FindByDomainAndIp(domain string, ip net.IP) *blackItem {
183+
key := makeMapKey(domain, ip)
184+
if item, ok := h.m[key]; ok {
185+
if item.expiredAt.Before(time.Now()) {
186+
heap.Remove(h, item.index)
187+
return nil
188+
}
189+
return item
190+
}
191+
return nil
192+
}
93193

94-
for _, ip := range ips {
95-
blocklistKey := chooser.makeBlocklistKey(domain, ip)
96-
delete(chooser.blacklist, blocklistKey)
194+
func (h *blackheap) String() string {
195+
var buf bytes.Buffer
196+
197+
buf.WriteString("&blackheap{ items: [")
198+
for _, item := range h.items {
199+
buf.WriteString(item.String())
200+
buf.WriteString(", ")
201+
}
202+
buf.WriteString("], m: {")
203+
for k, v := range h.m {
204+
buf.WriteString(k)
205+
buf.WriteString(": ")
206+
buf.WriteString(v.String())
207+
buf.WriteString(", ")
97208
}
209+
buf.WriteString("}")
98210

99-
go chooser.shrink()
211+
return buf.String()
100212
}
101213

102-
func (chooser *ipChooser) makeBlocklistKey(domain string, ip net.IP) string {
103-
return fmt.Sprintf("%s_%s", ip, domain)
104-
}
214+
func (item *blackItem) String() string {
215+
var buf bytes.Buffer
105216

106-
func (chooser *ipChooser) shrink() {
107-
chooser.blacklistMutex.Lock()
108-
defer chooser.blacklistMutex.Unlock()
217+
buf.WriteString("&blackItem{ index: ")
218+
buf.WriteString(fmt.Sprintf("%d", item.index))
219+
buf.WriteString(", domain: ")
220+
buf.WriteString(item.domain)
221+
buf.WriteString(", ip: ")
222+
buf.WriteString(item.ip.String())
223+
buf.WriteString(", expiredAt: ")
224+
buf.WriteString(item.expiredAt.String())
225+
buf.WriteString("}")
109226

110-
if time.Now().After(chooser.shrunkAt.Add(chooser.shrinkInterval)) {
111-
shrinkKeys := make([]string, 0, len(chooser.blacklist))
112-
for key, blacklistItem := range chooser.blacklist {
113-
if time.Now().After(blacklistItem.expiredAt) {
114-
shrinkKeys = append(shrinkKeys, key)
115-
}
116-
}
117-
for _, key := range shrinkKeys {
118-
delete(chooser.blacklist, key)
119-
}
120-
chooser.shrunkAt = time.Now()
121-
}
227+
return buf.String()
228+
}
229+
230+
func makeMapKey(domain string, ip net.IP) string {
231+
return ip.String() + "|" + domain
122232
}

0 commit comments

Comments
 (0)