diff --git a/pkg/scheduler/frameworkext/framework_extender.go b/pkg/scheduler/frameworkext/framework_extender.go index 70fad37f4..14cb29362 100644 --- a/pkg/scheduler/frameworkext/framework_extender.go +++ b/pkg/scheduler/frameworkext/framework_extender.go @@ -393,9 +393,23 @@ func (ext *frameworkExtenderImpl) RunReservationExtensionFinalRestoreReservation } // RunReservationFilterPlugins determines whether the Reservation can participate in the Reserve -func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status { +func (ext *frameworkExtenderImpl) RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status { for _, pl := range ext.reservationFilterPlugins { - status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeName) + status := pl.FilterReservation(ctx, cycleState, pod, reservationInfo, nodeInfo) + if !status.IsSuccess() { + if debugFilterFailure { + klog.Infof("Failed to FilterWithReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeInfo.Node().Name, pl.Name(), status.Message()) + } + return status + } + } + return nil +} + +// RunNominateReservationFilterPlugins determines whether the Reservation can participate in the Reserve. +func (ext *frameworkExtenderImpl) RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status { + for _, pl := range ext.reservationFilterPlugins { + status := pl.FilterNominateReservation(ctx, cycleState, pod, reservationInfo, nodeName) if !status.IsSuccess() { if debugFilterFailure { klog.Infof("Failed to FilterReservation for Pod %q with Reservation %q on Node %q, failedPlugin: %s, reason: %s", klog.KObj(pod), klog.KObj(reservationInfo), nodeName, pl.Name(), status.Message()) diff --git a/pkg/scheduler/frameworkext/framework_extender_test.go b/pkg/scheduler/frameworkext/framework_extender_test.go index b4a4f951e..537556d56 100644 --- a/pkg/scheduler/frameworkext/framework_extender_test.go +++ b/pkg/scheduler/frameworkext/framework_extender_test.go @@ -739,7 +739,18 @@ type fakeReservationFilterPlugin struct { func (f *fakeReservationFilterPlugin) Name() string { return "fakeReservationFilterPlugin" } -func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status { +func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status { + if reservationInfo.Reservation.Annotations == nil { + reservationInfo.Reservation.Annotations = map[string]string{} + } + reservationInfo.Reservation.Annotations[fmt.Sprintf("reservationFilterWithPlugin-%d", f.index)] = fmt.Sprintf("%d", f.index) + if f.err != nil { + return framework.AsStatus(f.err) + } + return nil +} + +func (f *fakeReservationFilterPlugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status { if reservationInfo.Reservation.Annotations == nil { reservationInfo.Reservation.Annotations = map[string]string{} } @@ -750,7 +761,87 @@ func (f *fakeReservationFilterPlugin) FilterReservation(ctx context.Context, cyc return nil } -func TestReservationFilterPlugin(t *testing.T) { +func TestRunReservationFilterPlugins(t *testing.T) { + testNode := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-node-1", + }, + } + testNodeInfo := framework.NewNodeInfo() + testNodeInfo.SetNode(testNode) + tests := []struct { + name string + reservation *schedulingv1alpha1.Reservation + plugins []*fakeReservationFilterPlugin + wantAnnotations map[string]string + wantStatus bool + }{ + { + name: "filter reservation succeeded", + reservation: &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-reservation", + }, + }, + plugins: []*fakeReservationFilterPlugin{ + {index: 1}, + {index: 2}, + }, + wantAnnotations: map[string]string{ + "reservationFilterWithPlugin-1": "1", + "reservationFilterWithPlugin-2": "2", + }, + wantStatus: true, + }, + { + name: "first plugin failed", + reservation: &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-reservation", + }, + }, + plugins: []*fakeReservationFilterPlugin{ + {index: 1, err: errors.New("failed")}, + {index: 2}, + }, + wantAnnotations: map[string]string{ + "reservationFilterWithPlugin-1": "1", + }, + wantStatus: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + registeredPlugins := []schedulertesting.RegisterPluginFunc{ + schedulertesting.RegisterBindPlugin(defaultbinder.Name, defaultbinder.New), + schedulertesting.RegisterQueueSortPlugin(queuesort.Name, queuesort.New), + } + fh, err := schedulertesting.NewFramework( + context.TODO(), + registeredPlugins, + "koord-scheduler", + ) + assert.NoError(t, err) + + extenderFactory, _ := NewFrameworkExtenderFactory() + + extender := NewFrameworkExtender(extenderFactory, fh) + impl := extender.(*frameworkExtenderImpl) + for _, pl := range tt.plugins { + impl.updatePlugins(pl) + } + + cycleState := framework.NewCycleState() + + reservationInfo := NewReservationInfo(tt.reservation) + status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, testNodeInfo) + assert.Equal(t, tt.wantStatus, status.IsSuccess()) + assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations) + }) + } +} + +func TestRunNominateReservationFilterPlugins(t *testing.T) { tests := []struct { name string reservation *schedulingv1alpha1.Reservation @@ -816,7 +907,7 @@ func TestReservationFilterPlugin(t *testing.T) { cycleState := framework.NewCycleState() reservationInfo := NewReservationInfo(tt.reservation) - status := extender.RunReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1") + status := extender.RunNominateReservationFilterPlugins(context.TODO(), cycleState, &corev1.Pod{}, reservationInfo, "test-node-1") assert.Equal(t, tt.wantStatus, status.IsSuccess()) assert.Equal(t, tt.wantAnnotations, reservationInfo.Reservation.Annotations) }) diff --git a/pkg/scheduler/frameworkext/interface.go b/pkg/scheduler/frameworkext/interface.go index 4c6d4e56b..a9fdfc023 100644 --- a/pkg/scheduler/frameworkext/interface.go +++ b/pkg/scheduler/frameworkext/interface.go @@ -64,7 +64,8 @@ type FrameworkExtender interface { // DEPRECATED: use RunReservationExtensionRestoreReservation instead. RunReservationExtensionFinalRestoreReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, states PluginToNodeReservationRestoreStates) *framework.Status - RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status + RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status + RunNominateReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status @@ -120,10 +121,14 @@ type ReservationRestorePlugin interface { } // ReservationFilterPlugin is an interface for Filter Reservation plugins. -// These plugins will be called during the Reserve phase to determine whether the Reservation can participate in the Reserve +// FilterReservation will be called in the Filter phase for determining which reservations are available. +// FilterNominateReservation will be called in the PreScore or the Reserve phase to nominate a reservation whether it +// can participate the Reserve. +// TODO: Looking forward a merged method. type ReservationFilterPlugin interface { framework.Plugin - FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status + FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status + FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status } // ReservationNominator nominates a more suitable Reservation in the Reserve stage and Pod will bind this Reservation. diff --git a/pkg/scheduler/frameworkext/reservation_info.go b/pkg/scheduler/frameworkext/reservation_info.go index 79d8d681b..2eca66baf 100644 --- a/pkg/scheduler/frameworkext/reservation_info.go +++ b/pkg/scheduler/frameworkext/reservation_info.go @@ -40,6 +40,7 @@ type ReservationInfo struct { ResourceNames []corev1.ResourceName Allocatable corev1.ResourceList Allocated corev1.ResourceList + Reserved corev1.ResourceList // reserved inside the reservation AllocatablePorts framework.HostPortInfo AllocatedPorts framework.HostPortInfo AssignedPods map[types.UID]*PodRequirement @@ -80,6 +81,7 @@ func (p *PodRequirement) Clone() *PodRequirement { func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo { var parseErrors []error allocatable := reservationutil.ReservationRequests(r) + reserved := util.GetNodeReservationFromAnnotation(r.Annotations) resourceNames := quotav1.ResourceNames(allocatable) if r.Spec.AllocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted { options, err := apiext.GetReservationRestrictedOptions(r.Annotations) @@ -107,6 +109,7 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo { Pod: reservedPod, ResourceNames: resourceNames, Allocatable: allocatable, + Reserved: reserved, AllocatablePorts: util.RequestedHostPorts(reservedPod), AssignedPods: map[types.UID]*PodRequirement{}, OwnerMatchers: ownerMatchers, @@ -118,6 +121,7 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo { var parseErrors []error allocatable := resource.PodRequests(pod, resource.PodResourcesOptions{}) + reserved := util.GetNodeReservationFromAnnotation(pod.Annotations) resourceNames := quotav1.ResourceNames(allocatable) options, err := apiext.GetReservationRestrictedOptions(pod.Annotations) if err == nil { @@ -148,6 +152,7 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo { Pod: pod, ResourceNames: resourceNames, Allocatable: allocatable, + Reserved: reserved, AllocatablePorts: util.RequestedHostPorts(pod), AssignedPods: map[types.UID]*PodRequirement{}, OwnerMatchers: ownerMatchers, @@ -344,6 +349,7 @@ func (ri *ReservationInfo) Clone() *ReservationInfo { ResourceNames: resourceNames, Allocatable: ri.Allocatable.DeepCopy(), Allocated: ri.Allocated.DeepCopy(), + Reserved: ri.Reserved.DeepCopy(), AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts), AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts), AssignedPods: assignedPods, diff --git a/pkg/scheduler/plugins/deviceshare/plugin.go b/pkg/scheduler/plugins/deviceshare/plugin.go index c0df52056..9722a45ab 100644 --- a/pkg/scheduler/plugins/deviceshare/plugin.go +++ b/pkg/scheduler/plugins/deviceshare/plugin.go @@ -349,7 +349,11 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p return status } -func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status { +func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + +func (p *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status { state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return status diff --git a/pkg/scheduler/plugins/deviceshare/plugin_test.go b/pkg/scheduler/plugins/deviceshare/plugin_test.go index da4c4d905..40ab403c0 100644 --- a/pkg/scheduler/plugins/deviceshare/plugin_test.go +++ b/pkg/scheduler/plugins/deviceshare/plugin_test.go @@ -2194,7 +2194,7 @@ func Test_Plugin_Filter(t *testing.T) { } } -func Test_Plugin_FilterReservation(t *testing.T) { +func Test_Plugin_FilterNominateReservation(t *testing.T) { node := &corev1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "test-node-1", @@ -2294,7 +2294,7 @@ func Test_Plugin_FilterReservation(t *testing.T) { }) assert.True(t, status.IsSuccess()) - status = pl.FilterReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1") + status = pl.FilterNominateReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1") assert.True(t, status.IsSuccess()) allocatedPod := &corev1.Pod{ @@ -2334,7 +2334,7 @@ func Test_Plugin_FilterReservation(t *testing.T) { }) assert.True(t, status.IsSuccess()) - status = pl.FilterReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1") + status = pl.FilterNominateReservation(context.TODO(), cycleState, pod, reservationInfo, "test-node-1") assert.Equal(t, framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient gpu devices"), status) } diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 8a5bbb08e..0003a1e3d 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -444,7 +444,11 @@ func (p *Plugin) filterAmplifiedCPUs(podRequestMilliCPU int64, nodeInfo *framewo return nil } -func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status { +func (p *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + +func (p *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status { state, status := getPreFilterState(cycleState) if !status.IsSuccess() { return status diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index 86b7cb4da..8c56a6e9c 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -1023,7 +1023,7 @@ func TestFilterWithAmplifiedCPUs(t *testing.T) { } } -func TestPlugin_FilterReservation(t *testing.T) { +func TestPlugin_FilterNominateReservation(t *testing.T) { skipState := framework.NewCycleState() skipState.Write(stateKey, &preFilterState{ skip: true, @@ -1077,7 +1077,7 @@ func TestPlugin_FilterReservation(t *testing.T) { assert.NotNil(t, p) assert.Nil(t, err) pl := p.(*Plugin) - got := pl.FilterReservation(context.TODO(), tt.args.cycleState, tt.args.pod, tt.args.reservationInfo, tt.args.nodeName) + got := pl.FilterNominateReservation(context.TODO(), tt.args.cycleState, tt.args.pod, tt.args.reservationInfo, tt.args.nodeName) assert.Equal(t, tt.want, got) }) } diff --git a/pkg/scheduler/plugins/reservation/nominator.go b/pkg/scheduler/plugins/reservation/nominator.go index aec0645bb..a6b090f1d 100644 --- a/pkg/scheduler/plugins/reservation/nominator.go +++ b/pkg/scheduler/plugins/reservation/nominator.go @@ -225,7 +225,7 @@ func (pl *Plugin) NominateReservation(ctx context.Context, cycleState *framework reservations := make([]*frameworkext.ReservationInfo, 0, len(reservationInfos)) for i := range reservationInfos { - status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName) + status := extender.RunNominateReservationFilterPlugins(ctx, cycleState, pod, reservationInfos[i], nodeName) if !status.IsSuccess() { continue } diff --git a/pkg/scheduler/plugins/reservation/plugin.go b/pkg/scheduler/plugins/reservation/plugin.go index fb960ada4..bf2bd2326 100644 --- a/pkg/scheduler/plugins/reservation/plugin.go +++ b/pkg/scheduler/plugins/reservation/plugin.go @@ -467,6 +467,11 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew return nil } + extender, ok := pl.handle.(frameworkext.FrameworkExtender) + if !ok { + return framework.AsStatus(fmt.Errorf("not implemented frameworkext.FrameworkExtender")) + } + node := nodeInfo.Node() state := getStateData(cycleState) nodeRState := state.nodeReservationStates[node.Name] @@ -495,22 +500,32 @@ func (pl *Plugin) filterWithReservations(ctx context.Context, cycleState *framew insufficientResourcesByNode := fitsNode(state.podRequestsResources, nodeInfo, nodeRState, rInfo, preemptible) state.preemptLock.RUnlock() - nodeFits := len(insufficientResourcesByNode) == 0 + nodeFits := len(insufficientResourcesByNode) <= 0 + allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...) + + reservationFits := false allocatePolicy := rInfo.GetAllocatePolicy() if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault || allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyAligned { - if nodeFits { - return nil - } - allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...) + reservationFits = nodeFits } else if allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyRestricted { insufficientResourceReasonsByReservation := fitsReservation(state.podRequests, rInfo, preemptibleInRR, requireDetailReasons) - if nodeFits && len(insufficientResourceReasonsByReservation) <= 0 { // fit the reservation - return nil - } - allInsufficientResourcesByNode.Insert(insufficientResourcesByNode...) + + reservationFits = len(insufficientResourceReasonsByReservation) <= 0 allInsufficientResourceReasonsByReservation = append(allInsufficientResourceReasonsByReservation, insufficientResourceReasonsByReservation...) } + + // Before nominating a reservation in PreScore or Reserve, check the reservation by multiple plugins to make + // the Filter phase give a more accurate result. It is extensible to support more policies. + status := extender.RunReservationFilterPlugins(ctx, cycleState, pod, rInfo, nodeInfo) + if !status.IsSuccess() { + allInsufficientResourceReasonsByReservation = append(allInsufficientResourceReasonsByReservation, status.Reasons()...) + continue + } + + if nodeFits && reservationFits { + return nil + } } // The Pod requirement must be allocated from Reservation, but currently no Reservation meets the requirement. @@ -548,7 +563,8 @@ func fitsNode(podRequest *framework.Resource, nodeInfo *framework.NodeInfo, node var rRemained *framework.Resource if rInfo != nil { - resources := quotav1.Subtract(rInfo.Allocatable, rInfo.Allocated) + // Reservation available = Allocatable - Allocated - InnerReserved + resources := quotav1.Subtract(quotav1.Subtract(rInfo.Allocatable, rInfo.Allocated), rInfo.Reserved) rRemained = framework.NewResource(resources) } else { rRemained = dummyResource @@ -589,9 +605,10 @@ func fitsReservation(podRequest corev1.ResourceList, rInfo *frameworkext.Reserva if len(preemptibleInRR) > 0 { allocated = quotav1.SubtractWithNonNegativeResult(allocated, preemptibleInRR) } + allocatable := rInfo.Allocatable allocated = quotav1.Mask(allocated, rInfo.ResourceNames) + reserved := quotav1.Mask(rInfo.Reserved, rInfo.ResourceNames) requests := quotav1.Mask(podRequest, rInfo.ResourceNames) - allocatable := rInfo.Allocatable var insufficientResourceReasons []string @@ -599,7 +616,7 @@ func fitsReservation(podRequest corev1.ResourceList, rInfo *frameworkext.Reserva if maxPods, found := allocatable[corev1.ResourcePods]; found { allocatedPods := rInfo.GetAllocatedPods() if preemptiblePodsInRR, found := preemptibleInRR[corev1.ResourcePods]; found { - allocatedPods += int(preemptiblePodsInRR.Value()) // assert no overflow + allocatedPods -= int(preemptiblePodsInRR.Value()) // assert no overflow } if int64(allocatedPods)+1 > maxPods.Value() { if !isDetailed { @@ -626,6 +643,11 @@ func fitsReservation(podRequest corev1.ResourceList, rInfo *frameworkext.Reserva if !found { used = *resource.NewQuantity(0, resource.DecimalSI) } + reservedQ, found := reserved[resourceName] + if found { + // NOTE: capacity excludes the reserved resource + capacity.Sub(reservedQ) + } remained := capacity.DeepCopy() remained.Sub(used) @@ -782,7 +804,11 @@ func (pl *Plugin) makePostFilterReasons(state *stateData, filteredNodeStatusMap return reasons } -func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status { +func (pl *Plugin) FilterReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeInfo *framework.NodeInfo) *framework.Status { + return nil +} + +func (pl *Plugin) FilterNominateReservation(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *frameworkext.ReservationInfo, nodeName string) *framework.Status { // TODO(joseph): We can consider optimizing these codes. It seems that there is no need to exist at present. state := getStateData(cycleState) diff --git a/pkg/scheduler/plugins/reservation/plugin_test.go b/pkg/scheduler/plugins/reservation/plugin_test.go index 5f36e598a..4fbf18943 100644 --- a/pkg/scheduler/plugins/reservation/plugin_test.go +++ b/pkg/scheduler/plugins/reservation/plugin_test.go @@ -788,6 +788,9 @@ func Test_filterWithReservations(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "test-r", UID: "123456", + Annotations: map[string]string{ + apiext.AnnotationNodeReservation: `{"resources": {"cpu": "1"}}`, + }, }, Spec: schedulingv1alpha1.ReservationSpec{ AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted, @@ -797,7 +800,7 @@ func Test_filterWithReservations(t *testing.T) { { Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("6"), + corev1.ResourceCPU: resource.MustParse("7"), corev1.ResourcePods: resource.MustParse("2"), }, }, @@ -808,7 +811,7 @@ func Test_filterWithReservations(t *testing.T) { }, Status: schedulingv1alpha1.ReservationStatus{ Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("6"), + corev1.ResourceCPU: resource.MustParse("7"), corev1.ResourcePods: resource.MustParse("2"), }, }, @@ -1010,6 +1013,53 @@ func Test_filterWithReservations(t *testing.T) { }, wantStatus: nil, }, + { + name: "filter restricted reservation with affinity", + stateData: &stateData{ + schedulingStateData: schedulingStateData{ + hasAffinity: true, + podRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + nodeReservationStates: map[string]*nodeReservationState{ + node.Name: { + podRequested: &framework.Resource{ + MilliCPU: 30 * 1000, + Memory: 24 * 1024 * 1024 * 1024, + }, + rAllocated: &framework.Resource{ + MilliCPU: 0, + }, + matchedOrIgnored: []*frameworkext.ReservationInfo{ + frameworkext.NewReservationInfo(&schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-r", + }, + Spec: schedulingv1alpha1.ReservationSpec{ + AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted, + Template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + }, + }, + }, + }, + }, + }, + }), + }, + }, + }, + }, + }, + wantStatus: nil, + }, { name: "filter restricted reservation with nodeInfo and matched requests are zero", stateData: &stateData{ @@ -1237,6 +1287,56 @@ func Test_filterWithReservations(t *testing.T) { }, wantStatus: nil, }, + { + name: "failed to filter restricted reservation due to reserved", + stateData: &stateData{ + schedulingStateData: schedulingStateData{ + hasAffinity: true, + podRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + nodeReservationStates: map[string]*nodeReservationState{ + node.Name: { + podRequested: &framework.Resource{ + MilliCPU: 30 * 1000, + Memory: 24 * 1024 * 1024 * 1024, + }, + rAllocated: &framework.Resource{ + MilliCPU: 0, + }, + matchedOrIgnored: []*frameworkext.ReservationInfo{ + frameworkext.NewReservationInfo(&schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-r", + Annotations: map[string]string{ + apiext.AnnotationNodeReservation: `{"resources": {"cpu": "2"}}`, + }, + }, + Spec: schedulingv1alpha1.ReservationSpec{ + AllocatePolicy: schedulingv1alpha1.ReservationAllocatePolicyRestricted, + Template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + }, + }, + }, + }, + }, + }, + }), + }, + }, + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient cpu"), + }, { name: "filter default reservations with preemption", stateData: &stateData{ @@ -1853,8 +1953,8 @@ func Test_filterWithReservations(t *testing.T) { preemptibleInRRs: map[string]map[types.UID]corev1.ResourceList{ node.Name: { "123456": { - corev1.ResourceCPU: resource.MustParse("-1"), - corev1.ResourcePods: resource.MustParse("-1"), + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourcePods: resource.MustParse("1"), }, }, }, @@ -1904,10 +2004,50 @@ func Test_filterWithReservations(t *testing.T) { wantStatus: framework.NewStatus(framework.Unschedulable, "Reservation(s) Too many pods, "+ "requested: 1, used: 2, capacity: 2"), }, + { + name: "failed to filter restricted reservation with name and reserved since insufficient resource", + stateData: &stateData{ + schedulingStateData: schedulingStateData{ + hasAffinity: true, + reservationName: "test-r", + podRequests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("6"), + }, + preemptibleInRRs: map[string]map[types.UID]corev1.ResourceList{ + node.Name: { + "123456": { + corev1.ResourceCPU: resource.MustParse("1"), + corev1.ResourcePods: resource.MustParse("1"), + }, + }, + }, + nodeReservationStates: map[string]*nodeReservationState{ + node.Name: { + podRequested: &framework.Resource{ + MilliCPU: 30 * 1000, + Memory: 24 * 1024 * 1024 * 1024, + }, + rAllocated: &framework.Resource{ + MilliCPU: 2000, + }, + matchedOrIgnored: []*frameworkext.ReservationInfo{ + testRInfo.Clone(), + }, + }, + }, + }, + }, + wantStatus: framework.NewStatus(framework.Unschedulable, "Reservation(s) Insufficient cpu, "+ + "requested: 6000, used: 1000, capacity: 6000"), + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - pl := &Plugin{} + suit := newPluginTestSuit(t) + p, err := suit.pluginFactory() + assert.NoError(t, err) + pl := p.(*Plugin) + suit.start() cycleState := framework.NewCycleState() if tt.stateData.podRequestsResources == nil { resources := framework.NewResource(tt.stateData.podRequests) @@ -2224,7 +2364,7 @@ func TestPreFilterExtensionRemovePod(t *testing.T) { } } -func TestFilterReservation(t *testing.T) { +func TestFilterNominateReservation(t *testing.T) { reservation4C8G := &schedulingv1alpha1.Reservation{ ObjectMeta: metav1.ObjectMeta{ UID: uuid.NewUUID(), @@ -2415,7 +2555,7 @@ func TestFilterReservation(t *testing.T) { cycleState.Write(stateKey, state) rInfo := frameworkext.NewReservationInfo(tt.targetReservation) - status := pl.FilterReservation(context.TODO(), cycleState, pod, rInfo, node.Name) + status := pl.FilterNominateReservation(context.TODO(), cycleState, pod, rInfo, node.Name) assert.Equal(t, tt.wantStatus, status) }) }