diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin.go b/pkg/scheduler/plugins/nodenumaresource/plugin.go index 77968b50a..a9cf5b342 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin.go @@ -47,10 +47,11 @@ const ( ) const ( - ErrNotFoundCPUTopology = "node(s) CPU Topology not found" - ErrInvalidCPUTopology = "node(s) invalid CPU Topology" - ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" - ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy" + ErrNotFoundCPUTopology = "node(s) CPU Topology not found" + ErrInvalidCPUTopology = "node(s) invalid CPU Topology" + ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core" + ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy" + ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu" ) var ( @@ -267,6 +268,10 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p return status } + if status := p.filterAmplifiedCPUs(state, nodeInfo); !status.IsSuccess() { + return status + } + node := nodeInfo.Node() topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name) numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) @@ -317,6 +322,46 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p return nil } +func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.NodeInfo) *framework.Status { + quantity := state.requests[corev1.ResourceCPU] + podRequestMilliCPU := quantity.MilliValue() + if podRequestMilliCPU == 0 { + return nil + } + + node := nodeInfo.Node() + ratios, err := extension.GetNodeResourceAmplificationRatios(node.Annotations) + if err != nil { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) invalid amplification ratios") + } + cpuAmplificationRatio := ratios[corev1.ResourceCPU] + if cpuAmplificationRatio <= 1 { + return nil + } + + if state.requestCPUBind { + podRequestMilliCPU = extension.Amplify(podRequestMilliCPU, cpuAmplificationRatio) + } + + // TODO(joseph): Reservations and preemption should be considered here. + _, allocated, _ := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{}) + if err != nil { + if err.Error() != ErrNotFoundCPUTopology { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } + } + allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000) + requestedMilliCPU := nodeInfo.Requested.MilliCPU + if requestedMilliCPU >= allocatedMilliCPU && allocatedMilliCPU > 0 { + requestedMilliCPU = requestedMilliCPU - allocatedMilliCPU + requestedMilliCPU += extension.Amplify(allocatedMilliCPU, cpuAmplificationRatio) + } + if podRequestMilliCPU > nodeInfo.Allocatable.MilliCPU-requestedMilliCPU { + return framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU) + } + return nil +} + func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status { state, status := getPreFilterState(cycleState) if !status.IsSuccess() { diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go index e96145ea3..00f60165d 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/plugin_test.go @@ -18,6 +18,7 @@ package nodenumaresource import ( "context" + "fmt" "reflect" "testing" @@ -31,6 +32,7 @@ import ( "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" + apiresource "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort" @@ -106,6 +108,31 @@ func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) { return f.nodeInfoMap[nodeName], nil } +func makeNode(name string, capacity map[corev1.ResourceName]string, cpuAmpRatio extension.Ratio) *corev1.Node { + node := schedulertesting.MakeNode().Name(name).Capacity(capacity).Obj() + _, _ = extension.SetNodeResourceAmplificationRatio(node, corev1.ResourceCPU, cpuAmpRatio) + return node +} + +func makePodOnNode(request map[corev1.ResourceName]string, node string, isCPUSet bool) *corev1.Pod { + pod := schedulertesting.MakePod().Req(request).Node(node).Priority(extension.PriorityProdValueMax).Obj() + if isCPUSet { + pod.Labels = map[string]string{ + extension.LabelPodQoS: string(extension.QoSLSR), + } + if node != "" { + reqs, _ := apiresource.PodRequestsAndLimits(pod) + val := reqs.Cpu().MilliValue() / 1000 + _ = extension.SetResourceStatus(pod, &extension.ResourceStatus{CPUSet: fmt.Sprintf("0-%d", val-1)}) + } + } + return pod +} + +func makePod(request map[corev1.ResourceName]string, isCPUSet bool) *corev1.Pod { + return makePodOnNode(request, "", isCPUSet) +} + type frameworkHandleExtender struct { frameworkext.FrameworkExtender *nrtfake.Clientset @@ -121,7 +148,7 @@ type pluginTestSuit struct { nodeNUMAResourceArgs *schedulingconfig.NodeNUMAResourceArgs } -func newPluginTestSuit(t *testing.T, nodes []*corev1.Node) *pluginTestSuit { +func newPluginTestSuit(t *testing.T, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit { var v1beta2args v1beta2.NodeNUMAResourceArgs v1beta2.SetDefaults_NodeNUMAResourceArgs(&v1beta2args) var nodeNUMAResourceArgs schedulingconfig.NodeNUMAResourceArgs @@ -155,7 +182,7 @@ func newPluginTestSuit(t *testing.T, nodes []*corev1.Node) *pluginTestSuit { assert.NoError(t, err) } informerFactory := informers.NewSharedInformerFactory(cs, 0) - snapshot := newTestSharedLister(nil, nodes) + snapshot := newTestSharedLister(pods, nodes) fh, err := schedulertesting.NewFramework( registeredPlugins, "koord-scheduler", @@ -185,7 +212,7 @@ func (p *pluginTestSuit) start() { } func TestNew(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -490,7 +517,7 @@ func TestPlugin_PreFilter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) if tt.defaultBindPolicy != "" { suit.nodeNUMAResourceArgs.DefaultCPUBindPolicy = tt.defaultBindPolicy } @@ -734,7 +761,7 @@ func TestPlugin_Filter(t *testing.T) { nodes[0].Annotations[k] = v } - suit := newPluginTestSuit(t, nodes) + suit := newPluginTestSuit(t, nil, nodes) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -784,6 +811,119 @@ func TestPlugin_Filter(t *testing.T) { } } +func TestFilterWithAmplifiedCPUs(t *testing.T) { + tests := []struct { + name string + pod *corev1.Pod + existingPods []*corev1.Pod + cpuTopology *CPUTopology + nodeHasNRT bool + nodeCPUAmplificationRatio extension.Ratio + wantStatus *framework.Status + }{ + { + name: "no resources requested always fits", + pod: &corev1.Pod{}, + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "4"}, "node-1", false)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeCPUAmplificationRatio: 2.0, + }, + { + name: "no filtering without node cpu amplification", + pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false), + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", false)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeCPUAmplificationRatio: 1.0, + }, + { + name: "cpu fits on no NRT node", + pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false), + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", false)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeCPUAmplificationRatio: 2.0, + }, + { + name: "insufficient cpu", + pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false), + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "64"}, "node-1", false)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeCPUAmplificationRatio: 2.0, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU), + }, + { + name: "insufficient cpu with cpuset pod on node", + pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false), + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", true)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeHasNRT: true, + nodeCPUAmplificationRatio: 2.0, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU), + }, + { + name: "insufficient cpu when scheduling cpuset pod", + pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, true), + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", false)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeHasNRT: true, + nodeCPUAmplificationRatio: 2.0, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU), + }, + { + name: "insufficient cpu when scheduling cpuset pod with cpuset pod on node", + pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, true), + existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", true)}, + cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2), + nodeHasNRT: true, + nodeCPUAmplificationRatio: 2.0, + wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + numCPUs := tt.cpuTopology.NumCPUs + cpu := fmt.Sprintf("%d", extension.Amplify(int64(numCPUs), tt.nodeCPUAmplificationRatio)) + node := makeNode("node-1", map[corev1.ResourceName]string{"cpu": cpu, "memory": "40Gi"}, tt.nodeCPUAmplificationRatio) + suit := newPluginTestSuit(t, tt.existingPods, []*corev1.Node{node}) + + p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) + assert.NoError(t, err) + suit.start() + pl := p.(*Plugin) + + if tt.nodeHasNRT { + topologyOptions := TopologyOptions{ + CPUTopology: tt.cpuTopology, + } + for i := 0; i < tt.cpuTopology.NumNodes; i++ { + topologyOptions.NUMANodeResources = append(topologyOptions.NUMANodeResources, NUMANodeResource{ + Node: i, + Resources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", extension.Amplify(int64(tt.cpuTopology.CPUsPerNode()), tt.nodeCPUAmplificationRatio))), + corev1.ResourceMemory: resource.MustParse("20Gi"), + }}) + } + pl.topologyOptionsManager.UpdateTopologyOptions(node.Name, func(options *TopologyOptions) { + *options = topologyOptions + }) + } + handler := &podEventHandler{resourceManager: pl.resourceManager} + for _, v := range tt.existingPods { + handler.OnAdd(v) + } + + cycleState := framework.NewCycleState() + _, preFilterStatus := pl.PreFilter(context.TODO(), cycleState, tt.pod) + assert.True(t, preFilterStatus.IsSuccess()) + + nodeInfo, err := suit.Handle.SnapshotSharedLister().NodeInfos().Get("node-1") + assert.NoError(t, err) + gotStatus := pl.Filter(context.TODO(), cycleState, tt.pod, nodeInfo) + assert.Equal(t, tt.wantStatus, gotStatus) + }) + } +} + func TestPlugin_Reserve(t *testing.T) { tests := []struct { name string @@ -937,7 +1077,7 @@ func TestPlugin_Reserve(t *testing.T) { nodes[0].Labels[k] = v } - suit := newPluginTestSuit(t, nodes) + suit := newPluginTestSuit(t, nil, nodes) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1047,7 +1187,7 @@ func TestPlugin_Unreserve(t *testing.T) { } func TestPlugin_PreBind(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1089,7 +1229,7 @@ func TestPlugin_PreBind(t *testing.T) { } func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1139,7 +1279,7 @@ func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) { } func TestPlugin_PreBindReservation(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NotNil(t, p) assert.Nil(t, err) @@ -1181,7 +1321,7 @@ func TestPlugin_PreBindReservation(t *testing.T) { } func TestRestoreReservation(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NoError(t, err) pl := p.(*Plugin) diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go index f59a2c517..093ce43e3 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go @@ -355,7 +355,7 @@ func (c *resourceManager) GetAvailableCPUs(nodeName string, preferredCPUs cpuset return cpuset.NewCPUSet(), nil, errors.New(ErrNotFoundCPUTopology) } if !topologyOptions.CPUTopology.IsValid() { - return cpuset.NewCPUSet(), nil, fmt.Errorf("cpuTopology is invalid") + return cpuset.NewCPUSet(), nil, errors.New(ErrInvalidCPUTopology) } allocation := c.getOrCreateNodeAllocation(nodeName) diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go index 66478a9cc..d7f78d72a 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go @@ -478,7 +478,7 @@ func TestResourceManagerAllocate(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) tom := NewTopologyOptionsManager() tom.UpdateTopologyOptions("test-node", func(options *TopologyOptions) { options.CPUTopology = buildCPUTopologyForTest(2, 1, 26, 2) @@ -879,7 +879,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) tom := NewTopologyOptionsManager() tom.UpdateTopologyOptions("test-node", func(options *TopologyOptions) { options.CPUTopology = buildCPUTopologyForTest(2, 1, 26, 2) diff --git a/pkg/scheduler/plugins/nodenumaresource/scoring.go b/pkg/scheduler/plugins/nodenumaresource/scoring.go index 9caeb9804..934cb0572 100644 --- a/pkg/scheduler/plugins/nodenumaresource/scoring.go +++ b/pkg/scheduler/plugins/nodenumaresource/scoring.go @@ -67,7 +67,10 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy) if skipTheNode(state, numaTopologyPolicy) { - return 0, nil + if state.skip { + return 0, nil + } + return p.scoreWithAmplifiedCPUs(cycleState, state, pod, nodeInfo, topologyOptions) } if state.requestCPUBind && (topologyOptions.CPUTopology == nil || !topologyOptions.CPUTopology.IsValid()) { @@ -89,6 +92,37 @@ func (p *Plugin) Score(ctx context.Context, cycleState *framework.CycleState, po return p.scorer.score(requested, allocatable, framework.NewResource(resourceOptions.requests)) } +func (p *Plugin) scoreWithAmplifiedCPUs(cycleState *framework.CycleState, state *preFilterState, pod *corev1.Pod, nodeInfo *framework.NodeInfo, topologyOptions TopologyOptions) (int64, *framework.Status) { + quantity := state.requests[corev1.ResourceCPU] + if quantity.IsZero() { + return 0, nil + } + + node := nodeInfo.Node() + resourceOptions, err := p.getResourceOptions(cycleState, state, node, pod, topologymanager.NUMATopologyHint{}, topologyOptions) + if err != nil { + return 0, nil + } + + cpuAmplificationRatio := resourceOptions.topologyOptions.AmplificationRatios[corev1.ResourceCPU] + if cpuAmplificationRatio <= 1 { + return p.scorer.score(nodeInfo.Requested, nodeInfo.Allocatable, framework.NewResource(resourceOptions.requests)) + } + + _, allocated, err := p.resourceManager.GetAvailableCPUs(node.Name, resourceOptions.preferredCPUs) + if err != nil { + if err.Error() != ErrNotFoundCPUTopology { + return 0, nil + } + return p.scorer.score(nodeInfo.Requested, nodeInfo.Allocatable, framework.NewResource(resourceOptions.requests)) + } + allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000) + requested := nodeInfo.Requested.Clone() + requested.MilliCPU -= allocatedMilliCPU + requested.MilliCPU += extension.Amplify(allocatedMilliCPU, cpuAmplificationRatio) + return p.scorer.score(requested, nodeInfo.Allocatable, framework.NewResource(resourceOptions.requests)) +} + func (p *Plugin) calculateAllocatableAndRequested( nodeName string, nodeInfo *framework.NodeInfo, @@ -121,8 +155,11 @@ func (p *Plugin) calculateAllocatableAndRequested( allocatable = framework.NewResource(totalAllocatable) requested = framework.NewResource(totalRequested) } else { - allocatable = nodeInfo.Allocatable.Clone() - requested = nodeInfo.Requested.Clone() + allocatable = nodeInfo.Allocatable + requested = nodeInfo.Requested + if !podAllocation.CPUSet.IsEmpty() { + requested = requested.Clone() + } } if !podAllocation.CPUSet.IsEmpty() { diff --git a/pkg/scheduler/plugins/nodenumaresource/scoring_test.go b/pkg/scheduler/plugins/nodenumaresource/scoring_test.go index 214c5b36a..183ef5db1 100644 --- a/pkg/scheduler/plugins/nodenumaresource/scoring_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/scoring_test.go @@ -18,6 +18,7 @@ package nodenumaresource import ( "context" + "fmt" "reflect" "testing" @@ -33,10 +34,16 @@ import ( "github.com/koordinator-sh/koordinator/apis/extension" apiext "github.com/koordinator-sh/koordinator/apis/extension" schedulerconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" - schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" "github.com/koordinator-sh/koordinator/pkg/util/cpuset" ) +var ( + defaultResources = []config.ResourceSpec{ + {Name: string(corev1.ResourceCPU), Weight: 1}, + {Name: string(corev1.ResourceMemory), Weight: 1}, + } +) + func TestNUMANodeScore(t *testing.T) { tests := []struct { name string @@ -294,7 +301,7 @@ func TestNUMANodeScore(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - suit := newPluginTestSuit(t, tt.nodes) + suit := newPluginTestSuit(t, nil, tt.nodes) if tt.strategy != nil { suit.nodeNUMAResourceArgs.ScoringStrategy = tt.strategy } @@ -410,7 +417,7 @@ func TestPlugin_Score(t *testing.T) { name: "score with full empty node FullPCPUs", state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -423,7 +430,7 @@ func TestPlugin_Score(t *testing.T) { state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 8, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -435,7 +442,7 @@ func TestPlugin_Score(t *testing.T) { name: "score with full empty node SpreadByPCPUs", state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicySpreadByPCPUs, numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -447,7 +454,7 @@ func TestPlugin_Score(t *testing.T) { name: "score with exceed socket FullPCPUs", state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 16, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -459,7 +466,7 @@ func TestPlugin_Score(t *testing.T) { name: "score with satisfied socket FullPCPUs", state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicyFullPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicyFullPCPUs, numCPUsNeeded: 16, }, cpuTopology: buildCPUTopologyForTest(2, 2, 4, 2), @@ -471,7 +478,7 @@ func TestPlugin_Score(t *testing.T) { name: "score with full empty socket SpreadByPCPUs", state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicySpreadByPCPUs, numCPUsNeeded: 4, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -486,7 +493,7 @@ func TestPlugin_Score(t *testing.T) { }, state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicySpreadByPCPUs, numCPUsNeeded: 2, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -501,7 +508,7 @@ func TestPlugin_Score(t *testing.T) { }, state: &preFilterState{ requestCPUBind: true, - preferredCPUBindPolicy: schedulingconfig.CPUBindPolicySpreadByPCPUs, + preferredCPUBindPolicy: schedulerconfig.CPUBindPolicySpreadByPCPUs, numCPUsNeeded: 8, }, cpuTopology: buildCPUTopologyForTest(2, 1, 4, 2), @@ -534,9 +541,9 @@ func TestPlugin_Score(t *testing.T) { nodes[0].Labels[k] = v } - suit := newPluginTestSuit(t, nodes) + suit := newPluginTestSuit(t, nil, nodes) suit.nodeNUMAResourceArgs.ScoringStrategy = &schedulerconfig.ScoringStrategy{ - Type: schedulingconfig.MostAllocated, + Type: schedulerconfig.MostAllocated, Resources: []config.ResourceSpec{ { Name: "cpu", @@ -586,3 +593,260 @@ func TestPlugin_Score(t *testing.T) { }) } } + +func TestScoreWithAmplifiedCPUs(t *testing.T) { + tests := []struct { + name string + args schedulerconfig.NodeNUMAResourceArgs + requestedPod *corev1.Pod + nodes []*corev1.Node + existingPods []*corev1.Pod + cpuTopologies map[string]*CPUTopology + nodeHasNRT []string + nodeRatios map[string]extension.Ratio + wantScoreList framework.NodeScoreList + }{ + { + name: "ScoringStrategy MostAllocated, no cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 0}, {Name: "node2", Score: 0}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.MostAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy MostAllocated, cpuset pods on node", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + nodeHasNRT: []string{"node1", "node2"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + }, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 54}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.MostAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy MostAllocated, scheduling cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + nodeHasNRT: []string{"node1", "node2"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + }, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", false), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", false), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 37}, {Name: "node2", Score: 29}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.MostAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy MostAllocated, cpuset pods on node, scheduling cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + nodeHasNRT: []string{"node1", "node2"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + }, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 68}, {Name: "node2", Score: 60}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.MostAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy LeastAllocated, no cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", false), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", false), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 0}, {Name: "node2", Score: 0}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.LeastAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy LeastAllocated, cpuset pods on node", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, false), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + nodeHasNRT: []string{"node1", "node2"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + }, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 45}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.LeastAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy LeastAllocated, scheduling cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + nodeHasNRT: []string{"node1", "node2"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + }, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", false), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", false), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 62}, {Name: "node2", Score: 70}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.LeastAllocated, + Resources: defaultResources, + }, + }, + }, + { + name: "ScoringStrategy LeastAllocated, cpuset pods on node,scheduling cpuset pod", + requestedPod: makePod(map[corev1.ResourceName]string{"cpu": "8", "memory": "16Gi"}, true), + nodes: []*corev1.Node{ + makeNode("node1", map[corev1.ResourceName]string{"cpu": "32", "memory": "40Gi"}, 1.0), + makeNode("node2", map[corev1.ResourceName]string{"cpu": "64", "memory": "60Gi"}, 2.0), + }, + nodeRatios: map[string]apiext.Ratio{"node1": 1.0, "node2": 2.0}, + nodeHasNRT: []string{"node1", "node2"}, + cpuTopologies: map[string]*CPUTopology{ + "node1": buildCPUTopologyForTest(2, 1, 8, 2), + "node2": buildCPUTopologyForTest(2, 1, 8, 2), + }, + existingPods: []*corev1.Pod{ + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node1", true), + makePodOnNode(map[corev1.ResourceName]string{"cpu": "20", "memory": "4Gi"}, "node2", true), + }, + wantScoreList: []framework.NodeScore{{Name: "node1", Score: 31}, {Name: "node2", Score: 39}}, + args: schedulerconfig.NodeNUMAResourceArgs{ + ScoringStrategy: &schedulerconfig.ScoringStrategy{ + Type: schedulerconfig.LeastAllocated, + Resources: defaultResources, + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + suit := newPluginTestSuit(t, tt.existingPods, tt.nodes) + suit.nodeNUMAResourceArgs.ScoringStrategy = tt.args.ScoringStrategy + p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) + assert.NoError(t, err) + suit.start() + + pl := p.(*Plugin) + + for _, nodeName := range tt.nodeHasNRT { + cpuTopology := tt.cpuTopologies[nodeName] + if cpuTopology == nil { + continue + } + ratio := tt.nodeRatios[nodeName] + if ratio == 0 { + ratio = 1 + } + topologyOptions := TopologyOptions{ + CPUTopology: cpuTopology, + } + for i := 0; i < cpuTopology.NumNodes; i++ { + topologyOptions.NUMANodeResources = append(topologyOptions.NUMANodeResources, NUMANodeResource{ + Node: i, + Resources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", extension.Amplify(int64(cpuTopology.CPUsPerNode()), ratio))), + corev1.ResourceMemory: resource.MustParse("20Gi"), + }}) + } + pl.topologyOptionsManager.UpdateTopologyOptions(nodeName, func(options *TopologyOptions) { + *options = topologyOptions + }) + } + + handler := &podEventHandler{resourceManager: pl.resourceManager} + for _, v := range tt.existingPods { + handler.OnAdd(v) + } + + state := framework.NewCycleState() + _, status := pl.PreFilter(context.TODO(), state, tt.requestedPod) + assert.True(t, status.IsSuccess()) + + var gotScoreList framework.NodeScoreList + for _, n := range tt.nodes { + score, status := p.(framework.ScorePlugin).Score(context.TODO(), state, tt.requestedPod, n.Name) + if !status.IsSuccess() { + t.Errorf("unexpected error: %v", status) + } + gotScoreList = append(gotScoreList, framework.NodeScore{Name: n.Name, Score: score}) + } + assert.Equal(t, tt.wantScoreList, gotScoreList) + }) + } +} diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_service.go b/pkg/scheduler/plugins/nodenumaresource/service.go similarity index 100% rename from pkg/scheduler/plugins/nodenumaresource/plugin_service.go rename to pkg/scheduler/plugins/nodenumaresource/service.go diff --git a/pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go b/pkg/scheduler/plugins/nodenumaresource/service_test.go similarity index 98% rename from pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go rename to pkg/scheduler/plugins/nodenumaresource/service_test.go index ef9fe808e..ff32f93cc 100644 --- a/pkg/scheduler/plugins/nodenumaresource/plugin_service_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/service_test.go @@ -44,7 +44,7 @@ func TestEndpointsQueryNode(t *testing.T) { extension.SetNodeResourceAmplificationRatios(node, map[corev1.ResourceName]extension.Ratio{ corev1.ResourceCPU: 1.5, }) - suit := newPluginTestSuit(t, []*corev1.Node{node}) + suit := newPluginTestSuit(t, nil, []*corev1.Node{node}) plugin, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NoError(t, err) assert.NotNil(t, plugin) diff --git a/pkg/scheduler/plugins/nodenumaresource/topology_hint_test.go b/pkg/scheduler/plugins/nodenumaresource/topology_hint_test.go index 44f159a50..5f7ed9a4c 100644 --- a/pkg/scheduler/plugins/nodenumaresource/topology_hint_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/topology_hint_test.go @@ -44,7 +44,7 @@ func TestReserveByNUMANode(t *testing.T) { }, }, } - suit := newPluginTestSuit(t, []*corev1.Node{node}) + suit := newPluginTestSuit(t, nil, []*corev1.Node{node}) p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle) assert.NoError(t, err) diff --git a/pkg/scheduler/plugins/nodenumaresource/topology_options_test.go b/pkg/scheduler/plugins/nodenumaresource/topology_options_test.go index 30954e23a..42adfd7ed 100644 --- a/pkg/scheduler/plugins/nodenumaresource/topology_options_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/topology_options_test.go @@ -32,7 +32,7 @@ import ( ) func TestTopologyOptionsManager(t *testing.T) { - suit := newPluginTestSuit(t, nil) + suit := newPluginTestSuit(t, nil, nil) expectCPUTopology := buildCPUTopologyForTest(2, 1, 4, 2)