Skip to content

Commit

Permalink
koord-descheduler: add arbitration to migration controller.
Browse files Browse the repository at this point in the history
1.make filter a member variable of arbitrator;

Signed-off-by: baowj-678 <[email protected]>
  • Loading branch information
baowj-678 committed Sep 23, 2023
1 parent cbb098d commit 3fc715a
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 104 deletions.
70 changes: 55 additions & 15 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/descheduler/framework"
"github.com/koordinator-sh/koordinator/pkg/descheduler/utils/sorter"
)

Expand All @@ -45,7 +46,14 @@ const (

var enqueueLog = klog.Background().WithName("eventHandler").WithName("arbitratorImpl")

type MigrationFilter interface {
Filter(pod *corev1.Pod) bool
PreEvictionFilter(pod *corev1.Pod) bool
TrackEvictedPod(pod *corev1.Pod)
}

type Arbitrator interface {
MigrationFilter
Add(job *v1alpha1.PodMigrationJob)
}

Expand All @@ -57,32 +65,37 @@ type arbitratorImpl struct {
waitingCollection map[types.UID]*v1alpha1.PodMigrationJob
interval time.Duration

sorts []SortFn
arbitrationFilter ArbitrationFilter
sorts []SortFn
filter *filter

client client.Client
eventRecorder events.EventRecorder
mu sync.Mutex
}

// New creates an arbitratorImpl based on parameters.
func New(args *config.ArbitrationArgs, options Options) (Arbitrator, error) {
func New(args *config.MigrationControllerArgs, options Options) (Arbitrator, error) {
f, err := newFilter(args, options.Handle)
if err != nil {
return nil, err
}

arbitrator := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
interval: args.Interval.Duration,
interval: args.ArbitrationArgs.Interval.Duration,
sorts: []SortFn{
SortJobsByCreationTime(),
SortJobsByPod(sorter.PodSorter().Sort),
SortJobsByController(),
SortJobsByMigratingNum(options.Client),
},
arbitrationFilter: options.Filter,
client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
filter: f,
client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
}

err := options.Manager.Add(arbitrator)
err = options.Manager.Add(arbitrator)
if err != nil {
return nil, err
}
Expand All @@ -104,6 +117,33 @@ func (a *arbitratorImpl) Start(ctx context.Context) error {
return nil
}

// Filter checks if a pod can be evicted
func (a *arbitratorImpl) Filter(pod *corev1.Pod) bool {
if !a.filter.filterExistingPodMigrationJob(pod) {
return false
}

if !a.filter.reservationFilter(pod) {
return false
}

if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
return false
}
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
return false
}
return true
}

func (a *arbitratorImpl) PreEvictionFilter(pod *corev1.Pod) bool {
return a.filter.defaultFilterPlugin.PreEvictionFilter(pod)
}

func (a *arbitratorImpl) TrackEvictedPod(pod *corev1.Pod) {
a.filter.trackEvictedPod(pod)
}

// sort stably sorts jobs, outputs the sorted results and corresponding ranking map.
func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
for _, sortFn := range a.sorts {
Expand All @@ -112,14 +152,14 @@ func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1
return jobs
}

