Skip to content

Commit 38c4c6c

Browse files
committed
feature: add weight_picker
1 parent 9bb9a47 commit 38c4c6c

File tree

5 files changed

+339
-0
lines changed

5 files changed

+339
-0
lines changed

balancer/types.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// Copyright 2023 ecodeclub
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package balancer

balancer/wrr/weight_picker.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package wrr
2+
3+
import (
4+
"google.golang.org/grpc/balancer"
5+
"google.golang.org/grpc/balancer/base"
6+
"math"
7+
"sync"
8+
)
9+
10+
var (
11+
_ balancer.Picker = (*Picker)(nil)
12+
_ base.PickerBuilder = (*PickerBuilder)(nil)
13+
)
14+
15+
type GetWeightFunc func(ci base.SubConnInfo) uint32
16+
17+
type Option func(b *PickerBuilder)
18+
19+
func WithGetWeightFunc(fn GetWeightFunc) Option {
20+
return func(b *PickerBuilder) {
21+
b.getWeightFunc = fn
22+
}
23+
}
24+
25+
func defaultGetWeight(ci base.SubConnInfo) uint32 {
26+
md, ok := ci.Address.Metadata.(map[string]any)
27+
if !ok {
28+
return 10
29+
}
30+
weightVal := md["weight"]
31+
weight, _ := weightVal.(uint32)
32+
return weight
33+
}
34+
35+
type PickerBuilder struct {
36+
getWeightFunc GetWeightFunc
37+
}
38+
39+
func NewPickerBuilder(opts ...Option) *PickerBuilder {
40+
res := &PickerBuilder{
41+
getWeightFunc: defaultGetWeight,
42+
}
43+
for _, opt := range opts {
44+
opt(res)
45+
}
46+
return res
47+
}
48+
49+
func (b *PickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
50+
connections := make([]*conn, 0, len(info.ReadySCs))
51+
for con, conInfo := range info.ReadySCs {
52+
weight := b.getWeightFunc(conInfo)
53+
connections = append(connections, &conn{
54+
SubConn: con,
55+
connInfo: conInfo,
56+
weight: weight,
57+
currentWeight: weight,
58+
efficientWeight: weight,
59+
name: conInfo.Address.Addr,
60+
})
61+
}
62+
return &Picker{
63+
connections: connections,
64+
}
65+
}
66+
67+
type Picker struct {
68+
connections []*conn
69+
mutex sync.Mutex
70+
}
71+
72+
func (p *Picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
73+
if len(p.connections) == 0 {
74+
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
75+
}
76+
var totalWeight uint32
77+
var maxConn *conn
78+
for _, con := range p.connections {
79+
con.mutex.Lock()
80+
totalWeight += con.efficientWeight
81+
con.currentWeight += con.efficientWeight
82+
if maxConn == nil || con.currentWeight > maxConn.currentWeight {
83+
maxConn = con
84+
}
85+
con.mutex.Unlock()
86+
}
87+
if maxConn == nil {
88+
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
89+
}
90+
maxConn.mutex.Lock()
91+
maxConn.currentWeight -= totalWeight
92+
maxConn.mutex.Unlock()
93+
return balancer.PickResult{
94+
SubConn: maxConn,
95+
Done: func(info balancer.DoneInfo) {
96+
maxConn.mutex.Lock()
97+
defer maxConn.mutex.Unlock()
98+
if info.Err != nil && maxConn.weight == 0 {
99+
return
100+
}
101+
if info.Err == nil && maxConn.efficientWeight == math.MaxUint32 {
102+
return
103+
}
104+
if info.Err != nil {
105+
maxConn.efficientWeight--
106+
} else {
107+
maxConn.efficientWeight++
108+
}
109+
},
110+
}, nil
111+
}
112+
113+
type conn struct {
114+
balancer.SubConn
115+
mutex sync.Mutex
116+
connInfo base.SubConnInfo
117+
name string
118+
weight uint32
119+
efficientWeight uint32
120+
currentWeight uint32
121+
}

