Skip to content

Commit

Permalink
koord-scheduler: NodeNUMAResource supports filtering/scoring with nod…
Browse files Browse the repository at this point in the history
…e-level amplification ratios

Signed-off-by: Joseph <[email protected]>
  • Loading branch information
eahydra committed Sep 21, 2023
1 parent f3b3eb4 commit efc7a9d
Show file tree
Hide file tree
Showing 10 changed files with 521 additions and 35 deletions.
53 changes: 49 additions & 4 deletions pkg/scheduler/plugins/nodenumaresource/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ const (
)

const (
ErrNotFoundCPUTopology = "node(s) CPU Topology not found"
ErrInvalidCPUTopology = "node(s) invalid CPU Topology"
ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core"
ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy"
ErrNotFoundCPUTopology = "node(s) CPU Topology not found"
ErrInvalidCPUTopology = "node(s) invalid CPU Topology"
ErrSMTAlignmentError = "node(s) requested cpus not multiple cpus per core"
ErrRequiredFullPCPUsPolicy = "node(s) required FullPCPUs policy"
ErrInsufficientAmplifiedCPU = "Insufficient amplified cpu"
)

var (
Expand Down Expand Up @@ -267,6 +268,10 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return status
}

if status := p.filterAmplifiedCPUs(state, nodeInfo); !status.IsSuccess() {
return status
}

node := nodeInfo.Node()
topologyOptions := p.topologyOptionsManager.GetTopologyOptions(node.Name)
numaTopologyPolicy := getNUMATopologyPolicy(node.Labels, topologyOptions.NUMATopologyPolicy)
Expand Down Expand Up @@ -317,6 +322,46 @@ func (p *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, p
return nil
}

func (p *Plugin) filterAmplifiedCPUs(state *preFilterState, nodeInfo *framework.NodeInfo) *framework.Status {
quantity := state.requests[corev1.ResourceCPU]
podRequestMilliCPU := quantity.MilliValue()
if podRequestMilliCPU == 0 {
return nil
}

node := nodeInfo.Node()
ratios, err := extension.GetNodeResourceAmplificationRatios(node.Annotations)
if err != nil {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, "node(s) invalid amplification ratios")
}
cpuAmplificationRatio := ratios[corev1.ResourceCPU]
if cpuAmplificationRatio <= 1 {
return nil
}

if state.requestCPUBind {
podRequestMilliCPU = extension.Amplify(podRequestMilliCPU, cpuAmplificationRatio)
}

// TODO(joseph): Reservations and preemption should be considered here.
_, allocated, _ := p.resourceManager.GetAvailableCPUs(node.Name, cpuset.CPUSet{})
if err != nil {
if err.Error() != ErrNotFoundCPUTopology {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
}
allocatedMilliCPU := int64(allocated.CPUs().Size() * 1000)
requestedMilliCPU := nodeInfo.Requested.MilliCPU
if requestedMilliCPU >= allocatedMilliCPU && allocatedMilliCPU > 0 {
requestedMilliCPU = requestedMilliCPU - allocatedMilliCPU
requestedMilliCPU += extension.Amplify(allocatedMilliCPU, cpuAmplificationRatio)
}
if podRequestMilliCPU > nodeInfo.Allocatable.MilliCPU-requestedMilliCPU {
return framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU)
}
return nil
}

