From d32ee999b8320146092c3c8d40f38584d37576ee Mon Sep 17 00:00:00 2001 From: "xingbao.zy" Date: Mon, 15 Apr 2024 10:47:49 +0800 Subject: [PATCH] scheduler: refine coscheduling fairness consider gangGroup(#2003) Signed-off-by: xingbao.zy --- .../plugins/coscheduling/core/core.go | 101 ++- .../plugins/coscheduling/core/core_test.go | 64 +- .../plugins/coscheduling/core/gang.go | 171 +++-- .../plugins/coscheduling/core/gang_cache.go | 123 ++-- .../coscheduling/core/gang_cache_test.go | 478 ++++++++++---- .../plugins/coscheduling/core/gang_summary.go | 47 +- .../plugins/coscheduling/core/gang_test.go | 167 +++++ .../plugins/coscheduling/core/ganggroup.go | 173 +++++ .../coscheduling/core/ganggroup_test.go | 102 +++ .../plugins/coscheduling/coscheduling.go | 35 +- .../plugins/coscheduling/coscheduling_test.go | 610 +++++++++--------- .../coscheduling/plugin_service_test.go | 38 +- 12 files changed, 1441 insertions(+), 668 deletions(-) create mode 100644 pkg/scheduler/plugins/coscheduling/core/gang_test.go create mode 100644 pkg/scheduler/plugins/coscheduling/core/ganggroup.go create mode 100644 pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go diff --git a/pkg/scheduler/plugins/coscheduling/core/core.go b/pkg/scheduler/plugins/coscheduling/core/core.go index 10e2f7eef..c9376e96e 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core.go +++ b/pkg/scheduler/plugins/coscheduling/core/core.go @@ -59,12 +59,12 @@ const ( // Manager defines the interfaces for PodGroup management. type Manager interface { - PreFilter(context.Context, *corev1.Pod) (err error, scheduleCycleInvalid bool) + PreFilter(context.Context, *corev1.Pod) error Permit(context.Context, *corev1.Pod) (time.Duration, Status) PostBind(context.Context, *corev1.Pod, string) PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) GetCreatTime(*framework.QueuedPodInfo) time.Time - GetGroupId(*corev1.Pod) (string, error) + GetGangGroupId(*corev1.Pod) (string, error) GetAllPodsFromGang(string) []*corev1.Pod ActivateSiblings(*corev1.Pod, *framework.CycleState) AllowGangGroup(*corev1.Pod, framework.Handle, string) @@ -73,7 +73,7 @@ type Manager interface { GetGangSummaries() map[string]*GangSummary IsGangMinSatisfied(*corev1.Pod) bool GetChildScheduleCycle(*corev1.Pod) int - GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time + GetLastScheduleTime(*corev1.Pod, time.Time) time.Time } // PodGroupManager defines the scheduling operation called @@ -146,7 +146,7 @@ func (pgMgr *PodGroupManager) OnPodGroupDelete(obj interface{}) { pgMgr.cache.onPodGroupDelete(obj) } -func (pgMgr *PodGroupManager) GetGroupId(pod *corev1.Pod) (string, error) { +func (pgMgr *PodGroupManager) GetGangGroupId(pod *corev1.Pod) (string, error) { gang := pgMgr.GetGangByPod(pod) if gang == nil { return "", fmt.Errorf("gang doesn't exist in cache") @@ -155,6 +155,15 @@ func (pgMgr *PodGroupManager) GetGroupId(pod *corev1.Pod) (string, error) { return gang.GangGroupId, nil } +func (pgMgr *PodGroupManager) GetLastScheduleTime(pod *corev1.Pod, defaultTime time.Time) time.Time { + gang := pgMgr.GetGangByPod(pod) + if gang == nil { + return defaultTime + } + + return gang.getPodLastScheduleTime(pod) +} + func (pgMgr *PodGroupManager) IsGangMinSatisfied(pod *corev1.Pod) bool { gang := pgMgr.GetGangByPod(pod) if gang == nil { @@ -212,43 +221,52 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework } } +type ScheduleCycleInValidError struct { + ErrMsg string +} + +func (err *ScheduleCycleInValidError) Error() string { + return err.ErrMsg +} + // PreFilter // i.Check whether children in Gang has met the requirements of minimum number under each Gang, and reject the pod if negative. // ii.Check whether the Gang is inited, and reject the pod if positive. // iii.Check whether the Gang is OnceResourceSatisfied // iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ). // v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above. -func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (err error, scheduleCycleInvalid bool) { +func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error { if !util.IsPodNeedGang(pod) { - return nil, false + return nil } gang := pgMgr.GetGangByPod(pod) if gang == nil { return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)), - util.GetId(pod.Namespace, pod.Name)), false + util.GetId(pod.Namespace, pod.Name)) } - pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) // check if gang is initialized if !gang.HasGangInit { return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name, - util.GetId(pod.Namespace, pod.Name)), false + util.GetId(pod.Namespace, pod.Name)) } // resourceSatisfied means pod will directly pass the PreFilter if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() { - return nil, false + return nil } // check minNum if gang.getChildrenNum() < gang.getGangMinNum() { return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name, - util.GetId(pod.Namespace, pod.Name)), false + util.GetId(pod.Namespace, pod.Name)) } if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle { - return nil, false + return nil } + gang.resetPodLastScheduleTime(pod) + // first try update the global cycle of gang gang.trySetScheduleCycleValid() gangScheduleCycle := gang.getScheduleCycle() @@ -257,19 +275,21 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (e gangMode := gang.getGangMode() if gangMode == extension.GangModeStrict { if pod.Status.NominatedNodeName != "" { - return nil, false + return nil } podScheduleCycle := gang.getChildScheduleCycle(pod) if !gang.isScheduleCycleValid() { - return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v", - gang.Name, util.GetId(pod.Namespace, pod.Name)), true + err := &ScheduleCycleInValidError{} + err.ErrMsg = fmt.Sprintf("gang scheduleCycle not valid, gangName: %v, podName: %v", + gang.Name, util.GetId(pod.Namespace, pod.Name)) + return err } if podScheduleCycle >= gangScheduleCycle { return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v", - gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), false + gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle) } } - return nil, false + return nil } // PostFilter @@ -285,9 +305,6 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h klog.Warningf(message) return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, message) } - defer func() { - pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) - }() if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() { return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable) } @@ -354,7 +371,6 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy klog.Warningf("Pod %q missing Gang", klog.KObj(pod)) return } - pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime()) // first delete the pod from gang's waitingFroBindChildren map gang.delAssumedPod(pod) @@ -376,34 +392,20 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi gangSet := sets.NewString(gangGroup...) if handle != nil { - /* - let's explain why we need defer by following example, we have three gang of one gang group, noted as gangA, gangB and gangC, - the following is what happened when no defer. - 1. gangB/C assumed, gangA failed - 2. then gangA invoke PostFilter, reject gangB|gangC, gangB|gangC invoke unreserve asynchronously, i.e: invoke rejectGangGroupById concurrently - 3. gangB or gangC maybe find it's scheduling cycle valid if it judges scheduling cycle before gang A reject it, then different lastScheduleTime is possible - */ - defer func() { - handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { - waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod())) - if gangSet.Has(waitingGangId) { - klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable", - "gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod())) - waitingPod.Reject(pluginName, message) - } - }) - }() - } - if !gang.isScheduleCycleValid() { - // In a schedule cycle, one gang can only reject its self and sibling gang once - return + handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) { + waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod())) + if gangSet.Has(waitingGangId) { + klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable", + "gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod())) + waitingPod.Reject(pluginName, message) + } + }) } - gangGroupLastScheduleTime := timeNowFn() + for gang := range gangSet { gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false) if gangIns != nil { - gangIns.setScheduleCycleValid(false) - gangIns.setLastScheduleTime(gangGroupLastScheduleTime) + gangIns.setScheduleCycleInvalid() } } } @@ -420,7 +422,6 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod } // first update gang in cache gang.addBoundPod(pod) - pgMgr.cache.deleteGangGroupLastScheduleTimeOfPod(pod) // update PodGroup _, pg := pgMgr.GetPodGroup(pod) @@ -569,11 +570,3 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int { return gang.getChildScheduleCycle(pod) } - -func (pgMgr *PodGroupManager) GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time { - gangGroupLastScheduleTime := pgMgr.cache.getGangGroupLastScheduleTimeOfPod(pod.UID) - if gangGroupLastScheduleTime != nil { - return *gangGroupLastScheduleTime - } - return podLastScheduleTime -} diff --git a/pkg/scheduler/plugins/coscheduling/core/core_test.go b/pkg/scheduler/plugins/coscheduling/core/core_test.go index 0883f762c..d214aa5aa 100644 --- a/pkg/scheduler/plugins/coscheduling/core/core_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/core_test.go @@ -82,6 +82,64 @@ func makePg(name, namespace string, min int32, creationTime *time.Time, minResou return pg } +func TestPlugin_PreFilter_ResetScheduleTime(t *testing.T) { + mgr := NewManagerForTest().pgMgr + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, + }, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, + }, + } + mgr.OnPodAdd(pod1) + mgr.OnPodAdd(pod2) + + gang := mgr.GetGangByPod(pod1) + lastScheduleTime1 := gang.GangGroupInfo.LastScheduleTime + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"]) + assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"]) + + mgr.PreFilter(context.TODO(), pod1) + lastScheduleTime2 := gang.GangGroupInfo.LastScheduleTime + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"]) + assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"]) + + mgr.PreFilter(context.TODO(), pod1) + lastScheduleTime2 = gang.GangGroupInfo.LastScheduleTime + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"]) + assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"]) + + mgr.PreFilter(context.TODO(), pod2) + lastScheduleTime2 = gang.GangGroupInfo.LastScheduleTime + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"]) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"]) + + mgr.PreFilter(context.TODO(), pod2) + lastScheduleTime3 := gang.GangGroupInfo.LastScheduleTime + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"]) + assert.Equal(t, lastScheduleTime3, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"]) +} + func TestPlugin_PreFilter(t *testing.T) { gangACreatedTime := time.Now() mgr := NewManagerForTest().pgMgr @@ -300,7 +358,7 @@ func TestPlugin_PreFilter(t *testing.T) { // set pre cases before test pod run if tt.shouldSetValidToFalse { - gang.setScheduleCycleValid(false) + gang.setScheduleCycleInvalid() } if tt.shouldSetCycleEqualWithGlobal { gang.setChildScheduleCycle(tt.pod, 1) @@ -315,7 +373,7 @@ func TestPlugin_PreFilter(t *testing.T) { }() } // run the case - err, _ := mgr.PreFilter(ctx, tt.pod) + err := mgr.PreFilter(ctx, tt.pod) var returnMessage string if err == nil { returnMessage = "" @@ -327,7 +385,7 @@ func TestPlugin_PreFilter(t *testing.T) { if gang != nil && !tt.isNonStrictMode && !tt.shouldSkipCheckScheduleCycle { assert.Equal(t, tt.expectedScheduleCycle, gang.getScheduleCycle()) assert.Equal(t, tt.expectedScheduleCycleValid, gang.isScheduleCycleValid()) - assert.Equal(t, tt.expectedChildCycleMap, gang.ChildrenScheduleRoundMap) + assert.Equal(t, tt.expectedChildCycleMap, gang.GangGroupInfo.ChildrenScheduleRoundMap) assert.Equal(t, tt.expectedChildCycleMap[util.GetId(tt.pod.Namespace, tt.pod.Name)], mgr.GetChildScheduleCycle(tt.pod)) diff --git a/pkg/scheduler/plugins/coscheduling/core/gang.go b/pkg/scheduler/plugins/coscheduling/core/gang.go index 6e64a281e..3d7a37373 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang.go @@ -51,6 +51,7 @@ type Gang struct { TotalChildrenNum int GangGroupId string GangGroup []string + GangGroupInfo *GangGroupInfo Children map[string]*v1.Pod // pods that have already assumed(waiting in Permit stage) WaitingForBindChildren map[string]*v1.Pod @@ -67,20 +68,6 @@ type Gang struct { // once-satisfied, once gang is satisfied, no need to consider any status pods GangMatchPolicy string - // if the podGroup should be passed at PreFilter stage(Strict-Mode) - ScheduleCycleValid bool - // these fields used to count the cycle - // For example, at the beginning, `scheduleCycle` is 1, and each pod's cycle in `childrenScheduleRoundMap` is 0. When each pod comes to PreFilter, - // we will check if the pod's value in `childrenScheduleRoundMap` is smaller than Gang's `scheduleCycle`, If result is positive, - // we set the pod's cycle in `childrenScheduleRoundMap` equal with `scheduleCycle` and pass the check. If result is negative, means - // the pod has been scheduled in this cycle, so we should reject it. With `totalChildrenNum`'s help, when the last pod comes to make all - // `childrenScheduleRoundMap`'s values equal to `scheduleCycle`, Gang's `scheduleCycle` will be added by 1, which means a new schedule cycle. - ScheduleCycle int - // Pods with the same priority are scheduled according to the scheduling time of the previous round to ensure fairness. - // For a Gang, its last round time as a whole is recorded as LastScheduleTime, that is, all Pods below it have the same time, which is LastScheduleTime. - LastScheduleTime time.Time - ChildrenScheduleRoundMap map[string]int - GangFrom string HasGangInit bool @@ -89,22 +76,18 @@ type Gang struct { func NewGang(gangName string) *Gang { return &Gang{ - Name: gangName, - CreateTime: timeNowFn(), - WaitTime: 0, - GangGroupId: gangName, - GangGroup: []string{gangName}, - Mode: extension.GangModeStrict, - GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, - Children: make(map[string]*v1.Pod), - WaitingForBindChildren: make(map[string]*v1.Pod), - BoundChildren: make(map[string]*v1.Pod), - ScheduleCycleValid: true, - ScheduleCycle: 1, - LastScheduleTime: timeNowFn(), - ChildrenScheduleRoundMap: make(map[string]int), - GangFrom: GangFromPodAnnotation, - HasGangInit: false, + Name: gangName, + CreateTime: timeNowFn(), + WaitTime: 0, + GangGroupId: gangName, + GangGroup: []string{gangName}, + Mode: extension.GangModeStrict, + GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, + Children: make(map[string]*v1.Pod), + WaitingForBindChildren: make(map[string]*v1.Pod), + BoundChildren: make(map[string]*v1.Pod), + GangFrom: GangFromPodAnnotation, + HasGangInit: false, } } @@ -153,7 +136,6 @@ func (gang *Gang) tryInitByPodConfig(pod *v1.Pod, args *config.CoschedulingArgs) // here we assume that Coscheduling's CreateTime equal with the pod's CreateTime gang.CreateTime = pod.CreationTimestamp.Time - gang.LastScheduleTime = pod.CreationTimestamp.Time waitTime, err := time.ParseDuration(pod.Annotations[extension.AnnotationGangWaitTime]) if err != nil || waitTime <= 0 { @@ -220,7 +202,6 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu // here we assume that Coscheduling's CreateTime equal with the podGroup CRD CreateTime gang.CreateTime = pg.CreationTimestamp.Time - gang.LastScheduleTime = pg.CreationTimestamp.Time waitTime := util.GetWaitTimeDuration(pg, args.DefaultTimeout.Duration) gang.WaitTime = waitTime @@ -245,6 +226,19 @@ func (gang *Gang) tryInitByPodGroup(pg *v1alpha1.PodGroup, args *config.Coschedu gang.Mode, gang.WaitTime, gang.GangGroup) } +func (gang *Gang) SetGangGroupInfo(gangGroupInfo *GangGroupInfo) { + gang.lock.Lock() + defer gang.lock.Unlock() + + if gang.GangGroupInfo == nil { + gang.GangGroupInfo = gangGroupInfo + klog.Infof("SetGangGroupInfo done, gangName: %v, groupSlice: %v, gangGroupId: %v", + gang.Name, gang.GangGroup, gang.GangGroupId) + } + + gang.GangGroupInfo.SetGangTotalChildrenNum(gang.Name, gang.TotalChildrenNum) +} + func (gang *Gang) deletePod(pod *v1.Pod) bool { if pod == nil { return false @@ -259,7 +253,8 @@ func (gang *Gang) deletePod(pod *v1.Pod) bool { delete(gang.Children, podId) delete(gang.WaitingForBindChildren, podId) delete(gang.BoundChildren, podId) - delete(gang.ChildrenScheduleRoundMap, podId) + gang.GangGroupInfo.deleteChildScheduleCycle(podId) + gang.GangGroupInfo.deletePodLastScheduleTime(podId) if gang.GangFrom == GangFromPodAnnotation { if len(gang.Children) == 0 { return true @@ -330,104 +325,108 @@ func (gang *Gang) getGangWaitingPods() int { return len(gang.WaitingForBindChildren) } -func (gang *Gang) getScheduleCycle() int { +func (gang *Gang) getCreateTime() time.Time { gang.lock.Lock() defer gang.lock.Unlock() - return gang.ScheduleCycle + return gang.CreateTime } -func (gang *Gang) getChildScheduleCycle(pod *v1.Pod) int { +func (gang *Gang) getGangGroup() []string { gang.lock.Lock() defer gang.lock.Unlock() - podId := util.GetId(pod.Namespace, pod.Name) - return gang.ChildrenScheduleRoundMap[podId] + return gang.GangGroup } -func (gang *Gang) getCreateTime() time.Time { +func (gang *Gang) isGangOnceResourceSatisfied() bool { gang.lock.Lock() defer gang.lock.Unlock() - return gang.CreateTime + return gang.OnceResourceSatisfied } -func (gang *Gang) getGangGroup() []string { +func (gang *Gang) setChild(pod *v1.Pod) { gang.lock.Lock() defer gang.lock.Unlock() - return gang.GangGroup + podId := util.GetId(pod.Namespace, pod.Name) + if _, ok := gang.Children[podId]; !ok { + gang.Children[podId] = pod + klog.Infof("SetChild, gangName: %v, childName: %v", gang.Name, podId) + } } -func (gang *Gang) isGangOnceResourceSatisfied() bool { +func (gang *Gang) initAllChildrenPodLastScheduleTime() { gang.lock.Lock() defer gang.lock.Unlock() - return gang.OnceResourceSatisfied + for _, pod := range gang.Children { + gang.GangGroupInfo.initPodLastScheduleTime(pod) + } } -func (gang *Gang) isScheduleCycleValid() bool { +func (gang *Gang) initPodLastScheduleTime(pod *v1.Pod) { gang.lock.Lock() defer gang.lock.Unlock() - return gang.ScheduleCycleValid + gang.GangGroupInfo.initPodLastScheduleTime(pod) } -func (gang *Gang) setChild(pod *v1.Pod) { +func (gang *Gang) getPodLastScheduleTime(pod *v1.Pod) time.Time { gang.lock.Lock() defer gang.lock.Unlock() - podId := util.GetId(pod.Namespace, pod.Name) - if _, ok := gang.Children[podId]; !ok { - gang.Children[podId] = pod - klog.Infof("SetChild, gangName: %v, childName: %v", gang.Name, podId) - } + return gang.GangGroupInfo.getPodLastScheduleTime(pod) } -func (gang *Gang) setScheduleCycleValid(valid bool) { +func (gang *Gang) resetPodLastScheduleTime(pod *v1.Pod) { gang.lock.Lock() defer gang.lock.Unlock() - if !valid && !gang.ScheduleCycleValid { - /* - let's explain why by following example, there are three gang of one group: F, G, every gang group have min-member of 10 pod, noted as F1-F10, G1-G10. - 1. F1 failed due to insufficient resource, F2-F10 failed due to cycle invalid, - 2. then G1-G10 assumed, however all it's sibling will come until next round of all gangs, these gangs will face insufficient resource due to pre-allocated of G1-G10 + gang.GangGroupInfo.resetPodLastScheduleTime(pod) +} - TODO this logic can be optimized by give gangs of same group same scheduling cycle - */ - gang.ScheduleCycle += 1 - } - gang.ScheduleCycleValid = valid - klog.Infof("SetScheduleCycleValid, gangName: %v, valid: %v", gang.Name, valid) +func (gang *Gang) setScheduleCycleInvalid() { + gang.lock.Lock() + defer gang.lock.Unlock() + + gang.GangGroupInfo.setScheduleCycleInvalid() +} + +func (gang *Gang) isScheduleCycleValid() bool { + gang.lock.Lock() + defer gang.lock.Unlock() + + return gang.GangGroupInfo.IsScheduleCycleValid() } func (gang *Gang) setChildScheduleCycle(pod *v1.Pod, childCycle int) { gang.lock.Lock() defer gang.lock.Unlock() - podId := util.GetId(pod.Namespace, pod.Name) - gang.ChildrenScheduleRoundMap[podId] = childCycle - klog.Infof("setChildScheduleCycle, pod: %v, childCycle: %v", podId, childCycle) + gang.GangGroupInfo.setChildScheduleCycle(pod, childCycle) } -func (gang *Gang) trySetScheduleCycleValid() { +func (gang *Gang) getChildScheduleCycle(pod *v1.Pod) int { gang.lock.Lock() defer gang.lock.Unlock() - num := 0 - for _, childScheduleCycle := range gang.ChildrenScheduleRoundMap { - if childScheduleCycle == gang.ScheduleCycle { - num++ - } - } + return gang.GangGroupInfo.getChildScheduleCycle(pod) +} - if num == gang.TotalChildrenNum { - gang.ScheduleCycleValid = true - gang.ScheduleCycle += 1 - klog.Infof("trySetScheduleCycleTrue, gangName: %v, ScheduleCycle: %v, ScheduleCycleValid: %v", - gang.Name, gang.ScheduleCycle, gang.ScheduleCycleValid) - } +func (gang *Gang) getScheduleCycle() int { + gang.lock.Lock() + defer gang.lock.Unlock() + + return gang.GangGroupInfo.GetScheduleCycle() +} + +func (gang *Gang) trySetScheduleCycleValid() { + gang.lock.Lock() + defer gang.lock.Unlock() + + gang.GangGroupInfo.trySetScheduleCycleValid() } func (gang *Gang) addAssumedPod(pod *v1.Pod) { @@ -510,15 +509,3 @@ func (gang *Gang) isGangValidForPermit() bool { return len(gang.WaitingForBindChildren) >= gang.MinRequiredNumber || gang.OnceResourceSatisfied == true } } - -func (gang *Gang) getLastScheduleTime() time.Time { - gang.lock.Lock() - defer gang.lock.Unlock() - return gang.LastScheduleTime -} - -func (gang *Gang) setLastScheduleTime(time time.Time) { - gang.lock.Lock() - defer gang.lock.Unlock() - gang.LastScheduleTime = time -} diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_cache.go b/pkg/scheduler/plugins/coscheduling/core/gang_cache.go index df96dbd82..d6f3431e1 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_cache.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_cache.go @@ -18,10 +18,8 @@ package core import ( "sync" - "time" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" @@ -34,57 +32,63 @@ import ( ) type GangCache struct { - lock *sync.RWMutex - gangItems map[string]*Gang - pluginArgs *config.CoschedulingArgs - podLister listerv1.PodLister - pgLister pglister.PodGroupLister - pgClient pgclientset.Interface - gangGroupLastScheduleTimeOfPod map[types.UID]*time.Time + lock *sync.RWMutex + gangItems map[string]*Gang + gangGroupInfoMap map[string]*GangGroupInfo + pluginArgs *config.CoschedulingArgs + podLister listerv1.PodLister + pgLister pglister.PodGroupLister + pgClient pgclientset.Interface } func NewGangCache(args *config.CoschedulingArgs, podLister listerv1.PodLister, pgLister pglister.PodGroupLister, client pgclientset.Interface) *GangCache { return &GangCache{ - gangItems: make(map[string]*Gang), - lock: new(sync.RWMutex), - pluginArgs: args, - podLister: podLister, - pgLister: pgLister, - pgClient: client, - gangGroupLastScheduleTimeOfPod: make(map[types.UID]*time.Time), + gangItems: make(map[string]*Gang), + gangGroupInfoMap: make(map[string]*GangGroupInfo), + lock: new(sync.RWMutex), + pluginArgs: args, + podLister: podLister, + pgLister: pgLister, + pgClient: client, } } -func (gangCache *GangCache) getGangFromCacheByGangId(gangId string, createIfNotExist bool) *Gang { +func (gangCache *GangCache) getGangGroupInfo(gangGroupId string, gangGroup []string, createIfNotExist bool) *GangGroupInfo { gangCache.lock.Lock() defer gangCache.lock.Unlock() - gang := gangCache.gangItems[gangId] - if gang == nil && createIfNotExist { - gang = NewGang(gangId) - gangCache.gangItems[gangId] = gang - klog.Infof("getGangFromCache create new gang, gang: %v", gangId) + + var gangGroupInfo *GangGroupInfo + if gangCache.gangGroupInfoMap[gangGroupId] == nil { + if createIfNotExist { + gangGroupInfo = NewGangGroupInfo(gangGroupId, gangGroup) + gangCache.gangGroupInfoMap[gangGroupId] = gangGroupInfo + klog.Infof("add gangGroupInfo to cache, gangGroupId: %v", gangGroupId) + } + } else { + gangGroupInfo = gangCache.gangGroupInfoMap[gangGroupId] } - return gang -} -func (gangCache *GangCache) getGangGroupLastScheduleTimeOfPod(podUID types.UID) *time.Time { - gangCache.lock.Lock() - defer gangCache.lock.Unlock() - return gangCache.gangGroupLastScheduleTimeOfPod[podUID] + return gangGroupInfo } -func (gangCache *GangCache) setGangGroupLastScheduleTimeOfPod(podUID types.UID, lastScheduleTime time.Time) { +func (gangCache *GangCache) deleteGangGroupInfo(gangGroupId string) { gangCache.lock.Lock() defer gangCache.lock.Unlock() - gangCache.gangGroupLastScheduleTimeOfPod[podUID] = &lastScheduleTime + + delete(gangCache.gangGroupInfoMap, gangGroupId) + klog.Infof("delete gangGroupInfo from cache, gangGroupId: %v", gangGroupId) } -func (gangCache *GangCache) deleteGangGroupLastScheduleTimeOfPod(pod *v1.Pod) { +func (gangCache *GangCache) getGangFromCacheByGangId(gangId string, createIfNotExist bool) *Gang { gangCache.lock.Lock() defer gangCache.lock.Unlock() - - delete(gangCache.gangGroupLastScheduleTimeOfPod, pod.UID) - klog.Infof("delete podScheduleInfo from cache, pod: %s/%s/%v", pod.Namespace, pod.Name, pod.UID) + gang := gangCache.gangItems[gangId] + if gang == nil && createIfNotExist { + gang = NewGang(gangId) + gangCache.gangItems[gangId] = gang + klog.Infof("getGangFromCache create new gang, gang: %v", gangId) + } + return gang } func (gangCache *GangCache) getAllGangsFromCache() map[string]*Gang { @@ -125,12 +129,26 @@ func (gangCache *GangCache) onPodAdd(obj interface{}) { // the gang is created in Annotation way if pod.Labels[v1alpha1.PodGroupLabel] == "" { gang.tryInitByPodConfig(pod, gangCache.pluginArgs) + + gangGroup := gang.getGangGroup() + gangGroupId := util.GetGangGroupId(gangGroup) + gangGroupInfo := gangCache.getGangGroupInfo(gangGroupId, gangGroup, true) + gang.SetGangGroupInfo(gangGroupInfo) + gang.initPodLastScheduleTime(pod) + } else { + //only podGroup added then can initPodLastScheduleTime + gangGroup := gang.getGangGroup() + gangGroupId := util.GetGangGroupId(gangGroup) + gangGroupInfo := gangCache.getGangGroupInfo(gangGroupId, gangGroup, false) + if gangGroupInfo != nil { + gang.initPodLastScheduleTime(pod) + } } + gang.setChild(pod) if pod.Spec.NodeName != "" { gang.addBoundPod(pod) gang.setResourceSatisfied() - gangCache.deleteGangGroupLastScheduleTimeOfPod(pod) } } @@ -161,7 +179,6 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) { if gangName == "" { return } - gangCache.deleteGangGroupLastScheduleTimeOfPod(pod) gangNamespace := pod.Namespace gangId := util.GetId(gangNamespace, gangName) @@ -173,6 +190,17 @@ func (gangCache *GangCache) onPodDelete(obj interface{}) { shouldDeleteGang := gang.deletePod(pod) if shouldDeleteGang { gangCache.deleteGangFromCacheByGangId(gangId) + + allGangDeleted := true + for _, gangId := range gang.GangGroup { + if gangCache.getGangFromCacheByGangId(gangId, false) != nil { + allGangDeleted = false + break + } + } + if allGangDeleted { + gangCache.deleteGangGroupInfo(gang.GangGroupInfo.GangGroupId) + } } } @@ -187,6 +215,13 @@ func (gangCache *GangCache) onPodGroupAdd(obj interface{}) { gangId := util.GetId(gangNamespace, gangName) gang := gangCache.getGangFromCacheByGangId(gangId, true) gang.tryInitByPodGroup(pg, gangCache.pluginArgs) + + gangGroup := gang.getGangGroup() + gangGroupId := util.GetGangGroupId(gangGroup) + gangGroupInfo := gangCache.getGangGroupInfo(gangGroupId, gangGroup, true) + gang.SetGangGroupInfo(gangGroupInfo) + //reset already connected pods lastScheduleTime + gang.initAllChildrenPodLastScheduleTime() } func (gangCache *GangCache) onPodGroupUpdate(oldObj interface{}, newObj interface{}) { @@ -204,6 +239,11 @@ func (gangCache *GangCache) onPodGroupUpdate(oldObj interface{}, newObj interfac return } gang.tryInitByPodGroup(pg, gangCache.pluginArgs) + + gangGroup := gang.getGangGroup() + gangGroupId := util.GetGangGroupId(gangGroup) + gangGroupInfo := gangCache.getGangGroupInfo(gangGroupId, gangGroup, true) + gang.SetGangGroupInfo(gangGroupInfo) } func (gangCache *GangCache) onPodGroupDelete(obj interface{}) { @@ -220,4 +260,15 @@ func (gangCache *GangCache) onPodGroupDelete(obj interface{}) { return } gangCache.deleteGangFromCacheByGangId(gangId) + + allGangDeleted := true + for _, gangId := range gang.GangGroup { + if gangCache.getGangFromCacheByGangId(gangId, false) != nil { + allGangDeleted = false + break + } + } + if allGangDeleted { + gangCache.deleteGangGroupInfo(gang.GangGroupInfo.GangGroupId) + } } diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go b/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go index bd2326b1e..740ab7e9d 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_cache_test.go @@ -53,6 +53,12 @@ func getTestDefaultCoschedulingArgs(t *testing.T) *config.CoschedulingArgs { } func TestGangCache_OnPodAdd(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + defaultArgs := getTestDefaultCoschedulingArgs(t) tests := []struct { name string @@ -89,17 +95,15 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, wantCache: map[string]*Gang{ "default/test": { - Name: "default/test", - CreateTime: fakeTimeNowFn(), - WaitTime: 0, - GangGroupId: "default/test", - GangGroup: []string{"default/test"}, - Mode: extension.GangModeStrict, - ScheduleCycleValid: true, - ScheduleCycle: 1, - GangFrom: GangFromPodAnnotation, - GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, - HasGangInit: false, + Name: "default/test", + CreateTime: fakeTimeNowFn(), + WaitTime: 0, + GangGroupId: "default/test", + GangGroup: []string{"default/test"}, + Mode: extension.GangModeStrict, + GangFrom: GangFromPodAnnotation, + GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, + HasGangInit: false, Children: map[string]*corev1.Pod{ "default/crdPod": { ObjectMeta: metav1.ObjectMeta{ @@ -109,9 +113,8 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ChildrenScheduleRoundMap: map[string]int{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, }, }, @@ -159,6 +162,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { MinRequiredNumber: 2, TotalChildrenNum: 2, GangGroup: []string{"default/ganga", "default/gangb"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/ganga", "default/gangb"}), []string{"default/ganga", "default/gangb"}), HasGangInit: true, GangFrom: GangFromPodAnnotation, GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, @@ -211,10 +215,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - ScheduleCycleValid: true, - ScheduleCycle: 1, - OnceResourceSatisfied: true, - ChildrenScheduleRoundMap: map[string]int{}, + OnceResourceSatisfied: true, }, }, }, @@ -261,6 +262,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { MinRequiredNumber: 2, TotalChildrenNum: 2, GangGroup: []string{"default/ganga"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/ganga"}), []string{"default/ganga"}), HasGangInit: true, GangFrom: GangFromPodAnnotation, GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, @@ -311,10 +313,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - ScheduleCycleValid: true, - ScheduleCycle: 1, - OnceResourceSatisfied: true, - ChildrenScheduleRoundMap: map[string]int{}, + OnceResourceSatisfied: true, }, }, }, @@ -359,6 +358,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { MinRequiredNumber: 2, TotalChildrenNum: 2, GangGroup: []string{"default/gangb"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangb"}), []string{"default/gangb"}), GangGroupId: "default/gangb", HasGangInit: true, GangFrom: GangFromPodAnnotation, @@ -389,11 +389,8 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, }, }, @@ -437,6 +434,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { MinRequiredNumber: 0, TotalChildrenNum: 0, GangGroup: []string{"default/gangc"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangc"}), []string{"default/gangc"}), HasGangInit: true, GangFrom: GangFromPodAnnotation, GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, @@ -454,11 +452,8 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, "default/gangd": { Name: "default/gangd", @@ -466,6 +461,7 @@ func TestGangCache_OnPodAdd(t *testing.T) { CreateTime: fakeTimeNowFn(), Mode: extension.GangModeStrict, GangGroupId: "default/gangd", + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangd"}), []string{"default/gangd"}), MinRequiredNumber: 0, TotalChildrenNum: 0, GangGroup: []string{"default/gangd"}, @@ -486,11 +482,8 @@ func TestGangCache_OnPodAdd(t *testing.T) { }, }, }, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, }, }, @@ -498,11 +491,6 @@ func TestGangCache_OnPodAdd(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - preTimeNowFn := timeNowFn - defer func() { - timeNowFn = preTimeNowFn - }() - timeNowFn = fakeTimeNowFn pgClientSet := fakepgclientset.NewSimpleClientset() pgInformerFactory := pgformers.NewSharedInformerFactory(pgClientSet, 0) pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() @@ -518,12 +506,36 @@ func TestGangCache_OnPodAdd(t *testing.T) { } tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup) } + + for _, pod := range tt.pods { + gangName := util.GetGangNameByPod(pod) + gangId := util.GetId(pod.Namespace, gangName) + gang := tt.wantCache[gangId] + if gang == nil { + continue + } + + if gang.GangGroupInfo == nil { + continue + } + + gang.GangGroupInfo.GangTotalChildrenNumMap[gang.Name] = gang.TotalChildrenNum + gang.GangGroupInfo.ChildrenLastScheduleTime[util.GetId(pod.Namespace, pod.Name)] = + gang.GangGroupInfo.LastScheduleTime + } + assert.Equal(t, tt.wantCache, gangCache.gangItems) }) } } func TestGangCache_OnPodUpdate(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + defaultArgs := getTestDefaultCoschedulingArgs(t) tests := []struct { name string @@ -580,6 +592,7 @@ func TestGangCache_OnPodUpdate(t *testing.T) { MinRequiredNumber: 2, TotalChildrenNum: 2, GangGroup: []string{"default/ganga", "default/gangb"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/ganga", "default/gangb"}), []string{"default/ganga", "default/gangb"}), HasGangInit: true, GangFrom: GangFromPodAnnotation, Children: map[string]*corev1.Pod{ @@ -631,10 +644,7 @@ func TestGangCache_OnPodUpdate(t *testing.T) { }, }, }, - ScheduleCycleValid: true, - ScheduleCycle: 1, - OnceResourceSatisfied: true, - ChildrenScheduleRoundMap: map[string]int{}, + OnceResourceSatisfied: true, }, }, }, @@ -642,11 +652,6 @@ func TestGangCache_OnPodUpdate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - preTimeNowFn := timeNowFn - defer func() { - timeNowFn = preTimeNowFn - }() - timeNowFn = fakeTimeNowFn pgClientSet := fakepgclientset.NewSimpleClientset() pgInformerFactory := pgformers.NewSharedInformerFactory(pgClientSet, 0) pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() @@ -662,12 +667,32 @@ func TestGangCache_OnPodUpdate(t *testing.T) { } tt.wantCache[k].GangGroupId = util.GetGangGroupId(v.GangGroup) } + + for _, pod := range tt.pods { + gangName := util.GetGangNameByPod(pod) + gangId := util.GetId(pod.Namespace, gangName) + gang := tt.wantCache[gangId] + if gang == nil { + continue + } + + gang.GangGroupInfo.GangTotalChildrenNumMap[gang.Name] = gang.TotalChildrenNum + gang.GangGroupInfo.ChildrenLastScheduleTime[util.GetId(pod.Namespace, pod.Name)] = + gang.GangGroupInfo.LastScheduleTime + } + assert.Equal(t, tt.wantCache, gangCache.gangItems) }) } } func TestGangCache_OnPodDelete(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + tests := []struct { name string podGroups []*v1alpha1.PodGroup @@ -736,7 +761,7 @@ func TestGangCache_OnPodDelete(t *testing.T) { Namespace: "default", Name: "pod5", Labels: map[string]string{ - v1alpha1.PodGroupLabel: "GangB", + v1alpha1.PodGroupLabel: "gangB", }, }, }, @@ -745,7 +770,7 @@ func TestGangCache_OnPodDelete(t *testing.T) { Namespace: "default", Name: "pod6", Labels: map[string]string{ - v1alpha1.PodGroupLabel: "GangB", + v1alpha1.PodGroupLabel: "gangB", }, }, }, @@ -764,23 +789,21 @@ func TestGangCache_OnPodDelete(t *testing.T) { }, wantCache: map[string]*Gang{ "default/gangB": { - Name: "default/gangB", - WaitTime: 10 * time.Second, - CreateTime: fakeTimeNowFn(), - Mode: extension.GangModeStrict, - MinRequiredNumber: 4, - TotalChildrenNum: 4, - GangGroup: []string{"default/gangB"}, - GangGroupId: "default/gangB", - HasGangInit: true, - GangFrom: GangFromPodGroupCrd, - GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, - Children: map[string]*corev1.Pod{}, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, + Name: "default/gangB", + WaitTime: 10 * time.Second, + CreateTime: fakeTimeNowFn(), + Mode: extension.GangModeStrict, + MinRequiredNumber: 4, + TotalChildrenNum: 4, + GangGroup: []string{"default/gangB"}, + GangGroupId: "default/gangB", + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangB"}), []string{"default/gangB"}), + HasGangInit: true, + GangFrom: GangFromPodGroupCrd, + GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, + Children: map[string]*corev1.Pod{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, }, wantPodGroup: map[string]*v1alpha1.PodGroup{ @@ -799,11 +822,6 @@ func TestGangCache_OnPodDelete(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - preTimeNowFn := timeNowFn - defer func() { - timeNowFn = preTimeNowFn - }() - timeNowFn = fakeTimeNowFn pgClient := fakepgclientset.NewSimpleClientset() pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0) pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() @@ -827,6 +845,23 @@ func TestGangCache_OnPodDelete(t *testing.T) { gangCache.onPodAdd(pod) } + for _, pod := range tt.pods { + gangName := util.GetGangNameByPod(pod) + gangId := util.GetId(pod.Namespace, gangName) + gang := tt.wantCache[gangId] + if gang == nil { + continue + } + + gang.GangGroupInfo.GangTotalChildrenNumMap[gang.Name] = gang.TotalChildrenNum + } + + for _, gang := range tt.wantCache { + if gangCache.gangItems[gang.Name].GangGroupInfo == nil { + gangCache.gangItems[gang.Name].GangGroupInfo = NewGangGroupInfo("", nil) + } + } + // start deleting pods for _, pod := range tt.pods { gangCache.onPodDelete(pod) @@ -867,6 +902,12 @@ func TestGangCache_OnPodDelete(t *testing.T) { } func TestGangCache_OnPodGroupAdd(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + waitTime := int32(300) tests := []struct { name string @@ -893,22 +934,20 @@ func TestGangCache_OnPodGroupAdd(t *testing.T) { }, wantCache: map[string]*Gang{ "default/gangA": { - Name: "default/gangA", - WaitTime: 300 * time.Second, - CreateTime: fakeTimeNowFn(), - Mode: extension.GangModeNonStrict, - MinRequiredNumber: 2, - TotalChildrenNum: 2, - GangGroup: []string{"default/gangA", "default/gangB"}, - HasGangInit: true, - GangFrom: GangFromPodGroupCrd, - GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, - Children: map[string]*corev1.Pod{}, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, + Name: "default/gangA", + WaitTime: 300 * time.Second, + CreateTime: fakeTimeNowFn(), + Mode: extension.GangModeNonStrict, + MinRequiredNumber: 2, + TotalChildrenNum: 2, + GangGroup: []string{"default/gangA", "default/gangB"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangA", "default/gangB"}), []string{"default/gangA", "default/gangB"}), + HasGangInit: true, + GangFrom: GangFromPodGroupCrd, + GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, + Children: map[string]*corev1.Pod{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, }, }, @@ -933,23 +972,21 @@ func TestGangCache_OnPodGroupAdd(t *testing.T) { }, wantCache: map[string]*Gang{ "default/gangA": { - Name: "default/gangA", - WaitTime: 300 * time.Second, - CreateTime: fakeTimeNowFn(), - Mode: extension.GangModeStrict, - MinRequiredNumber: 4, - TotalChildrenNum: 4, - GangGroup: []string{"default/gangA"}, - GangGroupId: "default/gangA", - HasGangInit: true, - GangFrom: GangFromPodGroupCrd, - GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, - Children: map[string]*corev1.Pod{}, - WaitingForBindChildren: map[string]*corev1.Pod{}, - BoundChildren: map[string]*corev1.Pod{}, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, + Name: "default/gangA", + WaitTime: 300 * time.Second, + CreateTime: fakeTimeNowFn(), + Mode: extension.GangModeStrict, + MinRequiredNumber: 4, + TotalChildrenNum: 4, + GangGroup: []string{"default/gangA"}, + GangGroupId: "default/gangA", + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangA"}), []string{"default/gangA"}), + HasGangInit: true, + GangFrom: GangFromPodGroupCrd, + GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, + Children: map[string]*corev1.Pod{}, + WaitingForBindChildren: map[string]*corev1.Pod{}, + BoundChildren: map[string]*corev1.Pod{}, }, }, }, @@ -957,16 +994,18 @@ func TestGangCache_OnPodGroupAdd(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - preTimeNowFn := timeNowFn - defer func() { - timeNowFn = preTimeNowFn - }() - timeNowFn = fakeTimeNowFn pgClient := fakepgclientset.NewSimpleClientset() gangCache := NewGangCache(&config.CoschedulingArgs{DefaultTimeout: metav1.Duration{Duration: time.Second}}, nil, nil, pgClient) for _, pg := range tt.pgs { gangCache.onPodGroupAdd(pg) } + + for _, gang := range tt.wantCache { + if gang.GangGroupInfo != nil { + gang.GangGroupInfo.GangTotalChildrenNumMap[gang.Name] = gang.TotalChildrenNum + } + } + for k, v := range tt.wantCache { if !v.HasGangInit { continue @@ -999,7 +1038,9 @@ func TestGangCache_OnGangDelete(t *testing.T) { }, } gangId := util.GetId("default", "ganga") - cache.getGangFromCacheByGangId(gangId, true) + gangTmp := cache.getGangFromCacheByGangId(gangId, true) + gangTmp.GangGroupInfo = NewGangGroupInfo("", nil) + cache.onPodGroupDelete(podGroup) assert.Equal(t, 0, len(cache.gangItems)) @@ -1032,6 +1073,7 @@ func TestGangCache_OnGangDelete(t *testing.T) { MinRequiredNumber: 2, TotalChildrenNum: 2, GangGroup: []string{"default/gangA", "default/gangB"}, + GangGroupInfo: NewGangGroupInfo(util.GetGangGroupId([]string{"default/gangA", "default/gangB"}), []string{"default/gangA", "default/gangB"}), HasGangInit: true, GangFrom: GangFromPodAnnotation, GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, @@ -1072,11 +1114,12 @@ func TestGangCache_OnGangDelete(t *testing.T) { }, }, }, - ScheduleCycleValid: true, - ScheduleCycle: 1, - OnceResourceSatisfied: true, - ChildrenScheduleRoundMap: map[string]int{}, + OnceResourceSatisfied: true, } + + wantedGang.GangGroupInfo.GangTotalChildrenNumMap[wantedGang.Name] = wantedGang.TotalChildrenNum + wantedGang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"] = wantedGang.GangGroupInfo.LastScheduleTime + cacheGang := cache.getGangFromCacheByGangId("default/gangb", false) wantedGang.GangGroupId = util.GetGangGroupId(wantedGang.GangGroup) assert.Equal(t, wantedGang, cacheGang) @@ -1117,3 +1160,202 @@ func TestGangCache_onPodGroupUpdate(t *testing.T) { gang = cache.getGangFromCacheByGangId(gangId, false) assert.Equal(t, gang.MinRequiredNumber, int(newPodGroup.Spec.MinMember)) } + +func TestGetGangGroupInfo_DeleteGangGroupInfo(t *testing.T) { + pgClient := fakepgclientset.NewSimpleClientset() + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + + pgInformerFactory := pgformers.NewSharedInformerFactory(pgClient, 0) + pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() + pglister := pgInformer.Lister() + cache := NewGangCache(&config.CoschedulingArgs{DefaultTimeout: metav1.Duration{Duration: time.Second}}, nil, pglister, pgClient) + + gangGroupInfo := cache.getGangGroupInfo("aa", []string{"aa"}, false) + assert.True(t, gangGroupInfo == nil) + assert.Equal(t, 0, len(cache.gangGroupInfoMap)) + + gangGroupInfo = cache.getGangGroupInfo("aa", []string{"aa"}, true) + assert.True(t, gangGroupInfo != nil) + assert.Equal(t, gangGroupInfo.GangGroupId, "aa") + assert.Equal(t, gangGroupInfo.GangGroup, []string{"aa"}) + assert.Equal(t, 1, len(cache.gangGroupInfoMap)) + + gangGroupInfo = cache.getGangGroupInfo("aa", []string{"aa"}, false) + assert.True(t, gangGroupInfo != nil) + assert.Equal(t, gangGroupInfo.GangGroupId, "aa") + assert.Equal(t, gangGroupInfo.GangGroup, []string{"aa"}) + assert.Equal(t, 1, len(cache.gangGroupInfoMap)) + + cache.deleteGangGroupInfo("aa") + gangGroupInfo = cache.getGangGroupInfo("aa", []string{"aa"}, false) + assert.True(t, gangGroupInfo == nil) + assert.Equal(t, 0, len(cache.gangGroupInfoMap)) +} + +func TestOnPodAdd_OnPodDeleteWithGangGroupInfo(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + + defaultArgs := getTestDefaultCoschedulingArgs(t) + + pods := []*corev1.Pod{ + // pod1 announce GangA + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "ganga", + extension.AnnotationGangMinNum: "2", + extension.AnnotationGangWaitTime: "30s", + extension.AnnotationGangMode: extension.GangModeNonStrict, + extension.AnnotationGangGroups: "[\"default/ganga\",\"default/gangb\"]", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "nba", + }, + }, + // pod2 also announce GangA but with different annotations after pod1's announcing + // so gangA in cache should only be created with pod1's Annotations + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{ + extension.AnnotationGangName: "ganga", + extension.AnnotationGangMinNum: "7", + extension.AnnotationGangWaitTime: "3000s", + extension.AnnotationGangGroups: "[\"default/gangc\",\"default/gangd\"]", + }, + }, + }, + } + + pgClientSet := fakepgclientset.NewSimpleClientset() + pgInformerFactory := pgformers.NewSharedInformerFactory(pgClientSet, 0) + pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() + pglister := pgInformer.Lister() + + gangCache := NewGangCache(defaultArgs, nil, pglister, pgClientSet) + for _, pod := range pods { + gangCache.onPodAdd(pod) + } + + gangName := util.GetGangNameByPod(pods[0]) + gangNamespace := pods[0].Namespace + gangId := util.GetId(gangNamespace, gangName) + gang := gangCache.getGangFromCacheByGangId(gangId, false) + + assert.Equal(t, 1, len(gangCache.gangGroupInfoMap)) + assert.Equal(t, util.GetGangGroupId(gang.GangGroup), gang.GangGroupInfo.GangGroupId) + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + + gangCache.onPodDelete(pods[0]) + assert.Equal(t, 1, len(gangCache.gangGroupInfoMap)) + assert.Equal(t, util.GetGangGroupId(gang.GangGroup), gang.GangGroupInfo.GangGroupId) + assert.Equal(t, 1, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + + gangCache.onPodDelete(pods[1]) + assert.Equal(t, 0, len(gangCache.gangGroupInfoMap)) +} + +func TestOnPgAdd_OnPgDeleteWithGangGroupInfo(t *testing.T) { + preTimeNowFn := timeNowFn + defer func() { + timeNowFn = preTimeNowFn + }() + timeNowFn = fakeTimeNowFn + + defaultArgs := getTestDefaultCoschedulingArgs(t) + + pgs := []*v1alpha1.PodGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "ganga", + Annotations: map[string]string{ + extension.AnnotationGangMode: extension.GangModeNonStrict, + }, + }, + Spec: v1alpha1.PodGroupSpec{ + MinMember: 2, + }, + }, + } + + pods := []*corev1.Pod{ + // pod1 announce GangA + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{}, + Labels: map[string]string{ + v1alpha1.PodGroupLabel: "ganga", + }, + }, + Spec: corev1.PodSpec{ + NodeName: "nba", + }, + }, + // pod2 also announce GangA but with different annotations after pod1's announcing + // so gangA in cache should only be created with pod1's Annotations + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{}, + Labels: map[string]string{ + v1alpha1.PodGroupLabel: "ganga", + }, + }, + }, + } + + pgClientSet := fakepgclientset.NewSimpleClientset() + pgInformerFactory := pgformers.NewSharedInformerFactory(pgClientSet, 0) + pgInformer := pgInformerFactory.Scheduling().V1alpha1().PodGroups() + pglister := pgInformer.Lister() + + gangCache := NewGangCache(defaultArgs, nil, pglister, pgClientSet) + + gangCache.onPodAdd(pods[0]) + + gangName := util.GetGangNameByPod(pods[0]) + gangNamespace := pods[0].Namespace + gangId := util.GetId(gangNamespace, gangName) + gang := gangCache.getGangFromCacheByGangId(gangId, false) + + assert.Equal(t, 0, len(gangCache.gangGroupInfoMap)) + + gangCache.onPodGroupAdd(pgs[0]) + assert.Equal(t, 1, len(gangCache.gangGroupInfoMap)) + assert.Equal(t, util.GetGangGroupId(gang.GangGroup), gang.GangGroupInfo.GangGroupId) + assert.Equal(t, 1, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + + gangCache.onPodAdd(pods[1]) + assert.Equal(t, 1, len(gangCache.gangGroupInfoMap)) + assert.Equal(t, util.GetGangGroupId(gang.GangGroup), gang.GangGroupInfo.GangGroupId) + assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + + gangCache.onPodDelete(pods[0]) + assert.Equal(t, 1, len(gangCache.gangGroupInfoMap)) + assert.Equal(t, util.GetGangGroupId(gang.GangGroup), gang.GangGroupInfo.GangGroupId) + assert.Equal(t, 1, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + + gangCache.onPodDelete(pods[1]) + assert.Equal(t, 1, len(gangCache.gangGroupInfoMap)) + assert.Equal(t, util.GetGangGroupId(gang.GangGroup), gang.GangGroupInfo.GangGroupId) + assert.Equal(t, 0, len(gang.GangGroupInfo.ChildrenLastScheduleTime)) + + gangCache.onPodGroupDelete(pgs[0]) + assert.Equal(t, 0, len(gangCache.gangGroupInfoMap)) +} diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_summary.go b/pkg/scheduler/plugins/coscheduling/core/gang_summary.go index 4a08975ee..412309409 100644 --- a/pkg/scheduler/plugins/coscheduling/core/gang_summary.go +++ b/pkg/scheduler/plugins/coscheduling/core/gang_summary.go @@ -7,32 +7,28 @@ import ( ) type GangSummary struct { - Name string `json:"name"` - WaitTime time.Duration `json:"waitTime"` - CreateTime time.Time `json:"createTime"` - Mode string `json:"mode"` - GangMatchPolicy string `json:"gangMatchPolicy"` - MinRequiredNumber int `json:"minRequiredNumber"` - TotalChildrenNum int `json:"totalChildrenNum"` - GangGroup []string `json:"gangGroup"` - Children sets.String `json:"children"` - WaitingForBindChildren sets.String `json:"waitingForBindChildren"` - BoundChildren sets.String `json:"boundChildren"` - OnceResourceSatisfied bool `json:"onceResourceSatisfied"` - ScheduleCycleValid bool `json:"scheduleCycleValid"` - ScheduleCycle int `json:"scheduleCycle"` - LastScheduleTime time.Time `json:"lastScheduleTime"` - ChildrenScheduleRoundMap map[string]int `json:"childrenScheduleRoundMap"` - GangFrom string `json:"gangFrom"` - HasGangInit bool `json:"hasGangInit"` + Name string `json:"name"` + WaitTime time.Duration `json:"waitTime"` + CreateTime time.Time `json:"createTime"` + Mode string `json:"mode"` + GangMatchPolicy string `json:"gangMatchPolicy"` + MinRequiredNumber int `json:"minRequiredNumber"` + TotalChildrenNum int `json:"totalChildrenNum"` + GangGroup []string `json:"gangGroup"` + Children sets.String `json:"children"` + WaitingForBindChildren sets.String `json:"waitingForBindChildren"` + BoundChildren sets.String `json:"boundChildren"` + OnceResourceSatisfied bool `json:"onceResourceSatisfied"` + GangGroupInfo *GangGroupInfo `json:"gangGroupInfo"` + GangFrom string `json:"gangFrom"` + HasGangInit bool `json:"hasGangInit"` } func (gang *Gang) GetGangSummary() *GangSummary { gangSummary := &GangSummary{ - Children: sets.NewString(), - WaitingForBindChildren: sets.NewString(), - BoundChildren: sets.NewString(), - ChildrenScheduleRoundMap: make(map[string]int), + Children: sets.NewString(), + WaitingForBindChildren: sets.NewString(), + BoundChildren: sets.NewString(), } if gang == nil { @@ -50,9 +46,7 @@ func (gang *Gang) GetGangSummary() *GangSummary { gangSummary.MinRequiredNumber = gang.MinRequiredNumber gangSummary.TotalChildrenNum = gang.TotalChildrenNum gangSummary.OnceResourceSatisfied = gang.OnceResourceSatisfied - gangSummary.ScheduleCycleValid = gang.ScheduleCycleValid - gangSummary.ScheduleCycle = gang.ScheduleCycle - gangSummary.LastScheduleTime = gang.LastScheduleTime + gangSummary.GangGroupInfo = gang.GangGroupInfo gangSummary.GangFrom = gang.GangFrom gangSummary.HasGangInit = gang.HasGangInit gangSummary.GangGroup = append(gangSummary.GangGroup, gang.GangGroup...) @@ -66,9 +60,6 @@ func (gang *Gang) GetGangSummary() *GangSummary { for podName := range gang.BoundChildren { gangSummary.BoundChildren.Insert(podName) } - for key, value := range gang.ChildrenScheduleRoundMap { - gangSummary.ChildrenScheduleRoundMap[key] = value - } return gangSummary } diff --git a/pkg/scheduler/plugins/coscheduling/core/gang_test.go b/pkg/scheduler/plugins/coscheduling/core/gang_test.go new file mode 100644 index 000000000..e6898c614 --- /dev/null +++ b/pkg/scheduler/plugins/coscheduling/core/gang_test.go @@ -0,0 +1,167 @@ +package core + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" +) + +func TestGangGroupInfo_SetGangGroupInfo(t *testing.T) { + gangGroupInfo := NewGangGroupInfo("aa_bb", []string{"aa", "bb"}) + assert.Equal(t, "aa_bb", gangGroupInfo.GangGroupId) + assert.Equal(t, 2, len(gangGroupInfo.GangGroup)) + assert.Equal(t, 1, gangGroupInfo.ScheduleCycle) + assert.Equal(t, true, gangGroupInfo.ScheduleCycleValid) + + assert.Equal(t, 2, len(gangGroupInfo.GangTotalChildrenNumMap)) + assert.Equal(t, 0, gangGroupInfo.GangTotalChildrenNumMap["aa"]) + assert.Equal(t, 0, gangGroupInfo.GangTotalChildrenNumMap["bb"]) + + assert.True(t, !gangGroupInfo.LastScheduleTime.IsZero()) + assert.Equal(t, "aa_bb", gangGroupInfo.GangGroupId) + assert.Equal(t, 0, len(gangGroupInfo.ChildrenLastScheduleTime)) + + gang := &Gang{} + gang.Name = "aa" + gang.TotalChildrenNum = 2 + gang.SetGangGroupInfo(gangGroupInfo) + assert.Equal(t, gang.GangGroupInfo.GangTotalChildrenNumMap["aa"], 2) +} + +func TestDeletePod(t *testing.T) { + gangGroupInfo := NewGangGroupInfo("aa_bb", []string{"aa", "bb"}) + gangGroupInfo.ChildrenScheduleRoundMap["test/pod1"] = 1 + gangGroupInfo.ChildrenLastScheduleTime["test/pod1"] = time.Now() + + gang := &Gang{} + gang.Name = "aa" + gang.TotalChildrenNum = 2 + gang.SetGangGroupInfo(gangGroupInfo) + + pod := &corev1.Pod{} + pod.Namespace = "test" + pod.Name = "pod1" + + assert.Equal(t, 1, len(gangGroupInfo.ChildrenScheduleRoundMap)) + assert.Equal(t, 1, len(gangGroupInfo.ChildrenLastScheduleTime)) + gang.deletePod(pod) + assert.Equal(t, 0, len(gangGroupInfo.ChildrenScheduleRoundMap)) + assert.Equal(t, 0, len(gangGroupInfo.ChildrenLastScheduleTime)) +} + +func TestIsScheduleCycleValid_GetScheduleCycle_GetChildScheduleCycle_SetChildScheduleCycle(t *testing.T) { + gangGroupInfo := NewGangGroupInfo("aa_bb", []string{"aa", "bb"}) + gangGroupInfo.ChildrenScheduleRoundMap["test/pod1"] = 1 + gangGroupInfo.ScheduleCycle = 2 + + gang := &Gang{} + gang.SetGangGroupInfo(gangGroupInfo) + + pod := &corev1.Pod{} + pod.Namespace = "test" + pod.Name = "pod1" + assert.Equal(t, 1, gang.getChildScheduleCycle(pod)) + assert.Equal(t, 2, gang.getScheduleCycle()) + + assert.Equal(t, true, gang.isScheduleCycleValid()) + gangGroupInfo.ScheduleCycleValid = false + assert.Equal(t, false, gang.isScheduleCycleValid()) + + gang.setChildScheduleCycle(pod, 33) + assert.Equal(t, 33, gang.getChildScheduleCycle(pod)) + assert.Equal(t, 33, gang.GangGroupInfo.ChildrenScheduleRoundMap["test/pod1"]) +} + +func TestInitPodLastScheduleTime_GetPodLastScheduleTime_ResetPodLastScheduleTime(t *testing.T) { + gangGroupInfo := NewGangGroupInfo("aa_bb", []string{"aa", "bb"}) + gangGroupInfo.LastScheduleTime = time.Now() + + gang := &Gang{} + gang.Children = make(map[string]*corev1.Pod) + gang.SetGangGroupInfo(gangGroupInfo) + + pod1 := &corev1.Pod{} + pod1.Namespace = "test" + pod1.Name = "pod1" + gang.initPodLastScheduleTime(pod1) + assert.Equal(t, gang.GangGroupInfo.LastScheduleTime, gang.getPodLastScheduleTime(pod1)) + + pod2 := &corev1.Pod{} + pod2.Namespace = "test" + pod2.Name = "pod2" + gang.initPodLastScheduleTime(pod2) + assert.Equal(t, gang.GangGroupInfo.LastScheduleTime, gang.getPodLastScheduleTime(pod2)) + + lastScheduleTime1 := gangGroupInfo.LastScheduleTime + + gang.resetPodLastScheduleTime(pod1) + lastScheduleTime2 := gangGroupInfo.LastScheduleTime + assert.NotEqual(t, lastScheduleTime1, lastScheduleTime2) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gang.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime1, gang.getPodLastScheduleTime(pod2)) + + gang.resetPodLastScheduleTime(pod1) + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gang.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime1, gang.getPodLastScheduleTime(pod2)) + + gang.Children[pod2.Name] = pod2 + gang.initAllChildrenPodLastScheduleTime() + assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gang.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime2, gang.getPodLastScheduleTime(pod2)) + + gang.resetPodLastScheduleTime(pod2) + lastScheduleTime3 := gangGroupInfo.LastScheduleTime + assert.NotEqual(t, lastScheduleTime2, lastScheduleTime3) + assert.Equal(t, lastScheduleTime3, gang.GangGroupInfo.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gang.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime3, gang.getPodLastScheduleTime(pod2)) +} + +func TestScheduleCycleRelated(t *testing.T) { + gangGroupInfo := NewGangGroupInfo("aa_bb", []string{"aa", "bb"}) + gangGroupInfo.LastScheduleTime = time.Now() + + gang := &Gang{} + gang.Name = "aa" + gang.SetGangGroupInfo(gangGroupInfo) + gangGroupInfo.GangTotalChildrenNumMap["aa"] = 1 + gangGroupInfo.GangTotalChildrenNumMap["bb"] = 1 + + pod1 := &corev1.Pod{} + pod1.Namespace = "test" + pod1.Name = "pod1" + + pod2 := &corev1.Pod{} + pod2.Namespace = "test" + pod2.Name = "pod2" + + gang.setScheduleCycleInvalid() + assert.Equal(t, false, gangGroupInfo.ScheduleCycleValid) + assert.Equal(t, false, gang.isScheduleCycleValid()) + + assert.Equal(t, 0, gang.GangGroupInfo.ChildrenScheduleRoundMap["test/pod1"]) + gang.setChildScheduleCycle(pod1, 1) + assert.Equal(t, 1, gang.getChildScheduleCycle(pod1)) + assert.Equal(t, 1, gang.GangGroupInfo.ChildrenScheduleRoundMap["test/pod1"]) + + assert.Equal(t, 1, gang.getScheduleCycle()) + assert.Equal(t, 1, gang.GangGroupInfo.ScheduleCycle) + + gang.trySetScheduleCycleValid() + assert.Equal(t, false, gang.isScheduleCycleValid()) + assert.Equal(t, 1, gang.getScheduleCycle()) + assert.Equal(t, 1, gang.getChildScheduleCycle(pod1)) + assert.Equal(t, 0, gang.getChildScheduleCycle(pod2)) + + gang.setChildScheduleCycle(pod2, 1) + gang.trySetScheduleCycleValid() + assert.Equal(t, true, gang.isScheduleCycleValid()) + assert.Equal(t, 2, gang.getScheduleCycle()) + assert.Equal(t, 1, gang.getChildScheduleCycle(pod1)) + assert.Equal(t, 1, gang.getChildScheduleCycle(pod2)) +} diff --git a/pkg/scheduler/plugins/coscheduling/core/ganggroup.go b/pkg/scheduler/plugins/coscheduling/core/ganggroup.go new file mode 100644 index 000000000..514e08ca3 --- /dev/null +++ b/pkg/scheduler/plugins/coscheduling/core/ganggroup.go @@ -0,0 +1,173 @@ +package core + +import ( + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util" +) + +type GangGroupInfo struct { + lock sync.Mutex + GangGroupId string + GangGroup []string + + // these fields used to count the cycle + // For example, at the beginning, `scheduleCycle` is 1, and each pod's cycle in `childrenScheduleRoundMap` is 0. When each pod comes to PreFilter, + // we will check if the pod's value in `childrenScheduleRoundMap` is smaller than Gang's `scheduleCycle`, If result is positive, + // we set the pod's cycle in `childrenScheduleRoundMap` equal with `scheduleCycle` and pass the check. If result is negative, means + // the pod has been scheduled in this cycle, so we should reject it. With `totalChildrenNum`'s help, when the last pod comes to make all + // `childrenScheduleRoundMap`'s values equal to `scheduleCycle`, Gang's `scheduleCycle` will be added by 1, which means a new schedule cycle. + ScheduleCycle int + ScheduleCycleValid bool + GangTotalChildrenNumMap map[string]int + ChildrenScheduleRoundMap map[string]int + + LastScheduleTime time.Time + ChildrenLastScheduleTime map[string]time.Time +} + +func NewGangGroupInfo(gangGroupId string, gangGroup []string) *GangGroupInfo { + gangGroupInfo := &GangGroupInfo{ + GangGroupId: gangGroupId, + GangGroup: gangGroup, + ScheduleCycle: 1, + ScheduleCycleValid: true, + GangTotalChildrenNumMap: make(map[string]int), + ChildrenScheduleRoundMap: make(map[string]int), + LastScheduleTime: timeNowFn(), + ChildrenLastScheduleTime: make(map[string]time.Time), + } + + for _, gang := range gangGroup { + gangGroupInfo.GangTotalChildrenNumMap[gang] = 0 + } + + return gangGroupInfo +} + +func (gg *GangGroupInfo) GetScheduleCycle() int { + gg.lock.Lock() + defer gg.lock.Unlock() + + return gg.ScheduleCycle +} + +func (gg *GangGroupInfo) setScheduleCycleInvalid() { + gg.lock.Lock() + defer gg.lock.Unlock() + + if gg.ScheduleCycleValid { + gg.ScheduleCycleValid = false + klog.Infof("setScheduleCycleInvalid, gangGroupName: %v, valid: %v", gg.GangGroupId) + } +} + +func (gg *GangGroupInfo) IsScheduleCycleValid() bool { + gg.lock.Lock() + defer gg.lock.Unlock() + + return gg.ScheduleCycleValid +} + +func (gg *GangGroupInfo) trySetScheduleCycleValid() { + gg.lock.Lock() + defer gg.lock.Unlock() + + num := 0 + for _, childScheduleCycle := range gg.ChildrenScheduleRoundMap { + if childScheduleCycle == gg.ScheduleCycle { + num++ + } + } + + totalChildrenNum := 0 + for _, gangTotalNum := range gg.GangTotalChildrenNumMap { + totalChildrenNum += gangTotalNum + } + + if num == totalChildrenNum { + gg.ScheduleCycle += 1 + gg.ScheduleCycleValid = true + + klog.Infof("trySetScheduleCycleTrue, gangGroupName: %v, ScheduleCycle: %v, ScheduleCycleValid: %v", + gg.GangGroupId, gg.ScheduleCycle, gg.ScheduleCycleValid) + } +} + +func (gg *GangGroupInfo) setChildScheduleCycle(pod *corev1.Pod, childCycle int) { + gg.lock.Lock() + defer gg.lock.Unlock() + + podId := util.GetId(pod.Namespace, pod.Name) + gg.ChildrenScheduleRoundMap[podId] = childCycle + klog.Infof("setChildScheduleCycle, pod: %v, childCycle: %v", podId, childCycle) +} + +func (gg *GangGroupInfo) getChildScheduleCycle(pod *corev1.Pod) int { + gg.lock.Lock() + defer gg.lock.Unlock() + + podId := util.GetId(pod.Namespace, pod.Name) + return gg.ChildrenScheduleRoundMap[podId] +} + +func (gg *GangGroupInfo) deleteChildScheduleCycle(podId string) { + gg.lock.Lock() + defer gg.lock.Unlock() + + delete(gg.ChildrenScheduleRoundMap, podId) +} + +func (gg *GangGroupInfo) SetGangTotalChildrenNum(gangName string, totalChildrenNum int) { + gg.lock.Lock() + defer gg.lock.Unlock() + + gg.GangTotalChildrenNumMap[gangName] = totalChildrenNum +} + +func (gg *GangGroupInfo) initPodLastScheduleTime(pod *corev1.Pod) { + gg.lock.Lock() + defer gg.lock.Unlock() + + podId := util.GetId(pod.Namespace, pod.Name) + gg.ChildrenLastScheduleTime[podId] = gg.LastScheduleTime +} + +func (gg *GangGroupInfo) getPodLastScheduleTime(pod *corev1.Pod) time.Time { + gg.lock.Lock() + defer gg.lock.Unlock() + + podId := util.GetId(pod.Namespace, pod.Name) + return gg.ChildrenLastScheduleTime[podId] +} + +func (gg *GangGroupInfo) deletePodLastScheduleTime(podId string) { + gg.lock.Lock() + defer gg.lock.Unlock() + + delete(gg.ChildrenLastScheduleTime, podId) +} + +func (gg *GangGroupInfo) resetPodLastScheduleTime(pod *corev1.Pod) { + gg.lock.Lock() + defer gg.lock.Unlock() + + num := 0 + for _, childLastScheduleTime := range gg.ChildrenLastScheduleTime { + if childLastScheduleTime.Equal(gg.LastScheduleTime) { + num++ + } + } + + if num == len(gg.ChildrenLastScheduleTime) { + gg.LastScheduleTime = time.Now() + klog.Infof("try resetGangGroupLastScheduleTime, gangGroupName: %v, time:%v", gg.GangGroupId, gg.LastScheduleTime) + } + + podId := util.GetId(pod.Namespace, pod.Name) + gg.ChildrenLastScheduleTime[podId] = gg.LastScheduleTime +} diff --git a/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go b/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go new file mode 100644 index 000000000..8858b1fa2 --- /dev/null +++ b/pkg/scheduler/plugins/coscheduling/core/ganggroup_test.go @@ -0,0 +1,102 @@ +package core + +import ( + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" +) + +func TestGangGroupInfo(t *testing.T) { + { + gg := NewGangGroupInfo("aa", []string{"aa"}) + assert.Equal(t, 1, gg.ScheduleCycle) + assert.Equal(t, true, gg.ScheduleCycleValid) + assert.True(t, !gg.LastScheduleTime.IsZero()) + } + { + gg := NewGangGroupInfo("aa", []string{"aa"}) + gg.setScheduleCycleInvalid() + + gg.SetGangTotalChildrenNum("aa", 1) + gg.SetGangTotalChildrenNum("bb", 1) + + pod1 := &corev1.Pod{} + pod1.Namespace = "test" + pod1.Name = "pod1" + + pod2 := &corev1.Pod{} + pod2.Namespace = "test" + pod2.Name = "pod2" + + assert.Equal(t, 0, gg.ChildrenScheduleRoundMap["test/pod1"]) + gg.setChildScheduleCycle(pod1, 1) + assert.Equal(t, 1, gg.getChildScheduleCycle(pod1)) + assert.Equal(t, 1, gg.ChildrenScheduleRoundMap["test/pod1"]) + + assert.Equal(t, 1, gg.GetScheduleCycle()) + assert.Equal(t, 1, gg.ScheduleCycle) + + gg.trySetScheduleCycleValid() + assert.Equal(t, false, gg.IsScheduleCycleValid()) + assert.Equal(t, 1, gg.GetScheduleCycle()) + assert.Equal(t, 1, gg.getChildScheduleCycle(pod1)) + assert.Equal(t, 0, gg.getChildScheduleCycle(pod2)) + + gg.setChildScheduleCycle(pod2, 1) + gg.trySetScheduleCycleValid() + assert.Equal(t, true, gg.IsScheduleCycleValid()) + assert.Equal(t, 2, gg.GetScheduleCycle()) + assert.Equal(t, 1, gg.getChildScheduleCycle(pod1)) + assert.Equal(t, 1, gg.getChildScheduleCycle(pod2)) + + assert.Equal(t, 2, len(gg.ChildrenScheduleRoundMap)) + gg.deleteChildScheduleCycle("test/pod1") + assert.Equal(t, 1, len(gg.ChildrenScheduleRoundMap)) + } + { + gg := NewGangGroupInfo("aa", []string{"aa"}) + + pod1 := &corev1.Pod{} + pod1.Namespace = "test" + pod1.Name = "pod1" + gg.initPodLastScheduleTime(pod1) + assert.Equal(t, gg.LastScheduleTime, gg.getPodLastScheduleTime(pod1)) + + pod2 := &corev1.Pod{} + pod2.Namespace = "test" + pod2.Name = "pod2" + gg.initPodLastScheduleTime(pod2) + assert.Equal(t, gg.LastScheduleTime, gg.getPodLastScheduleTime(pod2)) + + lastScheduleTime1 := gg.LastScheduleTime + + gg.resetPodLastScheduleTime(pod1) + lastScheduleTime2 := gg.LastScheduleTime + assert.NotEqual(t, lastScheduleTime1, lastScheduleTime2) + assert.Equal(t, lastScheduleTime2, gg.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gg.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime1, gg.getPodLastScheduleTime(pod2)) + + gg.resetPodLastScheduleTime(pod1) + assert.Equal(t, lastScheduleTime2, gg.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gg.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime1, gg.getPodLastScheduleTime(pod2)) + + gg.resetPodLastScheduleTime(pod2) + assert.Equal(t, lastScheduleTime2, gg.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gg.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime2, gg.getPodLastScheduleTime(pod2)) + + gg.resetPodLastScheduleTime(pod2) + lastScheduleTime3 := gg.LastScheduleTime + assert.NotEqual(t, lastScheduleTime2, lastScheduleTime3) + assert.Equal(t, lastScheduleTime3, gg.LastScheduleTime) + assert.Equal(t, lastScheduleTime2, gg.getPodLastScheduleTime(pod1)) + assert.Equal(t, lastScheduleTime3, gg.getPodLastScheduleTime(pod2)) + + assert.Equal(t, 2, len(gg.ChildrenLastScheduleTime)) + gg.deletePodLastScheduleTime("test/pod1") + assert.Equal(t, 1, len(gg.ChildrenLastScheduleTime)) + } +} diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling.go b/pkg/scheduler/plugins/coscheduling/coscheduling.go index 2ae5d5a65..56b978b5d 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling.go @@ -134,24 +134,31 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { return subPrio1 > subPrio2 } - lastScheduleTime1 := cs.pgMgr.GetGangGroupLastScheduleTimeOfPod(podInfo1.Pod, podInfo1.Timestamp) - lastScheduleTime2 := cs.pgMgr.GetGangGroupLastScheduleTimeOfPod(podInfo2.Pod, podInfo2.Timestamp) + lastScheduleTime1 := cs.pgMgr.GetLastScheduleTime(podInfo1.Pod, podInfo1.Timestamp) + lastScheduleTime2 := cs.pgMgr.GetLastScheduleTime(podInfo2.Pod, podInfo2.Timestamp) if !lastScheduleTime1.Equal(lastScheduleTime2) { return lastScheduleTime1.Before(lastScheduleTime2) } - gangId1 := util.GetId(podInfo1.Pod.Namespace, util.GetGangNameByPod(podInfo1.Pod)) - gangId2 := util.GetId(podInfo2.Pod.Namespace, util.GetGangNameByPod(podInfo2.Pod)) - if gangId1 != gangId2 { - return gangId1 < gangId2 + gangGroup1, _ := cs.pgMgr.GetGangGroupId(podInfo1.Pod) + gangGroup2, _ := cs.pgMgr.GetGangGroupId(podInfo2.Pod) + if gangGroup1 != gangGroup2 { + return gangGroup1 < gangGroup2 } - // for member pod of same gang, the pod with the smaller scheduling cycle take precedence so that gang scheduling cycle can be valid and iterated + + gang1 := util.GetId(podInfo1.Pod.Namespace, util.GetGangNameByPod(podInfo1.Pod)) + gang2 := util.GetId(podInfo2.Pod.Namespace, util.GetGangNameByPod(podInfo2.Pod)) + if gang1 != gang2 { + return gang1 < gang2 + } + childScheduleCycle1 := cs.pgMgr.GetChildScheduleCycle(podInfo1.Pod) childScheduleCycle2 := cs.pgMgr.GetChildScheduleCycle(podInfo2.Pod) if childScheduleCycle1 != childScheduleCycle2 { return childScheduleCycle1 < childScheduleCycle2 } - return podInfo1.Timestamp.Before(podInfo2.Timestamp) + + return podInfo1.Pod.Name < podInfo2.Pod.Name } // PreFilter @@ -161,10 +168,14 @@ func (cs *Coscheduling) Less(podInfo1, podInfo2 *framework.QueuedPodInfo) bool { // iii.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative. // iv.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above. func (cs *Coscheduling) PreFilter(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (*framework.PreFilterResult, *framework.Status) { - // If PreFilter fails, return framework.UnschedulableAndUnresolvable to avoid any preemption attempts. - // If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling. - if err, scheduleCycleInvalid := cs.pgMgr.PreFilter(ctx, pod); err != nil { - state.Write(stateKey, &stateData{skipPostFilter: scheduleCycleInvalid}) + // If PreFilter fails, return framework.Error to avoid + // any preemption attempts. + if err := cs.pgMgr.PreFilter(ctx, pod); err != nil { + // If Prefilter failed due to scheduleCycle invalid, we shouldn't reject it's assumed sibling. + if _, ok := err.(*core.ScheduleCycleInValidError); ok { + state.Write(stateKey, &stateData{skipPostFilter: true}) + } + klog.ErrorS(err, "PreFilter failed", "pod", klog.KObj(pod)) return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) } diff --git a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go index d4710b1f9..5440d818b 100644 --- a/pkg/scheduler/plugins/coscheduling/coscheduling_test.go +++ b/pkg/scheduler/plugins/coscheduling/coscheduling_test.go @@ -27,34 +27,28 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/events" - "k8s.io/kube-scheduler/config/v1beta3" - "k8s.io/kubernetes/pkg/scheduler" - configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" - "k8s.io/kubernetes/pkg/scheduler/profile" - "k8s.io/utils/pointer" - - "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext/helper" - "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/core" - "github.com/stretchr/testify/assert" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apiruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" kubefake "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/util/retry" + "k8s.io/client-go/tools/events" "k8s.io/klog/v2" + "k8s.io/kube-scheduler/config/v1beta3" + "k8s.io/kubernetes/pkg/scheduler" scheduledconfig "k8s.io/kubernetes/pkg/scheduler/apis/config" + configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/runtime" frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime" + "k8s.io/kubernetes/pkg/scheduler/profile" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" st "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/utils/pointer" "sigs.k8s.io/scheduler-plugins/pkg/apis/scheduling/v1alpha1" pgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned" fakepgclientset "sigs.k8s.io/scheduler-plugins/pkg/generated/clientset/versioned/fake" @@ -65,6 +59,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config/v1beta2" "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" + "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/core" "github.com/koordinator-sh/koordinator/pkg/scheduler/plugins/coscheduling/util" ) @@ -150,7 +145,6 @@ func makePg(name, namespace string, min int32, creationTime *time.Time, minResou } return pg } - func (f *testSharedLister) NodeInfos() framework.NodeInfoLister { return f } @@ -208,7 +202,6 @@ func newPluginTestSuit(t *testing.T, nodes []*corev1.Node, pgClientSet pgclients } informerFactory := informers.NewSharedInformerFactory(cs, 0) - informerFactory = helper.NewForceSyncSharedInformerFactory(informerFactory) snapshot := newTestSharedLister(nil, nodes) fh, err := schedulertesting.NewFramework( registeredPlugins, @@ -233,307 +226,312 @@ func (p *pluginTestSuit) start() { } func TestLess(t *testing.T) { - pgClientSet := fakepgclientset.NewSimpleClientset() - cs := kubefake.NewSimpleClientset() - suit := newPluginTestSuit(t, nil, pgClientSet, cs) - gp := suit.plugin.(*Coscheduling) - - var lowPriority, highPriority = int32(10), int32(100) - // koordinator priority announced in pod's Labels - var lowSubPriority, highSubPriority = "111", "222" - var gangA_ns, gangB_ns = "namespace1", "namespace2" - gangC_ns := "namespace3" - gangGroupNS := "namespace4" - now := time.Now() - earltTime := now.Add(1 * time.Second) - lateTime := now.Add(3 * time.Second) - - // we assume that there are tow gang: gangA and gangB - // gangA is announced by the pod's annotation,gangB is created by the podGroup - // so here we need to add two gangs to the cluster - - // GangA by Annotations - podToCreateGangA := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: gangA_ns, - Name: "pod1", - Annotations: map[string]string{ - extension.AnnotationGangName: "gangA", - extension.AnnotationGangMinNum: "2", + { + //pod priority + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + var lowPriority, highPriority = int32(10), int32(100) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", }, - }, - } - podToSatisfyGangC := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: gangC_ns, - Name: "podC", - Labels: map[string]string{ - "pod-group.scheduling.sigs.k8s.io/name": "gangC", + } + pod1.Spec.Priority = &lowPriority + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", }, - }, - Spec: corev1.PodSpec{ - NodeName: "fake-node", - }, - } - // GangB by PodGroup - gangBCreatTime := now.Add(5 * time.Second) - pg := makePg("gangB", gangB_ns, 2, &gangBCreatTime, nil) - // GangC by PodGroup - gangCCreatTime := now.Add(5 * time.Second) - pg2 := makePg("gangC", gangC_ns, 1, &gangCCreatTime, nil) - // GangD by PodGroup - pg3 := makePg("gangD", gangC_ns, 1, &gangCCreatTime, nil) - gangGroup := []string{"default/gangD", "default/gangE"} - rawGangGroup, err := json.Marshal(gangGroup) - assert.NoError(t, err) - pg4 := makePg("gang4", gangGroupNS, 0, nil, nil) - pg5 := makePg("gang5", gangGroupNS, 0, nil, nil) - pg4.Annotations = map[string]string{extension.AnnotationGangGroups: string(rawGangGroup)} - suit.start() - // create gangA and gangB - - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = suit.Handle.ClientSet().CoreV1().Pods(gangA_ns).Create(context.TODO(), podToCreateGangA, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry podClient create pod err: %v", err) - } - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = pgClientSet.SchedulingV1alpha1().PodGroups(gangB_ns).Create(context.TODO(), pg, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry pgClient create pg err: %v", err) - } - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = pgClientSet.SchedulingV1alpha1().PodGroups(gangC_ns).Create(context.TODO(), pg2, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry pgClient create pg err: %v", err) - } - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = pgClientSet.SchedulingV1alpha1().PodGroups(gangC_ns).Create(context.TODO(), pg3, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry pgClient create pg err: %v", err) - } - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = pgClientSet.SchedulingV1alpha1().PodGroups(gangGroupNS).Create(context.TODO(), pg4, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry pgClient create pg err: %v", err) - } - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = pgClientSet.SchedulingV1alpha1().PodGroups(gangGroupNS).Create(context.TODO(), pg5, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry pgClient create pg err: %v", err) - } - err = retry.OnError( - retry.DefaultRetry, - errors.IsTooManyRequests, - func() error { - var err error - _, err = suit.Handle.ClientSet().CoreV1().Pods(gangC_ns).Create(context.TODO(), podToSatisfyGangC, metav1.CreateOptions{}) - return err - }) - if err != nil { - t.Errorf("retry podClient create pod err: %v", err) + } + pod2.Spec.Priority = &highPriority + + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 + + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + + assert.Equal(t, false, gp.Less(q1, q2)) + assert.Equal(t, true, gp.Less(q2, q1)) } - time.Sleep(100 * time.Millisecond) - for _, tt := range []struct { - name string - p1 *framework.QueuedPodInfo - p2 *framework.QueuedPodInfo - childScheduleCycle1 int - childScheduleCycle2 int - annotations map[string]string - expected bool - }{ - { - name: "p1.priority less than p2.priority,but p1's subPriority is greater than p2's", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(lowPriority).Label(extension.LabelPodPriority, highSubPriority).Obj()), - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - }, - expected: false, // p2 should be ahead of p1 in the queue - }, - { - name: "p1.priority greater than p2.priority, p2's subPriority is greater than p1's", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(lowPriority).Label(extension.LabelPodPriority, highSubPriority).Obj()), - }, - expected: true, // p1 should be ahead of p2 in the queue - }, - { - name: "equal priority. p1's subPriority is less than p2's subPriority", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(extension.LabelPodPriority, highSubPriority).Obj()), - }, - expected: false, // p2 should be ahead of p1 in the queue - }, - { - name: "equal priority. p1's subPriority is greater than p2's subPriority", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(highPriority).Label(extension.LabelPodPriority, highSubPriority).Obj()), - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - }, - expected: true, // p1 should be ahead of p2 in the queue - }, - { - name: "equal priority. p1's subPriority is illegal , p2's subPriority is greater than 0", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(highPriority).Label(extension.LabelPodPriority, "????").Obj()), - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - }, - expected: false, // p2 should be ahead of p1 in the queue - }, - { - name: "equal priority, but p1 is added to schedulingQ earlier than p2", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - Timestamp: earltTime, + { + //sub priority + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + var lowPriority = int32(10) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(extension.LabelPodPriority, lowSubPriority).Obj()), - Timestamp: lateTime, - }, - expected: true, // p1 should be ahead of p2 in the queue - }, - { - name: "p1.priority less than p2.priority, p1 belongs to gangB", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(lowPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod2").Priority(highPriority).Obj()), + } + pod1.Spec.Priority = &lowPriority + pod1.Labels = map[string]string{ + extension.LabelPodPriority: "111", + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", }, - expected: false, // p2 should be ahead of p1 in the queue - }, - { - name: "equal priority, p1 is added to schedulingQ earlier than p2", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(highPriority).Obj()), - Timestamp: earltTime, + } + pod2.Spec.Priority = &lowPriority + pod2.Labels = map[string]string{ + extension.LabelPodPriority: "222", + } + + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 + + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + + assert.Equal(t, false, gp.Less(q1, q2)) + assert.Equal(t, true, gp.Less(q2, q1)) + } + { + //two gang LastScheduleTime + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangA", + extension.AnnotationGangMinNum: "2", + }, }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod2").Priority(highPriority).Obj()), - Timestamp: lateTime, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, }, - expected: true, // p1 should be ahead of p2 in the queue - }, - { - name: "p1.priority less than p2.priority, p1 belongs to gangA and p2 belongs to gangB", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangA_ns).Name("pod1").Priority(lowPriority).Obj()), + } + + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 + + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + + assert.Equal(t, false, gp.Less(q1, q2)) + assert.Equal(t, true, gp.Less(q2, q1)) + } + { + //gang and normal pod LastScheduleTime + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangA", + extension.AnnotationGangMinNum: "2", + }, }, - annotations: map[string]string{extension.AnnotationGangName: "gangA"}, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{}, }, - expected: false, // p1 should be ahead of p2 in the queue - }, - { - name: "equal priority and creation time, both belongs to gangB, earlier lastScheduleTime pod take precedence", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - Timestamp: lateTime, + } + + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 + + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + q2.Timestamp = time.Now().Add(time.Second * 1000 * -1) + + assert.Equal(t, false, gp.Less(q1, q2)) + assert.Equal(t, true, gp.Less(q2, q1)) + } + { + //gangGroup, gangId diff + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + extension.AnnotationGangGroups: "[\"default/gangA\",\"default/gangB\"]", + }, }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - Timestamp: earltTime, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangA", + extension.AnnotationGangMinNum: "2", + extension.AnnotationGangGroups: "[\"default/gangA\",\"default/gangB\"]", + }, }, - expected: false, - }, - { - name: "equal priority and creation time, both belongs to gangB, childScheduleCycle not equal", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - InitialAttemptTimestamp: lateTime, + } + + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 + + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + + assert.Equal(t, false, gp.Less(q1, q2)) + assert.Equal(t, true, gp.Less(q2, q1)) + } + { + //same gang, child schedule cycle diff + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangB_ns).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gangB").Obj()), - InitialAttemptTimestamp: earltTime, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, }, - childScheduleCycle1: 2, - childScheduleCycle2: 1, - expected: false, // p1 should be ahead of p2 in the queue - }, - { - name: "equal priority, p1 belongs to different gangs of one gangGroup, sort by gangID", - p1: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangGroupNS).Name("pod1").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gang4").Obj()), - Timestamp: earltTime, + } + + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 + + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + + gang1 := gp.pgMgr.(*core.PodGroupManager).GetGangByPod(pod1) + gang1.GangGroupInfo.ChildrenScheduleRoundMap["default/pod1"] = 1 + + assert.Equal(t, false, gp.Less(q1, q2)) + assert.Equal(t, true, gp.Less(q2, q1)) + } + { + //same gang, child schedule cycle diff + pgClientSet := fakepgclientset.NewSimpleClientset() + cs := kubefake.NewSimpleClientset() + suit := newPluginTestSuit(t, nil, pgClientSet, cs) + gp := suit.plugin.(*Coscheduling) + + pod1 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod1", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, }, - p2: &framework.QueuedPodInfo{ - PodInfo: framework.NewPodInfo(st.MakePod().Namespace(gangGroupNS).Name("pod2").Priority(highPriority).Label(v1alpha1.PodGroupLabel, "gang5").Obj()), - Timestamp: earltTime, + } + + pod2 := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "pod2", + Annotations: map[string]string{ + extension.AnnotationGangName: "gangB", + extension.AnnotationGangMinNum: "2", + }, }, - expected: true, - }, - } { - t.Run(tt.name, func(t *testing.T) { + } - if len(tt.annotations) != 0 { - tt.p1.Pod.Annotations = tt.annotations - } + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod1, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) + suit.Handle.ClientSet().CoreV1().Pods("default").Create(context.TODO(), pod2, metav1.CreateOptions{}) + time.Sleep(time.Millisecond * 100) - gang1 := gp.pgMgr.(*core.PodGroupManager).GetGangByPod(tt.p1.Pod) - gang2 := gp.pgMgr.(*core.PodGroupManager).GetGangByPod(tt.p2.Pod) - if gang1 != nil { - gang1.ChildrenScheduleRoundMap[util.GetId(tt.p1.Pod.Namespace, tt.p1.Pod.Name)] = tt.childScheduleCycle1 - } - if gang2 != nil { - gang2.ChildrenScheduleRoundMap[util.GetId(tt.p2.Pod.Namespace, tt.p2.Pod.Name)] = tt.childScheduleCycle2 - } + q1 := &framework.QueuedPodInfo{} + q1.PodInfo = &framework.PodInfo{} + q1.Pod = pod1 - if got := gp.Less(tt.p1, tt.p2); got != tt.expected { - t.Errorf("expected %v, got %v", tt.expected, got) - } - }) - } + q2 := &framework.QueuedPodInfo{} + q2.PodInfo = &framework.PodInfo{} + q2.Pod = pod2 + assert.Equal(t, true, gp.Less(q1, q2)) + assert.Equal(t, false, gp.Less(q2, q1)) + } } func TestPostFilter(t *testing.T) { @@ -1043,7 +1041,7 @@ func TestFairness(t *testing.T) { waitingPod := 0 gangSummaries := suit.plugin.(*Coscheduling).pgMgr.GetGangSummaries() for _, gangSummary := range gangSummaries { - if !gangSummary.ScheduleCycleValid { + if !gangSummary.GangGroupInfo.IsScheduleCycleValid() { continue } waitingPod += gangSummary.WaitingForBindChildren.Len() @@ -1073,11 +1071,11 @@ func TestFairness(t *testing.T) { if len(gangSummary.WaitingForBindChildren) != 0 { nonZeroWaitingBoundGroup[strings.Join(gangSummary.GangGroup, ",")] = true } - if gangSummary.ScheduleCycle < minGangSchedulingCycle { - minGangSchedulingCycle = gangSummary.ScheduleCycle + if gangSummary.GangGroupInfo.GetScheduleCycle() < minGangSchedulingCycle { + minGangSchedulingCycle = gangSummary.GangGroupInfo.GetScheduleCycle() } - if gangSummary.ScheduleCycle > maxGangSchedulingCycle { - maxGangSchedulingCycle = gangSummary.ScheduleCycle + if gangSummary.GangGroupInfo.GetScheduleCycle() > maxGangSchedulingCycle { + maxGangSchedulingCycle = gangSummary.GangGroupInfo.GetScheduleCycle() } } assert.LessOrEqual(t, 3, minGangSchedulingCycle) @@ -1110,10 +1108,10 @@ func simulateScheduleOne(t *testing.T, ctx context.Context, sched *scheduler.Sch } summary, exists := suit.plugin.(*Coscheduling).pgMgr.GetGangSummary(util.GetId(pod.Namespace, util.GetGangNameByPod(pod))) if exists { - scheduleInfo.gangScheduleCycle = summary.ScheduleCycle - scheduleInfo.schedulingCycleValid = summary.ScheduleCycleValid + scheduleInfo.gangScheduleCycle = summary.GangGroupInfo.GetScheduleCycle() + scheduleInfo.schedulingCycleValid = summary.GangGroupInfo.IsScheduleCycleValid() scheduleInfo.hasWaitForBoundChildren = summary.WaitingForBindChildren.Len() != 0 - scheduleInfo.lastScheduleTime = suit.plugin.(*Coscheduling).pgMgr.GetGangGroupLastScheduleTimeOfPod(pod, time.Time{}) + scheduleInfo.lastScheduleTime = suit.plugin.(*Coscheduling).pgMgr.GetLastScheduleTime(pod, time.Time{}) } *scheduleOrder = append(*scheduleOrder, scheduleInfo) fwk := suit.Handle.(framework.Framework) @@ -1321,7 +1319,7 @@ func TestDeadLockFree(t *testing.T) { waitingPod := 0 gangSummaries := suit.plugin.(*Coscheduling).pgMgr.GetGangSummaries() for _, gangSummary := range gangSummaries { - if gangSummary.ScheduleCycleValid == false { + if gangSummary.GangGroupInfo.IsScheduleCycleValid() == false { continue } waitingPod += gangSummary.WaitingForBindChildren.Len() diff --git a/pkg/scheduler/plugins/coscheduling/plugin_service_test.go b/pkg/scheduler/plugins/coscheduling/plugin_service_test.go index 13a5c3ff7..3d38c8089 100644 --- a/pkg/scheduler/plugins/coscheduling/plugin_service_test.go +++ b/pkg/scheduler/plugins/coscheduling/plugin_service_test.go @@ -60,6 +60,7 @@ func newPluginTestSuitForGangAPI(t *testing.T, nodes []*corev1.Node) *pluginTest gangSchedulingArgs: &gangSchedulingArgs, } } + func TestEndpointsQueryGangInfo(t *testing.T) { suit := newPluginTestSuitForGangAPI(t, nil) podToCreateGangA := &corev1.Pod{ @@ -82,23 +83,20 @@ func TestEndpointsQueryGangInfo(t *testing.T) { suit.start() gp := p.(*Coscheduling) gangExpected := core.GangSummary{ - Name: "ganga_ns/ganga", - WaitTime: time.Second * 600, - CreateTime: podToCreateGangA.CreationTimestamp.Time, - GangGroup: []string{"ganga_ns/ganga"}, - Mode: extension.GangModeStrict, - MinRequiredNumber: 2, - TotalChildrenNum: 2, - Children: sets.NewString("ganga_ns/pod1"), - WaitingForBindChildren: sets.NewString(), - BoundChildren: sets.NewString(), - OnceResourceSatisfied: false, - ScheduleCycleValid: true, - ScheduleCycle: 1, - ChildrenScheduleRoundMap: map[string]int{}, - GangFrom: core.GangFromPodAnnotation, - GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, - HasGangInit: true, + Name: "ganga_ns/ganga", + WaitTime: time.Second * 600, + CreateTime: podToCreateGangA.CreationTimestamp.Time, + GangGroup: []string{"ganga_ns/ganga"}, + Mode: extension.GangModeStrict, + MinRequiredNumber: 2, + TotalChildrenNum: 2, + Children: sets.NewString("ganga_ns/pod1"), + WaitingForBindChildren: sets.NewString(), + BoundChildren: sets.NewString(), + OnceResourceSatisfied: false, + GangFrom: core.GangFromPodAnnotation, + GangMatchPolicy: extension.GangMatchPolicyOnceSatisfied, + HasGangInit: true, } { engine := gin.Default() @@ -110,7 +108,8 @@ func TestEndpointsQueryGangInfo(t *testing.T) { gangMarshal := &core.GangSummary{} err = json.NewDecoder(w.Result().Body).Decode(gangMarshal) assert.NoError(t, err) - gangMarshal.LastScheduleTime = gangExpected.LastScheduleTime + assert.True(t, gangMarshal.GangGroupInfo != nil) + gangMarshal.GangGroupInfo = nil assert.Equal(t, &gangExpected, gangMarshal) } { @@ -123,7 +122,8 @@ func TestEndpointsQueryGangInfo(t *testing.T) { gangMarshalMap := make(map[string]*core.GangSummary) err = json.Unmarshal([]byte(w.Body.String()), &gangMarshalMap) assert.NoError(t, err) - gangMarshalMap["ganga_ns/ganga"].LastScheduleTime = gangExpected.LastScheduleTime + assert.True(t, gangMarshalMap["ganga_ns/ganga"].GangGroupInfo != nil) + gangMarshalMap["ganga_ns/ganga"].GangGroupInfo = nil assert.Equal(t, &gangExpected, gangMarshalMap["ganga_ns/ganga"]) } }