Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

koord-descheduler: add arbitration to migration controller #1651

Merged
Merged
30 changes: 12 additions & 18 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (

"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/apis/config"
"github.com/koordinator-sh/koordinator/pkg/descheduler/framework"
"github.com/koordinator-sh/koordinator/pkg/descheduler/utils/sorter"
)

Expand All @@ -58,9 +57,8 @@ type arbitratorImpl struct {
waitingCollection map[types.UID]*v1alpha1.PodMigrationJob
interval time.Duration

sorts []SortFn
nonRetryablePodFilter framework.FilterFunc
retryablePodFilter framework.FilterFunc
sorts []SortFn
arbitrationFilter ArbitrationFilter

client client.Client
eventRecorder events.EventRecorder
Expand All @@ -72,19 +70,16 @@ func New(args *config.ArbitrationArgs, options Options) (Arbitrator, error) {
arbitrator := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
interval: args.Interval.Duration,

sorts: []SortFn{
SortJobsByCreationTime(),
SortJobsByPod(sorter.PodSorter().Sort),
SortJobsByController(),
SortJobsByMigratingNum(options.Client),
},
retryablePodFilter: options.RetryableFilter,
nonRetryablePodFilter: options.NonRetryableFilter,

client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
arbitrationFilter: options.Filter,
client: options.Client,
eventRecorder: options.EventRecorder,
mu: sync.Mutex{},
eahydra marked this conversation as resolved.
Show resolved Hide resolved
}

err := options.Manager.Add(arbitrator)
Expand Down Expand Up @@ -120,11 +115,11 @@ func (a *arbitratorImpl) sort(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1
// filter calls nonRetryablePodFilter and retryablePodFilter to filter one PodMigrationJob.
func (a *arbitratorImpl) filter(pod *corev1.Pod) (isFailed, isPassed bool) {
if pod != nil {
if a.nonRetryablePodFilter != nil && !a.nonRetryablePodFilter(pod) {
if a.arbitrationFilter != nil && !a.arbitrationFilter.NonRetryablePodFilter(pod) {
isFailed = true
return
}
if a.retryablePodFilter != nil && !a.retryablePodFilter(pod) {
if a.arbitrationFilter != nil && !a.arbitrationFilter.RetryablePodFilter(pod) {
isPassed = false
return
}
Expand Down Expand Up @@ -248,11 +243,10 @@ func (h *arbitrationHandler) Create(evt event.CreateEvent, q workqueue.RateLimit
}

type Options struct {
Client client.Client
EventRecorder events.EventRecorder
RetryableFilter framework.FilterFunc
NonRetryableFilter framework.FilterFunc
Manager controllerruntime.Manager
Client client.Client
EventRecorder events.EventRecorder
Filter ArbitrationFilter
Manager controllerruntime.Manager
}

func getPodForJob(c client.Client, jobs []*v1alpha1.PodMigrationJob) map[*v1alpha1.PodMigrationJob]*corev1.Pod {
Expand Down
96 changes: 69 additions & 27 deletions pkg/descheduler/controllers/migration/arbitrator/arbitrator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (

"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
"github.com/koordinator-sh/koordinator/pkg/descheduler/framework"
)

func TestSingleSortFn(t *testing.T) {
Expand Down Expand Up @@ -189,11 +190,13 @@ func TestFilter(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
arbitrator := arbitratorImpl{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.nonRetryable
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.retryable
arbitrationFilter: &fakeArbitrationFilter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.nonRetryable
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return testCase.retryable
},
},
}
isFailed, isPassed := arbitrator.filter(testCase.pod)
Expand Down Expand Up @@ -281,12 +284,14 @@ func TestRequeueJobIfRetryablePodFilterFailed(t *testing.T) {
sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
return jobs
}},
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
arbitrationFilter: &fakeArbitrationFilter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
},
},
client: fakeClient,
mu: sync.Mutex{},
Expand Down Expand Up @@ -352,12 +357,14 @@ func TestAbortJobIfNonRetryablePodFilterFailed(t *testing.T) {
sorts: []SortFn{func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
return jobs
}},
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
arbitrationFilter: &fakeArbitrationFilter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
enter = true
return false
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
},
client: fakeClient,
mu: sync.Mutex{},
Expand Down Expand Up @@ -484,11 +491,13 @@ func TestDoOnceArbitrate(t *testing.T) {
}
a := &arbitratorImpl{
waitingCollection: collection,
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return !nonRetryablePods[pod.Name]
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return !retryablePods[pod.Name]
arbitrationFilter: &fakeArbitrationFilter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return !nonRetryablePods[pod.Name]
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return !retryablePods[pod.Name]
},
},
sorts: []SortFn{
func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
Expand Down Expand Up @@ -537,11 +546,13 @@ func TestArbitrate(t *testing.T) {

a := &arbitratorImpl{
waitingCollection: map[types.UID]*v1alpha1.PodMigrationJob{},
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
arbitrationFilter: &fakeArbitrationFilter{
nonRetryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
retryablePodFilter: func(pod *corev1.Pod) bool {
return true
},
},
sorts: []SortFn{
func(jobs []*v1alpha1.PodMigrationJob, podOfJob map[*v1alpha1.PodMigrationJob]*corev1.Pod) []*v1alpha1.PodMigrationJob {
Expand Down Expand Up @@ -741,3 +752,34 @@ func makePodMigrationJob(name string, creationTime time.Time, pod *corev1.Pod, d
}
return job
}

type fakeControllerFinder struct {
pods []*corev1.Pod
replicas int32
err error
}

func (f *fakeControllerFinder) ListPodsByWorkloads(workloadUIDs []types.UID, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, error) {
return f.pods, f.err
}

func (f *fakeControllerFinder) GetPodsForRef(ownerReference *metav1.OwnerReference, ns string, labelSelector *metav1.LabelSelector, active bool) ([]*corev1.Pod, int32, error) {
return f.pods, f.replicas, f.err
}

func (f *fakeControllerFinder) GetExpectedScaleForPod(pod *corev1.Pod) (int32, error) {
return f.replicas, f.err
}

type fakeArbitrationFilter struct {
nonRetryablePodFilter framework.FilterFunc
retryablePodFilter framework.FilterFunc
}

func (f *fakeArbitrationFilter) NonRetryablePodFilter(pod *corev1.Pod) bool {
return f.nonRetryablePodFilter(pod)
}

func (f *fakeArbitrationFilter) RetryablePodFilter(pod *corev1.Pod) bool {
return f.retryablePodFilter(pod)
}
Loading