From edfc8fa52c1473f8bb56d2bbea23b7eb6fca4ef2 Mon Sep 17 00:00:00 2001 From: Joseph Date: Fri, 13 Jan 2023 14:59:22 +0800 Subject: [PATCH] koord-descheduler: limits frequently migrated workloads (#950) Signed-off-by: Joseph Signed-off-by: Siyu Wang --- go.mod | 2 + go.sum | 11 +- .../apis/config/types_pluginargs.go | 28 +++- .../apis/config/v1alpha2/defaults.go | 12 ++ .../apis/config/v1alpha2/types_pluginargs.go | 24 +++ .../v1alpha2/zz_generated.conversion.go | 40 ++++- .../config/v1alpha2/zz_generated.deepcopy.go | 51 +++++++ .../apis/config/zz_generated.deepcopy.go | 51 +++++++ .../controllers/migration/controller.go | 29 ++++ .../controllers/migration/controller_test.go | 139 ++++++++++++++++++ .../controllerfinder/controller_finder.go | 1 + .../controllers/migration/filter.go | 69 +++++++++ .../controllers/migration/util/util.go | 2 +- 13 files changed, 445 insertions(+), 14 deletions(-) diff --git a/go.mod b/go.mod index b1e5427a0..421373872 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/onsi/ginkgo v1.16.4 github.com/onsi/gomega v1.15.0 github.com/openkruise/kruise-api v1.3.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prashantv/gostub v1.1.0 github.com/prometheus/client_golang v1.13.0 github.com/spf13/cobra v1.6.1 @@ -223,6 +224,7 @@ require ( replace ( github.com/google/cadvisor => github.com/koordinator-sh/cadvisor v0.0.0-20220919031936-833eb74e858e + golang.org/x/time => golang.org/x/time v0.3.0 k8s.io/api => k8s.io/api v0.22.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.22.6 k8s.io/apimachinery => k8s.io/apimachinery v0.22.6 diff --git a/go.sum b/go.sum index b419ba755..ec0e4f9c8 100644 --- a/go.sum +++ b/go.sum @@ -806,6 +806,7 @@ github.com/openkruise/kruise-api v1.3.0/go.mod h1:9ZX+ycdHKNzcA5ezAf35xOa2Mwfa2B github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= github.com/paypal/load-watcher v0.2.1/go.mod h1:MMCDf8aXF5k+K2q6AtMOBZItCvZ3oFAk+zNO4OAtp0w= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= @@ -1347,14 +1348,8 @@ golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs= -golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= +golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/pkg/descheduler/apis/config/types_pluginargs.go b/pkg/descheduler/apis/config/types_pluginargs.go index 5d3b8e0ec..2ed2f11fa 100644 --- a/pkg/descheduler/apis/config/types_pluginargs.go +++ b/pkg/descheduler/apis/config/types_pluginargs.go @@ -91,6 +91,9 @@ type MigrationControllerArgs struct { // Default is false DryRun bool + // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. + MaxConcurrentReconciles int32 + // EvictFailedBarePods allows pods without ownerReferences and in failed phase to be evicted. EvictFailedBarePods bool @@ -125,8 +128,11 @@ type MigrationControllerArgs struct { // Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%). MaxUnavailablePerWorkload *intstr.IntOrString - // MaxConcurrentReconciles is the maximum number of concurrent Reconciles which can be run. Defaults to 1. - MaxConcurrentReconciles int32 + // ObjectLimiters control the frequency of migration/eviction to make it smoother, + // and also protect Pods of the same class from being evicted frequently. + // e.g. limiting the frequency of Pods of the same workload being evicted. + // The default is to set the MigrationLimitObjectWorkload limiter. + ObjectLimiters ObjectLimiterMap // DefaultJobMode represents the default operating mode of the PodMigrationJob // Default is PodMigrationJobModeReservationFirst @@ -145,3 +151,21 @@ type MigrationControllerArgs struct { // DefaultDeleteOptions defines options when deleting migrated pods and preempted pods through the method specified by EvictionPolicy DefaultDeleteOptions *metav1.DeleteOptions } + +type MigrationLimitObjectType string + +const ( + MigrationLimitObjectWorkload MigrationLimitObjectType = "workload" +) + +type ObjectLimiterMap map[MigrationLimitObjectType]MigrationObjectLimiter + +// MigrationObjectLimiter means that if the specified dimension has multiple migrations within the configured time period +// and exceeds the configured threshold, it will be limited. +type MigrationObjectLimiter struct { + // Duration indicates the time window of the desired limit. + Duration metav1.Duration + // MaxMigrating indicates the maximum number of migrations/evictions allowed within the window time. + // If configured as nil or 0, the maximum number will be calculated according to MaxMigratingPerWorkload. + MaxMigrating *intstr.IntOrString +} diff --git a/pkg/descheduler/apis/config/v1alpha2/defaults.go b/pkg/descheduler/apis/config/v1alpha2/defaults.go index 9d425f1bd..41b4512ba 100644 --- a/pkg/descheduler/apis/config/v1alpha2/defaults.go +++ b/pkg/descheduler/apis/config/v1alpha2/defaults.go @@ -40,6 +40,14 @@ const ( defaultMigrationJobEvictionPolicy = migrationevictor.NativeEvictorName ) +var ( + defaultObjectLimiters = map[MigrationLimitObjectType]MigrationObjectLimiter{ + MigrationLimitObjectWorkload: { + Duration: metav1.Duration{Duration: 5 * time.Minute}, + }, + } +) + func addDefaultingFuncs(scheme *runtime.Scheme) error { return RegisterDefaults(scheme) } @@ -215,6 +223,10 @@ func SetDefaults_MigrationControllerArgs(obj *MigrationControllerArgs) { if obj.EvictionPolicy == "" { obj.EvictionPolicy = defaultMigrationJobEvictionPolicy } + + if len(obj.ObjectLimiters) == 0 { + obj.ObjectLimiters = defaultObjectLimiters + } } func SetDefaults_LowNodeLoadArgs(obj *LowNodeLoadArgs) { diff --git a/pkg/descheduler/apis/config/v1alpha2/types_pluginargs.go b/pkg/descheduler/apis/config/v1alpha2/types_pluginargs.go index 52ecd777e..25e28263e 100644 --- a/pkg/descheduler/apis/config/v1alpha2/types_pluginargs.go +++ b/pkg/descheduler/apis/config/v1alpha2/types_pluginargs.go @@ -128,6 +128,12 @@ type MigrationControllerArgs struct { // Value can be an absolute number (ex: 5) or a percentage of desired pods (ex: 10%). MaxUnavailablePerWorkload *intstr.IntOrString `json:"maxUnavailablePerWorkload,omitempty"` + // ObjectLimiters control the frequency of migration/eviction to make it smoother, + // and also protect Pods of the same class from being evicted frequently. + // e.g. limiting the frequency of Pods of the same workload being evicted. + // The default is to set the MigrationLimitObjectWorkload limiter. + ObjectLimiters ObjectLimiterMap `json:"objectLimiters,omitempty"` + // DefaultJobMode represents the default operating mode of the PodMigrationJob // Default is PodMigrationJobModeReservationFirst DefaultJobMode string `json:"defaultJobMode,omitempty"` @@ -145,3 +151,21 @@ type MigrationControllerArgs struct { // DefaultDeleteOptions defines options when deleting migrated pods and preempted pods through the method specified by EvictionPolicy DefaultDeleteOptions *metav1.DeleteOptions `json:"defaultDeleteOptions,omitempty"` } + +type MigrationLimitObjectType string + +const ( + MigrationLimitObjectWorkload MigrationLimitObjectType = "workload" +) + +type ObjectLimiterMap map[MigrationLimitObjectType]MigrationObjectLimiter + +// MigrationObjectLimiter means that if the specified dimension has multiple migrations within the configured time period +// and exceeds the configured threshold, it will be limited. +type MigrationObjectLimiter struct { + // Duration indicates the time window of the desired limit. + Duration metav1.Duration `json:"duration,omitempty"` + // MaxMigrating indicates the maximum number of migrations/evictions allowed within the window time. + // If configured as 0, the maximum number will be calculated according to MaxMigratingPerWorkload. + MaxMigrating *intstr.IntOrString `json:"maxMigrating,omitempty"` +} diff --git a/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go b/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go index a9ad31a58..7220e1003 100644 --- a/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go +++ b/pkg/descheduler/apis/config/v1alpha2/zz_generated.conversion.go @@ -89,6 +89,16 @@ func RegisterConversions(s *runtime.Scheme) error { }); err != nil { return err } + if err := s.AddGeneratedConversionFunc((*MigrationObjectLimiter)(nil), (*config.MigrationObjectLimiter)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_v1alpha2_MigrationObjectLimiter_To_config_MigrationObjectLimiter(a.(*MigrationObjectLimiter), b.(*config.MigrationObjectLimiter), scope) + }); err != nil { + return err + } + if err := s.AddGeneratedConversionFunc((*config.MigrationObjectLimiter)(nil), (*MigrationObjectLimiter)(nil), func(a, b interface{}, scope conversion.Scope) error { + return Convert_config_MigrationObjectLimiter_To_v1alpha2_MigrationObjectLimiter(a.(*config.MigrationObjectLimiter), b.(*MigrationObjectLimiter), scope) + }); err != nil { + return err + } if err := s.AddGeneratedConversionFunc((*Namespaces)(nil), (*config.Namespaces)(nil), func(a, b interface{}, scope conversion.Scope) error { return Convert_v1alpha2_Namespaces_To_config_Namespaces(a.(*Namespaces), b.(*config.Namespaces), scope) }); err != nil { @@ -419,6 +429,7 @@ func autoConvert_v1alpha2_MigrationControllerArgs_To_config_MigrationControllerA out.MaxMigratingPerNamespace = (*int32)(unsafe.Pointer(in.MaxMigratingPerNamespace)) out.MaxMigratingPerWorkload = (*intstr.IntOrString)(unsafe.Pointer(in.MaxMigratingPerWorkload)) out.MaxUnavailablePerWorkload = (*intstr.IntOrString)(unsafe.Pointer(in.MaxUnavailablePerWorkload)) + out.ObjectLimiters = *(*config.ObjectLimiterMap)(unsafe.Pointer(&in.ObjectLimiters)) out.DefaultJobMode = in.DefaultJobMode if err := v1.Convert_Pointer_v1_Duration_To_v1_Duration(&in.DefaultJobTTL, &out.DefaultJobTTL, s); err != nil { return err @@ -437,6 +448,9 @@ func Convert_v1alpha2_MigrationControllerArgs_To_config_MigrationControllerArgs( func autoConvert_config_MigrationControllerArgs_To_v1alpha2_MigrationControllerArgs(in *config.MigrationControllerArgs, out *MigrationControllerArgs, s conversion.Scope) error { out.DryRun = in.DryRun + if err := v1.Convert_int32_To_Pointer_int32(&in.MaxConcurrentReconciles, &out.MaxConcurrentReconciles, s); err != nil { + return err + } out.EvictFailedBarePods = in.EvictFailedBarePods out.EvictLocalStoragePods = in.EvictLocalStoragePods out.EvictSystemCriticalPods = in.EvictSystemCriticalPods @@ -447,9 +461,7 @@ func autoConvert_config_MigrationControllerArgs_To_v1alpha2_MigrationControllerA out.MaxMigratingPerNamespace = (*int32)(unsafe.Pointer(in.MaxMigratingPerNamespace)) out.MaxMigratingPerWorkload = (*intstr.IntOrString)(unsafe.Pointer(in.MaxMigratingPerWorkload)) out.MaxUnavailablePerWorkload = (*intstr.IntOrString)(unsafe.Pointer(in.MaxUnavailablePerWorkload)) - if err := v1.Convert_int32_To_Pointer_int32(&in.MaxConcurrentReconciles, &out.MaxConcurrentReconciles, s); err != nil { - return err - } + out.ObjectLimiters = *(*ObjectLimiterMap)(unsafe.Pointer(&in.ObjectLimiters)) out.DefaultJobMode = in.DefaultJobMode if err := v1.Convert_v1_Duration_To_Pointer_v1_Duration(&in.DefaultJobTTL, &out.DefaultJobTTL, s); err != nil { return err @@ -466,6 +478,28 @@ func Convert_config_MigrationControllerArgs_To_v1alpha2_MigrationControllerArgs( return autoConvert_config_MigrationControllerArgs_To_v1alpha2_MigrationControllerArgs(in, out, s) } +func autoConvert_v1alpha2_MigrationObjectLimiter_To_config_MigrationObjectLimiter(in *MigrationObjectLimiter, out *config.MigrationObjectLimiter, s conversion.Scope) error { + out.Duration = in.Duration + out.MaxMigrating = (*intstr.IntOrString)(unsafe.Pointer(in.MaxMigrating)) + return nil +} + +// Convert_v1alpha2_MigrationObjectLimiter_To_config_MigrationObjectLimiter is an autogenerated conversion function. +func Convert_v1alpha2_MigrationObjectLimiter_To_config_MigrationObjectLimiter(in *MigrationObjectLimiter, out *config.MigrationObjectLimiter, s conversion.Scope) error { + return autoConvert_v1alpha2_MigrationObjectLimiter_To_config_MigrationObjectLimiter(in, out, s) +} + +func autoConvert_config_MigrationObjectLimiter_To_v1alpha2_MigrationObjectLimiter(in *config.MigrationObjectLimiter, out *MigrationObjectLimiter, s conversion.Scope) error { + out.Duration = in.Duration + out.MaxMigrating = (*intstr.IntOrString)(unsafe.Pointer(in.MaxMigrating)) + return nil +} + +// Convert_config_MigrationObjectLimiter_To_v1alpha2_MigrationObjectLimiter is an autogenerated conversion function. +func Convert_config_MigrationObjectLimiter_To_v1alpha2_MigrationObjectLimiter(in *config.MigrationObjectLimiter, out *MigrationObjectLimiter, s conversion.Scope) error { + return autoConvert_config_MigrationObjectLimiter_To_v1alpha2_MigrationObjectLimiter(in, out, s) +} + func autoConvert_v1alpha2_Namespaces_To_config_Namespaces(in *Namespaces, out *config.Namespaces, s conversion.Scope) error { out.Include = *(*[]string)(unsafe.Pointer(&in.Include)) out.Exclude = *(*[]string)(unsafe.Pointer(&in.Exclude)) diff --git a/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go b/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go index 86333e256..16fb367f0 100644 --- a/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go +++ b/pkg/descheduler/apis/config/v1alpha2/zz_generated.deepcopy.go @@ -297,6 +297,13 @@ func (in *MigrationControllerArgs) DeepCopyInto(out *MigrationControllerArgs) { *out = new(intstr.IntOrString) **out = **in } + if in.ObjectLimiters != nil { + in, out := &in.ObjectLimiters, &out.ObjectLimiters + *out = make(ObjectLimiterMap, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } if in.DefaultJobTTL != nil { in, out := &in.DefaultJobTTL, &out.DefaultJobTTL *out = new(v1.Duration) @@ -328,6 +335,28 @@ func (in *MigrationControllerArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MigrationObjectLimiter) DeepCopyInto(out *MigrationObjectLimiter) { + *out = *in + out.Duration = in.Duration + if in.MaxMigrating != nil { + in, out := &in.MaxMigrating, &out.MaxMigrating + *out = new(intstr.IntOrString) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MigrationObjectLimiter. +func (in *MigrationObjectLimiter) DeepCopy() *MigrationObjectLimiter { + if in == nil { + return nil + } + out := new(MigrationObjectLimiter) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Namespaces) DeepCopyInto(out *Namespaces) { *out = *in @@ -354,6 +383,28 @@ func (in *Namespaces) DeepCopy() *Namespaces { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in ObjectLimiterMap) DeepCopyInto(out *ObjectLimiterMap) { + { + in := &in + *out = make(ObjectLimiterMap, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ObjectLimiterMap. +func (in ObjectLimiterMap) DeepCopy() ObjectLimiterMap { + if in == nil { + return nil + } + out := new(ObjectLimiterMap) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Plugin) DeepCopyInto(out *Plugin) { *out = *in diff --git a/pkg/descheduler/apis/config/zz_generated.deepcopy.go b/pkg/descheduler/apis/config/zz_generated.deepcopy.go index 3236d7a9f..3156a31bb 100644 --- a/pkg/descheduler/apis/config/zz_generated.deepcopy.go +++ b/pkg/descheduler/apis/config/zz_generated.deepcopy.go @@ -252,6 +252,13 @@ func (in *MigrationControllerArgs) DeepCopyInto(out *MigrationControllerArgs) { *out = new(intstr.IntOrString) **out = **in } + if in.ObjectLimiters != nil { + in, out := &in.ObjectLimiters, &out.ObjectLimiters + *out = make(ObjectLimiterMap, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } out.DefaultJobTTL = in.DefaultJobTTL if in.DefaultDeleteOptions != nil { in, out := &in.DefaultDeleteOptions, &out.DefaultDeleteOptions @@ -279,6 +286,28 @@ func (in *MigrationControllerArgs) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MigrationObjectLimiter) DeepCopyInto(out *MigrationObjectLimiter) { + *out = *in + out.Duration = in.Duration + if in.MaxMigrating != nil { + in, out := &in.MaxMigrating, &out.MaxMigrating + *out = new(intstr.IntOrString) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MigrationObjectLimiter. +func (in *MigrationObjectLimiter) DeepCopy() *MigrationObjectLimiter { + if in == nil { + return nil + } + out := new(MigrationObjectLimiter) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Namespaces) DeepCopyInto(out *Namespaces) { *out = *in @@ -305,6 +334,28 @@ func (in *Namespaces) DeepCopy() *Namespaces { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in ObjectLimiterMap) DeepCopyInto(out *ObjectLimiterMap) { + { + in := &in + *out = make(ObjectLimiterMap, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ObjectLimiterMap. +func (in ObjectLimiterMap) DeepCopy() ObjectLimiterMap { + if in == nil { + return nil + } + out := new(ObjectLimiterMap) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Plugin) DeepCopyInto(out *Plugin) { *out = *in diff --git a/pkg/descheduler/controllers/migration/controller.go b/pkg/descheduler/controllers/migration/controller.go index 2f097c34d..8f0fb4c41 100644 --- a/pkg/descheduler/controllers/migration/controller.go +++ b/pkg/descheduler/controllers/migration/controller.go @@ -20,8 +20,11 @@ import ( "context" "fmt" "strconv" + "sync" "time" + gocache "github.com/patrickmn/go-cache" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -80,6 +83,10 @@ type Reconciler struct { retriablePodFilter framework.FilterFunc assumedCache *assumedCache clock clock.Clock + + lock sync.Mutex + objectLimiters map[types.UID]*rate.Limiter + limiterCache *gocache.Cache } func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { @@ -183,8 +190,10 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame assumedCache: newAssumedCache(), clock: clock.RealClock{}, } + r.initObjectLimiters() retriablePodFilters := podutil.WrapFilterFuncs( + r.filterLimitedObject, r.filterMaxMigratingPerNode, r.filterMaxMigratingPerNamespace, r.filterMaxMigratingOrUnavailablePerWorkload, @@ -200,6 +209,25 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame return r, nil } +func (r *Reconciler) initObjectLimiters() { + var trackExpiration time.Duration + for _, v := range r.args.ObjectLimiters { + if v.Duration.Duration > trackExpiration { + trackExpiration = v.Duration.Duration + } + } + if trackExpiration > 0 { + r.objectLimiters = make(map[types.UID]*rate.Limiter) + limiterExpiration := trackExpiration + trackExpiration/2 + r.limiterCache = gocache.New(limiterExpiration, limiterExpiration) + r.limiterCache.OnEvicted(func(s string, _ interface{}) { + r.lock.Lock() + defer r.lock.Unlock() + delete(r.objectLimiters, types.UID(s)) + }) + } +} + func (r *Reconciler) Name() string { return Name } @@ -825,6 +853,7 @@ func (r *Reconciler) evictPod(ctx context.Context, job *sev1alpha1.PodMigrationJ r.eventRecorder.Eventf(job, nil, corev1.EventTypeWarning, sev1alpha1.PodMigrationJobReasonEvicting, "Migrating", "Failed evict Pod %q caused by %v", podNamespacedName, err) return false, reconcile.Result{}, err } + r.trackEvictedPod(pod) _, reason := evictor.GetEvictionTriggerAndReason(job.Annotations) cond = &sev1alpha1.PodMigrationJobCondition{ diff --git a/pkg/descheduler/controllers/migration/controller_test.go b/pkg/descheduler/controllers/migration/controller_test.go index c4fec86f9..1ca48b4e2 100644 --- a/pkg/descheduler/controllers/migration/controller_test.go +++ b/pkg/descheduler/controllers/migration/controller_test.go @@ -102,6 +102,10 @@ func (f *fakeControllerFinder) GetPodsForRef(apiVersion, kind, name, ns string, return f.pods, f.replicas, f.err } +func (f *fakeControllerFinder) GetExpectedScaleForPods(pods []*corev1.Pod) (int32, error) { + return f.replicas, f.err +} + func newTestReconciler() *Reconciler { scheme := runtime.NewScheme() _ = sev1alpha1.AddToScheme(scheme) @@ -2316,3 +2320,138 @@ func TestFilterMaxUnavailablePerWorkload(t *testing.T) { }) } } + +func TestFilterObjectLimiter(t *testing.T) { + ownerReferences1 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-1", + UID: uuid.NewUUID(), + }, + } + otherOwnerReferences := metav1.OwnerReference{ + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-2", + UID: uuid.NewUUID(), + } + testObjectLimiters := deschedulerconfig.ObjectLimiterMap{ + deschedulerconfig.MigrationLimitObjectWorkload: { + Duration: metav1.Duration{Duration: 1 * time.Second}, + MaxMigrating: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, + }, + } + + tests := []struct { + name string + objectLimiters deschedulerconfig.ObjectLimiterMap + totalReplicas int32 + sleepDuration time.Duration + pod *corev1.Pod + evictedPodsCount int + evictedWorkload *metav1.OwnerReference + want bool + }{ + { + name: "less than default maxMigrating", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + sleepDuration: 100 * time.Millisecond, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 6, + want: true, + }, + { + name: "exceeded default maxMigrating", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 11, + want: false, + }, + { + name: "other than workload", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + sleepDuration: 100 * time.Millisecond, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 11, + evictedWorkload: &otherOwnerReferences, + want: true, + }, + { + name: "disable objectLimiters", + totalReplicas: 100, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 11, + objectLimiters: deschedulerconfig.ObjectLimiterMap{ + deschedulerconfig.MigrationLimitObjectWorkload: deschedulerconfig.MigrationObjectLimiter{ + Duration: metav1.Duration{Duration: 0}, + }, + }, + want: true, + }, + { + name: "default limiter", + totalReplicas: 100, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 1, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + reconciler := newTestReconciler() + controllerFinder := &fakeControllerFinder{} + if tt.objectLimiters != nil { + reconciler.args.ObjectLimiters = tt.objectLimiters + } + + reconciler.initObjectLimiters() + if tt.totalReplicas > 0 { + controllerFinder.replicas = tt.totalReplicas + } + reconciler.controllerFinder = controllerFinder + if tt.evictedPodsCount > 0 { + for i := 0; i < tt.evictedPodsCount; i++ { + pod := tt.pod.DeepCopy() + if tt.evictedWorkload != nil { + pod.OwnerReferences = []metav1.OwnerReference{ + *tt.evictedWorkload, + } + } + reconciler.trackEvictedPod(pod) + if tt.sleepDuration > 0 { + time.Sleep(tt.sleepDuration) + } + } + } + got := reconciler.filterLimitedObject(tt.pod) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/pkg/descheduler/controllers/migration/controllerfinder/controller_finder.go b/pkg/descheduler/controllers/migration/controllerfinder/controller_finder.go index 63ba5fee4..c0940e2c9 100644 --- a/pkg/descheduler/controllers/migration/controllerfinder/controller_finder.go +++ b/pkg/descheduler/controllers/migration/controllerfinder/controller_finder.go @@ -70,6 +70,7 @@ type PodControllerFinder func(ref ControllerReference, namespace string) (*Scale type Interface interface { GetPodsForRef(apiVersion, kind, name, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, int32, error) + GetExpectedScaleForPods(pods []*corev1.Pod) (int32, error) } type ControllerFinder struct { diff --git a/pkg/descheduler/controllers/migration/filter.go b/pkg/descheduler/controllers/migration/filter.go index ee9e1af78..76b3f416a 100644 --- a/pkg/descheduler/controllers/migration/filter.go +++ b/pkg/descheduler/controllers/migration/filter.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + gocache "github.com/patrickmn/go-cache" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -30,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" sev1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + deschedulerconfig "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/util" "github.com/koordinator-sh/koordinator/pkg/descheduler/fieldindex" utilclient "github.com/koordinator-sh/koordinator/pkg/util/client" @@ -228,3 +231,69 @@ func mergeUnavailableAndMigratingPods(unavailablePods, migratingPods map[types.N unavailablePods[k] = v } } + +func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) { + if r.objectLimiters == nil || r.limiterCache == nil { + return + } + ownerRef := metav1.GetControllerOf(pod) + if ownerRef == nil { + return + } + + objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] + if !ok || objectLimiterArgs.Duration.Seconds() == 0 { + return + } + + var maxMigratingReplicas int + if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPods([]*corev1.Pod{pod}); err == nil { + maxMigrating := objectLimiterArgs.MaxMigrating + if maxMigrating == nil { + maxMigrating = r.args.MaxMigratingPerWorkload + } + maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating) + } + if maxMigratingReplicas == 0 { + return + } + + r.lock.Lock() + defer r.lock.Unlock() + + uid := ownerRef.UID + limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds()) + limiter := r.objectLimiters[uid] + if limiter == nil { + limiter = rate.NewLimiter(limit, 1) + r.objectLimiters[uid] = limiter + } else if limiter.Limit() != limit { + limiter.SetLimit(limit) + } + + if !limiter.AllowN(r.clock.Now(), 1) { + klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for a period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) + } + r.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration) +} + +func (r *Reconciler) filterLimitedObject(pod *corev1.Pod) bool { + if r.objectLimiters == nil || r.limiterCache == nil { + return true + } + objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] + if !ok || objectLimiterArgs.Duration.Duration == 0 { + return true + } + if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil { + r.lock.Lock() + defer r.lock.Unlock() + if limiter := r.objectLimiters[ownerRef.UID]; limiter != nil { + if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 { + klog.Infof("Pod %q is filtered by workload %s/%s/%s is limited", klog.KObj(pod), ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) + return false + } + } + } + return true +} diff --git a/pkg/descheduler/controllers/migration/util/util.go b/pkg/descheduler/controllers/migration/util/util.go index e4ccb084e..303cf872d 100644 --- a/pkg/descheduler/controllers/migration/util/util.go +++ b/pkg/descheduler/controllers/migration/util/util.go @@ -78,7 +78,7 @@ func IsMigratePendingPod(reservationObj reservation.Object) bool { } func GetMaxUnavailable(replicas int, intOrPercent *intstr.IntOrString) (int, error) { - if intOrPercent == nil { + if intOrPercent == nil || intOrPercent.IntValue() == 0 { if replicas > 10 { s := intstr.FromString("10%") intOrPercent = &s