From ac7733762a5501973199d21f3e65c40f6efc2210 Mon Sep 17 00:00:00 2001 From: baowj Date: Tue, 26 Sep 2023 22:33:56 +0800 Subject: [PATCH] koord-descheduler: add arbitration to migration controller (#1651) Signed-off-by: baowj Signed-off-by: baowj-678 --- .../migration/arbitrator/arbitrator.go | 152 +- .../migration/arbitrator/arbitrator_test.go | 130 +- .../migration/{ => arbitrator}/filter.go | 285 ++-- .../migration/arbitrator/filter_test.go | 1103 ++++++++++++++ .../migration/arbitrator/handler.go | 90 ++ .../controllers/migration/arbitrator/sort.go | 2 +- .../migration/arbitrator/sort_test.go | 2 +- .../controllers/migration/controller.go | 120 +- .../controllers/migration/controller_test.go | 1280 ++--------------- 9 files changed, 1655 insertions(+), 1509 deletions(-) rename pkg/descheduler/controllers/migration/{ => arbitrator}/filter.go (59%) create mode 100644 pkg/descheduler/controllers/migration/arbitrator/filter_test.go create mode 100644 pkg/descheduler/controllers/migration/arbitrator/handler.go diff --git a/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go b/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go index 956e68fb2..e0b55dc60 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go +++ b/pkg/descheduler/controllers/migration/arbitrator/arbitrator.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Koordinator Authors. +Copyright 2022 The Koordinator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -26,13 +26,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/events" - "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/handler" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" @@ -42,12 +38,21 @@ import ( const ( AnnotationPassedArbitration = "descheduler.koordinator.sh/passed-arbitration" + AnnotationPodArbitrating = "descheduler.koordinator.sh/pod-arbitrating" ) var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitratorImpl") +type MigrationFilter interface { + Filter(pod *corev1.Pod) bool + PreEvictionFilter(pod *corev1.Pod) bool + TrackEvictedPod(pod *corev1.Pod) +} + type Arbitrator interface { - Add(job *v1alpha1.PodMigrationJob) + MigrationFilter + AddPodMigrationJob(job *v1alpha1.PodMigrationJob) + DeletePodMigrationJob(job *v1alpha1.PodMigrationJob) } // SortFn stably sorts PodMigrationJobs slice based on a certain strategy. Users @@ -58,9 +63,8 @@ type arbitratorImpl struct { waitingCollection map[types.UID]*v1alpha1.PodMigrationJob interval time.Duration - sorts []SortFn - nonRetryablePodFilter framework.FilterFunc - retryablePodFilter framework.FilterFunc + sorts []SortFn + filter *filter client client.Client eventRecorder events.EventRecorder @@ -68,40 +72,48 @@ type arbitratorImpl struct { } // New creates an arbitratorImpl based on parameters. -func New(args *config.ArbitrationArgs, options Options) (Arbitrator, error) { +func New(args *config.MigrationControllerArgs, options Options) (Arbitrator, error) { + f, err := newFilter(args, options.Handle) + if err != nil { + return nil, err + } + arbitrator := &arbitratorImpl{ waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{}, - interval: args.Interval.Duration, - + interval: args.ArbitrationArgs.Interval.Duration, sorts: []SortFn{ SortJobsByCreationTime(), SortJobsByPod(sorter.PodSorter().Sort), SortJobsByController(), SortJobsByMigratingNum(options.Client), }, - retryablePodFilter: options.RetryableFilter, - nonRetryablePodFilter: options.NonRetryableFilter, - + filter: f, client: options.Client, eventRecorder: options.EventRecorder, mu: sync.Mutex{}, } - err := options.Manager.Add(arbitrator) + err = options.Manager.Add(arbitrator) if err != nil { return nil, err } return arbitrator, nil } -// Add adds a PodMigrationJob to the waitingCollection of arbitratorImpl. +// AddPodMigrationJob adds a PodMigrationJob waiting to be arbitrated to Arbitrator. // It is safe to be called concurrently by multiple goroutines. -func (a *arbitratorImpl) Add(job *v1alpha1.PodMigrationJob) { +func (a *arbitratorImpl) AddPodMigrationJob(job *v1alpha1.PodMigrationJob) { a.mu.Lock() defer a.mu.Unlock() a.waitingCollection[job.UID] = job.DeepCopy() } +// DeletePodMigrationJob removes a deleted PodMigrationJob from Arbitrator. +// It is safe to be called concurrently by multiple goroutines. +func (a *arbitratorImpl) DeletePodMigrationJob(job *v1alpha1.PodMigrationJob) { + a.filter.removeJobPassedArbitration(job.UID) +} + // Start starts the goroutine to arbitrate jobs periodically. func (a *arbitratorImpl) Start(ctx context.Context) error { klog.InfoS("Start Arbitrator Arbitrate Goroutine") @@ -109,6 +121,33 @@ func (a *arbitratorImpl) Start(ctx context.Context) error { return nil } +// Filter checks if a pod can be evicted +func (a *arbitratorImpl) Filter(pod *corev1.Pod) bool { + if !a.filter.filterExistingPodMigrationJob(pod) { + return false + } + + if !a.filter.reservationFilter(pod) { + return false + } + + if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) { + return false + } + if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) { + return false + } + return true +} + +func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool { + return a.filter.defaultFilterPlugin.PreEvictionFilter(pod) +} + +func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) { + a.filter.trackEvictedPod(pod) +} + // sort stably sorts jobs, outputs the sorted results and corresponding ranking map. func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob { for _, sortFn := range a.sorts { @@ -117,14 +156,15 @@ func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1 return jobs } -// filter calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob. -func (a *arbitratorImpl) filter(pod *corev1.Pod) (isFailed, isPassed bool) { +// filtering calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob. +func (a *arbitratorImpl) filtering(pod *corev1.Pod) (isFailed, isPassed bool) { if pod != nil { - if a.nonRetryablePodFilter != nil && !a.nonRetryablePodFilter(pod) { + markPodArbitrating(pod) + if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) { isFailed = true return } - if a.retryablePodFilter != nil && !a.retryablePodFilter(pod) { + if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) { isPassed = false return } @@ -144,6 +184,7 @@ func (a *arbitratorImpl) updatePassedJob(job *v1alpha1.PodMigrationJob) { if err != nil { klog.ErrorS(err, "failed to update job", "job", klog.KObj(job)) } else { + a.filter.markJobPassedArbitration(job.UID) // remove job from the waitingCollection a.mu.Lock() delete(a.waitingCollection, job.UID) @@ -167,7 +208,7 @@ func (a *arbitratorImpl) doOnceArbitrate() { // filter for _, job := range jobs { pod := podOfJob[job] - isFailed, isPassed := a.filter(pod) + isFailed, isPassed := a.filtering(pod) if isFailed { a.updateFailedJob(job, pod) continue @@ -205,54 +246,11 @@ func (a *arbitratorImpl) updateFailedJob(job *v1alpha1.PodMigrationJob, pod *cor a.mu.Unlock() } -// arbitrationHandler implement handler.EventHandler -type arbitrationHandler struct { - handler.EnqueueRequestForObject - c client.Client - arbitrator Arbitrator -} - -func NewHandler(arbitrator Arbitrator, c client.Client) handler.EventHandler { - return &arbitrationHandler{ - EnqueueRequestForObject: handler.EnqueueRequestForObject{}, - arbitrator: arbitrator, - c: c, - } -} - -// Create call Arbitrator.Create -func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { - if evt.Object == nil { - enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) - return - } - // get job - job := &v1alpha1.PodMigrationJob{} - err := h.c.Get(context.TODO(), types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }, job) - if err != nil { - // if err, add job to the workQueue directly. - enqueueLog.Error(nil, "Fail to get PodMigrationJob", "PodMigrationJob", types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }) - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) - return - } - h.arbitrator.Add(job) -} - type Options struct { - Client client.Client - EventRecorder events.EventRecorder - RetryableFilter framework.FilterFunc - NonRetryableFilter framework.FilterFunc - Manager controllerruntime.Manager + Client client.Client + EventRecorder events.EventRecorder + Manager controllerruntime.Manager + Handle framework.Handle } func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alpha1.PodMigrationJob]*corev1.Pod { @@ -276,3 +274,17 @@ func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alph } return podOfJob } + +func markPodArbitrating(pod *corev1.Pod) { + if pod.Annotations == nil { + pod.Annotations = map[string]string{} + } + pod.Annotations[AnnotationPodArbitrating] = "true" +} + +func checkPodArbitrating(pod *corev1.Pod) bool { + if pod.Annotations == nil { + return false + } + return pod.Annotations[AnnotationPodArbitrating] == "true" +} diff --git a/pkg/descheduler/controllers/migration/arbitrator/arbitrator_test.go b/pkg/descheduler/controllers/migration/arbitrator/arbitrator_test.go index 88592f81e..d79e5ea6f 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/arbitrator_test.go +++ b/pkg/descheduler/controllers/migration/arbitrator/arbitrator_test.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Koordinator Authors. +Copyright 2022 The Koordinator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -36,7 +36,6 @@ import ( "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/client/fake" "sigs.k8s.io/controller-runtime/pkg/event" - "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" @@ -136,7 +135,7 @@ func TestMultiSortFn(t *testing.T) { assert.Equal(t, expectedJobsOrder, jobsOrder) } -func TestFilter(t *testing.T) { +func TestFiltering(t *testing.T) { testCases := []struct { name string pod *corev1.Pod @@ -189,14 +188,16 @@ func TestFilter(t *testing.T) { for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { arbitrator := arbitratorImpl{ - nonRetryablePodFilter: func(pod *corev1.Pod) bool { - return testCase.nonRetryable - }, - retryablePodFilter: func(pod *corev1.Pod) bool { - return testCase.retryable + filter: &filter{ + nonRetryablePodFilter: func(pod *corev1.Pod) bool { + return testCase.nonRetryable + }, + retryablePodFilter: func(pod *corev1.Pod) bool { + return testCase.retryable + }, }, } - isFailed, isPassed := arbitrator.filter(testCase.pod) + isFailed, isPassed := arbitrator.filtering(testCase.pod) assert.Equal(t, testCase.isFailed, isFailed) assert.Equal(t, testCase.isPassed, isPassed) }) @@ -222,7 +223,7 @@ func TestAdd(t *testing.T) { } for _, job := range migratingJobs { - arbitrator.Add(job) + arbitrator.AddPodMigrationJob(job) } var actualJobs []string @@ -281,12 +282,14 @@ func TestRequeueJobIfRetryablePodFilterFailed(t *testing.T) { sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob { return jobs }}, - nonRetryablePodFilter: func(pod *corev1.Pod) bool { - return true - }, - retryablePodFilter: func(pod *corev1.Pod) bool { - enter = true - return false + filter: &filter{ + nonRetryablePodFilter: func(pod *corev1.Pod) bool { + return true + }, + retryablePodFilter: func(pod *corev1.Pod) bool { + enter = true + return false + }, }, client: fakeClient, mu: sync.Mutex{}, @@ -352,12 +355,14 @@ func TestAbortJobIfNonRetryablePodFilterFailed(t *testing.T) { sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob { return jobs }}, - nonRetryablePodFilter: func(pod *corev1.Pod) bool { - enter = true - return false - }, - retryablePodFilter: func(pod *corev1.Pod) bool { - return true + filter: &filter{ + nonRetryablePodFilter: func(pod *corev1.Pod) bool { + enter = true + return false + }, + retryablePodFilter: func(pod *corev1.Pod) bool { + return true + }, }, client: fakeClient, mu: sync.Mutex{}, @@ -484,11 +489,14 @@ func TestDoOnceArbitrate(t *testing.T) { } a := &arbitratorImpl{ waitingCollection: collection, - nonRetryablePodFilter: func(pod *corev1.Pod) bool { - return !nonRetryablePods[pod.Name] - }, - retryablePodFilter: func(pod *corev1.Pod) bool { - return !retryablePods[pod.Name] + filter: &filter{ + nonRetryablePodFilter: func(pod *corev1.Pod) bool { + return !nonRetryablePods[pod.Name] + }, + retryablePodFilter: func(pod *corev1.Pod) bool { + return !retryablePods[pod.Name] + }, + arbitratedPodMigrationJobs: map[types.UID]bool{}, }, sorts: []SortFn{ func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob { @@ -537,11 +545,14 @@ func TestArbitrate(t *testing.T) { a := &arbitratorImpl{ waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{}, - nonRetryablePodFilter: func(pod *corev1.Pod) bool { - return true - }, - retryablePodFilter: func(pod *corev1.Pod) bool { - return true + filter: &filter{ + nonRetryablePodFilter: func(pod *corev1.Pod) bool { + return true + }, + retryablePodFilter: func(pod *corev1.Pod) bool { + return true + }, + arbitratedPodMigrationJobs: map[types.UID]bool{}, }, sorts: []SortFn{ func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob { @@ -563,7 +574,7 @@ func TestArbitrate(t *testing.T) { job := makePodMigrationJob("test-job-"+strconv.Itoa(i), time.Now(), pod) assert.Nil(t, fakeClient.Create(context.TODO(), pod)) assert.Nil(t, fakeClient.Create(context.TODO(), job)) - a.Add(job) + a.AddPodMigrationJob(job) time.Sleep(800 * time.Millisecond) assert.Nil(t, fakeClient.Get(context.TODO(), types.NamespacedName{ Namespace: job.Namespace, @@ -592,9 +603,13 @@ func TestUpdatePassedJob(t *testing.T) { client: fakeClient, mu: sync.Mutex{}, eventRecorder: &events.FakeRecorder{}, + filter: &filter{ + arbitratedPodMigrationJobs: map[types.UID]bool{}, + }, } - arbitrator.updatePassedJob(job) + assert.False(t, arbitrator.filter.checkJobPassedArbitration(job.UID)) + arbitrator.updatePassedJob(job) assert.Equal(t, 0, len(arbitrator.waitingCollection)) actualJob := &v1alpha1.PodMigrationJob{} @@ -603,6 +618,7 @@ func TestUpdatePassedJob(t *testing.T) { Name: "test", }, actualJob)) assert.Equal(t, map[string]string{AnnotationPassedArbitration: "true"}, actualJob.Annotations) + assert.True(t, arbitrator.filter.checkJobPassedArbitration(job.UID)) } func TestUpdateFailedJob(t *testing.T) { @@ -641,7 +657,7 @@ func TestUpdateFailedJob(t *testing.T) { assert.Equal(t, v1alpha1.PodMigrationJobFailed, actualJob.Status.Phase) } -func TestEventHandlerCreate(t *testing.T) { +func TestEventHandler(t *testing.T) { creationTime := time.Now() migratingJobs := []*v1alpha1.PodMigrationJob{ makePodMigrationJob("test-job-1", creationTime, nil), @@ -660,13 +676,21 @@ func TestEventHandlerCreate(t *testing.T) { arbitrator := &arbitratorImpl{ waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{}, client: fakeClient, + filter: &filter{ + client: fakeClient, + arbitratedPodMigrationJobs: map[types.UID]bool{}, + }, } handler := NewHandler(arbitrator, fakeClient) var expectedJobs []string for _, job := range migratingJobs { assert.Nil(t, fakeClient.Create(context.TODO(), job)) + assert.False(t, arbitrator.filter.checkJobPassedArbitration(job.UID)) handler.Create(event.CreateEvent{Object: job}, queue) + + arbitrator.filter.markJobPassedArbitration(job.UID) + assert.True(t, arbitrator.filter.checkJobPassedArbitration(job.UID)) expectedJobs = append(expectedJobs, job.Name) var actualJobs []string @@ -676,11 +700,19 @@ func TestEventHandlerCreate(t *testing.T) { assert.ElementsMatch(t, actualJobs, expectedJobs) } assert.Equal(t, 0, queue.Len()) - nilJob := makePodMigrationJob("test-job-6", creationTime, nil) - handler.Create(event.CreateEvent{Object: nilJob}, queue) - - actualJob, _ := queue.Get() - assert.Equal(t, actualJob.(reconcile.Request).Name, nilJob.Name) + for _, job := range migratingJobs[:3] { + handler.Delete(event.DeleteEvent{Object: job}, queue) + assert.False(t, arbitrator.filter.checkJobPassedArbitration(job.UID)) + } + migratingJobs[3].Status.Phase = v1alpha1.PodMigrationJobFailed + assert.Nil(t, fakeClient.Update(context.TODO(), migratingJobs[3])) + handler.Update(event.UpdateEvent{ObjectNew: migratingJobs[3]}, queue) + assert.False(t, arbitrator.filter.checkJobPassedArbitration(migratingJobs[3].UID)) + + migratingJobs[4].Status.Phase = v1alpha1.PodMigrationJobSucceeded + assert.Nil(t, fakeClient.Update(context.TODO(), migratingJobs[4])) + handler.Update(event.UpdateEvent{ObjectNew: migratingJobs[4]}, queue) + assert.False(t, arbitrator.filter.checkJobPassedArbitration(migratingJobs[4].UID)) } type podDecoratorFn func(pod *corev1.Pod) @@ -741,3 +773,21 @@ func makePodMigrationJob(name string, creationTime time.Time, pod *corev1.Pod, d } return job } + +type fakeControllerFinder struct { + pods []*corev1.Pod + replicas int32 + err error +} + +func (f *fakeControllerFinder) ListPodsByWorkloads(workloadUIDs []types.UID, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, error) { + return f.pods, f.err +} + +func (f *fakeControllerFinder) GetPodsForRef(ownerReference *metav1.OwnerReference, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, int32, error) { + return f.pods, f.replicas, f.err +} + +func (f *fakeControllerFinder) GetExpectedScaleForPod(pod *corev1.Pod) (int32, error) { + return f.replicas, f.err +} diff --git a/pkg/descheduler/controllers/migration/filter.go b/pkg/descheduler/controllers/migration/arbitrator/filter.go similarity index 59% rename from pkg/descheduler/controllers/migration/filter.go rename to pkg/descheduler/controllers/migration/arbitrator/filter.go index eac22d0c1..715a1b086 100644 --- a/pkg/descheduler/controllers/migration/filter.go +++ b/pkg/descheduler/controllers/migration/arbitrator/filter.go @@ -14,11 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package migration +package arbitrator import ( "context" "fmt" + "sync" + "time" gocache "github.com/patrickmn/go-cache" "golang.org/x/time/rate" @@ -30,12 +32,15 @@ import ( "k8s.io/klog/v2" k8spodutil "k8s.io/kubernetes/pkg/api/v1/pod" kubecontroller "k8s.io/kubernetes/pkg/controller" + "k8s.io/utils/clock" "sigs.k8s.io/controller-runtime/pkg/client" k8sdeschedulerapi "sigs.k8s.io/descheduler/pkg/api" sev1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" deschedulerconfig "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/controllerfinder" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/util" + "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/options" evictionsutil "github.com/koordinator-sh/koordinator/pkg/descheduler/evictions" "github.com/koordinator-sh/koordinator/pkg/descheduler/fieldindex" "github.com/koordinator-sh/koordinator/pkg/descheduler/framework" @@ -45,24 +50,44 @@ import ( utilclient "github.com/koordinator-sh/koordinator/pkg/util/client" ) -const ( - // AnnotationPrepareMigrating is an internal switch flag, indicating that the current Pod is ready for migration, - // and its activation triggers certain filter logic. - AnnotationPrepareMigrating = "descheduler.koordinator.sh/prepare-migrating" -) +type filter struct { + client client.Client + clock clock.RealClock -func markPodPrepareMigrating(pod *corev1.Pod) { - if pod.Annotations == nil { - pod.Annotations = make(map[string]string) - } - pod.Annotations[AnnotationPrepareMigrating] = "true" + nonRetryablePodFilter framework.FilterFunc + retryablePodFilter framework.FilterFunc + defaultFilterPlugin framework.FilterPlugin + + args *deschedulerconfig.MigrationControllerArgs + controllerFinder controllerfinder.Interface + objectLimiters map[types.UID]*rate.Limiter + limiterCache *gocache.Cache + limiterLock sync.Mutex + + arbitratedPodMigrationJobs map[types.UID]bool + arbitratedMapLock sync.Mutex } -func isPodPrepareMigrating(pod *corev1.Pod) bool { - return pod.Annotations[AnnotationPrepareMigrating] == "true" +func newFilter(args *deschedulerconfig.MigrationControllerArgs, handle framework.Handle) (*filter, error) { + controllerFinder, err := controllerfinder.New(options.Manager) + if err != nil { + return nil, err + } + f := &filter{ + client: options.Manager.GetClient(), + args: args, + controllerFinder: controllerFinder, + clock: clock.RealClock{}, + arbitratedPodMigrationJobs: map[types.UID]bool{}, + } + if err := f.initFilters(args, handle); err != nil { + return nil, err + } + f.initObjectLimiters() + return f, nil } -func (r *Reconciler) initFilters(args *deschedulerconfig.MigrationControllerArgs, handle framework.Handle) error { +func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, handle framework.Handle) error { defaultEvictorArgs := &defaultevictor.DefaultEvictorArgs{ NodeFit: args.NodeFit, NodeSelector: args.NodeSelector, @@ -93,7 +118,7 @@ func (r *Reconciler) initFilters(args *deschedulerconfig.MigrationControllerArgs wrapFilterFuncs := podutil.WrapFilterFuncs( util.FilterPodWithMaxEvictionCost, filterPlugin.Filter, - r.filterExpectedReplicas, + f.filterExpectedReplicas, ) podFilter, err := podutil.NewOptions(). WithFilter(wrapFilterFuncs). @@ -104,65 +129,45 @@ func (r *Reconciler) initFilters(args *deschedulerconfig.MigrationControllerArgs return err } retriablePodFilters := podutil.WrapFilterFuncs( - r.filterLimitedObject, - r.filterMaxMigratingPerNode, - r.filterMaxMigratingPerNamespace, - r.filterMaxMigratingOrUnavailablePerWorkload, + f.filterLimitedObject, + f.filterMaxMigratingPerNode, + f.filterMaxMigratingPerNamespace, + f.filterMaxMigratingOrUnavailablePerWorkload, ) - r.retryablePodFilter = func(pod *corev1.Pod) bool { + f.retryablePodFilter = func(pod *corev1.Pod) bool { return evictionsutil.HaveEvictAnnotation(pod) || retriablePodFilters(pod) } - r.nonRetryablePodFilter = podFilter - r.defaultFilterPlugin = defaultEvictor.(framework.FilterPlugin) + f.nonRetryablePodFilter = podFilter + f.defaultFilterPlugin = defaultEvictor.(framework.FilterPlugin) return nil } -// Filter checks if a pod can be evicted -func (r *Reconciler) Filter(pod *corev1.Pod) bool { - if !r.filterExistingPodMigrationJob(pod) { - return false - } - - if !r.reservationFilter(pod) { - return false - } - - if r.nonRetryablePodFilter != nil && !r.nonRetryablePodFilter(pod) { - return false - } - if r.retryablePodFilter != nil && !r.retryablePodFilter(pod) { - return false - } - return true -} - -func (r *Reconciler) reservationFilter(pod *corev1.Pod) bool { - if sev1alpha1.PodMigrationJobMode(r.args.DefaultJobMode) != sev1alpha1.PodMigrationJobModeReservationFirst { +func (f *filter) reservationFilter(pod *corev1.Pod) bool { + if sev1alpha1.PodMigrationJobMode(f.args.DefaultJobMode) != sev1alpha1.PodMigrationJobModeReservationFirst { return true } - if pkgutil.IsIn(r.args.SchedulerNames, pod.Spec.SchedulerName) { + if pkgutil.IsIn(f.args.SchedulerNames, pod.Spec.SchedulerName) { return true } - klog.Errorf("Pod %q can not be migrated by ReservationFirst mode because pod.schedulerName=%s but scheduler of pmj controller assigned is %s", klog.KObj(pod), pod.Spec.SchedulerName, r.args.SchedulerNames) + klog.Errorf("Pod %q can not be migrated by ReservationFirst mode because pod.schedulerName=%s but scheduler of pmj controller assigned is %s", klog.KObj(pod), pod.Spec.SchedulerName, f.args.SchedulerNames) return false } -func (r *Reconciler) PreEvictionFilter(pod *corev1.Pod) bool { - return r.defaultFilterPlugin.PreEvictionFilter(pod) -} - -func (r *Reconciler) forEachAvailableMigrationJobs(listOpts *client.ListOptions, handler func(job *sev1alpha1.PodMigrationJob) bool, expectPhases ...sev1alpha1.PodMigrationJobPhase) { +func (f *filter) forEachAvailableMigrationJobs(listOpts *client.ListOptions, handler func(job *sev1alpha1.PodMigrationJob) bool, expectedPhaseContexts ...phaseContext) { jobList := &sev1alpha1.PodMigrationJobList{} - err := r.Client.List(context.TODO(), jobList, listOpts, utilclient.DisableDeepCopy) + err := f.client.List(context.TODO(), jobList, listOpts, utilclient.DisableDeepCopy) if err != nil { klog.Errorf("failed to get PodMigrationJobList, err: %v", err) return } - if len(expectPhases) == 0 { - expectPhases = []sev1alpha1.PodMigrationJobPhase{sev1alpha1.PodMigrationJobPending, sev1alpha1.PodMigrationJobRunning} + if len(expectedPhaseContexts) == 0 { + expectedPhaseContexts = []phaseContext{ + {phase: sev1alpha1.PodMigrationJobRunning, checkArbitration: false}, + {phase: sev1alpha1.PodMigrationJobPending, checkArbitration: false}, + } } for i := range jobList.Items { @@ -172,8 +177,8 @@ func (r *Reconciler) forEachAvailableMigrationJobs(listOpts *client.ListOptions, phase = sev1alpha1.PodMigrationJobPending } found := false - for _, v := range expectPhases { - if v == phase { + for _, v := range expectedPhaseContexts { + if phase == v.phase && (!v.checkArbitration || f.checkJobPassedArbitration(job.UID)) { found = true break } @@ -184,40 +189,40 @@ func (r *Reconciler) forEachAvailableMigrationJobs(listOpts *client.ListOptions, } } -func (r *Reconciler) filterExistingPodMigrationJob(pod *corev1.Pod) bool { - return !r.existingPodMigrationJob(pod) +func (f *filter) filterExistingPodMigrationJob(pod *corev1.Pod) bool { + return !f.existingPodMigrationJob(pod) } -func (r *Reconciler) existingPodMigrationJob(pod *corev1.Pod, expectPhases ...sev1alpha1.PodMigrationJobPhase) bool { +func (f *filter) existingPodMigrationJob(pod *corev1.Pod, expectedPhaseContexts ...phaseContext) bool { opts := &client.ListOptions{FieldSelector: fields.OneTermEqualSelector(fieldindex.IndexJobByPodUID, string(pod.UID))} existing := false - r.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { + f.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { if podRef := job.Spec.PodRef; podRef != nil && podRef.UID == pod.UID { existing = true } return !existing - }, expectPhases...) + }, expectedPhaseContexts...) if !existing { opts = &client.ListOptions{FieldSelector: fields.OneTermEqualSelector(fieldindex.IndexJobPodNamespacedName, fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))} - r.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { + f.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { if podRef := job.Spec.PodRef; podRef != nil && podRef.Namespace == pod.Namespace && podRef.Name == pod.Name { existing = true } return !existing - }, expectPhases...) + }, expectedPhaseContexts...) } return existing } -func (r *Reconciler) filterMaxMigratingPerNode(pod *corev1.Pod) bool { - if pod.Spec.NodeName == "" || r.args.MaxMigratingPerNode == nil || *r.args.MaxMigratingPerNode <= 0 { +func (f *filter) filterMaxMigratingPerNode(pod *corev1.Pod) bool { + if pod.Spec.NodeName == "" || f.args.MaxMigratingPerNode == nil || *f.args.MaxMigratingPerNode <= 0 { return true } podList := &corev1.PodList{} listOpts := &client.ListOptions{FieldSelector: fields.OneTermEqualSelector(fieldindex.IndexPodByNodeName, pod.Spec.NodeName)} - err := r.Client.List(context.TODO(), podList, listOpts, utilclient.DisableDeepCopy) + err := f.client.List(context.TODO(), podList, listOpts, utilclient.DisableDeepCopy) if err != nil { return true } @@ -225,9 +230,12 @@ func (r *Reconciler) filterMaxMigratingPerNode(pod *corev1.Pod) bool { return true } - var expectPhases []sev1alpha1.PodMigrationJobPhase - if isPodPrepareMigrating(pod) { - expectPhases = append(expectPhases, sev1alpha1.PodMigrationJobRunning) + var expectedPhaseContexts []phaseContext + if checkPodArbitrating(pod) { + expectedPhaseContexts = []phaseContext{ + {phase: sev1alpha1.PodMigrationJobRunning, checkArbitration: false}, + {phase: sev1alpha1.PodMigrationJobPending, checkArbitration: true}, + } } count := 0 @@ -235,12 +243,12 @@ func (r *Reconciler) filterMaxMigratingPerNode(pod *corev1.Pod) bool { v := &podList.Items[i] if v.UID != pod.UID && v.Spec.NodeName == pod.Spec.NodeName && - r.existingPodMigrationJob(v, expectPhases...) { + f.existingPodMigrationJob(v, expectedPhaseContexts...) { count++ } } - maxMigratingPerNode := int(*r.args.MaxMigratingPerNode) + maxMigratingPerNode := int(*f.args.MaxMigratingPerNode) exceeded := count >= maxMigratingPerNode if exceeded { klog.V(4).Infof("Pod %q fails to check maxMigratingPerNode because the Node %q has %d migrating Pods, exceeding the maxMigratingPerNode(%d)", @@ -249,26 +257,29 @@ func (r *Reconciler) filterMaxMigratingPerNode(pod *corev1.Pod) bool { return !exceeded } -func (r *Reconciler) filterMaxMigratingPerNamespace(pod *corev1.Pod) bool { - if r.args.MaxMigratingPerNamespace == nil || *r.args.MaxMigratingPerNamespace <= 0 { +func (f *filter) filterMaxMigratingPerNamespace(pod *corev1.Pod) bool { + if f.args.MaxMigratingPerNamespace == nil || *f.args.MaxMigratingPerNamespace <= 0 { return true } - var expectPhases []sev1alpha1.PodMigrationJobPhase - if isPodPrepareMigrating(pod) { - expectPhases = append(expectPhases, sev1alpha1.PodMigrationJobRunning) + var expectedPhaseContexts []phaseContext + if checkPodArbitrating(pod) { + expectedPhaseContexts = []phaseContext{ + {phase: sev1alpha1.PodMigrationJobRunning, checkArbitration: false}, + {phase: sev1alpha1.PodMigrationJobPending, checkArbitration: true}, + } } opts := &client.ListOptions{FieldSelector: fields.OneTermEqualSelector(fieldindex.IndexJobByPodNamespace, pod.Namespace)} count := 0 - r.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { + f.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { if podRef := job.Spec.PodRef; podRef != nil && podRef.UID != pod.UID && podRef.Namespace == pod.Namespace { count++ } return true - }, expectPhases...) + }, expectedPhaseContexts...) - maxMigratingPerNamespace := int(*r.args.MaxMigratingPerNamespace) + maxMigratingPerNamespace := int(*f.args.MaxMigratingPerNamespace) exceeded := count >= maxMigratingPerNamespace if exceeded { klog.V(4).Infof("Pod %q fails to check maxMigratingPerNamespace because the Namespace %q has %d migrating Pods, exceeding the maxMigratingPerNamespace(%d)", @@ -277,33 +288,35 @@ func (r *Reconciler) filterMaxMigratingPerNamespace(pod *corev1.Pod) bool { return !exceeded } -func (r *Reconciler) filterMaxMigratingOrUnavailablePerWorkload(pod *corev1.Pod) bool { +func (f *filter) filterMaxMigratingOrUnavailablePerWorkload(pod *corev1.Pod) bool { ownerRef := metav1.GetControllerOf(pod) if ownerRef == nil { return true } - pods, expectedReplicas, err := r.controllerFinder.GetPodsForRef(ownerRef, pod.Namespace, nil, false) + pods, expectedReplicas, err := f.controllerFinder.GetPodsForRef(ownerRef, pod.Namespace, nil, false) if err != nil { return false } - maxMigrating, err := util.GetMaxMigrating(int(expectedReplicas), r.args.MaxMigratingPerWorkload) + maxMigrating, err := util.GetMaxMigrating(int(expectedReplicas), f.args.MaxMigratingPerWorkload) if err != nil { return false } - maxUnavailable, err := util.GetMaxUnavailable(int(expectedReplicas), r.args.MaxUnavailablePerWorkload) + maxUnavailable, err := util.GetMaxUnavailable(int(expectedReplicas), f.args.MaxUnavailablePerWorkload) if err != nil { return false } - var expectPhases []sev1alpha1.PodMigrationJobPhase - if isPodPrepareMigrating(pod) { - expectPhases = append(expectPhases, sev1alpha1.PodMigrationJobRunning) + var expectedPhaseContext []phaseContext + if checkPodArbitrating(pod) { + expectedPhaseContext = []phaseContext{ + {phase: sev1alpha1.PodMigrationJobRunning, checkArbitration: false}, + {phase: sev1alpha1.PodMigrationJobPending, checkArbitration: true}, + } } - opts := &client.ListOptions{FieldSelector: fields.OneTermEqualSelector(fieldindex.IndexJobByPodNamespace, pod.Namespace)} migratingPods := map[types.NamespacedName]struct{}{} - r.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { + f.forEachAvailableMigrationJobs(opts, func(job *sev1alpha1.PodMigrationJob) bool { podRef := job.Spec.PodRef if podRef == nil || podRef.UID == pod.UID { return true @@ -314,7 +327,7 @@ func (r *Reconciler) filterMaxMigratingOrUnavailablePerWorkload(pod *corev1.Pod) Name: podRef.Name, } p := &corev1.Pod{} - err := r.Client.Get(context.TODO(), podNamespacedName, p) + err := f.client.Get(context.TODO(), podNamespacedName, p) if err != nil { klog.Errorf("Failed to get Pod %q, err: %v", podNamespacedName, err) } else { @@ -324,7 +337,7 @@ func (r *Reconciler) filterMaxMigratingOrUnavailablePerWorkload(pod *corev1.Pod) } } return true - }, expectPhases...) + }, expectedPhaseContext...) if len(migratingPods) > 0 { exceeded := len(migratingPods) >= maxMigrating @@ -335,7 +348,7 @@ func (r *Reconciler) filterMaxMigratingOrUnavailablePerWorkload(pod *corev1.Pod) } } - unavailablePods := r.getUnavailablePods(pods) + unavailablePods := f.getUnavailablePods(pods) mergeUnavailableAndMigratingPods(unavailablePods, migratingPods) exceeded := len(unavailablePods) >= maxUnavailable if exceeded { @@ -346,29 +359,29 @@ func (r *Reconciler) filterMaxMigratingOrUnavailablePerWorkload(pod *corev1.Pod) return true } -func (r *Reconciler) filterExpectedReplicas(pod *corev1.Pod) bool { +func (f *filter) filterExpectedReplicas(pod *corev1.Pod) bool { ownerRef := metav1.GetControllerOf(pod) if ownerRef == nil { return true } - _, expectedReplicas, err := r.controllerFinder.GetPodsForRef(ownerRef, pod.Namespace, nil, false) + _, expectedReplicas, err := f.controllerFinder.GetPodsForRef(ownerRef, pod.Namespace, nil, false) if err != nil { klog.Errorf("filterExpectedReplicas, getPodsForRef err: %s", err.Error()) return false } - maxMigrating, err := util.GetMaxMigrating(int(expectedReplicas), r.args.MaxMigratingPerWorkload) + maxMigrating, err := util.GetMaxMigrating(int(expectedReplicas), f.args.MaxMigratingPerWorkload) if err != nil { klog.Errorf("filterExpectedReplicas, getMaxMigrating err: %s", err.Error()) return false } - maxUnavailable, err := util.GetMaxUnavailable(int(expectedReplicas), r.args.MaxUnavailablePerWorkload) + maxUnavailable, err := util.GetMaxUnavailable(int(expectedReplicas), f.args.MaxUnavailablePerWorkload) if err != nil { klog.Errorf("filterExpectedReplicas, getMaxUnavailable err: %s", err.Error()) return false } - if r.args.SkipCheckExpectedReplicas == nil || !*r.args.SkipCheckExpectedReplicas { - // TODO(joseph): There are a few special scenarios where should we allow eviction? + if f.args.SkipCheckExpectedReplicas == nil || !*f.args.SkipCheckExpectedReplicas { + // TODO(joseph): There are f few special scenarios where should we allow eviction? if expectedReplicas == 1 || int(expectedReplicas) == maxMigrating || int(expectedReplicas) == maxUnavailable { klog.Warningf("maxMigrating(%d) or maxUnavailable(%d) equals to the replicas(%d) of the workload %s/%s/%s(%s) of Pod %q, or the replicas equals to 1, please increase the replicas or update the defense configurations", maxMigrating, maxUnavailable, expectedReplicas, ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion, ownerRef.UID, klog.KObj(pod)) @@ -378,7 +391,7 @@ func (r *Reconciler) filterExpectedReplicas(pod *corev1.Pod) bool { return true } -func (r *Reconciler) getUnavailablePods(pods []*corev1.Pod) map[types.NamespacedName]struct{} { +func (f *filter) getUnavailablePods(pods []*corev1.Pod) map[types.NamespacedName]struct{} { unavailablePods := make(map[types.NamespacedName]struct{}) for _, pod := range pods { if kubecontroller.IsPodActive(pod) && k8spodutil.IsPodReady(pod) { @@ -399,8 +412,8 @@ func mergeUnavailableAndMigratingPods(unavailablePods, migratingPods map[types.N } } -func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) { - if r.objectLimiters == nil || r.limiterCache == nil { +func (f *filter) trackEvictedPod(pod *corev1.Pod) { + if f.objectLimiters == nil || f.limiterCache == nil { return } ownerRef := metav1.GetControllerOf(pod) @@ -408,16 +421,16 @@ func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) { return } - objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] + objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] if !ok || objectLimiterArgs.Duration.Seconds() == 0 { return } var maxMigratingReplicas int - if expectedReplicas, err := r.controllerFinder.GetExpectedScaleForPod(pod); err == nil { + if expectedReplicas, err := f.controllerFinder.GetExpectedScaleForPod(pod); err == nil { maxMigrating := objectLimiterArgs.MaxMigrating if maxMigrating == nil { - maxMigrating = r.args.MaxMigratingPerWorkload + maxMigrating = f.args.MaxMigratingPerWorkload } maxMigratingReplicas, _ = util.GetMaxMigrating(int(expectedReplicas), maxMigrating) } @@ -425,37 +438,37 @@ func (r *Reconciler) trackEvictedPod(pod *corev1.Pod) { return } - r.lock.Lock() - defer r.lock.Unlock() + f.limiterLock.Lock() + defer f.limiterLock.Unlock() uid := ownerRef.UID limit := rate.Limit(maxMigratingReplicas) / rate.Limit(objectLimiterArgs.Duration.Seconds()) - limiter := r.objectLimiters[uid] + limiter := f.objectLimiters[uid] if limiter == nil { limiter = rate.NewLimiter(limit, 1) - r.objectLimiters[uid] = limiter + f.objectLimiters[uid] = limiter } else if limiter.Limit() != limit { limiter.SetLimit(limit) } - if !limiter.AllowN(r.clock.Now(), 1) { - klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for a period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) + if !limiter.AllowN(f.clock.Now(), 1) { + klog.Infof("The workload %s/%s/%s has been frequently descheduled recently and needs to be limited for f period of time", ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) } - r.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration) + f.limiterCache.Set(string(uid), 0, gocache.DefaultExpiration) } -func (r *Reconciler) filterLimitedObject(pod *corev1.Pod) bool { - if r.objectLimiters == nil || r.limiterCache == nil { +func (f *filter) filterLimitedObject(pod *corev1.Pod) bool { + if f.objectLimiters == nil || f.limiterCache == nil { return true } - objectLimiterArgs, ok := r.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] + objectLimiterArgs, ok := f.args.ObjectLimiters[deschedulerconfig.MigrationLimitObjectWorkload] if !ok || objectLimiterArgs.Duration.Duration == 0 { return true } if ownerRef := metav1.GetControllerOf(pod); ownerRef != nil { - r.lock.Lock() - defer r.lock.Unlock() - if limiter := r.objectLimiters[ownerRef.UID]; limiter != nil { + f.limiterLock.Lock() + defer f.limiterLock.Unlock() + if limiter := f.objectLimiters[ownerRef.UID]; limiter != nil { if remainTokens := limiter.Tokens() - float64(1); remainTokens < 0 { klog.Infof("Pod %q is filtered by workload %s/%s/%s is limited", klog.KObj(pod), ownerRef.Name, ownerRef.Kind, ownerRef.APIVersion) return false @@ -464,3 +477,45 @@ func (r *Reconciler) filterLimitedObject(pod *corev1.Pod) bool { } return true } + +func (f *filter) initObjectLimiters() { + var trackExpiration time.Duration + for _, v := range f.args.ObjectLimiters { + if v.Duration.Duration > trackExpiration { + trackExpiration = v.Duration.Duration + } + } + if trackExpiration > 0 { + f.objectLimiters = make(map[types.UID]*rate.Limiter) + limiterExpiration := trackExpiration + trackExpiration/2 + f.limiterCache = gocache.New(limiterExpiration, limiterExpiration) + f.limiterCache.OnEvicted(func(s string, _ interface{}) { + f.limiterLock.Lock() + defer f.limiterLock.Unlock() + delete(f.objectLimiters, types.UID(s)) + }) + } +} + +func (f *filter) checkJobPassedArbitration(uid types.UID) bool { + f.arbitratedMapLock.Lock() + defer f.arbitratedMapLock.Unlock() + return f.arbitratedPodMigrationJobs[uid] +} + +func (f *filter) markJobPassedArbitration(uid types.UID) { + f.arbitratedMapLock.Lock() + defer f.arbitratedMapLock.Unlock() + f.arbitratedPodMigrationJobs[uid] = true +} + +func (f *filter) removeJobPassedArbitration(uid types.UID) { + f.arbitratedMapLock.Lock() + defer f.arbitratedMapLock.Unlock() + delete(f.arbitratedPodMigrationJobs, uid) +} + +type phaseContext struct { + phase sev1alpha1.PodMigrationJobPhase + checkArbitration bool +} diff --git a/pkg/descheduler/controllers/migration/arbitrator/filter_test.go b/pkg/descheduler/controllers/migration/arbitrator/filter_test.go new file mode 100644 index 000000000..6e3a337f3 --- /dev/null +++ b/pkg/descheduler/controllers/migration/arbitrator/filter_test.go @@ -0,0 +1,1103 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package arbitrator + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/uuid" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" + "k8s.io/utils/clock" + "k8s.io/utils/pointer" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + + "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" + "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/v1alpha2" +) + +func TestFilterExistingMigrationJob(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + a := filter{client: fakeClient, args: &config.MigrationControllerArgs{}, arbitratedPodMigrationJobs: map[types.UID]bool{}} + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-pod", + UID: uuid.NewUUID(), + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + assert.Nil(t, a.client.Create(context.TODO(), pod)) + + job := &v1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + CreationTimestamp: metav1.Time{Time: time.Now()}, + }, + Spec: v1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: "default", + Name: "test-pod", + UID: pod.UID, + }, + }, + } + assert.Nil(t, a.client.Create(context.TODO(), job)) + + assert.False(t, a.filterExistingPodMigrationJob(pod)) +} + +func TestFilterMaxMigratingPerNode(t *testing.T) { + tests := []struct { + name string + numMigratingPods int + samePod bool + sameNode bool + maxMigrating int32 + want bool + }{ + { + name: "maxMigrating=0", + want: true, + }, + { + name: "maxMigrating=1 no migrating Pods", + maxMigrating: 1, + want: true, + }, + { + name: "maxMigrating=1 one migrating Pod with same Pod and Node", + numMigratingPods: 1, + samePod: true, + sameNode: true, + maxMigrating: 1, + want: true, + }, + { + name: "maxMigrating=1 one migrating Pod with diff Pod and same Node", + numMigratingPods: 1, + samePod: false, + sameNode: true, + maxMigrating: 1, + want: false, + }, + { + name: "maxMigrating=1 one migrating Pod with diff Pod and Node", + numMigratingPods: 1, + samePod: false, + sameNode: false, + maxMigrating: 1, + want: true, + }, + { + name: "maxMigrating=2 two migrating Pod with same Pod and Node", + numMigratingPods: 2, + samePod: true, + sameNode: true, + maxMigrating: 2, + want: true, + }, + { + name: "maxMigrating=2 two migrating Pod with diff Pod and Node", + numMigratingPods: 2, + samePod: false, + sameNode: false, + maxMigrating: 2, + want: true, + }, + { + name: "maxMigrating=2 two migrating Pod with diff Pod and same Node", + numMigratingPods: 2, + samePod: false, + sameNode: true, + maxMigrating: 2, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + a := filter{client: fakeClient, args: &config.MigrationControllerArgs{}, arbitratedPodMigrationJobs: map[types.UID]bool{}} + a.args.MaxMigratingPerNode = pointer.Int32(tt.maxMigrating) + + var migratingPods []*corev1.Pod + for i := 0; i < tt.numMigratingPods; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-migrating-pod-%d", i), + UID: uuid.NewUUID(), + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + migratingPods = append(migratingPods, pod) + + assert.Nil(t, a.client.Create(context.TODO(), pod)) + + job := &v1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%d", i), + CreationTimestamp: metav1.Time{Time: time.Now()}, + Annotations: map[string]string{AnnotationPassedArbitration: "true"}, + UID: uuid.NewUUID(), + }, + Spec: v1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + }, + }, + } + a.markJobPassedArbitration(job.UID) + assert.Nil(t, a.client.Create(context.TODO(), job)) + } + + var filterPod *corev1.Pod + if tt.samePod && len(migratingPods) > 0 { + filterPod = migratingPods[0] + } + if filterPod == nil { + filterPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), + UID: uuid.NewUUID(), + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + } + if tt.sameNode { + filterPod.Spec.NodeName = "test-node" + } else { + filterPod.Spec.NodeName = "test-other-node" + } + + got := a.filterMaxMigratingPerNode(filterPod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFilterMaxMigratingPerNamespace(t *testing.T) { + tests := []struct { + name string + numMigratingPods int + samePod bool + sameNamespace bool + maxMigrating int32 + want bool + }{ + { + name: "maxMigrating=0", + want: true, + }, + { + name: "maxMigrating=1 no migrating Pods", + maxMigrating: 1, + want: true, + }, + { + name: "maxMigrating=1 one migrating Pod with same Pod and Namespace", + numMigratingPods: 1, + samePod: true, + sameNamespace: true, + maxMigrating: 1, + want: true, + }, + { + name: "maxMigrating=1 one migrating Pod with diff Pod and same Namespace", + numMigratingPods: 1, + samePod: false, + sameNamespace: true, + maxMigrating: 1, + want: false, + }, + { + name: "maxMigrating=1 one migrating Pod with diff Pod and Namespace", + numMigratingPods: 1, + samePod: false, + sameNamespace: false, + maxMigrating: 1, + want: true, + }, + { + name: "maxMigrating=2 two migrating Pod with same Pod and Namespace", + numMigratingPods: 2, + samePod: true, + sameNamespace: true, + maxMigrating: 2, + want: true, + }, + { + name: "maxMigrating=2 two migrating Pod with diff Pod and Namespace", + numMigratingPods: 2, + samePod: false, + sameNamespace: false, + maxMigrating: 2, + want: true, + }, + { + name: "maxMigrating=2 two migrating Pod with diff Pod and same Namespace", + numMigratingPods: 2, + samePod: false, + sameNamespace: true, + maxMigrating: 2, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + a := filter{client: fakeClient, args: &config.MigrationControllerArgs{}, arbitratedPodMigrationJobs: map[types.UID]bool{}} + a.args.MaxMigratingPerNamespace = pointer.Int32(tt.maxMigrating) + + var migratingPods []*corev1.Pod + for i := 0; i < tt.numMigratingPods; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-migrating-pod-%d", i), + UID: uuid.NewUUID(), + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + migratingPods = append(migratingPods, pod) + + assert.Nil(t, a.client.Create(context.TODO(), pod)) + + job := &v1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%d", i), + CreationTimestamp: metav1.Time{Time: time.Now()}, + Annotations: map[string]string{AnnotationPassedArbitration: "true"}, + UID: uuid.NewUUID(), + }, + Spec: v1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + }, + }, + } + a.markJobPassedArbitration(job.UID) + assert.Nil(t, a.client.Create(context.TODO(), job)) + } + + var filterPod *corev1.Pod + if tt.samePod && len(migratingPods) > 0 { + filterPod = migratingPods[0] + } + if filterPod == nil { + filterPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), + UID: uuid.NewUUID(), + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + } + if !tt.sameNamespace { + filterPod.Namespace = "other-namespace" + } + got := a.filterMaxMigratingPerNamespace(filterPod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFilterMaxMigratingPerWorkload(t *testing.T) { + ownerReferences1 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-1", + UID: uuid.NewUUID(), + }, + } + + ownerReferences2 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-2", + UID: uuid.NewUUID(), + }, + } + tests := []struct { + name string + totalReplicas int32 + numMigratingPods int + samePod bool + sameWorkload bool + maxMigrating int + want bool + }{ + { + name: "totalReplicas=10 and maxMigrating=1 no migrating Pod", + totalReplicas: 10, + numMigratingPods: 0, + maxMigrating: 1, + samePod: false, + sameWorkload: false, + want: true, + }, + { + name: "totalReplicas=10 and maxMigrating=1 one migrating Pod with same Pod and Workload", + totalReplicas: 10, + numMigratingPods: 1, + maxMigrating: 1, + samePod: true, + sameWorkload: true, + want: true, + }, + { + name: "totalReplicas=10 and maxMigrating=1 one migrating Pod with diff Pod and same Workload", + totalReplicas: 10, + numMigratingPods: 1, + maxMigrating: 1, + samePod: false, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=10 and maxMigrating=1 one migrating Pod with diff Pod and diff Workload", + totalReplicas: 10, + numMigratingPods: 1, + maxMigrating: 1, + samePod: false, + sameWorkload: false, + want: true, + }, + { + name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with same Pod and Workload", + totalReplicas: 10, + numMigratingPods: 2, + maxMigrating: 2, + samePod: true, + sameWorkload: true, + want: true, + }, + { + name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with diff Pod and same Workload", + totalReplicas: 10, + numMigratingPods: 2, + maxMigrating: 2, + samePod: false, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with diff Pod and diff Workload", + totalReplicas: 10, + numMigratingPods: 2, + maxMigrating: 2, + samePod: false, + sameWorkload: false, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + intOrString := intstr.FromInt(tt.maxMigrating) + maxUnavailable := intstr.FromInt(int(tt.totalReplicas - 1)) + + a := filter{client: fakeClient, args: &config.MigrationControllerArgs{}, arbitratedPodMigrationJobs: map[types.UID]bool{}} + a.args.MaxMigratingPerWorkload = &intOrString + a.args.MaxUnavailablePerWorkload = &maxUnavailable + + var migratingPods []*corev1.Pod + for i := 0; i < tt.numMigratingPods; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-migrating-pod-%d", i), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + migratingPods = append(migratingPods, pod) + + assert.Nil(t, a.client.Create(context.TODO(), pod)) + + job := &v1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%d", i), + CreationTimestamp: metav1.Time{Time: time.Now()}, + Annotations: map[string]string{AnnotationPassedArbitration: "true"}, + }, + Spec: v1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + }, + }, + } + a.markJobPassedArbitration(job.UID) + assert.Nil(t, a.client.Create(context.TODO(), job)) + } + + var filterPod *corev1.Pod + if tt.samePod && len(migratingPods) > 0 { + filterPod = migratingPods[0] + } + if filterPod == nil { + filterPod = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + } + if !tt.sameWorkload { + filterPod.OwnerReferences = ownerReferences2 + } + + a.controllerFinder = &fakeControllerFinder{ + replicas: tt.totalReplicas, + } + + got := a.filterMaxMigratingOrUnavailablePerWorkload(filterPod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFilterMaxUnavailablePerWorkload(t *testing.T) { + ownerReferences1 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-1", + UID: uuid.NewUUID(), + }, + } + + ownerReferences2 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-2", + UID: uuid.NewUUID(), + }, + } + tests := []struct { + name string + totalReplicas int32 + numUnavailablePods int + numMigratingPods int + maxUnavailable int + sameWorkload bool + want bool + }{ + { + name: "totalReplicas=10 and maxUnavailable=1 no migrating Pod and no unavailable Pod", + totalReplicas: 10, + numUnavailablePods: 0, + numMigratingPods: 0, + maxUnavailable: 1, + sameWorkload: true, + want: true, + }, + { + name: "totalReplicas=10 and maxUnavailable=1 one unavailable Pod with same Workload", + totalReplicas: 10, + numUnavailablePods: 1, + numMigratingPods: 0, + maxUnavailable: 1, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=10 and maxUnavailable=1 one migrating Pod with same Workload", + totalReplicas: 10, + numUnavailablePods: 0, + numMigratingPods: 1, + maxUnavailable: 1, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=10 and maxUnavailable=1 one unavailable Pod and one migrating Pod with same Workload", + totalReplicas: 10, + numUnavailablePods: 1, + maxUnavailable: 1, + sameWorkload: true, + want: false, + }, + + { + name: "totalReplicas=10 and maxUnavailable=2 no migrating Pod and no unavailable Pod", + totalReplicas: 10, + numUnavailablePods: 0, + numMigratingPods: 0, + maxUnavailable: 2, + sameWorkload: true, + want: true, + }, + { + name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod with same Workload", + totalReplicas: 10, + numUnavailablePods: 1, + numMigratingPods: 0, + maxUnavailable: 2, + sameWorkload: true, + want: true, + }, + { + name: "totalReplicas=10 and maxUnavailable=2 one migrating Pod with same Workload", + totalReplicas: 10, + numUnavailablePods: 0, + numMigratingPods: 1, + maxUnavailable: 2, + sameWorkload: true, + want: true, + }, + { + name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod and one migrating Pod with same Workload", + totalReplicas: 10, + numUnavailablePods: 1, + numMigratingPods: 1, + maxUnavailable: 2, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod and one migrating Pod with diff Workload", + totalReplicas: 10, + numUnavailablePods: 1, + numMigratingPods: 1, + maxUnavailable: 2, + sameWorkload: false, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + intOrString := intstr.FromInt(int(tt.totalReplicas - 1)) + maxUnavailable := intstr.FromInt(tt.maxUnavailable) + + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + a := filter{client: fakeClient, args: &config.MigrationControllerArgs{}, arbitratedPodMigrationJobs: map[types.UID]bool{}} + a.args.MaxMigratingPerWorkload = &intOrString + a.args.MaxUnavailablePerWorkload = &maxUnavailable + + var totalPods []*corev1.Pod + for i := 0; i < tt.numUnavailablePods; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-unavailable-pod-%d", i), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + totalPods = append(totalPods, pod) + + assert.Nil(t, a.client.Create(context.TODO(), pod)) + } + + for i := 0; i < tt.numMigratingPods; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-migrating-pod-%d", i), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + totalPods = append(totalPods, pod) + + assert.Nil(t, a.client.Create(context.TODO(), pod)) + + job := &v1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-%d", i), + CreationTimestamp: metav1.Time{Time: time.Now()}, + Annotations: map[string]string{AnnotationPassedArbitration: "true"}, + }, + Spec: v1alpha1.PodMigrationJobSpec{ + PodRef: &corev1.ObjectReference{ + Namespace: pod.Namespace, + Name: pod.Name, + UID: pod.UID, + }, + }, + } + a.markJobPassedArbitration(job.UID) + assert.Nil(t, a.client.Create(context.TODO(), job)) + } + + for i := 0; i < int(tt.totalReplicas)-len(totalPods); i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-available-pod-%d", i), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + totalPods = append(totalPods, pod) + } + + filterPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + if !tt.sameWorkload { + filterPod.OwnerReferences = ownerReferences2 + } + + a.controllerFinder = &fakeControllerFinder{ + pods: totalPods, + replicas: tt.totalReplicas, + } + + got := a.filterMaxMigratingOrUnavailablePerWorkload(filterPod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFilterExpectedReplicas(t *testing.T) { + ownerReferences1 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-1", + UID: uuid.NewUUID(), + }, + } + + ownerReferences2 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-2", + UID: uuid.NewUUID(), + }, + } + tests := []struct { + name string + totalReplicas int32 + numUnavailablePods int + maxUnavailable int + prepareMigrating bool + sameWorkload bool + want bool + }{ + { + name: "totalReplicas=1 and maxUnavailable=1", + totalReplicas: 1, + numUnavailablePods: 0, + maxUnavailable: 1, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=100 and maxUnavailable=100", + totalReplicas: 100, + numUnavailablePods: 0, + maxUnavailable: 100, + sameWorkload: true, + want: false, + }, + { + name: "totalReplicas=100 and maxUnavailable=10", + totalReplicas: 100, + numUnavailablePods: 0, + maxUnavailable: 10, + sameWorkload: true, + want: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + intOrString := intstr.FromInt(int(tt.totalReplicas - 1)) + maxUnavailable := intstr.FromInt(tt.maxUnavailable) + + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + a := filter{client: fakeClient, args: &config.MigrationControllerArgs{}} + a.args.MaxMigratingPerWorkload = &intOrString + a.args.MaxUnavailablePerWorkload = &maxUnavailable + + var totalPods []*corev1.Pod + for i := 0; i < tt.numUnavailablePods; i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-unavailable-pod-%d", i), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodPending, + }, + } + totalPods = append(totalPods, pod) + + assert.Nil(t, a.client.Create(context.TODO(), pod)) + } + + for i := 0; i < int(tt.totalReplicas)-len(totalPods); i++ { + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-available-pod-%d", i), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + Conditions: []corev1.PodCondition{ + { + Type: corev1.PodReady, + Status: corev1.ConditionTrue, + }, + }, + }, + } + totalPods = append(totalPods, pod) + } + + filterPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), + UID: uuid.NewUUID(), + OwnerReferences: ownerReferences1, + }, + Spec: corev1.PodSpec{ + NodeName: "test-node", + }, + Status: corev1.PodStatus{ + Phase: corev1.PodRunning, + }, + } + if !tt.sameWorkload { + filterPod.OwnerReferences = ownerReferences2 + } + + a.controllerFinder = &fakeControllerFinder{ + pods: totalPods, + replicas: tt.totalReplicas, + } + + got := a.filterExpectedReplicas(filterPod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestFilterObjectLimiter(t *testing.T) { + ownerReferences1 := []metav1.OwnerReference{ + { + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-1", + UID: uuid.NewUUID(), + }, + } + otherOwnerReferences := metav1.OwnerReference{ + APIVersion: "apps/v1", + Controller: pointer.Bool(true), + Kind: "StatefulSet", + Name: "test-2", + UID: uuid.NewUUID(), + } + testObjectLimiters := config.ObjectLimiterMap{ + config.MigrationLimitObjectWorkload: { + Duration: metav1.Duration{Duration: 1 * time.Second}, + MaxMigrating: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, + }, + } + + tests := []struct { + name string + objectLimiters config.ObjectLimiterMap + totalReplicas int32 + sleepDuration time.Duration + pod *corev1.Pod + evictedPodsCount int + evictedWorkload *metav1.OwnerReference + want bool + }{ + { + name: "less than default maxMigrating", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + sleepDuration: 100 * time.Millisecond, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 6, + want: true, + }, + { + name: "exceeded default maxMigrating", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 11, + want: false, + }, + { + name: "other than workload", + totalReplicas: 100, + objectLimiters: testObjectLimiters, + sleepDuration: 100 * time.Millisecond, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 11, + evictedWorkload: &otherOwnerReferences, + want: true, + }, + { + name: "disable objectLimiters", + totalReplicas: 100, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 11, + objectLimiters: config.ObjectLimiterMap{ + config.MigrationLimitObjectWorkload: config.MigrationObjectLimiter{ + Duration: metav1.Duration{Duration: 0}, + }, + }, + want: true, + }, + { + name: "default limiter", + totalReplicas: 100, + pod: &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + OwnerReferences: ownerReferences1, + }, + }, + evictedPodsCount: 1, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + scheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(scheme) + _ = clientgoscheme.AddToScheme(scheme) + fakeClient := fake.NewClientBuilder().WithScheme(scheme).Build() + + var v1beta2args v1alpha2.MigrationControllerArgs + v1alpha2.SetDefaults_MigrationControllerArgs(&v1beta2args) + var args config.MigrationControllerArgs + err := v1alpha2.Convert_v1alpha2_MigrationControllerArgs_To_config_MigrationControllerArgs(&v1beta2args, &args, nil) + if err != nil { + panic(err) + } + a := filter{client: fakeClient, args: &args, clock: clock.RealClock{}} + + controllerFinder := &fakeControllerFinder{} + if tt.objectLimiters != nil { + a.args.ObjectLimiters = tt.objectLimiters + } + + a.initObjectLimiters() + if tt.totalReplicas > 0 { + controllerFinder.replicas = tt.totalReplicas + } + a.controllerFinder = controllerFinder + if tt.evictedPodsCount > 0 { + for i := 0; i < tt.evictedPodsCount; i++ { + pod := tt.pod.DeepCopy() + if tt.evictedWorkload != nil { + pod.OwnerReferences = []metav1.OwnerReference{ + *tt.evictedWorkload, + } + } + a.trackEvictedPod(pod) + if tt.sleepDuration > 0 { + time.Sleep(tt.sleepDuration) + } + } + } + got := a.filterLimitedObject(tt.pod) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestArbitratedMap(t *testing.T) { + f := filter{ + arbitratedPodMigrationJobs: map[types.UID]bool{}, + } + job := &v1alpha1.PodMigrationJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "test-job", + UID: uuid.NewUUID(), + }, + } + assert.False(t, f.checkJobPassedArbitration(job.UID)) + + f.markJobPassedArbitration(job.UID) + assert.True(t, f.checkJobPassedArbitration(job.UID)) + + f.removeJobPassedArbitration(job.UID) + assert.False(t, f.checkJobPassedArbitration(job.UID)) +} diff --git a/pkg/descheduler/controllers/migration/arbitrator/handler.go b/pkg/descheduler/controllers/migration/arbitrator/handler.go new file mode 100644 index 000000000..85c866e87 --- /dev/null +++ b/pkg/descheduler/controllers/migration/arbitrator/handler.go @@ -0,0 +1,90 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package arbitrator + +import ( + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" +) + +// arbitrationHandler implement handler.EventHandler +type arbitrationHandler struct { + handler.EnqueueRequestForObject + client client.Client + arbitrator Arbitrator +} + +func NewHandler(arbitrator Arbitrator, client client.Client) handler.EventHandler { + return &arbitrationHandler{ + EnqueueRequestForObject: handler.EnqueueRequestForObject{}, + arbitrator: arbitrator, + client: client, + } +} + +// Create call Arbitrator.Create +func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) { + if evt.Object == nil { + enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) + return + } + job := evt.Object.(*v1alpha1.PodMigrationJob) + h.arbitrator.AddPodMigrationJob(job) +} + +// Update implements EventHandler. +func (h *arbitrationHandler) Update(evt event.UpdateEvent, q workqueue.RateLimitingInterface) { + switch { + case evt.ObjectNew != nil: + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.ObjectNew.GetName(), + Namespace: evt.ObjectNew.GetNamespace(), + }}) + job := evt.ObjectNew.(*v1alpha1.PodMigrationJob) + if job.Status.Phase == v1alpha1.PodMigrationJobFailed || + job.Status.Phase == v1alpha1.PodMigrationJobSucceeded || + job.Status.Phase == v1alpha1.PodMigrationJobAborted { + h.arbitrator.DeletePodMigrationJob(job) + } + case evt.ObjectOld != nil: + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.ObjectOld.GetName(), + Namespace: evt.ObjectOld.GetNamespace(), + }}) + default: + enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt) + } +} + +// Delete implements EventHandler. +func (h *arbitrationHandler) Delete(evt event.DeleteEvent, q workqueue.RateLimitingInterface) { + if evt.Object == nil { + enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) + return + } + q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ + Name: evt.Object.GetName(), + Namespace: evt.Object.GetNamespace(), + }}) + h.arbitrator.DeletePodMigrationJob(evt.Object.(*v1alpha1.PodMigrationJob)) +} diff --git a/pkg/descheduler/controllers/migration/arbitrator/sort.go b/pkg/descheduler/controllers/migration/arbitrator/sort.go index e73289790..3846f3f0a 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/sort.go +++ b/pkg/descheduler/controllers/migration/arbitrator/sort.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Koordinator Authors. +Copyright 2022 The Koordinator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/descheduler/controllers/migration/arbitrator/sort_test.go b/pkg/descheduler/controllers/migration/arbitrator/sort_test.go index 00a4c9b18..92be542de 100644 --- a/pkg/descheduler/controllers/migration/arbitrator/sort_test.go +++ b/pkg/descheduler/controllers/migration/arbitrator/sort_test.go @@ -1,5 +1,5 @@ /* -Copyright 2023 The Koordinator Authors. +Copyright 2022 The Koordinator Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. diff --git a/pkg/descheduler/controllers/migration/controller.go b/pkg/descheduler/controllers/migration/controller.go index fc20dc6fd..7844c40df 100644 --- a/pkg/descheduler/controllers/migration/controller.go +++ b/pkg/descheduler/controllers/migration/controller.go @@ -20,11 +20,8 @@ import ( "context" "fmt" "strconv" - "sync" "time" - gocache "github.com/patrickmn/go-cache" - "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -47,13 +44,12 @@ import ( sev1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" deschedulerconfig "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/validation" - "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/controllerfinder" + "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/arbitrator" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/evictor" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/reservation" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/util" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/names" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/options" - evictionsutil "github.com/koordinator-sh/koordinator/pkg/descheduler/evictions" "github.com/koordinator-sh/koordinator/pkg/descheduler/framework" utilclient "github.com/koordinator-sh/koordinator/pkg/util/client" ) @@ -76,16 +72,10 @@ type Reconciler struct { eventRecorder events.EventRecorder reservationInterpreter reservation.Interpreter evictorInterpreter evictor.Interpreter - controllerFinder controllerfinder.Interface - nonRetryablePodFilter framework.FilterFunc - retryablePodFilter framework.FilterFunc - defaultFilterPlugin framework.FilterPlugin assumedCache *assumedCache clock clock.Clock - lock sync.Mutex - objectLimiters map[types.UID]*rate.Limiter - limiterCache *gocache.Cache + arbitrator arbitrator.Arbitrator } func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) { @@ -108,7 +98,19 @@ func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) return nil, err } - if err = c.Watch(&source.Kind{Type: &sev1alpha1.PodMigrationJob{}}, &handler.EnqueueRequestForObject{}, &predicate.Funcs{ + a, err := arbitrator.New(controllerArgs, arbitrator.Options{ + Client: r.Client, + EventRecorder: r.eventRecorder, + Manager: options.Manager, + Handle: handle, + }) + if err != nil { + return nil, err + } + r.arbitrator = a + arbitrationEventHandler := arbitrator.NewHandler(a, r.Client) + + if err = c.Watch(&source.Kind{Type: &sev1alpha1.PodMigrationJob{}}, arbitrationEventHandler, &predicate.Funcs{ DeleteFunc: func(event event.DeleteEvent) bool { job := event.Object.(*sev1alpha1.PodMigrationJob) r.assumedCache.delete(job) @@ -134,7 +136,6 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame return nil, err } - controllerFinder, err := controllerfinder.New(manager) if err != nil { return nil, err } @@ -145,14 +146,9 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame eventRecorder: handle.EventRecorder(), reservationInterpreter: reservationInterpreter, evictorInterpreter: evictorInterpreter, - controllerFinder: controllerFinder, assumedCache: newAssumedCache(), clock: clock.RealClock{}, } - if err := r.initFilters(args, handle); err != nil { - return nil, err - } - r.initObjectLimiters() if err := manager.Add(r); err != nil { return nil, err @@ -160,25 +156,6 @@ func newReconciler(args *deschedulerconfig.MigrationControllerArgs, handle frame return r, nil } -func (r *Reconciler) initObjectLimiters() { - var trackExpiration time.Duration - for _, v := range r.args.ObjectLimiters { - if v.Duration.Duration > trackExpiration { - trackExpiration = v.Duration.Duration - } - } - if trackExpiration > 0 { - r.objectLimiters = make(map[types.UID]*rate.Limiter) - limiterExpiration := trackExpiration + trackExpiration/2 - r.limiterCache = gocache.New(limiterExpiration, limiterExpiration) - r.limiterCache.OnEvicted(func(s string, _ interface{}) { - r.lock.Lock() - defer r.lock.Unlock() - delete(r.objectLimiters, types.UID(s)) - }) - } -} - func (r *Reconciler) Name() string { return Name } @@ -405,7 +382,7 @@ func (r *Reconciler) doMigrate(ctx context.Context, job *sev1alpha1.PodMigration } func (r *Reconciler) preparePendingJob(ctx context.Context, job *sev1alpha1.PodMigrationJob) (reconcile.Result, error) { - changed, pod, err := r.preparePodRef(ctx, job) + changed, _, err := r.preparePodRef(ctx, job) if err != nil { return reconcile.Result{}, err } @@ -415,24 +392,6 @@ func (r *Reconciler) preparePendingJob(ctx context.Context, job *sev1alpha1.PodM } } - if !r.reservationFilter(pod) { - err = fmt.Errorf("pod %q can not be migrated by ReservationFirst mode because pod.schedulerName is not support reservation", klog.KObj(pod)) - return reconcile.Result{}, err - } - - markPodPrepareMigrating(pod) - if !evictionsutil.HaveEvictAnnotation(job) { - if aborted, err := r.abortJobIfNonRetryablePodFilterFailed(ctx, pod, job); aborted || err != nil { - if err == nil { - err = fmt.Errorf("abort job since failed to non-retryable Pod filter") - } - return reconcile.Result{}, err - } - if requeue, err := r.requeueJobIfRetryablePodFilterFailed(ctx, pod, job); requeue || err != nil { - return reconcile.Result{RequeueAfter: defaultRequeueAfter}, err - } - } - job.Status.Phase = sev1alpha1.PodMigrationJobRunning err = r.Client.Status().Update(ctx, job) return reconcile.Result{}, err @@ -487,42 +446,6 @@ func (r *Reconciler) abortJobIfTimeout(ctx context.Context, job *sev1alpha1.PodM return true, err } -func (r *Reconciler) requeueJobIfRetryablePodFilterFailed(ctx context.Context, pod *corev1.Pod, job *sev1alpha1.PodMigrationJob) (bool, error) { - if r.retryablePodFilter == nil { - return false, nil - } - - if pod != nil { - if !r.retryablePodFilter(pod) { - r.eventRecorder.Eventf(job, nil, corev1.EventTypeWarning, "Requeue", "Migrating", "Failed to retriable filter") - return true, nil - } - } - - return false, nil -} - -func (r *Reconciler) abortJobIfNonRetryablePodFilterFailed(ctx context.Context, pod *corev1.Pod, job *sev1alpha1.PodMigrationJob) (bool, error) { - if r.nonRetryablePodFilter == nil { - return false, nil - } - - if pod != nil { - if !r.nonRetryablePodFilter(pod) { - job.Status.Phase = sev1alpha1.PodMigrationJobFailed - job.Status.Reason = sev1alpha1.PodMigrationJobReasonForbiddenMigratePod - job.Status.Message = fmt.Sprintf("Pod %q is forbidden to migrate because it does not meet the requirements", klog.KObj(pod)) - err := r.Status().Update(ctx, job) - if err == nil { - r.eventRecorder.Eventf(job, nil, corev1.EventTypeWarning, sev1alpha1.PodMigrationJobReasonForbiddenMigratePod, "Migrating", job.Status.Message) - } - return true, err - } - } - - return false, nil -} - func (r *Reconciler) abortJobByInvalidPodRef(ctx context.Context, job *sev1alpha1.PodMigrationJob) error { job.Status.Phase = sev1alpha1.PodMigrationJobFailed job.Status.Reason = "InvalidPodRef" @@ -783,7 +706,7 @@ func (r *Reconciler) evictPod(ctx context.Context, job *sev1alpha1.PodMigrationJ r.eventRecorder.Eventf(job, nil, corev1.EventTypeWarning, sev1alpha1.PodMigrationJobReasonEvicting, "Migrating", "Failed evict Pod %q caused by %v", podNamespacedName, err) return false, reconcile.Result{}, err } - r.trackEvictedPod(pod) + r.arbitrator.TrackEvictedPod(pod) _, reason := evictor.GetEvictionTriggerAndReason(job.Annotations) cond = &sev1alpha1.PodMigrationJobCondition{ @@ -996,3 +919,12 @@ func (r *Reconciler) updateCondition(ctx context.Context, job *sev1alpha1.PodMig } return nil } + +// Filter checks if a pod can be evicted +func (r *Reconciler) Filter(pod *corev1.Pod) bool { + return r.arbitrator.Filter(pod) +} + +func (r *Reconciler) PreEvictionFilter(pod *corev1.Pod) bool { + return r.arbitrator.PreEvictionFilter(pod) +} diff --git a/pkg/descheduler/controllers/migration/controller_test.go b/pkg/descheduler/controllers/migration/controller_test.go index 8f72a4b6a..c94b213db 100644 --- a/pkg/descheduler/controllers/migration/controller_test.go +++ b/pkg/descheduler/controllers/migration/controller_test.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/uuid" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" @@ -43,12 +42,10 @@ import ( sev1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" deschedulerconfig "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config/v1alpha2" - "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/controllerfinder" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/reservation" "github.com/koordinator-sh/koordinator/pkg/descheduler/controllers/migration/util" evictionsutil "github.com/koordinator-sh/koordinator/pkg/descheduler/evictions" "github.com/koordinator-sh/koordinator/pkg/descheduler/framework" - podutil "github.com/koordinator-sh/koordinator/pkg/descheduler/pod" ) type fakeEvictionInterpreter struct { @@ -92,24 +89,6 @@ func (f fakeReservationInterpreter) DeleteReservation(ctx context.Context, reser return f.deleteErr } -type fakeControllerFinder struct { - pods []*corev1.Pod - replicas int32 - err error -} - -func (f *fakeControllerFinder) ListPodsByWorkloads(workloadUIDs []types.UID, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, error) { - return f.pods, f.err -} - -func (f *fakeControllerFinder) GetPodsForRef(ownerReference *metav1.OwnerReference, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, int32, error) { - return f.pods, f.replicas, f.err -} - -func (f *fakeControllerFinder) GetExpectedScaleForPod(pod *corev1.Pod) (int32, error) { - return f.replicas, f.err -} - func newTestReconciler() *Reconciler { scheme := runtime.NewScheme() _ = sev1alpha1.AddToScheme(scheme) @@ -129,45 +108,26 @@ func newTestReconciler() *Reconciler { eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(scheme, corev1.EventSource{Component: Name}) - nodesGetter := func() ([]*corev1.Node, error) { - var nodeList corev1.NodeList - err := runtimeClient.List(context.TODO(), &nodeList) - if err != nil { - return nil, err - } - r := make([]*corev1.Node, 0, len(nodeList.Items)) - for i := range nodeList.Items { - r = append(r, &nodeList.Items[i]) - } - return r, nil - } - - evictorFilter := evictionsutil.NewEvictorFilter( - nodesGetter, - func(s string, filterFunc framework.FilterFunc) ([]*corev1.Pod, error) { - return nil, fmt.Errorf("implement it") + arbitrator := fakeArbitrator{ + filter: func(pod *corev1.Pod) bool { + return true + }, + preEvictionFilter: func(pod *corev1.Pod) bool { + return true + }, + trackEvictedPod: func(pod *corev1.Pod) { + return }, - false, false, false, false, - ) - - podFilter, err := podutil.NewOptions(). - WithFilter(evictorFilter.Filter). - BuildFilterFunc() - if err != nil { - panic(err) } - - controllerFinder := &controllerfinder.ControllerFinder{Client: runtimeClient} r := &Reconciler{ Client: runtimeClient, args: &args, eventRecorder: record.NewEventRecorderAdapter(recorder), reservationInterpreter: nil, evictorInterpreter: nil, - controllerFinder: controllerFinder, - nonRetryablePodFilter: podFilter, assumedCache: newAssumedCache(), clock: clock.RealClock{}, + arbitrator: &arbitrator, } return r @@ -1441,7 +1401,6 @@ func TestEvict(t *testing.T) { Phase: corev1.PodRunning, }, } - assert.True(t, reconciler.Filter(pod)) assert.True(t, reconciler.Evict(context.TODO(), pod, framework.EvictOptions{})) var jobList sev1alpha1.PodMigrationJobList @@ -1509,73 +1468,14 @@ func TestAbortJobIfReserveOnSameNode(t *testing.T) { assert.Equal(t, sev1alpha1.PodMigrationJobReasonForbiddenMigratePod, job.Status.Reason) } -func TestRequeueJobIfRetryablePodFilterFailed(t *testing.T) { - reconciler := newTestReconciler() - enter := false - reconciler.retryablePodFilter = func(pod *corev1.Pod) bool { - enter = true - assert.True(t, isPodPrepareMigrating(pod)) - return false - } - - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - CreationTimestamp: metav1.Time{Time: time.Now()}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: "default", - Name: "test-pod", - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "test-pod", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test", - UID: "2f96233d-a6b9-4981-b594-7c90c987aed9", - }, - }, - }, - Spec: corev1.PodSpec{ - SchedulerName: "koord-scheduler", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - - result, err := reconciler.doMigrate(context.TODO(), job) - assert.True(t, enter) - assert.NoError(t, err) - assert.True(t, result.RequeueAfter != 0) - assert.NoError(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job)) - assert.Equal(t, sev1alpha1.PodMigrationJobPhase(""), job.Status.Phase) - assert.Equal(t, "", job.Status.Reason) -} - -func TestAbortJobIfNonRetryablePodFilterFailed(t *testing.T) { +func TestAllowAnnotatedPodMigrationJobPassFilter(t *testing.T) { reconciler := newTestReconciler() - enter := false - reconciler.nonRetryablePodFilter = func(pod *corev1.Pod) bool { - enter = true - assert.True(t, isPodPrepareMigrating(pod)) - return false - } job := &sev1alpha1.PodMigrationJob{ ObjectMeta: metav1.ObjectMeta{ Name: "test", CreationTimestamp: metav1.Time{Time: time.Now()}, + Annotations: map[string]string{"descheduler.alpha.kubernetes.io/evict": "true"}, }, Spec: sev1alpha1.PodMigrationJobSpec{ PodRef: &corev1.ObjectReference{ @@ -1608,1120 +1508,124 @@ func TestAbortJobIfNonRetryablePodFilterFailed(t *testing.T) { } assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - result, err := reconciler.doMigrate(context.TODO(), job) - assert.True(t, enter) - assert.NotNil(t, err) + result, err := reconciler.preparePendingJob(context.TODO(), job) + assert.Nil(t, err) assert.Equal(t, reconcile.Result{}, result) assert.NoError(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job)) - assert.Equal(t, sev1alpha1.PodMigrationJobFailed, job.Status.Phase) - assert.Equal(t, sev1alpha1.PodMigrationJobReasonForbiddenMigratePod, job.Status.Reason) -} - -func TestFilterExistingMigrationJob(t *testing.T) { - reconciler := newTestReconciler() - - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "test-pod", - UID: uuid.NewUUID(), - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - CreationTimestamp: metav1.Time{Time: time.Now()}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: "default", - Name: "test-pod", - UID: pod.UID, - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - - assert.False(t, reconciler.filterExistingPodMigrationJob(pod)) -} - -func TestFilterMaxMigratingPerNode(t *testing.T) { - tests := []struct { - name string - numMigratingPods int - samePod bool - sameNode bool - prepareMigrating bool - maxMigrating int32 - want bool - }{ - { - name: "maxMigrating=0", - want: true, - }, - { - name: "maxMigrating=1 no migrating Pods", - maxMigrating: 1, - want: true, - }, - { - name: "maxMigrating=1 one migrating Pod with same Pod and Node", - numMigratingPods: 1, - samePod: true, - sameNode: true, - maxMigrating: 1, - want: true, - }, - { - name: "maxMigrating=1 one migrating Pod with diff Pod and same Node", - numMigratingPods: 1, - samePod: false, - sameNode: true, - maxMigrating: 1, - want: false, - }, - { - name: "maxMigrating=1 one migrating Pod with diff Pod and Node", - numMigratingPods: 1, - samePod: false, - sameNode: false, - maxMigrating: 1, - want: true, - }, - { - name: "maxMigrating=2 two migrating Pod with same Pod and Node", - numMigratingPods: 2, - samePod: true, - sameNode: true, - maxMigrating: 2, - want: true, - }, - { - name: "maxMigrating=2 two migrating Pod with diff Pod and Node", - numMigratingPods: 2, - samePod: false, - sameNode: false, - maxMigrating: 2, - want: true, - }, - { - name: "maxMigrating=2 two migrating Pod with diff Pod and same Node", - numMigratingPods: 2, - samePod: false, - sameNode: true, - maxMigrating: 2, - want: false, - }, - { - name: "maxMigrating=2 two migrating Pod with diff Pod but prepare migrating and same Node", - numMigratingPods: 2, - samePod: false, - sameNode: true, - prepareMigrating: true, - maxMigrating: 2, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - reconciler := newTestReconciler() - reconciler.args.MaxMigratingPerNode = pointer.Int32(tt.maxMigrating) - - var migratingPods []*corev1.Pod - for i := 0; i < tt.numMigratingPods; i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-migrating-pod-%d", i), - UID: uuid.NewUUID(), - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - migratingPods = append(migratingPods, pod) - - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-%d", i), - CreationTimestamp: metav1.Time{Time: time.Now()}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: pod.Namespace, - Name: pod.Name, - UID: pod.UID, - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - } - - var filterPod *corev1.Pod - if tt.samePod && len(migratingPods) > 0 { - filterPod = migratingPods[0] - } - if filterPod == nil { - filterPod = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), - UID: uuid.NewUUID(), - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - } - if tt.sameNode { - filterPod.Spec.NodeName = "test-node" - } else { - filterPod.Spec.NodeName = "test-other-node" - } - if tt.prepareMigrating { - markPodPrepareMigrating(filterPod) - } - - got := reconciler.filterMaxMigratingPerNode(filterPod) - assert.Equal(t, tt.want, got) - }) - } -} - -func TestFilterMaxMigratingPerNamespace(t *testing.T) { - tests := []struct { - name string - numMigratingPods int - samePod bool - sameNamespace bool - prepareMigrating bool - maxMigrating int32 - want bool - }{ - { - name: "maxMigrating=0", - want: true, - }, - { - name: "maxMigrating=1 no migrating Pods", - maxMigrating: 1, - want: true, - }, - { - name: "maxMigrating=1 one migrating Pod with same Pod and Namespace", - numMigratingPods: 1, - samePod: true, - sameNamespace: true, - maxMigrating: 1, - want: true, - }, - { - name: "maxMigrating=1 one migrating Pod with diff Pod and same Namespace", - numMigratingPods: 1, - samePod: false, - sameNamespace: true, - maxMigrating: 1, - want: false, - }, - { - name: "maxMigrating=1 one migrating Pod with diff Pod and Namespace", - numMigratingPods: 1, - samePod: false, - sameNamespace: false, - maxMigrating: 1, - want: true, - }, - { - name: "maxMigrating=2 two migrating Pod with same Pod and Namespace", - numMigratingPods: 2, - samePod: true, - sameNamespace: true, - maxMigrating: 2, - want: true, - }, - { - name: "maxMigrating=2 two migrating Pod with diff Pod and Namespace", - numMigratingPods: 2, - samePod: false, - sameNamespace: false, - maxMigrating: 2, - want: true, - }, - { - name: "maxMigrating=2 two migrating Pod with diff Pod and same Namespace", - numMigratingPods: 2, - samePod: false, - sameNamespace: true, - maxMigrating: 2, - want: false, - }, - { - name: "maxMigrating=2 two migrating Pod with diff Pod but prepare migrating and same Namespace", - numMigratingPods: 2, - samePod: false, - sameNamespace: true, - prepareMigrating: true, - maxMigrating: 2, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - reconciler := newTestReconciler() - reconciler.args.MaxMigratingPerNamespace = pointer.Int32(tt.maxMigrating) - - var migratingPods []*corev1.Pod - for i := 0; i < tt.numMigratingPods; i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-migrating-pod-%d", i), - UID: uuid.NewUUID(), - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - migratingPods = append(migratingPods, pod) - - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-%d", i), - CreationTimestamp: metav1.Time{Time: time.Now()}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: pod.Namespace, - Name: pod.Name, - UID: pod.UID, - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - } - - var filterPod *corev1.Pod - if tt.samePod && len(migratingPods) > 0 { - filterPod = migratingPods[0] - } - if filterPod == nil { - filterPod = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), - UID: uuid.NewUUID(), - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - } - if !tt.sameNamespace { - filterPod.Namespace = "other-namespace" - } - if tt.prepareMigrating { - markPodPrepareMigrating(filterPod) - } - - got := reconciler.filterMaxMigratingPerNamespace(filterPod) - assert.Equal(t, tt.want, got) - }) - } + assert.Equal(t, sev1alpha1.PodMigrationJobRunning, job.Status.Phase) } -func TestFilterMaxMigratingPerWorkload(t *testing.T) { - ownerReferences1 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-1", - UID: uuid.NewUUID(), - }, - } - - ownerReferences2 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-2", - UID: uuid.NewUUID(), - }, - } - tests := []struct { - name string - totalReplicas int32 - numMigratingPods int - samePod bool - sameWorkload bool - prepareMigrating bool - maxMigrating int - want bool +func TestFilter(t *testing.T) { + testCases := []struct { + name string + expected bool }{ - { - name: "totalReplicas=10 and maxMigrating=1 no migrating Pod", - totalReplicas: 10, - numMigratingPods: 0, - maxMigrating: 1, - samePod: false, - sameWorkload: false, - want: true, - }, - { - name: "totalReplicas=10 and maxMigrating=1 one migrating Pod with same Pod and Workload", - totalReplicas: 10, - numMigratingPods: 1, - maxMigrating: 1, - samePod: true, - sameWorkload: true, - want: true, - }, - { - name: "totalReplicas=10 and maxMigrating=1 one migrating Pod with diff Pod and same Workload", - totalReplicas: 10, - numMigratingPods: 1, - maxMigrating: 1, - samePod: false, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=10 and maxMigrating=1 one migrating Pod with diff Pod and diff Workload", - totalReplicas: 10, - numMigratingPods: 1, - maxMigrating: 1, - samePod: false, - sameWorkload: false, - want: true, - }, - { - name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with same Pod and Workload", - totalReplicas: 10, - numMigratingPods: 2, - maxMigrating: 2, - samePod: true, - sameWorkload: true, - want: true, - }, - { - name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with diff Pod and same Workload", - totalReplicas: 10, - numMigratingPods: 2, - maxMigrating: 2, - samePod: false, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with diff Pod but prepare migrating and same Workload", - totalReplicas: 10, - numMigratingPods: 2, - maxMigrating: 2, - samePod: false, - sameWorkload: true, - prepareMigrating: true, - want: true, - }, - { - name: "totalReplicas=10 and maxMigrating=2 two migrating Pod with diff Pod and diff Workload", - totalReplicas: 10, - numMigratingPods: 2, - maxMigrating: 2, - samePod: false, - sameWorkload: false, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - reconciler := newTestReconciler() - intOrString := intstr.FromInt(tt.maxMigrating) - reconciler.args.MaxMigratingPerWorkload = &intOrString - maxUnavailable := intstr.FromInt(int(tt.totalReplicas - 1)) - reconciler.args.MaxUnavailablePerWorkload = &maxUnavailable - - var migratingPods []*corev1.Pod - for i := 0; i < tt.numMigratingPods; i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-migrating-pod-%d", i), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }, - }, - }, - } - migratingPods = append(migratingPods, pod) - - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-%d", i), - CreationTimestamp: metav1.Time{Time: time.Now()}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: pod.Namespace, - Name: pod.Name, - UID: pod.UID, - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - } - - var filterPod *corev1.Pod - if tt.samePod && len(migratingPods) > 0 { - filterPod = migratingPods[0] - } - if filterPod == nil { - filterPod = &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - } - if !tt.sameWorkload { - filterPod.OwnerReferences = ownerReferences2 - } - if tt.prepareMigrating { - markPodPrepareMigrating(filterPod) - } - - reconciler.controllerFinder = &fakeControllerFinder{ - replicas: tt.totalReplicas, - } - - got := reconciler.filterMaxMigratingOrUnavailablePerWorkload(filterPod) - assert.Equal(t, tt.want, got) - }) + {"test-1", true}, + {"test-2", false}, } -} - -func TestFilterMaxUnavailablePerWorkload(t *testing.T) { - ownerReferences1 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-1", - UID: uuid.NewUUID(), - }, - } - - ownerReferences2 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-2", - UID: uuid.NewUUID(), - }, - } - tests := []struct { - name string - totalReplicas int32 - numUnavailablePods int - numMigratingPods int - maxUnavailable int - prepareMigrating bool - sameWorkload bool - want bool - }{ - { - name: "totalReplicas=10 and maxUnavailable=1 no migrating Pod and no unavailable Pod", - totalReplicas: 10, - numUnavailablePods: 0, - numMigratingPods: 0, - maxUnavailable: 1, - sameWorkload: true, - want: true, - }, - { - name: "totalReplicas=10 and maxUnavailable=1 one unavailable Pod with same Workload", - totalReplicas: 10, - numUnavailablePods: 1, - numMigratingPods: 0, - maxUnavailable: 1, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=10 and maxUnavailable=1 one migrating Pod with same Workload", - totalReplicas: 10, - numUnavailablePods: 0, - numMigratingPods: 1, - maxUnavailable: 1, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=10 and maxUnavailable=1 one unavailable Pod and one migrating Pod with same Workload", - totalReplicas: 10, - numUnavailablePods: 1, - maxUnavailable: 1, - sameWorkload: true, - want: false, - }, - - { - name: "totalReplicas=10 and maxUnavailable=2 no migrating Pod and no unavailable Pod", - totalReplicas: 10, - numUnavailablePods: 0, - numMigratingPods: 0, - maxUnavailable: 2, - sameWorkload: true, - want: true, - }, - { - name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod with same Workload", - totalReplicas: 10, - numUnavailablePods: 1, - numMigratingPods: 0, - maxUnavailable: 2, - sameWorkload: true, - want: true, - }, - { - name: "totalReplicas=10 and maxUnavailable=2 one migrating Pod with same Workload", - totalReplicas: 10, - numUnavailablePods: 0, - numMigratingPods: 1, - maxUnavailable: 2, - sameWorkload: true, - want: true, - }, - { - name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod and one migrating Pod with same Workload", - totalReplicas: 10, - numUnavailablePods: 1, - numMigratingPods: 1, - maxUnavailable: 2, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod and one migrating Pod but prepare migrating with same Workload", - totalReplicas: 10, - numUnavailablePods: 1, - numMigratingPods: 1, - maxUnavailable: 2, - sameWorkload: true, - prepareMigrating: true, - want: true, - }, - { - name: "totalReplicas=10 and maxUnavailable=2 one unavailable Pod and one migrating Pod with diff Workload", - totalReplicas: 10, - numUnavailablePods: 1, - numMigratingPods: 1, - maxUnavailable: 2, - sameWorkload: false, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + enterFilter := false reconciler := newTestReconciler() - intOrString := intstr.FromInt(int(tt.totalReplicas - 1)) - reconciler.args.MaxMigratingPerWorkload = &intOrString - maxUnavailable := intstr.FromInt(tt.maxUnavailable) - reconciler.args.MaxUnavailablePerWorkload = &maxUnavailable - - var totalPods []*corev1.Pod - for i := 0; i < tt.numUnavailablePods; i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-unavailable-pod-%d", i), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodPending, - }, - } - totalPods = append(totalPods, pod) - - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - } - - for i := 0; i < tt.numMigratingPods; i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-migrating-pod-%d", i), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }, - }, - }, - } - totalPods = append(totalPods, pod) - - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("test-%d", i), - CreationTimestamp: metav1.Time{Time: time.Now()}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: pod.Namespace, - Name: pod.Name, - UID: pod.UID, - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - } - - for i := 0; i < int(tt.totalReplicas)-len(totalPods); i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-available-pod-%d", i), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }, - }, - }, - } - totalPods = append(totalPods, pod) - } - - filterPod := &corev1.Pod{ + pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, + Namespace: "default", + Name: "test", + UID: uuid.NewUUID(), }, Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, + NodeName: "test-node-1", }, } - if !tt.sameWorkload { - filterPod.OwnerReferences = ownerReferences2 - } - if tt.prepareMigrating { - markPodPrepareMigrating(filterPod) - } - reconciler.controllerFinder = &fakeControllerFinder{ - pods: totalPods, - replicas: tt.totalReplicas, + reconciler.arbitrator = &fakeArbitrator{ + filter: func(pod *corev1.Pod) bool { + enterFilter = true + return testCase.expected + }, + preEvictionFilter: func(pod *corev1.Pod) bool { + return true + }, + trackEvictedPod: func(pod *corev1.Pod) { + return + }, } - got := reconciler.filterMaxMigratingOrUnavailablePerWorkload(filterPod) - assert.Equal(t, tt.want, got) + actual := reconciler.Filter(pod) + assert.Equal(t, testCase.expected, actual) + assert.True(t, enterFilter) }) } } -func TestFilterExpectedReplicas(t *testing.T) { - ownerReferences1 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-1", - UID: uuid.NewUUID(), - }, - } - - ownerReferences2 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-2", - UID: uuid.NewUUID(), - }, - } - tests := []struct { - name string - totalReplicas int32 - numUnavailablePods int - maxUnavailable int - prepareMigrating bool - sameWorkload bool - want bool +func TestPreEvictionFilter(t *testing.T) { + testCases := []struct { + name string + expected bool }{ - { - name: "totalReplicas=1 and maxUnavailable=1", - totalReplicas: 1, - numUnavailablePods: 0, - maxUnavailable: 1, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=100 and maxUnavailable=100", - totalReplicas: 100, - numUnavailablePods: 0, - maxUnavailable: 100, - sameWorkload: true, - want: false, - }, - { - name: "totalReplicas=100 and maxUnavailable=10", - totalReplicas: 100, - numUnavailablePods: 0, - maxUnavailable: 10, - sameWorkload: true, - want: true, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { + {"test-1", true}, + {"test-2", false}, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + enterPreEvictionFilter := false reconciler := newTestReconciler() - intOrString := intstr.FromInt(int(tt.totalReplicas - 1)) - reconciler.args.MaxMigratingPerWorkload = &intOrString - maxUnavailable := intstr.FromInt(tt.maxUnavailable) - reconciler.args.MaxUnavailablePerWorkload = &maxUnavailable - - var totalPods []*corev1.Pod - for i := 0; i < tt.numUnavailablePods; i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-unavailable-pod-%d", i), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodPending, - }, - } - totalPods = append(totalPods, pod) - - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) - } - - for i := 0; i < int(tt.totalReplicas)-len(totalPods); i++ { - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-available-pod-%d", i), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, - }, - Spec: corev1.PodSpec{ - NodeName: "test-node", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - Conditions: []corev1.PodCondition{ - { - Type: corev1.PodReady, - Status: corev1.ConditionTrue, - }, - }, - }, - } - totalPods = append(totalPods, pod) - } - - filterPod := &corev1.Pod{ + pod := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: fmt.Sprintf("test-pod-%s", uuid.NewUUID()), - UID: uuid.NewUUID(), - OwnerReferences: ownerReferences1, + Namespace: "default", + Name: "test", + UID: uuid.NewUUID(), }, Spec: corev1.PodSpec{ - NodeName: "test-node", + NodeName: "test-node-1", }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - if !tt.sameWorkload { - filterPod.OwnerReferences = ownerReferences2 - } - - reconciler.controllerFinder = &fakeControllerFinder{ - pods: totalPods, - replicas: tt.totalReplicas, } - got := reconciler.filterExpectedReplicas(filterPod) - assert.Equal(t, tt.want, got) - }) - } -} - -func TestFilterObjectLimiter(t *testing.T) { - ownerReferences1 := []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-1", - UID: uuid.NewUUID(), - }, - } - otherOwnerReferences := metav1.OwnerReference{ - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test-2", - UID: uuid.NewUUID(), - } - testObjectLimiters := deschedulerconfig.ObjectLimiterMap{ - deschedulerconfig.MigrationLimitObjectWorkload: { - Duration: metav1.Duration{Duration: 1 * time.Second}, - MaxMigrating: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}, - }, - } - - tests := []struct { - name string - objectLimiters deschedulerconfig.ObjectLimiterMap - totalReplicas int32 - sleepDuration time.Duration - pod *corev1.Pod - evictedPodsCount int - evictedWorkload *metav1.OwnerReference - want bool - }{ - { - name: "less than default maxMigrating", - totalReplicas: 100, - objectLimiters: testObjectLimiters, - sleepDuration: 100 * time.Millisecond, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 6, - want: true, - }, - { - name: "exceeded default maxMigrating", - totalReplicas: 100, - objectLimiters: testObjectLimiters, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 11, - want: false, - }, - { - name: "other than workload", - totalReplicas: 100, - objectLimiters: testObjectLimiters, - sleepDuration: 100 * time.Millisecond, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, - }, - }, - evictedPodsCount: 11, - evictedWorkload: &otherOwnerReferences, - want: true, - }, - { - name: "disable objectLimiters", - totalReplicas: 100, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, + reconciler.arbitrator = &fakeArbitrator{ + filter: func(pod *corev1.Pod) bool { + return true }, - }, - evictedPodsCount: 11, - objectLimiters: deschedulerconfig.ObjectLimiterMap{ - deschedulerconfig.MigrationLimitObjectWorkload: deschedulerconfig.MigrationObjectLimiter{ - Duration: metav1.Duration{Duration: 0}, + preEvictionFilter: func(pod *corev1.Pod) bool { + enterPreEvictionFilter = true + return testCase.expected }, - }, - want: true, - }, - { - name: "default limiter", - totalReplicas: 100, - pod: &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - OwnerReferences: ownerReferences1, + trackEvictedPod: func(pod *corev1.Pod) { + return }, - }, - evictedPodsCount: 1, - want: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - reconciler := newTestReconciler() - controllerFinder := &fakeControllerFinder{} - if tt.objectLimiters != nil { - reconciler.args.ObjectLimiters = tt.objectLimiters } - reconciler.initObjectLimiters() - if tt.totalReplicas > 0 { - controllerFinder.replicas = tt.totalReplicas - } - reconciler.controllerFinder = controllerFinder - if tt.evictedPodsCount > 0 { - for i := 0; i < tt.evictedPodsCount; i++ { - pod := tt.pod.DeepCopy() - if tt.evictedWorkload != nil { - pod.OwnerReferences = []metav1.OwnerReference{ - *tt.evictedWorkload, - } - } - reconciler.trackEvictedPod(pod) - if tt.sleepDuration > 0 { - time.Sleep(tt.sleepDuration) - } - } - } - got := reconciler.filterLimitedObject(tt.pod) - assert.Equal(t, tt.want, got) + actual := reconciler.PreEvictionFilter(pod) + assert.Equal(t, testCase.expected, actual) + assert.True(t, enterPreEvictionFilter) }) } } -func TestAllowAnnotatedPodMigrationJobPassFilter(t *testing.T) { - reconciler := newTestReconciler() - enterNonRetryable := false - enterRetryable := false - reconciler.nonRetryablePodFilter = func(pod *corev1.Pod) bool { - enterNonRetryable = true - return false - } - reconciler.retryablePodFilter = func(pod *corev1.Pod) bool { - enterRetryable = true - return true - } +type fakeArbitrator struct { + filter framework.FilterFunc + preEvictionFilter framework.FilterFunc + trackEvictedPod func(*corev1.Pod) + add func(*sev1alpha1.PodMigrationJob) + delete func(types.UID) +} - job := &sev1alpha1.PodMigrationJob{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - CreationTimestamp: metav1.Time{Time: time.Now()}, - Annotations: map[string]string{"descheduler.alpha.kubernetes.io/evict": "true"}, - }, - Spec: sev1alpha1.PodMigrationJobSpec{ - PodRef: &corev1.ObjectReference{ - Namespace: "default", - Name: "test-pod", - }, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), job)) - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "default", - Name: "test-pod", - OwnerReferences: []metav1.OwnerReference{ - { - APIVersion: "apps/v1", - Controller: pointer.Bool(true), - Kind: "StatefulSet", - Name: "test", - UID: "2f96233d-a6b9-4981-b594-7c90c987aed9", - }, - }, - }, - Spec: corev1.PodSpec{ - SchedulerName: "koord-scheduler", - }, - Status: corev1.PodStatus{ - Phase: corev1.PodRunning, - }, - } - assert.Nil(t, reconciler.Client.Create(context.TODO(), pod)) +func (f *fakeArbitrator) DeletePodMigrationJob(job *sev1alpha1.PodMigrationJob) { + f.delete(job.UID) +} - result, err := reconciler.preparePendingJob(context.TODO(), job) - assert.False(t, enterRetryable) - assert.False(t, enterNonRetryable) - assert.Nil(t, err) - assert.Equal(t, reconcile.Result{}, result) +func (f *fakeArbitrator) Filter(pod *corev1.Pod) bool { + return f.filter(pod) +} - assert.NoError(t, reconciler.Client.Get(context.TODO(), types.NamespacedName{Name: job.Name}, job)) - assert.Equal(t, sev1alpha1.PodMigrationJobRunning, job.Status.Phase) +func (f *fakeArbitrator) PreEvictionFilter(pod *corev1.Pod) bool { + return f.preEvictionFilter(pod) +} + +func (f *fakeArbitrator) TrackEvictedPod(pod *corev1.Pod) { + f.trackEvictedPod(pod) +} + +func (f *fakeArbitrator) AddPodMigrationJob(job *sev1alpha1.PodMigrationJob) { + f.add(job) }