-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathbalancer_res.go
109 lines (95 loc) · 3.61 KB
/
balancer_res.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package metafora
import (
"fmt"
"time"
)
// ResourceReporter is required by the ResourceBalancer to read the resource
// being used for balancing.
type ResourceReporter interface {
// Used returns the amount of a resource used and the total amount of that
// resource.
Used() (used uint64, total uint64)
// String returns the unit resources are reported in.
String() string
}
// ResourceBalancer is a balancer implemntation which uses two thresholds to
// limit claiming and rebalance work based upon a resource reported by a
// ResourceReporter. When the claim threshold is exceeded, no new work will be
// claimed. When the release threshold is exceeded work will be released until
// below that threshold. The claim threshold must be less than the release
// threshold (otherwise claims would continue just to have the work
// rebalanced.)
//
// Even below the claim limit, claims are delayed by the percent of resources
// used (in milliseconds) to give less loaded nodes a claim advantage.
//
// The balancer releases the oldest tasks first (skipping those who are already
// stopping) to try to prevent rebalancing the same tasks repeatedly within a
// cluster.
type ResourceBalancer struct {
ctx BalancerContext
reporter ResourceReporter
claimLimit int
releaseLimit int
}
// NewResourceBalancer creates a new ResourceBalancer or returns an error if
// the limits are invalid.
//
// Limits should be a percentage expressed as an integer between 1 and 100
// inclusive.
func NewResourceBalancer(src ResourceReporter, claimLimit, releaseLimit int) (*ResourceBalancer, error) {
if claimLimit < 1 || claimLimit > 100 || releaseLimit < 1 || releaseLimit > 100 {
return nil, fmt.Errorf("Limits must be between 1 and 100. claim=%d release=%d", claimLimit, releaseLimit)
}
if claimLimit >= releaseLimit {
return nil, fmt.Errorf("Claim threshold must be < release threshold. claim=%d >= release=%d", claimLimit, releaseLimit)
}
return &ResourceBalancer{
reporter: src,
claimLimit: claimLimit,
releaseLimit: releaseLimit,
}, nil
}
func (b *ResourceBalancer) Init(ctx BalancerContext) {
b.ctx = ctx
}
func (b *ResourceBalancer) CanClaim(string) bool {
used, total := b.reporter.Used()
threshold := int(float32(used) / float32(total) * 100)
if threshold >= b.claimLimit {
//FIXME Until #93 is fixed returning false is very dangerous as it could
// cause a tight loop with the coordinator. Sleep longer than more
// lightly loaded nodes.
dur := time.Duration(100+(threshold-b.claimLimit)) * time.Millisecond
Infof("%d is over the claim limit of %d. Used %d of %d %s. Sleeping %s before claiming.",
threshold, b.claimLimit, used, total, b.reporter, dur)
time.Sleep(dur)
return true
}
// Always sleep based on resource usage to give less loaded nodes an advantage
dur := time.Duration(threshold) * time.Millisecond
time.Sleep(dur)
return true
}
func (b *ResourceBalancer) Balance() []string {
used, total := b.reporter.Used()
threshold := int(float32(used) / float32(total) * 100)
if threshold < b.releaseLimit {
// We're below the limit! Don't release anything.
return nil
}
// Release the oldest task that isn't already stopping
var oldest RunningTask
for _, t := range b.ctx.Tasks() {
if t.Stopped().IsZero() && (oldest == nil || oldest.Started().After(t.Started())) {
oldest = t
}
}
// No tasks or all tasks are stopping, don't bother rebalancing
if oldest == nil {
return nil
}
Infof("Releasing task %s (started %s) because %d > %d (%d of %d %s used)",
oldest.Task().ID(), oldest.Started(), threshold, b.releaseLimit, used, total, b.reporter)
return []string{oldest.Task().ID()}
}