balancer/wrr/weight_picker_test.go

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package wrr
2+
3+
import (
4+
"github.com/stretchr/testify/assert"
5+
"github.com/stretchr/testify/require"
6+
"google.golang.org/grpc/attributes"
7+
"google.golang.org/grpc/balancer"
8+
"google.golang.org/grpc/balancer/base"
9+
"google.golang.org/grpc/resolver"
10+
"testing"
11+
)
12+
13+
func TestWeightBalancer_PickerBuilder(t *testing.T) {
14+
testCases := []struct {
15+
name string
16+
b *PickerBuilder
17+
buildInfoFn func() base.PickerBuildInfo
18+
wantWeights map[string]uint32
19+
}{
20+
{
21+
name: "default",
22+
buildInfoFn: func() base.PickerBuildInfo {
23+
readySCs := make(map[balancer.SubConn]base.SubConnInfo, 3)
24+
k1 := &mockSubConn{id: 1}
25+
k2 := &mockSubConn{id: 2}
26+
k3 := &mockSubConn{id: 3}
27+
readySCs[k1] = base.SubConnInfo{
28+
Address: resolver.Address{
29+
Addr: "weight-1",
30+
Metadata: map[string]any{"weight": uint32(15)},
31+
},
32+
}
33+
readySCs[k2] = base.SubConnInfo{
34+
Address: resolver.Address{
35+
Addr: "weight-2",
36+
Metadata: map[string]any{"weight": uint32(20)},
37+
},
38+
}
39+
readySCs[k3] = base.SubConnInfo{
40+
Address: resolver.Address{
41+
Addr: "weight-3",
42+
Metadata: map[string]any{"weight": uint32(25)},
43+
},
44+
}
45+
return base.PickerBuildInfo{
46+
ReadySCs: readySCs,
47+
}
48+
},
49+
b: NewPickerBuilder(),
50+
wantWeights: map[string]uint32{"weight-1": 15, "weight-2": 20, "weight-3": 25},
51+
},
52+
{
53+
name: "address attributes",
54+
buildInfoFn: func() base.PickerBuildInfo {
55+
readySCs := make(map[balancer.SubConn]base.SubConnInfo, 3)
56+
k1 := &mockSubConn{id: 1}
57+
k2 := &mockSubConn{id: 2}
58+
k3 := &mockSubConn{id: 3}
59+
readySCs[k1] = base.SubConnInfo{
60+
Address: resolver.Address{
61+
Addr: "weight-1",
62+
Attributes: attributes.New("weight", uint32(15)),
63+
},
64+
}
65+
readySCs[k2] = base.SubConnInfo{
66+
Address: resolver.Address{
67+
Addr: "weight-2",
68+
Attributes: attributes.New("weight", uint32(20)),
69+
},
70+
}
71+
readySCs[k3] = base.SubConnInfo{
72+
Address: resolver.Address{
73+
Addr: "weight-3",
74+
Attributes: attributes.New("weight", uint32(25)),
75+
},
76+
}
77+
return base.PickerBuildInfo{
78+
ReadySCs: readySCs,
79+
}
80+
},
81+
b: NewPickerBuilder(WithGetWeightFunc(func(ci base.SubConnInfo) uint32 {
82+
weight := ci.Address.Attributes.Value("weight").(uint32)
83+
return weight
84+
})),
85+
wantWeights: map[string]uint32{"weight-1": 15, "weight-2": 20, "weight-3": 25},
86+
},
87+
}
88+
for _, tc := range testCases {
89+
tt := tc
90+
t.Run(tt.name, func(t *testing.T) {
91+
p := tt.b.Build(tt.buildInfoFn()).(*Picker)
92+
targetWeights := make(map[string]uint32, len(p.connections))
93+
for _, c := range p.connections {
94+
targetWeights[c.name] = c.weight
95+
}
96+
assert.Equal(t, targetWeights, tt.wantWeights)
97+
})
98+
}
99+
}
100+
101+
func TestWeightBalancer_Pick(t *testing.T) {
102+
b := &Picker{
103+
connections: []*conn{
104+
{
105+
name: "weight-5",
106+
weight: 5,
107+
efficientWeight: 5,
108+
currentWeight: 5,
109+
},
110+
{
111+
name: "weight-4",
112+
weight: 4,
113+
efficientWeight: 4,
114+
currentWeight: 4,
115+
},
116+
{
117+
name: "weight-3",
118+
weight: 3,
119+
efficientWeight: 3,
120+
currentWeight: 3,
121+
},
122+
},
123+
}
124+
pickRes, err := b.Pick(balancer.PickInfo{})
125+
require.NoError(t, err)
126+
assert.Equal(t, "weight-5", pickRes.SubConn.(*conn).name)
127+
128+
pickRes, err = b.Pick(balancer.PickInfo{})
129+
require.NoError(t, err)
130+
assert.Equal(t, "weight-4", pickRes.SubConn.(*conn).name)
131+
132+
pickRes, err = b.Pick(balancer.PickInfo{})
133+
require.NoError(t, err)
134+
assert.Equal(t, "weight-3", pickRes.SubConn.(*conn).name)
135+
136+
pickRes, err = b.Pick(balancer.PickInfo{})
137+
require.NoError(t, err)
138+
assert.Equal(t, "weight-5", pickRes.SubConn.(*conn).name)
139+
140+
pickRes, err = b.Pick(balancer.PickInfo{})
141+
require.NoError(t, err)
142+
assert.Equal(t, "weight-4", pickRes.SubConn.(*conn).name)
143+
144+
pickRes.Done(balancer.DoneInfo{})
145+
// 断言这里面 efficient weight 是变化了的
146+
}
147+
148+
var _ balancer.SubConn = (*mockSubConn)(nil)
149+
150+
type mockSubConn struct{ id int }
151+
152+
func (m *mockSubConn) UpdateAddresses(addresses []resolver.Address) {
153+
return
154+
}
155+
156+
func (m *mockSubConn) Connect() {
157+
return
158+
}
159+
160+
func (m *mockSubConn) GetOrBuildProducer(builder balancer.ProducerBuilder) (p balancer.Producer, close func()) {
161+
return
162+
}
163+
164+
func (m *mockSubConn) Shutdown() {
165+
return
166+
}

go.mod

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,16 @@
11
module github.com/ecodeclub/grpcx
22

33
go 1.21
4+
5+
require (
6+
github.com/stretchr/testify v1.9.0
7+
google.golang.org/grpc v1.67.0
8+
)
9+
10+
require (
11+
github.com/davecgh/go-spew v1.1.1 // indirect
12+
github.com/pmezard/go-difflib v1.0.0 // indirect
13+
golang.org/x/sys v0.24.0 // indirect
14+
google.golang.org/protobuf v1.34.2 // indirect
15+
gopkg.in/yaml.v3 v3.0.1 // indirect
16+
)

go.sum

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
2+
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
4+
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
5+
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
6+
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
7+
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
8+
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
9+
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
10+
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
11+
golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
12+
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
13+
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
14+
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
15+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 h1:e7S5W7MGGLaSu8j3YjdezkZ+m1/Nm0uRVRMEMGk26Xs=
16+
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
17+
google.golang.org/grpc v1.67.0 h1:IdH9y6PF5MPSdAntIcpjQ+tXO41pcQsfZV2RxtQgVcw=
18+
google.golang.org/grpc v1.67.0/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA=
19+
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
20+
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
21+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
22+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
23+
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
24+
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 commit comments

Comments
 (0)