Skip to content

Commit

Permalink
Merge pull request kubernetes#7145 from abdelrahman882/proactive-scaleup
Browse files Browse the repository at this point in the history
Add proactive scaleup
  • Loading branch information
k8s-ci-robot authored Aug 26, 2024
2 parents 527de12 + 01e9433 commit 9226cf6
Show file tree
Hide file tree
Showing 14 changed files with 1,558 additions and 0 deletions.
19 changes: 19 additions & 0 deletions cluster-autoscaler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ import (
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodeinfosprovider"
"k8s.io/autoscaler/cluster-autoscaler/processors/podinjection"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/processors/pods"
"k8s.io/autoscaler/cluster-autoscaler/processors/provreq"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates"
"k8s.io/autoscaler/cluster-autoscaler/processors/scaledowncandidates/emptycandidates"
Expand Down Expand Up @@ -266,6 +269,8 @@ var (
provisioningRequestsEnabled = flag.Bool("enable-provisioning-requests", false, "Whether the clusterautoscaler will be handling the ProvisioningRequest CRs.")
frequentLoopsEnabled = flag.Bool("frequent-loops-enabled", false, "Whether clusterautoscaler triggers new iterations more frequently when it's needed")
asyncNodeGroupsEnabled = flag.Bool("async-node-groups", false, "Whether clusterautoscaler creates and deletes node groups asynchronously. Experimental: requires cloud provider supporting async node group operations, enable at your own risk.")
proactiveScaleupEnabled = flag.Bool("enable-proactive-scaleup", false, "Whether to enable/disable proactive scale-ups, defaults to false")
podInjectionLimit = flag.Int("pod-injection-limit", 5000, "Limits total number of pods while injecting fake pods. If unschedulable pods already exceeds the limit, pod injection is disabled but pods are not truncated.")
)

func isFlagPassed(name string) bool {
Expand Down Expand Up @@ -527,6 +532,20 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter
podListProcessor.AddProcessor(injector)
podListProcessor.AddProcessor(provreqProcesor)
}

if *proactiveScaleupEnabled {
podInjectionBackoffRegistry := podinjectionbackoff.NewFakePodControllerRegistry()

podInjectionPodListProcessor := podinjection.NewPodInjectionPodListProcessor(podInjectionBackoffRegistry)
enforceInjectedPodsLimitProcessor := podinjection.NewEnforceInjectedPodsLimitProcessor(*podInjectionLimit)

podListProcessor = pods.NewCombinedPodListProcessor([]pods.PodListProcessor{podInjectionPodListProcessor, podListProcessor, enforceInjectedPodsLimitProcessor})

// FakePodsScaleUpStatusProcessor processor needs to be the first processor in ScaleUpStatusProcessor as it filters out fake pods from
// Scale Up status so that we don't emit events.
opts.Processors.ScaleUpStatusProcessor = podinjection.NewFakePodsScaleUpStatusProcessor(podInjectionBackoffRegistry)
}

opts.Processors.PodListProcessor = podListProcessor
scaleDownCandidatesComparers := []scaledowncandidates.CandidatesComparer{}
if autoscalingOptions.ParallelDrain {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjectionbackoff

import (
"time"

"github.com/cenkalti/backoff/v4"
"k8s.io/apimachinery/pkg/types"
)

const (
baseBackoff = 5 * time.Minute
backoffThreshold = 30 * time.Minute
)

// controllerEntry describes a backed off controller
type controllerEntry struct {
until time.Time
backoff backoff.ExponentialBackOff
}

// ControllerRegistry contains backed off controllers to be used in time-based backing off of controllers considered in fake pod injection
type ControllerRegistry struct {
backedOffControllers map[types.UID]controllerEntry
}

// NewFakePodControllerRegistry Creates & returns an instance of fakePodControllerBackoffRegistry
func NewFakePodControllerRegistry() *ControllerRegistry {
return &ControllerRegistry{
backedOffControllers: make(map[types.UID]controllerEntry),
}
}

// newExponentialBackOff creates an instance of ExponentialBackOff using non-default values.
func newExponentialBackOff(clock backoff.Clock) backoff.ExponentialBackOff {
b := backoff.ExponentialBackOff{
InitialInterval: baseBackoff,
// Disables randomization for easier testing and better predictability
RandomizationFactor: 0,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: backoffThreshold,
// Disable stopping if it reaches threshold
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: clock,
}
b.Reset()
return b
}

// BackoffController Backs off a controller
// If the controller is already in backoff it's backoff time is exponentially increased
// If the controller was in backoff, it resets its entry and makes it in backoff
// If the controller is not in backoff and not stored, a new entry is created
func (r *ControllerRegistry) BackoffController(ownerUID types.UID, now time.Time) {
if ownerUID == "" {
return
}

controller, found := r.backedOffControllers[ownerUID]

if !found || now.After(controller.until) {
controller = controllerEntry{
backoff: newExponentialBackOff(backoff.SystemClock),
}
}
// NextBackOff() needs to be called to increase the next interval
controller.until = now.Add(controller.backoff.NextBackOff())

r.backedOffControllers[ownerUID] = controller
}

// BackOffUntil Returns the back off status a controller with id `uid`
func (r *ControllerRegistry) BackOffUntil(uid types.UID, now time.Time) time.Time {
controller, found := r.backedOffControllers[uid]

if !found {
return time.Time{}
}

return controller.until
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjectionbackoff

import (
"testing"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/types"
)

func TestBackoffControllerOfPod(t *testing.T) {
c1 := types.UID("c1")
c2 := types.UID("c2")
clock := &clock{}

testCases := map[string]struct {
backoffCounts map[types.UID]int
spendTime time.Duration
expectedBackedoffControllers map[types.UID]controllerEntry
}{
"backing-off a controller adds its controller UID in backoff correctly": {
backoffCounts: map[types.UID]int{
c1: 1,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff),
},
},
},
"backing-off an already backed-off controller exponentially increases backoff duration": {
backoffCounts: map[types.UID]int{
c1: 2,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)),
},
},
},
"backing-off a controller doesn't affect other controllers": {
backoffCounts: map[types.UID]int{
c1: 1,
c2: 2,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff),
},
c2: {
until: clock.now.Add(time.Duration(float64(baseBackoff) * backoff.DefaultMultiplier)),
},
},
},
"backing-off a past backed-off controller resets backoff": {
backoffCounts: map[types.UID]int{
c1: 1,
},
spendTime: baseBackoff * 2,
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(baseBackoff * 2).Add(baseBackoff),
},
},
},
"back-off duration doesn't exceed backoffThreshold": {
backoffCounts: map[types.UID]int{
c1: 15,
},
expectedBackedoffControllers: map[types.UID]controllerEntry{
c1: {
until: clock.now.Add(backoffThreshold),
},
},
},
}

