Skip to content

Commit

Permalink
koord-descheduler: add arbitration to migration controller (#1651)
Browse files Browse the repository at this point in the history
Signed-off-by: baowj <[email protected]>
Signed-off-by: baowj-678 <[email protected]>
  • Loading branch information
baowj-678 authored Sep 26, 2023
1 parent 4c7df59 commit ac77337
Show file tree
Hide file tree
Showing 9 changed files with 1,655 additions and 1,509 deletions.
152 changes: 82 additions & 70 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2023 The Koordinator Authors.
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand All @@ -26,13 +26,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/events"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config"
Expand All @@ -42,12 +38,21 @@ import (

const (
AnnotationPassedArbitration = "descheduler.koordinator.sh/passed-arbitration"
AnnotationPodArbitrating = "descheduler.koordinator.sh/pod-arbitrating"
)

var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitratorImpl")

type MigrationFilter interface {
Filter(pod *corev1.Pod) bool
PreEvictionFilter(pod *corev1.Pod) bool
TrackEvictedPod(pod *corev1.Pod)
}

type Arbitrator interface {
Add(job *v1alpha1.PodMigrationJob)
MigrationFilter
AddPodMigrationJob(job *v1alpha1.PodMigrationJob)
DeletePodMigrationJob(job *v1alpha1.PodMigrationJob)
}

// SortFn stably sorts PodMigrationJobs slice based on a certain strategy. Users
Expand All @@ -58,57 +63,91 @@ type arbitratorImpl struct {
waitingCollection map[types.UID]*v1alpha1.PodMigrationJob
interval time.Duration

sorts []SortFn
nonRetryablePodFilter framework.FilterFunc
retryablePodFilter framework.FilterFunc
sorts []SortFn
filter *filter

client client.Client
eventRecorder events.EventRecorder
mu sync.Mutex
}

// New creates an arbitratorImpl based on parameters.
func New(args *config.ArbitrationArgs, options Options) (Arbitrator, error) {
func New(args *config.MigrationControllerArgs, options Options) (Arbitrator, error) {
f, err := newFilter(args, options.Handle)
if err != nil {
return nil, err
}

arbitrator := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
interval: args.Interval.Duration,

interval: args.ArbitrationArgs.Interval.Duration,
sorts: []SortFn{
SortJobsByCreationTime(),
SortJobsByPod(sorter.PodSorter().Sort),
SortJobsByController(),
SortJobsByMigratingNum(options.Client),
},
retryablePodFilter: options.RetryableFilter,
nonRetryablePodFilter: options.NonRetryableFilter,

filter: f,
client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
}

err := options.Manager.Add(arbitrator)
err = options.Manager.Add(arbitrator)
if err != nil {
return nil, err
}
return arbitrator, nil
}

// Add adds a PodMigrationJob to the waitingCollection of arbitratorImpl.
// AddPodMigrationJob adds a PodMigrationJob waiting to be arbitrated to Arbitrator.
// It is safe to be called concurrently by multiple goroutines.
func (a *arbitratorImpl) Add(job *v1alpha1.PodMigrationJob) {
func (a *arbitratorImpl) AddPodMigrationJob(job *v1alpha1.PodMigrationJob) {
a.mu.Lock()
defer a.mu.Unlock()
a.waitingCollection[job.UID] = job.DeepCopy()
}

// DeletePodMigrationJob removes a deleted PodMigrationJob from Arbitrator.
// It is safe to be called concurrently by multiple goroutines.
func (a *arbitratorImpl) DeletePodMigrationJob(job *v1alpha1.PodMigrationJob) {
a.filter.removeJobPassedArbitration(job.UID)
}

// Start starts the goroutine to arbitrate jobs periodically.
func (a *arbitratorImpl) Start(ctx context.Context) error {
klog.InfoS("Start Arbitrator Arbitrate Goroutine")
wait.Until(a.doOnceArbitrate, a.interval, ctx.Done())
return nil
}

// Filter checks if a pod can be evicted
func (a *arbitratorImpl) Filter(pod *corev1.Pod) bool {
if !a.filter.filterExistingPodMigrationJob(pod) {
return false
}

if !a.filter.reservationFilter(pod) {
return false
}

if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
return false
}
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
return false
}
return true
}

func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool {
return a.filter.defaultFilterPlugin.PreEvictionFilter(pod)
}

func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) {
a.filter.trackEvictedPod(pod)
}

