Skip to content

Commit

Permalink
scheduler: revise ReservationFilterPlugin and fix preempting pods res…
Browse files Browse the repository at this point in the history
…ources (#2315)

Signed-off-by: saintube <[email protected]>
Co-authored-by: shenxin <[email protected]>
  • Loading branch information
saintube and shenxin authored Jan 10, 2025
1 parent 8d0e6ce commit 236bcc9
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 36 deletions.
18 changes: 16 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,23 @@ func (ext *frameworkExtenderImpl) RunReservationExtensionFinalRestoreReservation
}

// RunReservationFilterPlugins determines whether the Reservation can participate in the Reserve
func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
for _, pl := range ext.reservationFilterPlugins {
status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeName)
status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeInfo)
if !status.IsSuccess() {
if debugFilterFailure {
klog.Infof("Failed to FilterWithReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeInfo.Node().Name, pl.Name(), status.Message())
}
return status
}
}
return nil
}

// RunNominateReservationFilterPlugins determines whether the Reservation can participate in the Reserve.
func (ext *frameworkExtenderImpl) RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
for _, pl := range ext.reservationFilterPlugins {
status := pl.FilterNominateReservation(ctx, cycleState, pod, reservationInfo, nodeName)
if !status.IsSuccess() {
if debugFilterFailure {
klog.Infof("Failed to FilterReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeName, pl.Name(), status.Message())
Expand Down
97 changes: 94 additions & 3 deletions pkg/scheduler/frameworkext/framework_extender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,18 @@ type fakeReservationFilterPlugin struct {

func (f *fakeReservationFilterPlugin) Name() string { return "fakeReservationFilterPlugin" }

func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
if reservationInfo.Reservation.Annotations == nil {
reservationInfo.Reservation.Annotations = map[string]string{}
}
reservationInfo.Reservation.Annotations[fmt.Sprintf("reservationFilterWithPlugin-%d", f.index)] = fmt.Sprintf("%d", f.index)
if f.err != nil {
return framework.AsStatus(f.err)
}
return nil
}

func (f *fakeReservationFilterPlugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status {
if reservationInfo.Reservation.Annotations == nil {
reservationInfo.Reservation.Annotations = map[string]string{}
}
Expand All @@ -750,7 +761,87 @@ func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cyc
return nil
}

func TestReservationFilterPlugin(t *testing.T) {
func TestRunReservationFilterPlugins(t *testing.T) {
testNode := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
},
}
testNodeInfo := framework.NewNodeInfo()
testNodeInfo.SetNode(testNode)
tests := []struct {
name string
reservation *schedulingv1alpha1.Reservation
plugins []*fakeReservationFilterPlugin
wantAnnotations map[string]string
wantStatus bool
}{
{
name: "filter reservation succeeded",
reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
},
plugins: []*fakeReservationFilterPlugin{
{index: 1},
{index: 2},
},
wantAnnotations: map[string]string{
"reservationFilterWithPlugin-1": "1",
"reservationFilterWithPlugin-2": "2",
},
wantStatus: true,
},
{
name: "first plugin failed",
reservation: &schedulingv1alpha1.Reservation{
ObjectMeta: metav1.ObjectMeta{
Name: "test-reservation",
},
},
plugins: []*fakeReservationFilterPlugin{
{index: 1, err: errors.New("failed")},
{index: 2},
},
wantAnnotations: map[string]string{
"reservationFilterWithPlugin-1": "1",
},
wantStatus: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
registeredPlugins := []schedulertesting.RegisterPluginFunc{
schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New),
schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New),
}
fh, err := schedulertesting.NewFramework(
context.TODO(),
registeredPlugins,
"koord-scheduler",
)
assert.NoError(t, err)

extenderFactory, _ := NewFrameworkExtenderFactory()

extender := NewFrameworkExtender(extenderFactory, fh)
impl := extender.(*frameworkExtenderImpl)
for _, pl := range tt.plugins {
impl.updatePlugins(pl)
}

cycleState := framework.NewCycleState()

reservationInfo := NewReservationInfo(tt.reservation)
status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, testNodeInfo)
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations)
})
}
}

func TestRunNominateReservationFilterPlugins(t *testing.T) {
tests := []struct {
name string
reservation *schedulingv1alpha1.Reservation
Expand Down Expand Up @@ -816,7 +907,7 @@ func TestReservationFilterPlugin(t *testing.T) {
cycleState := framework.NewCycleState()

reservationInfo := NewReservationInfo(tt.reservation)
status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1")
status := extender.RunNominateReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1")
assert.Equal(t, tt.wantStatus, status.IsSuccess())
assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations)
})
Expand Down
11 changes: 8 additions & 3 deletions pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ type FrameworkExtender interface {
// DEPRECATED: use RunReservationExtensionRestoreReservation instead.
RunReservationExtensionFinalRestoreReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, states PluginToNodeReservationRestoreStates) *framework.Status

RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status
RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status)

RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status
Expand Down Expand Up @@ -120,10 +121,14 @@ type ReservationRestorePlugin interface {
}