// filter calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filter(pod *corev1.Pod) (isFailed, isPassed bool) {
// filtering calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filtering(pod *corev1.Pod) (isFailed, isPassed bool) {
if pod != nil {
if a.arbitrationFilter != nil && !a.arbitrationFilter.NonRetryablePodFilter(pod) {
if a.filter.nonRetryablePodFilter != nil && !a.filter.nonRetryablePodFilter(pod) {
isFailed = true
return
}
if a.arbitrationFilter != nil && !a.arbitrationFilter.RetryablePodFilter(pod) {
if a.filter.retryablePodFilter != nil && !a.filter.retryablePodFilter(pod) {
isPassed = false
return
}
Expand Down Expand Up @@ -162,7 +202,7 @@ func (a *arbitratorImpl) doOnceArbitrate() {
// filter
for _, job := range jobs {
pod := podOfJob[job]
isFailed, isPassed := a.filter(pod)
isFailed, isPassed := a.filtering(pod)
if isFailed {
a.updateFailedJob(job, pod)
continue
Expand Down Expand Up @@ -245,8 +285,8 @@ func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimit
type Options struct {
Client client.Client
EventRecorder events.EventRecorder
Filter ArbitrationFilter
Manager controllerruntime.Manager
Handle framework.Handle
}

func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alpha1.PodMigrationJob]*corev1.Pod {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/framework"
)

func TestSingleSortFn(t *testing.T) {
Expand Down Expand Up @@ -137,7 +136,7 @@ func TestMultiSortFn(t *testing.T) {
assert.Equal(t, expectedJobsOrder, jobsOrder)
}

func TestFilter(t *testing.T) {
func TestFiltering(t *testing.T) {
testCases := []struct {
name string
pod *corev1.Pod
Expand Down Expand Up @@ -190,7 +189,7 @@ func TestFilter(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
arbitrator := arbitratorImpl{
arbitrationFilter: &fakeArbitrationFilter{
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.nonRetryable
},
Expand All @@ -199,7 +198,7 @@ func TestFilter(t *testing.T) {
},
},
}
isFailed, isPassed := arbitrator.filter(testCase.pod)
isFailed, isPassed := arbitrator.filtering(testCase.pod)
assert.Equal(t, testCase.isFailed, isFailed)
assert.Equal(t, testCase.isPassed, isPassed)
})
Expand Down Expand Up @@ -284,7 +283,7 @@ func TestRequeueJobIfRetryablePodFilterFailed(t *testing.T) {
sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
return jobs
}},
arbitrationFilter: &fakeArbitrationFilter{
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
Expand Down Expand Up @@ -357,7 +356,7 @@ func TestAbortJobIfNonRetryablePodFilterFailed(t *testing.T) {
sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
return jobs
}},
arbitrationFilter: &fakeArbitrationFilter{
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
Expand Down Expand Up @@ -491,7 +490,7 @@ func TestDoOnceArbitrate(t *testing.T) {
}
a := &arbitratorImpl{
waitingCollection: collection,
arbitrationFilter: &fakeArbitrationFilter{
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return !nonRetryablePods[pod.Name]
},
Expand Down Expand Up @@ -546,7 +545,7 @@ func TestArbitrate(t *testing.T) {

a := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
arbitrationFilter: &fakeArbitrationFilter{
filter: &filter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
Expand Down Expand Up @@ -770,16 +769,3 @@ func (f *fakeControllerFinder) GetPodsForRef(ownerReference *metav1.OwnerReferen
func (f *fakeControllerFinder) GetExpectedScaleForPod(pod *corev1.Pod) (int32, error) {
return f.replicas, f.err
}

type fakeArbitrationFilter struct {
nonRetryablePodFilter framework.FilterFunc
retryablePodFilter framework.FilterFunc
}

func (f *fakeArbitrationFilter) NonRetryablePodFilter(pod *corev1.Pod) bool {
return f.nonRetryablePodFilter(pod)
}

func (f *fakeArbitrationFilter) RetryablePodFilter(pod *corev1.Pod) bool {
return f.retryablePodFilter(pod)
}
52 changes: 5 additions & 47 deletions pkg/descheduler/controllers/migration/arbitrator/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,6 @@ import (
utilclient "github.com/koordinator-sh/koordinator/pkg/util/client"
)

type MigrationFilter interface {
Filter(pod *corev1.Pod) bool
PreEvictionFilter(pod *corev1.Pod) bool
TrackEvictedPod(pod *corev1.Pod)
}

type ArbitrationFilter interface {
NonRetryablePodFilter(*corev1.Pod) bool
RetryablePodFilter(*corev1.Pod) bool
}

type filter struct {
client client.Client
mu sync.Mutex
Expand All @@ -76,10 +65,10 @@ type filter struct {
controllerFinder controllerfinder.Interface
}

func NewFilter(args *deschedulerconfig.MigrationControllerArgs, handle framework.Handle) (MigrationFilter, ArbitrationFilter, error) {
func newFilter(args *deschedulerconfig.MigrationControllerArgs, handle framework.Handle) (*filter, error) {
controllerFinder, err := controllerfinder.New(options.Manager)
if err != nil {
return nil, nil, err
return nil, err
}
f := &filter{
client: options.Manager.GetClient(),
Expand All @@ -88,10 +77,10 @@ func NewFilter(args *deschedulerconfig.MigrationControllerArgs, handle framework
clock: clock.RealClock{},
}
if err := f.initFilters(args, handle); err != nil {
return nil, nil, err
return nil, err
}
f.initObjectLimiters()
return f, f, nil
return f, nil
}

func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, handle framework.Handle) error {
Expand Down Expand Up @@ -150,33 +139,6 @@ func (f *filter) initFilters(args *deschedulerconfig.MigrationControllerArgs, ha
return nil
}

// Filter checks if a pod can be evicted
func (f *filter) Filter(pod *corev1.Pod) bool {
if !f.filterExistingPodMigrationJob(pod) {
return false
}

if !f.reservationFilter(pod) {
return false
}

if f.nonRetryablePodFilter != nil && !f.nonRetryablePodFilter(pod) {
return false
}
if f.retryablePodFilter != nil && !f.retryablePodFilter(pod) {
return false
}
return true
}

func (f *filter) NonRetryablePodFilter(pod *corev1.Pod) bool {
return f.nonRetryablePodFilter(pod)
}

func (f *filter) RetryablePodFilter(pod *corev1.Pod) bool {
return f.retryablePodFilter(pod)
}

func (f *filter) reservationFilter(pod *corev1.Pod) bool {
if sev1alpha1.PodMigrationJobMode(f.args.DefaultJobMode) != sev1alpha1.PodMigrationJobModeReservationFirst {
return true
Expand All @@ -190,10 +152,6 @@ func (f *filter) reservationFilter(pod *corev1.Pod) bool {
return false
}

func (f *filter) PreEvictionFilter(pod *corev1.Pod) bool {
return f.defaultFilterPlugin.PreEvictionFilter(pod)
}

func (f *filter) forEachAvailableMigrationJobs(listOpts *client.ListOptions, handler func(job *sev1alpha1.PodMigrationJob) bool, expectedPhaseAndAnnotations ...PhaseAndAnnotation) {
jobList := &sev1alpha1.PodMigrationJobList{}
err := f.client.List(context.TODO(), jobList, listOpts, utilclient.DisableDeepCopy)
Expand Down Expand Up @@ -446,7 +404,7 @@ func mergeUnavailableAndMigratingPods(unavailablePods, migratingPods map[types.N
}
}

func (f *filter) TrackEvictedPod(pod *corev1.Pod) {
func (f *filter) trackEvictedPod(pod *corev1.Pod) {
if f.objectLimiters == nil || f.limiterCache == nil {
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,7 @@ func TestFilterObjectLimiter(t *testing.T) {
*tt.evictedWorkload,
}
}
a.TrackEvictedPod(pod)
a.trackEvictedPod(pod)
if tt.sleepDuration > 0 {
time.Sleep(tt.sleepDuration)
}
Expand Down
19 changes: 7 additions & 12 deletions pkg/descheduler/controllers/migration/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type Reconciler struct {
assumedCache *assumedCache
clock clock.Clock

filter arbitrator.MigrationFilter
arbitrator arbitrator.Arbitrator
}

func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error) {
Expand All @@ -98,21 +98,16 @@ func New(args runtime.Object, handle framework.Handle) (framework.Plugin, error)
return nil, err
}

filter, arbitrationFilter, err := arbitrator.NewFilter(controllerArgs, handle)
if err != nil {
return nil, err
}
r.filter = filter

a, err := arbitrator.New(controllerArgs.ArbitrationArgs, arbitrator.Options{
a, err := arbitrator.New(controllerArgs, arbitrator.Options{
Client: r.Client,
EventRecorder: r.eventRecorder,
Manager: options.Manager,
Filter: arbitrationFilter,
Handle: handle,
})
if err != nil {
return nil, err
}
r.arbitrator = a
arbitrationEventHandler := arbitrator.NewHandler(a, r.Client)

if err = c.Watch(&source.Kind{Type: &sev1alpha1.PodMigrationJob{}}, arbitrationEventHandler, &predicate.Funcs{
Expand Down Expand Up @@ -711,7 +706,7 @@ func (r *Reconciler) evictPod(ctx context.Context, job *sev1alpha1.PodMigrationJ
r.eventRecorder.Eventf(job, nil, corev1.EventTypeWarning, sev1alpha1.PodMigrationJobReasonEvicting, "Migrating", "Failed evict Pod %q caused by %v", podNamespacedName, err)
return false, reconcile.Result{}, err
}
r.filter.TrackEvictedPod(pod)
r.arbitrator.TrackEvictedPod(pod)

_, reason := evictor.GetEvictionTriggerAndReason(job.Annotations)
cond = &sev1alpha1.PodMigrationJobCondition{
Expand Down Expand Up @@ -927,9 +922,9 @@ func (r *Reconciler) updateCondition(ctx context.Context, job *sev1alpha1.PodMig

// Filter checks if a pod can be evicted
func (r *Reconciler) Filter(pod *corev1.Pod) bool {
return r.filter.Filter(pod)
return r.arbitrator.Filter(pod)
}

func (r *Reconciler) PreEvictionFilter(pod *corev1.Pod) bool {
return r.filter.PreEvictionFilter(pod)
return r.arbitrator.PreEvictionFilter(pod)
}
Loading

0 comments on commit 3fc715a

Please sign in to comment.