// sort stably sorts jobs, outputs the sorted results and corresponding ranking map.
func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
for _, sortFn := range a.sorts {
Expand All @@ -117,14 +156,15 @@ func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1
return jobs
}

// filter calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filter(pod *corev1.Pod) (isFailed, isPassed bool) {
// filtering calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filtering(pod *corev1.Pod) (isFailed, isPassed bool) {
if pod != nil {
if a.nonRetryablePodFilter != nil && !a.nonRetryablePodFilter(pod) {
markPodArbitrating(pod)
if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
isFailed = true
return
}
if a.retryablePodFilter != nil && !a.retryablePodFilter(pod) {
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
isPassed = false
return
}
Expand All @@ -144,6 +184,7 @@ func (a *arbitratorImpl) updatePassedJob(job *v1alpha1.PodMigrationJob) {
if err != nil {
klog.ErrorS(err, "failed to update job", "job", klog.KObj(job))
} else {
a.filter.markJobPassedArbitration(job.UID)
// remove job from the waitingCollection
a.mu.Lock()
delete(a.waitingCollection, job.UID)
Expand All @@ -167,7 +208,7 @@ func (a *arbitratorImpl) doOnceArbitrate() {
// filter
for _, job := range jobs {
pod := podOfJob[job]
isFailed, isPassed := a.filter(pod)
isFailed, isPassed := a.filtering(pod)
if isFailed {
a.updateFailedJob(job, pod)
continue
Expand Down Expand Up @@ -205,54 +246,11 @@ func (a *arbitratorImpl) updateFailedJob(job *v1alpha1.PodMigrationJob, pod *cor
a.mu.Unlock()
}

// arbitrationHandler implement handler.EventHandler
type arbitrationHandler struct {
handler.EnqueueRequestForObject
c client.Client
arbitrator Arbitrator
}

func NewHandler(arbitrator Arbitrator, c client.Client) handler.EventHandler {
return &arbitrationHandler{
EnqueueRequestForObject: handler.EnqueueRequestForObject{},
arbitrator: arbitrator,
c: c,
}
}

// Create call Arbitrator.Create
func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimitingInterface) {
if evt.Object == nil {
enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt)
return
}
// get job
job := &v1alpha1.PodMigrationJob{}
err := h.c.Get(context.TODO(), types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}, job)
if err != nil {
// if err, add job to the workQueue directly.
enqueueLog.Error(nil, "Fail to get PodMigrationJob", "PodMigrationJob", types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
})
q.Add(reconcile.Request{NamespacedName: types.NamespacedName{
Name: evt.Object.GetName(),
Namespace: evt.Object.GetNamespace(),
}})
return
}
h.arbitrator.Add(job)
}

type Options struct {
Client client.Client
EventRecorder events.EventRecorder
RetryableFilter framework.FilterFunc
NonRetryableFilter framework.FilterFunc
Manager controllerruntime.Manager
Client client.Client
EventRecorder events.EventRecorder
Manager controllerruntime.Manager
Handle framework.Handle
}

func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alpha1.PodMigrationJob]*corev1.Pod {
Expand All @@ -276,3 +274,17 @@ func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alph
}
return podOfJob
}

func markPodArbitrating(pod *corev1.Pod) {
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[AnnotationPodArbitrating] = "true"
}

func checkPodArbitrating(pod *corev1.Pod) bool {
if pod.Annotations == nil {
return false
}
return pod.Annotations[AnnotationPodArbitrating] == "true"
}
Loading

0 comments on commit ac77337

Please sign in to comment.