Skip to content

Commit

Permalink
scheduler: refine coscheduling fairness consider gangGroup (#2004)
Browse files Browse the repository at this point in the history
Signed-off-by: xingbao.zy <[email protected]>
Co-authored-by: xingbao.zy <[email protected]>
  • Loading branch information
buptcozy and xingbao.zy authored Apr 18, 2024
1 parent c8bfd9e commit 211250b
Show file tree
Hide file tree
Showing 12 changed files with 1,441 additions and 668 deletions.
101 changes: 47 additions & 54 deletions pkg/scheduler/plugins/coscheduling/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ const (

// Manager defines the interfaces for PodGroup management.
type Manager interface {
PreFilter(context.Context, *corev1.Pod) (err error, scheduleCycleInvalid bool)
PreFilter(context.Context, *corev1.Pod) error
Permit(context.Context, *corev1.Pod) (time.Duration, Status)
PostBind(context.Context, *corev1.Pod, string)
PostFilter(context.Context, *corev1.Pod, framework.Handle, string, framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status)
GetCreatTime(*framework.QueuedPodInfo) time.Time
GetGroupId(*corev1.Pod) (string, error)
GetGangGroupId(*corev1.Pod) (string, error)
GetAllPodsFromGang(string) []*corev1.Pod
ActivateSiblings(*corev1.Pod, *framework.CycleState)
AllowGangGroup(*corev1.Pod, framework.Handle, string)
Expand All @@ -73,7 +73,7 @@ type Manager interface {
GetGangSummaries() map[string]*GangSummary
IsGangMinSatisfied(*corev1.Pod) bool
GetChildScheduleCycle(*corev1.Pod) int
GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time
GetLastScheduleTime(*corev1.Pod, time.Time) time.Time
}

// PodGroupManager defines the scheduling operation called
Expand Down Expand Up @@ -146,7 +146,7 @@ func (pgMgr *PodGroupManager) OnPodGroupDelete(obj interface{}) {
pgMgr.cache.onPodGroupDelete(obj)
}

func (pgMgr *PodGroupManager) GetGroupId(pod *corev1.Pod) (string, error) {
func (pgMgr *PodGroupManager) GetGangGroupId(pod *corev1.Pod) (string, error) {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return "", fmt.Errorf("gang doesn't exist in cache")
Expand All @@ -155,6 +155,15 @@ func (pgMgr *PodGroupManager) GetGroupId(pod *corev1.Pod) (string, error) {
return gang.GangGroupId, nil
}

func (pgMgr *PodGroupManager) GetLastScheduleTime(pod *corev1.Pod, defaultTime time.Time) time.Time {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return defaultTime
}

return gang.getPodLastScheduleTime(pod)
}

func (pgMgr *PodGroupManager) IsGangMinSatisfied(pod *corev1.Pod) bool {
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
Expand Down Expand Up @@ -212,43 +221,52 @@ func (pgMgr *PodGroupManager) ActivateSiblings(pod *corev1.Pod, state *framework
}
}

type ScheduleCycleInValidError struct {
ErrMsg string
}

func (err *ScheduleCycleInValidError) Error() string {
return err.ErrMsg
}

// PreFilter
// i.Check whether children in Gang has met the requirements of minimum number under each Gang, and reject the pod if negative.
// ii.Check whether the Gang is inited, and reject the pod if positive.
// iii.Check whether the Gang is OnceResourceSatisfied
// iv.Check whether the Gang has met the scheduleCycleValid check, and reject the pod if negative(only Strict mode ).
// v.Try update scheduleCycle, scheduleCycleValid, childrenScheduleRoundMap as mentioned above.
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (err error, scheduleCycleInvalid bool) {
func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) error {
if !util.IsPodNeedGang(pod) {
return nil, false
return nil
}
gang := pgMgr.GetGangByPod(pod)
if gang == nil {
return fmt.Errorf("can't find gang, gangName: %v, podName: %v", util.GetId(pod.Namespace, util.GetGangNameByPod(pod)),
util.GetId(pod.Namespace, pod.Name)), false
util.GetId(pod.Namespace, pod.Name))
}
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())

// check if gang is initialized
if !gang.HasGangInit {
return fmt.Errorf("gang has not init, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name)), false
util.GetId(pod.Namespace, pod.Name))
}
// resourceSatisfied means pod will directly pass the PreFilter
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return nil, false
return nil
}

// check minNum
if gang.getChildrenNum() < gang.getGangMinNum() {
return fmt.Errorf("gang child pod not collect enough, gangName: %v, podName: %v", gang.Name,
util.GetId(pod.Namespace, pod.Name)), false
util.GetId(pod.Namespace, pod.Name))
}

if pgMgr.args != nil && pgMgr.args.SkipCheckScheduleCycle {
return nil, false
return nil
}

gang.resetPodLastScheduleTime(pod)

// first try update the global cycle of gang
gang.trySetScheduleCycleValid()
gangScheduleCycle := gang.getScheduleCycle()
Expand All @@ -257,19 +275,21 @@ func (pgMgr *PodGroupManager) PreFilter(ctx context.Context, pod *corev1.Pod) (e
gangMode := gang.getGangMode()
if gangMode == extension.GangModeStrict {
if pod.Status.NominatedNodeName != "" {
return nil, false
return nil
}
podScheduleCycle := gang.getChildScheduleCycle(pod)
if !gang.isScheduleCycleValid() {
return fmt.Errorf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name)), true
err := &ScheduleCycleInValidError{}
err.ErrMsg = fmt.Sprintf("gang scheduleCycle not valid, gangName: %v, podName: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name))
return err
}
if podScheduleCycle >= gangScheduleCycle {
return fmt.Errorf("pod's schedule cycle too large, gangName: %v, podName: %v, podCycle: %v, gangCycle: %v",
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle), false
gang.Name, util.GetId(pod.Namespace, pod.Name), podScheduleCycle, gangScheduleCycle)
}
}
return nil, false
return nil
}