func (p *Plugin) Reserve(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
state, status := getPreFilterState(cycleState)
if !status.IsSuccess() {
Expand Down
160 changes: 150 additions & 10 deletions pkg/scheduler/plugins/nodenumaresource/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nodenumaresource

import (
"context"
"fmt"
"reflect"
"testing"

Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
apiresource "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/queuesort"
Expand Down Expand Up @@ -106,6 +108,31 @@ func (f *testSharedLister) Get(nodeName string) (*framework.NodeInfo, error) {
return f.nodeInfoMap[nodeName], nil
}

func makeNode(name string, capacity map[corev1.ResourceName]string, cpuAmpRatio extension.Ratio) *corev1.Node {
node := schedulertesting.MakeNode().Name(name).Capacity(capacity).Obj()
_, _ = extension.SetNodeResourceAmplificationRatio(node, corev1.ResourceCPU, cpuAmpRatio)
return node
}

func makePodOnNode(request map[corev1.ResourceName]string, node string, isCPUSet bool) *corev1.Pod {
pod := schedulertesting.MakePod().Req(request).Node(node).Priority(extension.PriorityProdValueMax).Obj()
if isCPUSet {
pod.Labels = map[string]string{
extension.LabelPodQoS: string(extension.QoSLSR),
}
if node != "" {
reqs, _ := apiresource.PodRequestsAndLimits(pod)
val := reqs.Cpu().MilliValue() / 1000
_ = extension.SetResourceStatus(pod, &extension.ResourceStatus{CPUSet: fmt.Sprintf("0-%d", val-1)})
}
}
return pod
}

func makePod(request map[corev1.ResourceName]string, isCPUSet bool) *corev1.Pod {
return makePodOnNode(request, "", isCPUSet)
}

type frameworkHandleExtender struct {
frameworkext.FrameworkExtender
*nrtfake.Clientset
Expand All @@ -121,7 +148,7 @@ type pluginTestSuit struct {
nodeNUMAResourceArgs *schedulingconfig.NodeNUMAResourceArgs
}

func newPluginTestSuit(t *testing.T, nodes []*corev1.Node) *pluginTestSuit {
func newPluginTestSuit(t *testing.T, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit {
var v1beta2args v1beta2.NodeNUMAResourceArgs
v1beta2.SetDefaults_NodeNUMAResourceArgs(&v1beta2args)
var nodeNUMAResourceArgs schedulingconfig.NodeNUMAResourceArgs
Expand Down Expand Up @@ -155,7 +182,7 @@ func newPluginTestSuit(t *testing.T, nodes []*corev1.Node) *pluginTestSuit {
assert.NoError(t, err)
}
informerFactory := informers.NewSharedInformerFactory(cs, 0)
snapshot := newTestSharedLister(nil, nodes)
snapshot := newTestSharedLister(pods, nodes)
fh, err := schedulertesting.NewFramework(
registeredPlugins,
"koord-scheduler",
Expand Down Expand Up @@ -185,7 +212,7 @@ func (p *pluginTestSuit) start() {
}

func TestNew(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NotNil(t, p)
assert.Nil(t, err)
Expand Down Expand Up @@ -490,7 +517,7 @@ func TestPlugin_PreFilter(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
if tt.defaultBindPolicy != "" {
suit.nodeNUMAResourceArgs.DefaultCPUBindPolicy = tt.defaultBindPolicy
}
Expand Down Expand Up @@ -734,7 +761,7 @@ func TestPlugin_Filter(t *testing.T) {
nodes[0].Annotations[k] = v
}

suit := newPluginTestSuit(t, nodes)
suit := newPluginTestSuit(t, nil, nodes)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NotNil(t, p)
assert.Nil(t, err)
Expand Down Expand Up @@ -784,6 +811,119 @@ func TestPlugin_Filter(t *testing.T) {
}
}

func TestFilterWithAmplifiedCPUs(t *testing.T) {
tests := []struct {
name string
pod *corev1.Pod
existingPods []*corev1.Pod
cpuTopology *CPUTopology
nodeHasNRT bool
nodeCPUAmplificationRatio extension.Ratio
wantStatus *framework.Status
}{
{
name: "no resources requested always fits",
pod: &corev1.Pod{},
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "4"}, "node-1", false)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeCPUAmplificationRatio: 2.0,
},
{
name: "no filtering without node cpu amplification",
pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false),
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", false)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeCPUAmplificationRatio: 1.0,
},
{
name: "cpu fits on no NRT node",
pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false),
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", false)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeCPUAmplificationRatio: 2.0,
},
{
name: "insufficient cpu",
pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false),
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "64"}, "node-1", false)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeCPUAmplificationRatio: 2.0,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU),
},
{
name: "insufficient cpu with cpuset pod on node",
pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, false),
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", true)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeHasNRT: true,
nodeCPUAmplificationRatio: 2.0,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU),
},
{
name: "insufficient cpu when scheduling cpuset pod",
pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, true),
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", false)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeHasNRT: true,
nodeCPUAmplificationRatio: 2.0,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU),
},
{
name: "insufficient cpu when scheduling cpuset pod with cpuset pod on node",
pod: makePod(map[corev1.ResourceName]string{"cpu": "32"}, true),
existingPods: []*corev1.Pod{makePodOnNode(map[corev1.ResourceName]string{"cpu": "32"}, "node-1", true)},
cpuTopology: buildCPUTopologyForTest(2, 1, 8, 2),
nodeHasNRT: true,
nodeCPUAmplificationRatio: 2.0,
wantStatus: framework.NewStatus(framework.Unschedulable, ErrInsufficientAmplifiedCPU),
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
numCPUs := tt.cpuTopology.NumCPUs
cpu := fmt.Sprintf("%d", extension.Amplify(int64(numCPUs), tt.nodeCPUAmplificationRatio))
node := makeNode("node-1", map[corev1.ResourceName]string{"cpu": cpu, "memory": "40Gi"}, tt.nodeCPUAmplificationRatio)
suit := newPluginTestSuit(t, tt.existingPods, []*corev1.Node{node})

p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NoError(t, err)
suit.start()
pl := p.(*Plugin)

if tt.nodeHasNRT {
topologyOptions := TopologyOptions{
CPUTopology: tt.cpuTopology,
}
for i := 0; i < tt.cpuTopology.NumNodes; i++ {
topologyOptions.NUMANodeResources = append(topologyOptions.NUMANodeResources, NUMANodeResource{
Node: i,
Resources: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(fmt.Sprintf("%d", extension.Amplify(int64(tt.cpuTopology.CPUsPerNode()), tt.nodeCPUAmplificationRatio))),
corev1.ResourceMemory: resource.MustParse("20Gi"),
}})
}
pl.topologyOptionsManager.UpdateTopologyOptions(node.Name, func(options *TopologyOptions) {
*options = topologyOptions
})
}
handler := &podEventHandler{resourceManager: pl.resourceManager}
for _, v := range tt.existingPods {
handler.OnAdd(v)
}

