-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler_balancer_leader.go
107 lines (86 loc) · 2.89 KB
/
scheduler_balancer_leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package prophet
import (
"math"
)
type balanceResourceLeaderScheduler struct {
cfg *Cfg
limit uint64
selector Selector
}
func newBalanceResourceLeaderScheduler(cfg *Cfg) Scheduler {
var filters []Filter
filters = append(filters, NewBlockFilter())
filters = append(filters, NewStateFilter(cfg))
filters = append(filters, NewHealthFilter(cfg))
return &balanceResourceLeaderScheduler{
cfg: cfg,
limit: 1,
selector: newBalanceSelector(LeaderKind, filters),
}
}
func (l *balanceResourceLeaderScheduler) Name() string {
return "scheduler-rebalance-leader"
}
func (l *balanceResourceLeaderScheduler) ResourceKind() ResourceKind {
return LeaderKind
}
func (l *balanceResourceLeaderScheduler) ResourceLimit() uint64 {
return minUint64(l.limit, l.cfg.MaxRebalanceLeader)
}
func (l *balanceResourceLeaderScheduler) Prepare(rt *Runtime) error { return nil }
func (l *balanceResourceLeaderScheduler) Cleanup(rt *Runtime) {}
func (l *balanceResourceLeaderScheduler) Schedule(rt *Runtime) Operator {
res, newLeader := scheduleTransferLeader(rt, l.selector)
if res == nil {
return nil
}
source := rt.Container(res.leaderPeer.ContainerID)
target := rt.Container(newLeader.ContainerID)
if !shouldBalance(source, target, l.ResourceKind()) {
return nil
}
l.limit = adjustBalanceLimit(rt, l.ResourceKind())
return newTransferLeaderAggregationOp(l.cfg, res, newLeader)
}
// scheduleTransferLeader schedules a resource to transfer leader to the peer.
func scheduleTransferLeader(rt *Runtime, s Selector, filters ...Filter) (*ResourceRuntime, *Peer) {
containers := rt.Containers()
if len(containers) == 0 {
return nil, nil
}
var averageLeader float64
for _, container := range containers {
averageLeader += container.LeaderScore() / float64(len(containers))
}
mostLeaderContainer := s.SelectSource(containers, filters...)
leastLeaderContainer := s.SelectTarget(containers, filters...)
var mostLeaderDistance, leastLeaderDistance float64
if mostLeaderContainer != nil {
mostLeaderDistance = math.Abs(mostLeaderContainer.LeaderScore() - averageLeader)
}
if leastLeaderContainer != nil {
leastLeaderDistance = math.Abs(leastLeaderContainer.LeaderScore() - averageLeader)
}
if mostLeaderDistance == 0 && leastLeaderDistance == 0 {
return nil, nil
}
if mostLeaderDistance > leastLeaderDistance {
// Transfer a leader out of mostLeaderContainer.
res := rt.RandLeaderResource(mostLeaderContainer.meta.ID(), LeaderKind)
if res == nil {
return nil, nil
}
targetContainers := rt.ResourceFollowerContainers(res)
target := s.SelectTarget(targetContainers)
if target == nil {
return nil, nil
}
return res, res.GetContainerPeer(target.meta.ID())
}
// Transfer a leader into leastLeaderContainer.
res := rt.RandFollowerResource(leastLeaderContainer.meta.ID(), LeaderKind)
if res == nil {
return nil, nil
}
return res, res.GetContainerPeer(leastLeaderContainer.meta.ID())
}