for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
// Reset time between test cases
clock.now = time.Time{}
clock.now = clock.now.Add(tc.spendTime)

registry := NewFakePodControllerRegistry()

for uid, backoffCount := range tc.backoffCounts {
for i := 0; i < backoffCount; i++ {
registry.BackoffController(uid, clock.now)
}
}

assert.Equal(t, len(registry.backedOffControllers), len(tc.expectedBackedoffControllers))
for uid, backoffController := range tc.expectedBackedoffControllers {
assert.NotNil(t, registry.backedOffControllers[uid])
assert.Equal(t, backoffController.until, registry.backedOffControllers[uid].until)
}
})
}
}

type clock struct {
now time.Time
}

func (c *clock) Now() time.Time {
return c.now
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package podinjection

import (
apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/context"
)

// EnforceInjectedPodsLimitProcessor is a PodListProcessor used to limit the number of injected fake pods.
type EnforceInjectedPodsLimitProcessor struct {
podLimit int
}

// NewEnforceInjectedPodsLimitProcessor return an instance of EnforceInjectedPodsLimitProcessor
func NewEnforceInjectedPodsLimitProcessor(podLimit int) *EnforceInjectedPodsLimitProcessor {
return &EnforceInjectedPodsLimitProcessor{
podLimit: podLimit,
}
}

// Process filters unschedulablePods and enforces the limit of the number of injected pods
func (p *EnforceInjectedPodsLimitProcessor) Process(ctx *context.AutoscalingContext, unschedulablePods []*apiv1.Pod) ([]*apiv1.Pod, error) {

numberOfFakePodsToRemove := len(unschedulablePods) - p.podLimit
var unschedulablePodsAfterProcessing []*apiv1.Pod

for _, pod := range unschedulablePods {
if IsFake(pod) && numberOfFakePodsToRemove > 0 {
numberOfFakePodsToRemove -= 1
continue
}

unschedulablePodsAfterProcessing = append(unschedulablePodsAfterProcessing, pod)
}

return unschedulablePodsAfterProcessing, nil
}

// CleanUp is called at CA termination
func (p *EnforceInjectedPodsLimitProcessor) CleanUp() {
}
Loading

0 comments on commit 9226cf6

Please sign in to comment.