cycleState := framework.NewCycleState()
_, preFilterStatus := pl.PreFilter(context.TODO(), cycleState, tt.pod)
assert.True(t, preFilterStatus.IsSuccess())

nodeInfo, err := suit.Handle.SnapshotSharedLister().NodeInfos().Get("node-1")
assert.NoError(t, err)
gotStatus := pl.Filter(context.TODO(), cycleState, tt.pod, nodeInfo)
assert.Equal(t, tt.wantStatus, gotStatus)
})
}
}

func TestPlugin_Reserve(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -937,7 +1077,7 @@ func TestPlugin_Reserve(t *testing.T) {
nodes[0].Labels[k] = v
}

suit := newPluginTestSuit(t, nodes)
suit := newPluginTestSuit(t, nil, nodes)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NotNil(t, p)
assert.Nil(t, err)
Expand Down Expand Up @@ -1047,7 +1187,7 @@ func TestPlugin_Unreserve(t *testing.T) {
}

func TestPlugin_PreBind(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NotNil(t, p)
assert.Nil(t, err)
Expand Down Expand Up @@ -1089,7 +1229,7 @@ func TestPlugin_PreBind(t *testing.T) {
}

func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NotNil(t, p)
assert.Nil(t, err)
Expand Down Expand Up @@ -1139,7 +1279,7 @@ func TestPlugin_PreBindWithCPUBindPolicyNone(t *testing.T) {
}

func TestPlugin_PreBindReservation(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NotNil(t, p)
assert.Nil(t, err)
Expand Down Expand Up @@ -1181,7 +1321,7 @@ func TestPlugin_PreBindReservation(t *testing.T) {
}

func TestRestoreReservation(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
p, err := suit.proxyNew(suit.nodeNUMAResourceArgs, suit.Handle)
assert.NoError(t, err)
pl := p.(*Plugin)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/nodenumaresource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ func (c *resourceManager) GetAvailableCPUs(nodeName string, preferredCPUs cpuset
return cpuset.NewCPUSet(), nil, errors.New(ErrNotFoundCPUTopology)
}
if !topologyOptions.CPUTopology.IsValid() {
return cpuset.NewCPUSet(), nil, fmt.Errorf("cpuTopology is invalid")
return cpuset.NewCPUSet(), nil, errors.New(ErrInvalidCPUTopology)
}

allocation := c.getOrCreateNodeAllocation(nodeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ func TestResourceManagerAllocate(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
tom := NewTopologyOptionsManager()
tom.UpdateTopologyOptions("test-node", func(options *TopologyOptions) {
options.CPUTopology = buildCPUTopologyForTest(2, 1, 26, 2)
Expand Down Expand Up @@ -879,7 +879,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
suit := newPluginTestSuit(t, nil)
suit := newPluginTestSuit(t, nil, nil)
tom := NewTopologyOptionsManager()
tom.UpdateTopologyOptions("test-node", func(options *TopologyOptions) {
options.CPUTopology = buildCPUTopologyForTest(2, 1, 26, 2)
Expand Down
Loading

0 comments on commit efc7a9d

Please sign in to comment.