// PostFilter
Expand All @@ -285,9 +305,6 @@ func (pgMgr *PodGroupManager) PostFilter(ctx context.Context, pod *corev1.Pod, h
klog.Warningf(message)
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable, message)
}
defer func() {
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())
}()
if gang.getGangMatchPolicy() == extension.GangMatchPolicyOnceSatisfied && gang.isGangOnceResourceSatisfied() {
return &framework.PostFilterResult{}, framework.NewStatus(framework.Unschedulable)
}
Expand Down Expand Up @@ -354,7 +371,6 @@ func (pgMgr *PodGroupManager) Unreserve(ctx context.Context, state *framework.Cy
klog.Warningf("Pod %q missing Gang", klog.KObj(pod))
return
}
pgMgr.cache.setGangGroupLastScheduleTimeOfPod(pod.UID, gang.getLastScheduleTime())
// first delete the pod from gang's waitingFroBindChildren map
gang.delAssumedPod(pod)

Expand All @@ -376,34 +392,20 @@ func (pgMgr *PodGroupManager) rejectGangGroupById(handle framework.Handle, plugi
gangSet := sets.NewString(gangGroup...)

if handle != nil {
/*
let's explain why we need defer by following example, we have three gang of one gang group, noted as gangA, gangB and gangC,
the following is what happened when no defer.
1. gangB/C assumed, gangA failed
2. then gangA invoke PostFilter, reject gangB|gangC, gangB|gangC invoke unreserve asynchronously, i.e: invoke rejectGangGroupById concurrently
3. gangB or gangC maybe find it's scheduling cycle valid if it judges scheduling cycle before gang A reject it, then different lastScheduleTime is possible
*/
defer func() {
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
}
})
}()
}
if !gang.isScheduleCycleValid() {
// In a schedule cycle, one gang can only reject its self and sibling gang once
return
handle.IterateOverWaitingPods(func(waitingPod framework.WaitingPod) {
waitingGangId := util.GetId(waitingPod.GetPod().Namespace, util.GetGangNameByPod(waitingPod.GetPod()))
if gangSet.Has(waitingGangId) {
klog.V(1).InfoS("GangGroup gets rejected due to member Gang is unschedulable",
"gang", gangId, "waitingGang", waitingGangId, "waitingPod", klog.KObj(waitingPod.GetPod()))
waitingPod.Reject(pluginName, message)
}
})
}
gangGroupLastScheduleTime := timeNowFn()

