-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathscheduler_balancer_replica.go
113 lines (91 loc) · 3.11 KB
/
scheduler_balancer_replica.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package prophet
type balanceReplicaScheduler struct {
limit uint64
freezeCache *resourceFreezeCache
selector Selector
cfg *Cfg
}
func newBalanceReplicaScheduler(cfg *Cfg) Scheduler {
freezeCache := newResourceFreezeCache(cfg.MaxFreezeScheduleInterval, 4*cfg.MaxFreezeScheduleInterval)
var filters []Filter
filters = append(filters, NewCacheFilter(freezeCache))
filters = append(filters, NewStateFilter(cfg))
filters = append(filters, NewHealthFilter(cfg))
filters = append(filters, NewStorageThresholdFilter(cfg))
filters = append(filters, NewSnapshotCountFilter(cfg))
freezeCache.startGC()
return &balanceReplicaScheduler{
cfg: cfg,
freezeCache: freezeCache,
limit: 1,
selector: newBalanceSelector(ReplicaKind, filters),
}
}
func (s *balanceReplicaScheduler) Name() string {
return "balance-replica-scheduler"
}
func (s *balanceReplicaScheduler) ResourceKind() ResourceKind {
return ReplicaKind
}
func (s *balanceReplicaScheduler) ResourceLimit() uint64 {
return minUint64(s.limit, s.cfg.MaxRebalanceReplica)
}
func (s *balanceReplicaScheduler) Prepare(rt *Runtime) error { return nil }
func (s *balanceReplicaScheduler) Cleanup(rt *Runtime) {}
func (s *balanceReplicaScheduler) Schedule(rt *Runtime) Operator {
// Select a peer from the container with most resources.
res, oldPeer := scheduleRemovePeer(rt, s.selector)
if res == nil {
return nil
}
// We don't schedule resource with abnormal number of replicas.
if len(res.meta.Peers()) != int(s.cfg.CountResourceReplicas) {
return nil
}
op := s.transferPeer(rt, res, oldPeer)
if op == nil {
// We can't transfer peer from this container now, so we add it to the cache
// and skip it for a while.
s.freezeCache.set(oldPeer.ContainerID, nil)
}
return op
}
func (s *balanceReplicaScheduler) transferPeer(rt *Runtime, res *ResourceRuntime, oldPeer *Peer) Operator {
// scoreGuard guarantees that the distinct score will not decrease.
containers := rt.ResourceContainers(res)
source := rt.Container(oldPeer.ContainerID)
scoreGuard := NewDistinctScoreFilter(s.cfg, containers, source)
checker := newReplicaChecker(s.cfg, rt)
newPeer, _ := checker.selectBestPeer(res, false, scoreGuard)
if newPeer == nil {
return nil
}
target := rt.Container(newPeer.ContainerID)
if !shouldBalance(source, target, s.ResourceKind()) {
return nil
}
id, err := checker.rt.p.store.AllocID()
if err != nil {
log.Errorf("allocate peer failure, %+v", err)
return nil
}
newPeer.ID = id
s.limit = adjustBalanceLimit(rt, s.ResourceKind())
return newTransferPeerAggregationOp(s.cfg, res, oldPeer, newPeer)
}
// scheduleRemovePeer schedules a resource to remove the peer.
func scheduleRemovePeer(rt *Runtime, s Selector, filters ...Filter) (*ResourceRuntime, *Peer) {
containers := rt.Containers()
source := s.SelectSource(containers, filters...)
if source == nil {
return nil, nil
}
target := rt.RandFollowerResource(source.meta.ID(), ReplicaKind)
if target == nil {
target = rt.RandLeaderResource(source.meta.ID(), ReplicaKind)
}
if target == nil {
return nil, nil
}
return target, target.GetContainerPeer(source.meta.ID())
}