Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-descheduler: add arbitration to migration controller #1651

Merged
Merged
135 changes: 65 additions & 70 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Koordinator Authors.
Copyright 2022 The Koordinator Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -26,13 +26,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config"
Expand All @@ -46,8 +42,16 @@ const (

var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitratorImpl")

type MigrationFilter interface {
Filter(pod *corev1.Pod) bool
PreEvictionFilter(pod *corev1.Pod) bool
TrackEvictedPod(pod *corev1.Pod)
}

type Arbitrator interface {
Add(job *v1alpha1.PodMigrationJob)
MigrationFilter
AddPodMigrationJob(job *v1alpha1.PodMigrationJob)
DeletePodMigrationJob(job *v1alpha1.PodMigrationJob)
}

// SortFn stably sorts PodMigrationJobs slice based on a certain strategy. Users
Expand All @@ -58,57 +62,90 @@ type arbitratorImpl struct {
waitingCollection map[types.UID]*v1alpha1.PodMigrationJob
interval time.Duration

sorts []SortFn
nonRetryablePodFilter framework.FilterFunc
retryablePodFilter framework.FilterFunc
sorts []SortFn
filter *filter

client client.Client
eventRecorder events.EventRecorder
mu sync.Mutex
}

// New creates an arbitratorImpl based on parameters.
func New(args *config.ArbitrationArgs, options Options) (Arbitrator, error) {
func New(args *config.MigrationControllerArgs, options Options) (Arbitrator, error) {
f, err := newFilter(args, options.Handle)
if err != nil {
return nil, err
}

arbitrator := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
interval: args.Interval.Duration,

interval: args.ArbitrationArgs.Interval.Duration,
sorts: []SortFn{
SortJobsByCreationTime(),
SortJobsByPod(sorter.PodSorter().Sort),
SortJobsByController(),
SortJobsByMigratingNum(options.Client),
},
retryablePodFilter: options.RetryableFilter,
nonRetryablePodFilter: options.NonRetryableFilter,

filter: f,
client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
}

err := options.Manager.Add(arbitrator)
err = options.Manager.Add(arbitrator)
if err != nil {
return nil, err
}
return arbitrator, nil
}

// Add adds a PodMigrationJob to the waitingCollection of arbitratorImpl.
// AddPodMigrationJob adds a PodMigrationJob to the waitingCollection of arbitratorImpl.
eahydra marked this conversation as resolved.
Show resolved Hide resolved
// It is safe to be called concurrently by multiple goroutines.
func (a *arbitratorImpl) Add(job *v1alpha1.PodMigrationJob) {
func (a *arbitratorImpl) AddPodMigrationJob(job *v1alpha1.PodMigrationJob) {
a.mu.Lock()
defer a.mu.Unlock()
a.waitingCollection[job.UID] = job.DeepCopy()
}

// DeletePodMigrationJob remove PodMigrationJob from local PassedArbitration Record map.
func (a *arbitratorImpl) DeletePodMigrationJob(job *v1alpha1.PodMigrationJob) {
a.filter.removeJobPassedArbitration(job.UID)
}

// Start starts the goroutine to arbitrate jobs periodically.
func (a *arbitratorImpl) Start(ctx context.Context) error {
klog.InfoS("Start Arbitrator Arbitrate Goroutine")
wait.Until(a.doOnceArbitrate, a.interval, ctx.Done())
return nil
}

// Filter checks if a pod can be evicted
func (a *arbitratorImpl) Filter(pod *corev1.Pod) bool {
if !a.filter.filterExistingPodMigrationJob(pod) {
return false
}

if !a.filter.reservationFilter(pod) {
return false
}

if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
return false
}
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
return false
}
return true
eahydra marked this conversation as resolved.
Show resolved Hide resolved
}

func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool {
return a.filter.defaultFilterPlugin.PreEvictionFilter(pod)
}

func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) {
a.filter.trackEvictedPod(pod)
}

// sort stably sorts jobs, outputs the sorted results and corresponding ranking map.
func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
for _, sortFn := range a.sorts {
Expand All @@ -117,14 +154,14 @@ func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1
return jobs
}

// filter calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filter(pod *corev1.Pod) (isFailed, isPassed bool) {
// filtering calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filtering(pod *corev1.Pod) (isFailed, isPassed bool) {
if pod != nil {
if a.nonRetryablePodFilter != nil && !a.nonRetryablePodFilter(pod) {
if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
isFailed = true
return
}
if a.retryablePodFilter != nil && !a.retryablePodFilter(pod) {
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
isPassed = false
return
}
Expand All @@ -144,6 +181,7 @@ func (a *arbitratorImpl) updatePassedJob(job *v1alpha1.PodMigrationJob) {
if err != nil {
klog.ErrorS(err, "failed to update job", "job", klog.KObj(job))
} else {
a.filter.markJobPassedArbitration(job.UID)
// remove job from the waitingCollection
a.mu.Lock()
delete(a.waitingCollection, job.UID)
Expand All @@ -167,7 +205,7 @@ func (a *arbitratorImpl) doOnceArbitrate() {
// filter
for _, job := range jobs {
pod := podOfJob[job]
isFailed, isPassed := a.filter(pod)
isFailed, isPassed := a.filtering(pod)
eahydra marked this conversation as resolved.
Show resolved Hide resolved
if isFailed {
a.updateFailedJob(job, pod)
continue
Expand Down Expand Up @@ -205,54 +243,11 @@ func (a *arbitratorImpl) updateFailedJob(job *v1alpha1.PodMigrationJob, pod *cor
a.mu.Unlock()
}

// arbitrationHandler implement handler.EventHandler
type arbitrationHandler struct {
handler.EnqueueRequestForObject
c client.Client
arbitrator Arbitrator
}

func NewHandler(arbitrator Arbitrator, c client.Client) handler.EventHandler {
return &arbitrationHandler{
EnqueueRequestForObject: handler.EnqueueRequestForObject{},
arbitrator: arbitrator,
c: c,
}
}

// Create call Arbitrator.Create
func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
// get job
job := &v1alpha1.PodMigrationJob{}
err := h.c.Get(context.TODO(), types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}, job)
if err != nil {
// if err, add job to the workQueue directly.
enqueueLog.Error(nil, "Fail to get PodMigrationJob", "PodMigrationJob", types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
})
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
return
}
h.arbitrator.Add(job)
}

type Options struct {
Client client.Client
EventRecorder events.EventRecorder
RetryableFilter framework.FilterFunc
NonRetryableFilter framework.FilterFunc
Manager controllerruntime.Manager
Client client.Client
EventRecorder events.EventRecorder
Manager controllerruntime.Manager
Handle framework.Handle
}

func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alpha1.PodMigrationJob]*corev1.Pod {
Expand Down
Loading