for gang := range gangSet {
gangIns := pgMgr.cache.getGangFromCacheByGangId(gang, false)
if gangIns != nil {
gangIns.setScheduleCycleValid(false)
gangIns.setLastScheduleTime(gangGroupLastScheduleTime)
gangIns.setScheduleCycleInvalid()
}
}
}
Expand All @@ -420,7 +422,6 @@ func (pgMgr *PodGroupManager) PostBind(ctx context.Context, pod *corev1.Pod, nod
}
// first update gang in cache
gang.addBoundPod(pod)
pgMgr.cache.deleteGangGroupLastScheduleTimeOfPod(pod)

// update PodGroup
_, pg := pgMgr.GetPodGroup(pod)
Expand Down Expand Up @@ -569,11 +570,3 @@ func (pgMgr *PodGroupManager) GetChildScheduleCycle(pod *corev1.Pod) int {

return gang.getChildScheduleCycle(pod)
}

func (pgMgr *PodGroupManager) GetGangGroupLastScheduleTimeOfPod(pod *corev1.Pod, podLastScheduleTime time.Time) time.Time {
gangGroupLastScheduleTime := pgMgr.cache.getGangGroupLastScheduleTimeOfPod(pod.UID)
if gangGroupLastScheduleTime != nil {
return *gangGroupLastScheduleTime
}
return podLastScheduleTime
}
64 changes: 61 additions & 3 deletions pkg/scheduler/plugins/coscheduling/core/core_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,64 @@ func makePg(name, namespace string, min int32, creationTime *time.Time, minResou
return pg
}

func TestPlugin_PreFilter_ResetScheduleTime(t *testing.T) {
mgr := NewManagerForTest().pgMgr

pod1 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod1",
Annotations: map[string]string{
extension.AnnotationGangName: "gangB",
extension.AnnotationGangMinNum: "2",
},
},
}

pod2 := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod2",
Annotations: map[string]string{
extension.AnnotationGangName: "gangB",
extension.AnnotationGangMinNum: "2",
},
},
}
mgr.OnPodAdd(pod1)
mgr.OnPodAdd(pod2)

gang := mgr.GetGangByPod(pod1)
lastScheduleTime1 := gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod1)
lastScheduleTime2 := gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod1)
lastScheduleTime2 = gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime1, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod2)
lastScheduleTime2 = gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])

mgr.PreFilter(context.TODO(), pod2)
lastScheduleTime3 := gang.GangGroupInfo.LastScheduleTime
assert.Equal(t, 2, len(gang.GangGroupInfo.ChildrenLastScheduleTime))
assert.Equal(t, lastScheduleTime2, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod1"])
assert.Equal(t, lastScheduleTime3, gang.GangGroupInfo.ChildrenLastScheduleTime["default/pod2"])
}

func TestPlugin_PreFilter(t *testing.T) {
gangACreatedTime := time.Now()
mgr := NewManagerForTest().pgMgr
Expand Down Expand Up @@ -300,7 +358,7 @@ func TestPlugin_PreFilter(t *testing.T) {

// set pre cases before test pod run
if tt.shouldSetValidToFalse {
gang.setScheduleCycleValid(false)
gang.setScheduleCycleInvalid()
}
if tt.shouldSetCycleEqualWithGlobal {
gang.setChildScheduleCycle(tt.pod, 1)
Expand All @@ -315,7 +373,7 @@ func TestPlugin_PreFilter(t *testing.T) {
}()
}
// run the case
err, _ := mgr.PreFilter(ctx, tt.pod)
err := mgr.PreFilter(ctx, tt.pod)
var returnMessage string
if err == nil {
returnMessage = ""
Expand All @@ -327,7 +385,7 @@ func TestPlugin_PreFilter(t *testing.T) {
if gang != nil && !tt.isNonStrictMode && !tt.shouldSkipCheckScheduleCycle {
assert.Equal(t, tt.expectedScheduleCycle, gang.getScheduleCycle())
assert.Equal(t, tt.expectedScheduleCycleValid, gang.isScheduleCycleValid())
assert.Equal(t, tt.expectedChildCycleMap, gang.ChildrenScheduleRoundMap)
assert.Equal(t, tt.expectedChildCycleMap, gang.GangGroupInfo.ChildrenScheduleRoundMap)

assert.Equal(t, tt.expectedChildCycleMap[util.GetId(tt.pod.Namespace, tt.pod.Name)],
mgr.GetChildScheduleCycle(tt.pod))
Expand Down
Loading

0 comments on commit 211250b

Please sign in to comment.