Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-descheduler: add arbitration to migration controller #1651

Merged
Merged
74 changes: 54 additions & 20 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,14 @@ const (

var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitratorImpl")

type MigrationFilter interface {
Filter(pod *corev1.Pod) bool
PreEvictionFilter(pod *corev1.Pod) bool
TrackEvictedPod(pod *corev1.Pod)
}

type Arbitrator interface {
MigrationFilter
Add(job *v1alpha1.PodMigrationJob)
}

Expand All @@ -58,36 +65,37 @@ type arbitratorImpl struct {
waitingCollection map[types.UID]*v1alpha1.PodMigrationJob
interval time.Duration

sorts []SortFn
nonRetryablePodFilter framework.FilterFunc
retryablePodFilter framework.FilterFunc
sorts []SortFn
filter *filter

client client.Client
eventRecorder events.EventRecorder
mu sync.Mutex
}

// New creates an arbitratorImpl based on parameters.
func New(args *config.ArbitrationArgs, options Options) (Arbitrator, error) {
func New(args *config.MigrationControllerArgs, options Options) (Arbitrator, error) {
f, err := newFilter(args, options.Handle)
if err != nil {
return nil, err
}

arbitrator := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
interval: args.Interval.Duration,

interval: args.ArbitrationArgs.Interval.Duration,
sorts: []SortFn{
SortJobsByCreationTime(),
SortJobsByPod(sorter.PodSorter().Sort),
SortJobsByController(),
SortJobsByMigratingNum(options.Client),
},
retryablePodFilter: options.RetryableFilter,
nonRetryablePodFilter: options.NonRetryableFilter,

filter: f,
client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
}

err := options.Manager.Add(arbitrator)
err = options.Manager.Add(arbitrator)
if err != nil {
return nil, err
}
Expand All @@ -109,6 +117,33 @@ func (a *arbitratorImpl) Start(ctx context.Context) error {
return nil
}

// Filter checks if a pod can be evicted
func (a *arbitratorImpl) Filter(pod *corev1.Pod) bool {
if !a.filter.filterExistingPodMigrationJob(pod) {
return false
}

if !a.filter.reservationFilter(pod) {
return false
}

if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
return false
}
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
return false
}
return true
eahydra marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool {
return a.filter.defaultFilterPlugin.PreEvictionFilter(pod)
}

func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) {
a.filter.trackEvictedPod(pod)
}

// sort stably sorts jobs, outputs the sorted results and corresponding ranking map.
func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
for _, sortFn := range a.sorts {
Expand All @@ -117,14 +152,14 @@ func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1
return jobs
}

// filter calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filter(pod *corev1.Pod) (isFailed, isPassed bool) {
// filtering calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filtering(pod *corev1.Pod) (isFailed, isPassed bool) {
if pod != nil {
if a.nonRetryablePodFilter != nil && !a.nonRetryablePodFilter(pod) {
if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
isFailed = true
return
}
if a.retryablePodFilter != nil && !a.retryablePodFilter(pod) {
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
isPassed = false
return
}
Expand Down Expand Up @@ -167,7 +202,7 @@ func (a *arbitratorImpl) doOnceArbitrate() {
// filter
for _, job := range jobs {
pod := podOfJob[job]
isFailed, isPassed := a.filter(pod)
isFailed, isPassed := a.filtering(pod)
eahydra marked this conversation as resolved.
Show resolved Hide resolved
if isFailed {
a.updateFailedJob(job, pod)
continue
Expand Down Expand Up @@ -248,11 +283,10 @@ func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimit
}

type Options struct {
Client client.Client
EventRecorder events.EventRecorder
RetryableFilter framework.FilterFunc
NonRetryableFilter framework.FilterFunc
Manager controllerruntime.Manager
Client client.Client
EventRecorder events.EventRecorder
Manager controllerruntime.Manager
Handle framework.Handle
}

func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alpha1.PodMigrationJob]*corev1.Pod {
Expand Down
86 changes: 57 additions & 29 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestMultiSortFn(t *testing.T) {
assert.Equal(t, expectedJobsOrder, jobsOrder)
}

func TestFilter(t *testing.T) {
func TestFiltering(t *testing.T) {
testCases := []struct {
name string
pod *corev1.Pod
Expand Down Expand Up @@ -189,14 +189,16 @@ func TestFilter(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
arbitrator := arbitratorImpl{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.nonRetryable
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.retryable
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.nonRetryable
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.retryable
},
},
}
isFailed, isPassed := arbitrator.filter(testCase.pod)
isFailed, isPassed := arbitrator.filtering(testCase.pod)
assert.Equal(t, testCase.isFailed, isFailed)
assert.Equal(t, testCase.isPassed, isPassed)
})
Expand Down Expand Up @@ -281,12 +283,14 @@ func TestRequeueJobIfRetryablePodFilterFailed(t *testing.T) {
sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
return jobs
}},
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
},
},
client: fakeClient,
mu: sync.Mutex{},
Expand Down Expand Up @@ -352,12 +356,14 @@ func TestAbortJobIfNonRetryablePodFilterFailed(t *testing.T) {
sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
return jobs
}},
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
},
client: fakeClient,
mu: sync.Mutex{},
Expand Down Expand Up @@ -484,11 +490,13 @@ func TestDoOnceArbitrate(t *testing.T) {
}
a := &arbitratorImpl{
waitingCollection: collection,
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return !nonRetryablePods[pod.Name]
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return !retryablePods[pod.Name]
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return !nonRetryablePods[pod.Name]
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return !retryablePods[pod.Name]
},
},
sorts: []SortFn{
func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
Expand Down Expand Up @@ -537,11 +545,13 @@ func TestArbitrate(t *testing.T) {

a := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
},
sorts: []SortFn{
func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
Expand Down Expand Up @@ -741,3 +751,21 @@ func makePodMigrationJob(name string, creationTime time.Time, pod *corev1.Pod, d
}
return job
}

type fakeControllerFinder struct {
pods []*corev1.Pod
replicas int32
err error
}

func (f *fakeControllerFinder) ListPodsByWorkloads(workloadUIDs []types.UID, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, error) {
return f.pods, f.err
}

func (f *fakeControllerFinder) GetPodsForRef(ownerReference *metav1.OwnerReference, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, int32, error) {
return f.pods, f.replicas, f.err
}

func (f *fakeControllerFinder) GetExpectedScaleForPod(pod *corev1.Pod) (int32, error) {
return f.replicas, f.err
}
Loading