// ReservationFilterPlugin is an interface for Filter Reservation plugins.
// These plugins will be called during the Reserve phase to determine whether the Reservation can participate in the Reserve
// FilterReservation will be called in the Filter phase for determining which reservations are available.
// FilterNominateReservation will be called in the PreScore or the Reserve phase to nominate a reservation whether it
// can participate the Reserve.
// TODO: Looking forward a merged method.
type ReservationFilterPlugin interface {
framework.Plugin
FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status
FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
}

// ReservationNominator nominates a more suitable Reservation in the Reserve stage and Pod will bind this Reservation.
Expand Down
6 changes: 6 additions & 0 deletions pkg/scheduler/frameworkext/reservation_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ReservationInfo struct {
ResourceNames []corev1.ResourceName
Allocatable corev1.ResourceList
Allocated corev1.ResourceList
Reserved corev1.ResourceList // reserved inside the reservation
AllocatablePorts framework.HostPortInfo
AllocatedPorts framework.HostPortInfo
AssignedPods map[types.UID]*PodRequirement
Expand Down Expand Up @@ -80,6 +81,7 @@ func (p *PodRequirement) Clone() *PodRequirement {
func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
var parseErrors []error
allocatable := reservationutil.ReservationRequests(r)
reserved := util.GetNodeReservationFromAnnotation(r.Annotations)
resourceNames := quotav1.ResourceNames(allocatable)
if r.Spec.AllocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted {
options, err := apiext.GetReservationRestrictedOptions(r.Annotations)
Expand Down Expand Up @@ -107,6 +109,7 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
Pod: reservedPod,
ResourceNames: resourceNames,
Allocatable: allocatable,
Reserved: reserved,
AllocatablePorts: util.RequestedHostPorts(reservedPod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
Expand All @@ -118,6 +121,7 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
var parseErrors []error

allocatable := resource.PodRequests(pod, resource.PodResourcesOptions{})
reserved := util.GetNodeReservationFromAnnotation(pod.Annotations)
resourceNames := quotav1.ResourceNames(allocatable)
options, err := apiext.GetReservationRestrictedOptions(pod.Annotations)
if err == nil {
Expand Down Expand Up @@ -148,6 +152,7 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
Pod: pod,
ResourceNames: resourceNames,
Allocatable: allocatable,
Reserved: reserved,
AllocatablePorts: util.RequestedHostPorts(pod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
Expand Down Expand Up @@ -344,6 +349,7 @@ func (ri *ReservationInfo) Clone() *ReservationInfo {
ResourceNames: resourceNames,
Allocatable: ri.Allocatable.DeepCopy(),
Allocated: ri.Allocated.DeepCopy(),
Reserved: ri.Reserved.DeepCopy(),
AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts),
AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts),
AssignedPods: assignedPods,
Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/deviceshare/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,11 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return status
}

func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (p *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down
6 changes: 3 additions & 3 deletions pkg/scheduler/plugins/deviceshare/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2194,7 +2194,7 @@ func Test_Plugin_Filter(t *testing.T) {
}
}

func Test_Plugin_FilterReservation(t *testing.T) {
func Test_Plugin_FilterNominateReservation(t *testing.T) {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node-1",
Expand Down Expand Up @@ -2294,7 +2294,7 @@ func Test_Plugin_FilterReservation(t *testing.T) {
})
assert.True(t, status.IsSuccess())

status = pl.FilterReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
status = pl.FilterNominateReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
assert.True(t, status.IsSuccess())

allocatedPod := &corev1.Pod{
Expand Down Expand Up @@ -2334,7 +2334,7 @@ func Test_Plugin_FilterReservation(t *testing.T) {
})
assert.True(t, status.IsSuccess())

status = pl.FilterReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
status = pl.FilterNominateReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1")
assert.Equal(t, framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient gpu devices"), status)
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,11 @@ func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framewo
return nil
}

func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status {
return nil
}

func (p *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
return status
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/plugins/nodenumaresource/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ func TestFilterWithAmplifiedCPUs(t *testing.T) {
}
}

func TestPlugin_FilterReservation(t *testing.T) {
func TestPlugin_FilterNominateReservation(t *testing.T) {
skipState := framework.NewCycleState()
skipState.Write(stateKey, &preFilterState{
skip: true,
Expand Down Expand Up @@ -1077,7 +1077,7 @@ func TestPlugin_FilterReservation(t *testing.T) {
assert.NotNil(t, p)
assert.Nil(t, err)
pl := p.(*Plugin)
got := pl.FilterReservation(context.TODO(), tt.args.cycleState, tt.args.pod, tt.args.reservationInfo, tt.args.nodeName)
got := pl.FilterNominateReservation(context.TODO(), tt.args.cycleState, tt.args.pod, tt.args.reservationInfo, tt.args.nodeName)
assert.Equal(t, tt.want, got)
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/nominator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (pl *Plugin) NominateReservation(ctx context.Context, cycleState *framework

reservations := make([]*frameworkext.ReservationInfo, 0, len(reservationInfos))
for i := range reservationInfos {
status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName)
status := extender.RunNominateReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName)
if !status.IsSuccess() {
continue
}
Expand Down
Loading

0 comments on commit 236bcc9

Please sign in to comment.