From 7fa537d22fe313a12f6a683b951558b7b80e63cd Mon Sep 17 00:00:00 2001 From: songtao98 Date: Fri, 7 Jun 2024 16:51:09 +0800 Subject: [PATCH] fix object limiter not working due to arbitration Signed-off-by: songtao98 --- .../migration/arbitrator/arbitrator.go | 5 - .../migration/arbitrator/filter.go | 94 ------- .../migration/arbitrator/filter_test.go | 150 ---------- .../controllers/migration/controller.go | 137 +++++++++- .../controllers/migration/controller_test.go | 256 +++++++++++++++++- .../controllers/migration/evict.go | 5 + 6 files changed, 379 insertions(+), 268 deletions(-) diff --git a/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go b/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go index 003843e9e..e42e28c99 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go +++ b/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go @@ -46,7 +46,6 @@ var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitrator type MigrationFilter interface { Filter(pod *corev1.Pod) bool PreEvictionFilter(pod *corev1.Pod) bool - TrackEvictedPod(pod *corev1.Pod) } type Arbitrator interface { @@ -146,10 +145,6 @@ func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool { return a.filter.defaultFilterPlugin.PreEvictionFilter(pod) } -func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) { - a.filter.trackEvictedPod(pod) -} - // sort stably sorts jobs, outputs the sorted results and corresponding ranking map. func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob { for _, sortFn := range a.sorts { diff --git a/pkg/descheduler/controllers/migration/arbitrator/filter.go b/pkg/descheduler/controllers/migration/arbitrator/filter.go index 5b5000724..2f99417da 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/filter.go +++ b/pkg/descheduler/controllers/migration/arbitrator/filter.go @@ -20,10 +20,7 @@ import ( "context" "fmt" "sync" - "time" - gocache "github.com/patrickmn/go-cache" - "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -60,9 +57,6 @@ type filter struct { args *deschedulerconfig.MigrationControllerArgs controllerFinder controllerfinder.Interface - objectLimiters map[types.UID]*rate.Limiter - limiterCache *gocache.Cache - limiterLock sync.Mutex arbitratedPodMigrationJobs map[types.UID]bool arbitratedMapLock sync.Mutex @@ -83,7 +77,6 @@ func newFilter(args *deschedulerconfig.MigrationControllerArgs, handle framework if err := f.initFilters(args, handle); err != nil { return nil, err } - f.initObjectLimiters() return f, nil } @@ -129,7 +122,6 @@ func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, ha return err } retriablePodFilters := podutil.WrapFilterFuncs( - f.filterLimitedObject, f.filterMaxMigratingPerNode, f.filterMaxMigratingPerNamespace, f.filterMaxMigratingOrUnavailablePerWorkload, @@ -417,92 +409,6 @@ func mergeUnavailableAndMigratingPods(unavailablePods, migratingPods map[types.N } } -func (f *filter) trackEvictedPod(pod *corev1.Pod) { - if f.objectLimiters == nil || f.limiterCache == nil { - return - } - ownerRef := metav1.GetControllerOf(pod) - if ownerRef == nil { - return - } - - objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] - if !ok || objectLimiterArgs.Duration.Seconds() == 0 { - return - } - - var maxMigratingReplicas int - if expectedReplicas, err := f.controllerFinder.GetExpectedScaleForPod(pod); err == nil { - maxMigrating := objectLimiterArgs.MaxMigrating - if maxMigrating == nil { - maxMigrating = f.args.MaxMigratingPerWorkload - } - maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating) - } - if maxMigratingReplicas == 0 { - return - } - - f.limiterLock.Lock() - defer f.limiterLock.Unlock() - - uid := ownerRef.UID - limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds()) - limiter := f.objectLimiters[uid] - if limiter == nil { - limiter = rate.NewLimiter(limit, 1) - f.objectLimiters[uid] = limiter - } else if limiter.Limit() != limit { - limiter.SetLimit(limit) - } - - if !limiter.AllowN(f.clock.Now(), 1) { - klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for f period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) - } - f.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration) -} - -func (f *filter) filterLimitedObject(pod *corev1.Pod) bool { - if f.objectLimiters == nil || f.limiterCache == nil { - return true - } - objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] - if !ok || objectLimiterArgs.Duration.Duration == 0 { - return true - } - if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil { - f.limiterLock.Lock() - defer f.limiterLock.Unlock() - if limiter := f.objectLimiters[ownerRef.UID]; limiter != nil { - if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 { - klog.V(4).InfoS("Pod fails the following checks", "pod", klog.KObj(pod), "checks", "limitedObject", - "owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)) - return false - } - } - } - return true -} - -func (f *filter) initObjectLimiters() { - var trackExpiration time.Duration - for _, v := range f.args.ObjectLimiters { - if v.Duration.Duration > trackExpiration { - trackExpiration = v.Duration.Duration - } - } - if trackExpiration > 0 { - f.objectLimiters = make(map[types.UID]*rate.Limiter) - limiterExpiration := trackExpiration + trackExpiration/2 - f.limiterCache = gocache.New(limiterExpiration, limiterExpiration) - f.limiterCache.OnEvicted(func(s string, _ interface{}) { - f.limiterLock.Lock() - defer f.limiterLock.Unlock() - delete(f.objectLimiters, types.UID(s)) - }) - } -} - func (f *filter) checkJobPassedArbitration(uid types.UID) bool { f.arbitratedMapLock.Lock() defer f.arbitratedMapLock.Unlock() diff --git a/pkg/descheduler/controllers/migration/arbitrator/filter_test.go b/pkg/descheduler/controllers/migration/arbitrator/filter_test.go index 3e097b2d0..00bb23a1d 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/filter_test.go +++ b/pkg/descheduler/controllers/migration/arbitrator/filter_test.go @@ -30,14 +30,12 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/utils/clock" "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" - "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/v1alpha2" ) func TestFilterExistingMigrationJob(t *testing.T) { @@ -985,154 +983,6 @@ func TestFilterExpectedReplicas(t *testing.T) { } } -func TestFilterObjectLimiter(t *testing.T) { - ownerReferences1 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-1", - UID: uuid.NewUUID(), - }, - } - otherOwnerReferences := metav1.OwnerReference{ - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-2", - UID: uuid.NewUUID(), - } - testObjectLimiters := config.ObjectLimiterMap{ - config.MigrationLimitObjectWorkload: { - Duration: metav1.Duration{Duration: 1 * time.Second}, - MaxMigrating: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, - }, - } - - tests := []struct { - name string - objectLimiters config.ObjectLimiterMap - totalReplicas int32 - sleepDuration time.Duration - pod *corev1.Pod - evictedPodsCount int - evictedWorkload *metav1.OwnerReference - want bool - }{ - { - name: "less than default maxMigrating", - totalReplicas: 100, - objectLimiters: testObjectLimiters, - sleepDuration: 100 * time.Millisecond, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 6, - want: true, - }, - { - name: "exceeded default maxMigrating", - totalReplicas: 100, - objectLimiters: testObjectLimiters, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 11, - want: false, - }, - { - name: "other than workload", - totalReplicas: 100, - objectLimiters: testObjectLimiters, - sleepDuration: 100 * time.Millisecond, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 11, - evictedWorkload: &otherOwnerReferences, - want: true, - }, - { - name: "disable objectLimiters", - totalReplicas: 100, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 11, - objectLimiters: config.ObjectLimiterMap{ - config.MigrationLimitObjectWorkload: config.MigrationObjectLimiter{ - Duration: metav1.Duration{Duration: 0}, - }, - }, - want: true, - }, - { - name: "default limiter", - totalReplicas: 100, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 1, - want: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - scheme := runtime.NewScheme() - _ = v1alpha1.AddToScheme(scheme) - _ = clientgoscheme.AddToScheme(scheme) - fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() - - var v1beta2args v1alpha2.MigrationControllerArgs - v1alpha2.SetDefaults_MigrationControllerArgs(&v1beta2args) - var args config.MigrationControllerArgs - err := v1alpha2.Convert_v1alpha2_MigrationControllerArgs_To_config_MigrationControllerArgs(&v1beta2args, &args, nil) - if err != nil { - panic(err) - } - a := filter{client: fakeClient, args: &args, clock: clock.RealClock{}} - - controllerFinder := &fakeControllerFinder{} - if tt.objectLimiters != nil { - a.args.ObjectLimiters = tt.objectLimiters - } - - a.initObjectLimiters() - if tt.totalReplicas > 0 { - controllerFinder.replicas = tt.totalReplicas - } - a.controllerFinder = controllerFinder - if tt.evictedPodsCount > 0 { - for i := 0; i < tt.evictedPodsCount; i++ { - pod := tt.pod.DeepCopy() - if tt.evictedWorkload != nil { - pod.OwnerReferences = []metav1.OwnerReference{ - *tt.evictedWorkload, - } - } - a.trackEvictedPod(pod) - if tt.sleepDuration > 0 { - time.Sleep(tt.sleepDuration) - } - } - } - got := a.filterLimitedObject(tt.pod) - assert.Equal(t, tt.want, got) - }) - } -} - func TestArbitratedMap(t *testing.T) { f := filter{ arbitratedPodMigrationJobs: map[types.UID]bool{}, diff --git a/pkg/descheduler/controllers/migration/controller.go b/pkg/descheduler/controllers/migration/controller.go index fd141925b..d747f15b0 100644 --- a/pkg/descheduler/controllers/migration/controller.go +++ b/pkg/descheduler/controllers/migration/controller.go @@ -20,10 +20,14 @@ import ( "context" "fmt" "strconv" + "sync" "time" + gocache "github.com/patrickmn/go-cache" + "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" @@ -45,11 +49,13 @@ import ( deschedulerconfig "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/validation" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/arbitrator" + "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/controllerfinder" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/evictor" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/reservation" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/util" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/names" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/options" + evictionsutil "github.com/koordinator-sh/koordinator/pkg/descheduler/evictions" "github.com/koordinator-sh/koordinator/pkg/descheduler/framework" utilclient "github.com/koordinator-sh/koordinator/pkg/util/client" ) @@ -72,10 +78,14 @@ type Reconciler struct { eventRecorder events.EventRecorder reservationInterpreter reservation.Interpreter evictorInterpreter evictor.Interpreter + controllerFinder controllerfinder.Interface assumedCache *assumedCache clock clock.Clock - arbitrator arbitrator.Arbitrator + arbitrator arbitrator.Arbitrator + objectLimiters map[types.UID]*rate.Limiter + limiterCache *gocache.Cache + limiterLock sync.Mutex } func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { @@ -135,7 +145,7 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame if err != nil { return nil, err } - + controllerFinder, err := controllerfinder.New(manager) if err != nil { return nil, err } @@ -146,10 +156,11 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame eventRecorder: handle.EventRecorder(), reservationInterpreter: reservationInterpreter, evictorInterpreter: evictorInterpreter, + controllerFinder: controllerFinder, assumedCache: newAssumedCache(), clock: clock.RealClock{}, } - + r.initObjectLimiters() if err := manager.Add(r); err != nil { return nil, err } @@ -238,6 +249,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) ( return result, err } +func (r *Reconciler) getPodByJob(ctx context.Context, job *sev1alpha1.PodMigrationJob) (*corev1.Pod, error) { + if job.Spec.PodRef.Namespace == "" || job.Spec.PodRef.Name == "" { + return nil, fmt.Errorf("get pod failed for invalid podRef") + } + + podNamespacedName := types.NamespacedName{ + Namespace: job.Spec.PodRef.Namespace, + Name: job.Spec.PodRef.Name, + } + var pod corev1.Pod + err := r.Client.Get(ctx, podNamespacedName, &pod) + if err != nil { + return nil, err + } + return &pod, nil +} + func (r *Reconciler) doMigrate(ctx context.Context, job *sev1alpha1.PodMigrationJob) (reconcile.Result, error) { klog.V(4).Infof("begin process MigrationJob %s", job.Name) if job.Spec.Paused { @@ -263,6 +291,10 @@ func (r *Reconciler) doMigrate(ctx context.Context, job *sev1alpha1.PodMigration } } + if requeue := r.requeueJobIfObjectLimiterFailed(ctx, job); requeue { + return reconcile.Result{RequeueAfter: defaultRequeueAfter}, nil + } + if job.Spec.Mode == sev1alpha1.PodMigrationJobModeEvictionDirectly || (job.Spec.Mode == "" && r.args.DefaultJobMode == string(sev1alpha1.PodMigrationJobModeEvictionDirectly)) { return r.evictPodDirectly(ctx, job) @@ -419,6 +451,39 @@ func (r *Reconciler) preparePodRef(ctx context.Context, job *sev1alpha1.PodMigra return true, &pod, nil } +func (r *Reconciler) checkPodExceedObjectLimiter(pod *corev1.Pod) bool { + if r.objectLimiters == nil || r.limiterCache == nil { + return false + } + objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] + if !ok || objectLimiterArgs.Duration.Duration == 0 { + return false + } + if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil { + r.limiterLock.Lock() + defer r.limiterLock.Unlock() + if limiter := r.objectLimiters[ownerRef.UID]; limiter != nil { + if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 { + klog.V(4).InfoS("Pod fails the following checks", "pod", klog.KObj(pod), "checks", "limitedObject", + "owner", fmt.Sprintf("%s/%s/%s", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion)) + return true + } + } + } + return false +} + +func (r *Reconciler) requeueJobIfObjectLimiterFailed(ctx context.Context, job *sev1alpha1.PodMigrationJob) bool { + if evictionsutil.HaveEvictAnnotation(job) { + return false + } + pod, err := r.getPodByJob(ctx, job) + if err != nil { + return false + } + return r.checkPodExceedObjectLimiter(pod) +} + func (r *Reconciler) abortJobIfTimeout(ctx context.Context, job *sev1alpha1.PodMigrationJob) (bool, error) { if job.Spec.TTL == nil || job.Spec.TTL.Duration == 0 { return false, nil @@ -706,7 +771,7 @@ func (r *Reconciler) evictPod(ctx context.Context, job *sev1alpha1.PodMigrationJ r.eventRecorder.Eventf(job, nil, corev1.EventTypeWarning, sev1alpha1.PodMigrationJobReasonEvicting, "Migrating", "Failed evict Pod %q caused by %v", podNamespacedName, err) return false, reconcile.Result{}, err } - r.arbitrator.TrackEvictedPod(pod) + r.trackEvictedPod(pod) _, reason := evictor.GetEvictionTriggerAndReason(job.Annotations) cond = &sev1alpha1.PodMigrationJobCondition{ @@ -753,6 +818,51 @@ func (r *Reconciler) prepareJobWithReservationScheduleSuccess(ctx context.Contex return err } +func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) { + if r.objectLimiters == nil || r.limiterCache == nil { + return + } + ownerRef := metav1.GetControllerOf(pod) + if ownerRef == nil { + return + } + + objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] + if !ok || objectLimiterArgs.Duration.Seconds() == 0 { + return + } + + var maxMigratingReplicas int + if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPod(pod); err == nil { + maxMigrating := objectLimiterArgs.MaxMigrating + if maxMigrating == nil { + maxMigrating = r.args.MaxMigratingPerWorkload + } + maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating) + } + if maxMigratingReplicas == 0 { + return + } + + r.limiterLock.Lock() + defer r.limiterLock.Unlock() + + uid := ownerRef.UID + limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds()) + limiter := r.objectLimiters[uid] + if limiter == nil { + limiter = rate.NewLimiter(limit, maxMigratingReplicas) + r.objectLimiters[uid] = limiter + } else if limiter.Limit() != limit { + limiter.SetLimit(limit) + } + + if !limiter.AllowN(r.clock.Now(), 1) { + klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for r period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) + } + r.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration) +} + func (r *Reconciler) deleteReservation(ctx context.Context, job *sev1alpha1.PodMigrationJob) error { if job.Spec.ReservationOptions == nil || job.Spec.ReservationOptions.ReservationRef == nil { return nil @@ -928,3 +1038,22 @@ func (r *Reconciler) Filter(pod *corev1.Pod) bool { func (r *Reconciler) PreEvictionFilter(pod *corev1.Pod) bool { return r.arbitrator.PreEvictionFilter(pod) } + +func (r *Reconciler) initObjectLimiters() { + var trackExpiration time.Duration + for _, v := range r.args.ObjectLimiters { + if v.Duration.Duration > trackExpiration { + trackExpiration = v.Duration.Duration + } + } + if trackExpiration > 0 { + r.objectLimiters = make(map[types.UID]*rate.Limiter) + limiterExpiration := trackExpiration + trackExpiration/2 + r.limiterCache = gocache.New(limiterExpiration, limiterExpiration) + r.limiterCache.OnEvicted(func(s string, _ interface{}) { + r.limiterLock.Lock() + defer r.limiterLock.Unlock() + delete(r.objectLimiters, types.UID(s)) + }) + } +} diff --git a/pkg/descheduler/controllers/migration/controller_test.go b/pkg/descheduler/controllers/migration/controller_test.go index a7c54d64a..57f40b2ec 100644 --- a/pkg/descheduler/controllers/migration/controller_test.go +++ b/pkg/descheduler/controllers/migration/controller_test.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" @@ -43,6 +44,7 @@ import ( sev1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" deschedulerconfig "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/v1alpha2" + "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/controllerfinder" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/reservation" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/util" evictionsutil "github.com/koordinator-sh/koordinator/pkg/descheduler/evictions" @@ -90,6 +92,24 @@ func (f fakeReservationInterpreter) DeleteReservation(ctx context.Context, reser return f.deleteErr } +type fakeControllerFinder struct { + pods []*corev1.Pod + replicas int32 + err error +} + +func (f *fakeControllerFinder) ListPodsByWorkloads(workloadUIDs []types.UID, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, error) { + return f.pods, f.err +} + +func (f *fakeControllerFinder) GetPodsForRef(ownerReference *metav1.OwnerReference, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, int32, error) { + return f.pods, f.replicas, f.err +} + +func (f *fakeControllerFinder) GetExpectedScaleForPod(pod *corev1.Pod) (int32, error) { + return f.replicas, f.err +} + func newTestReconciler() *Reconciler { scheme := runtime.NewScheme() _ = sev1alpha1.AddToScheme(scheme) @@ -117,16 +137,15 @@ func newTestReconciler() *Reconciler { preEvictionFilter: func(pod *corev1.Pod) bool { return true }, - trackEvictedPod: func(pod *corev1.Pod) { - return - }, } + controllerFinder := &controllerfinder.ControllerFinder{Client: runtimeClient} r := &Reconciler{ Client: runtimeClient, args: &args, eventRecorder: record.NewEventRecorderAdapter(recorder), reservationInterpreter: nil, evictorInterpreter: nil, + controllerFinder: controllerFinder, assumedCache: newAssumedCache(), clock: clock.RealClock{}, arbitrator: &arbitrator, @@ -457,7 +476,6 @@ func TestEvictPodDirectly(t *testing.T) { }, } assert.Nil(t, reconciler.Create(context.TODO(), job)) - for i := 0; i < 2; i++ { result, err := reconciler.doMigrate(context.TODO(), job) assert.Nil(t, err) @@ -1549,9 +1567,6 @@ func TestFilter(t *testing.T) { preEvictionFilter: func(pod *corev1.Pod) bool { return true }, - trackEvictedPod: func(pod *corev1.Pod) { - return - }, } actual := reconciler.Filter(pod) @@ -1592,9 +1607,6 @@ func TestPreEvictionFilter(t *testing.T) { enterPreEvictionFilter = true return testCase.expected }, - trackEvictedPod: func(pod *corev1.Pod) { - return - }, } actual := reconciler.PreEvictionFilter(pod) @@ -1604,10 +1616,228 @@ func TestPreEvictionFilter(t *testing.T) { } } +func TestRequeueJobIfObjectLimiterFailed(t *testing.T) { + ownerReferences1 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-1", + UID: uuid.NewUUID(), + }, + } + otherOwnerReferences := metav1.OwnerReference{ + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-2", + UID: uuid.NewUUID(), + } + testObjectLimiters := deschedulerconfig.ObjectLimiterMap{ + deschedulerconfig.MigrationLimitObjectWorkload: { + Duration: metav1.Duration{Duration: 1 * time.Second}, + MaxMigrating: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, + }, + } + + tests := []struct { + name string + objectLimiters deschedulerconfig.ObjectLimiterMap + totalReplicas int32 + sleepDuration time.Duration + pod *corev1.Pod + job *sev1alpha1.PodMigrationJob + evictedPodsCount int + evictedWorkload *metav1.OwnerReference + want bool + }{ + { + name: "less than default maxMigrating", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + sleepDuration: 100 * time.Millisecond, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + Name: "test-pod", + Namespace: "test-namespace", + }, + }, + job: &sev1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: sev1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: "test-namespace", + Name: "test-pod", + }, + }, + }, + evictedPodsCount: 6, + want: false, + }, + { + name: "exceeded default maxMigrating", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + Name: "test-pod", + Namespace: "test-namespace", + }, + }, + job: &sev1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: sev1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: "test-namespace", + Name: "test-pod", + }, + }, + }, + evictedPodsCount: 11, + want: true, + }, + { + name: "other than workload", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + sleepDuration: 100 * time.Millisecond, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + Name: "test-pod", + Namespace: "test-namespace", + }, + }, + job: &sev1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: sev1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: "test-namespace", + Name: "test-pod", + }, + }, + }, + evictedPodsCount: 11, + evictedWorkload: &otherOwnerReferences, + want: false, + }, + { + name: "disable objectLimiters", + totalReplicas: 100, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + Name: "test-pod", + Namespace: "test-namespace", + }, + }, + evictedPodsCount: 11, + objectLimiters: deschedulerconfig.ObjectLimiterMap{ + deschedulerconfig.MigrationLimitObjectWorkload: deschedulerconfig.MigrationObjectLimiter{ + Duration: metav1.Duration{Duration: 0}, + }, + }, + job: &sev1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: sev1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: "test-namespace", + Name: "test-pod", + }, + }, + }, + want: false, + }, + { + name: "default limiter", + totalReplicas: 100, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + Name: "test-pod", + Namespace: "test-namespace", + }, + }, + job: &sev1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: sev1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: "test-namespace", + Name: "test-pod", + }, + }, + }, + evictedPodsCount: 11, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + _ = sev1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + + var v1beta2args v1alpha2.MigrationControllerArgs + v1alpha2.SetDefaults_MigrationControllerArgs(&v1beta2args) + var args deschedulerconfig.MigrationControllerArgs + err := v1alpha2.Convert_v1alpha2_MigrationControllerArgs_To_config_MigrationControllerArgs(&v1beta2args, &args, nil) + if err != nil { + panic(err) + } + reconciler := newTestReconciler() + controllerFinder := &fakeControllerFinder{} + if tt.objectLimiters != nil { + reconciler.args.ObjectLimiters = tt.objectLimiters + } + + reconciler.initObjectLimiters() + if tt.totalReplicas > 0 { + controllerFinder.replicas = tt.totalReplicas + } + reconciler.controllerFinder = controllerFinder + assert.NoError(t, reconciler.Create(context.TODO(), tt.pod)) + assert.NoError(t, reconciler.Create(context.TODO(), tt.job)) + if tt.evictedPodsCount > 0 { + for i := 0; i < tt.evictedPodsCount; i++ { + pod := tt.pod.DeepCopy() + if tt.evictedWorkload != nil { + pod.OwnerReferences = []metav1.OwnerReference{ + *tt.evictedWorkload, + } + } + reconciler.trackEvictedPod(pod) + if tt.sleepDuration > 0 { + time.Sleep(tt.sleepDuration) + } + } + } + got := reconciler.requeueJobIfObjectLimiterFailed(context.TODO(), tt.job) + assert.Equal(t, tt.want, got) + }) + } +} + type fakeArbitrator struct { filter framework.FilterFunc preEvictionFilter framework.FilterFunc - trackEvictedPod func(*corev1.Pod) add func(*sev1alpha1.PodMigrationJob) delete func(types.UID) } @@ -1624,10 +1854,6 @@ func (f *fakeArbitrator) PreEvictionFilter(pod *corev1.Pod) bool { return f.preEvictionFilter(pod) } -func (f *fakeArbitrator) TrackEvictedPod(pod *corev1.Pod) { - f.trackEvictedPod(pod) -} - func (f *fakeArbitrator) AddPodMigrationJob(job *sev1alpha1.PodMigrationJob) { f.add(job) } diff --git a/pkg/descheduler/controllers/migration/evict.go b/pkg/descheduler/controllers/migration/evict.go index 91dca072d..7490e8910 100644 --- a/pkg/descheduler/controllers/migration/evict.go +++ b/pkg/descheduler/controllers/migration/evict.go @@ -44,6 +44,11 @@ func (r *Reconciler) Evict(ctx context.Context, pod *corev1.Pod, evictOptions fr return false } + if r.checkPodExceedObjectLimiter(pod) { + klog.Errorf("Pod %q cannot be evicted since it exceeds object limiter", klog.KObj(pod)) + return false + } + err := CreatePodMigrationJob(ctx, pod, evictOptions, r.Client, r.args) return err == nil }