diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go index 54cfff32d..c7e580bbb 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/client-go/tools/cache" + corehelper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/framework" schedulingconfig "github.com/koordinator-sh/koordinator/pkg/scheduler/apis/config" @@ -128,15 +129,45 @@ func (c *resourceManager) GetTopologyHints(node *corev1.Node, pod *corev1.Pod, o if err != nil { return nil, err } - - result := generateResourceHints(topologyOptions.NUMANodeResources, options.requests, totalAvailable, options.numaScorer) - hints := make(map[string][]topologymanager.NUMATopologyHint) - for k, v := range result { - hints[k] = v + if err := c.trimNUMANodeResources(node.Name, totalAvailable, options); err != nil { + return nil, err } + + hints := generateResourceHints(topologyOptions.NUMANodeResources, options.requests, totalAvailable, options.numaScorer) return hints, nil } +func (c *resourceManager) trimNUMANodeResources(nodeName string, totalAvailable map[int]corev1.ResourceList, options *ResourceOptions) error { + if !options.requiredCPUBindPolicy { + return nil + } + availableCPUs, _, err := c.GetAvailableCPUs(nodeName, options.preferredCPUs) + if err != nil { + return err + } + cpuDetails := options.topologyOptions.CPUTopology.CPUDetails.KeepOnly(availableCPUs) + for numaNode, available := range totalAvailable { + cpuQuantity := available[corev1.ResourceCPU] + if cpuQuantity.IsZero() { + continue + } + availableCPUs := cpuDetails.CPUsInNUMANodes(numaNode) + if int64(availableCPUs.Size()*1000) >= cpuQuantity.MilliValue() { + availableCPUs = filterCPUsByRequiredCPUBindPolicy( + options.cpuBindPolicy, + availableCPUs, + cpuDetails, + options.topologyOptions.CPUTopology.CPUsPerCore(), + ) + } + if int64(availableCPUs.Size())*1000 < cpuQuantity.MilliValue() { + cpuQuantity.SetMilli(int64(availableCPUs.Size() * 1000)) + available[corev1.ResourceCPU] = cpuQuantity + } + } + return nil +} + func (c *resourceManager) Allocate(node *corev1.Node, pod *corev1.Pod, options *ResourceOptions) (*PodAllocation, error) { allocation := &PodAllocation{ UID: pod.UID, @@ -249,7 +280,12 @@ func (c *resourceManager) allocateCPUSet(node *corev1.Node, pod *corev1.Pod, all topologyOptions := &options.topologyOptions if options.requiredCPUBindPolicy { cpuDetails := topologyOptions.CPUTopology.CPUDetails.KeepOnly(availableCPUs) - availableCPUs = filterAvailableCPUsByRequiredCPUBindPolicy(options.cpuBindPolicy, availableCPUs, cpuDetails, topologyOptions.CPUTopology.CPUsPerCore()) + availableCPUs = filterCPUsByRequiredCPUBindPolicy( + options.cpuBindPolicy, + availableCPUs, + cpuDetails, + topologyOptions.CPUTopology.CPUsPerCore(), + ) } if availableCPUs.Size() < options.numCPUsNeeded { @@ -380,21 +416,25 @@ func (c *resourceManager) getAvailableNUMANodeResources(nodeName string, topolog } func generateResourceHints(numaNodeResources []NUMANodeResource, podRequests corev1.ResourceList, totalAvailable map[int]corev1.ResourceList, numaScorer *resourceAllocationScorer) map[string][]topologymanager.NUMATopologyHint { - // Initialize minAffinitySize to include all NUMA Cells. - minAffinitySizeMap := map[corev1.ResourceName]*int{} + generator := hintsGenerator{ + minAffinitySize: make(map[corev1.ResourceName]int), + hints: map[string][]topologymanager.NUMATopologyHint{}, + } + var memoryResourceNames []corev1.ResourceName for resourceName := range podRequests { - size := len(numaNodeResources) - minAffinitySizeMap[resourceName] = &size + generator.minAffinitySize[resourceName] = len(numaNodeResources) + if resourceName == corev1.ResourceMemory || corehelper.IsHugePageResourceName(resourceName) { + memoryResourceNames = append(memoryResourceNames, resourceName) + } } - hints := map[string][]topologymanager.NUMATopologyHint{} - numaNodes := make([]int, 0, len(numaNodeResources)) for _, v := range numaNodeResources { numaNodes = append(numaNodes, v.Node) } podRequestResources := framework.NewResource(podRequests) + totalResourceNames := sets.NewString() bitmask.IterateBitMasks(numaNodes, func(mask bitmask.BitMask) { maskBits := mask.GetBits() available := make(corev1.ResourceList) @@ -415,55 +455,112 @@ func generateResourceHints(numaNodeResources []NUMANodeResource, podRequests cor score, _ = numaScorer.score(framework.NewResource(requested), framework.NewResource(total), podRequestResources) } - for resourceName, request := range podRequests { - minAffinitySize := minAffinitySizeMap[resourceName] - if !shouldGenerateHint(total[resourceName], available[resourceName], request, mask.Count(), minAffinitySize) { - continue + // verify that for all memory types the node mask has enough allocatable resources + generator.generateHints(mask, score, total, available, podRequests, memoryResourceNames...) + + for resourceName := range podRequests { + if _, ok := total[resourceName]; ok { + totalResourceNames.Insert(string(resourceName)) } - if _, ok := hints[string(resourceName)]; !ok { - hints[string(resourceName)] = []topologymanager.NUMATopologyHint{} + if resourceName == corev1.ResourceMemory || corehelper.IsHugePageResourceName(resourceName) { + continue } - hints[string(resourceName)] = append(hints[string(resourceName)], topologymanager.NUMATopologyHint{ - NUMANodeAffinity: mask, - Preferred: false, - Score: score, - }) + generator.generateHints(mask, score, total, available, podRequests, resourceName) } }) // update hints preferred according to multiNUMAGroups, in case when it wasn't provided, the default // behavior to prefer the minimal amount of NUMA nodes will be used for resourceName := range podRequests { - minAffinitySize := *minAffinitySizeMap[resourceName] - for i, hint := range hints[string(resourceName)] { - hints[string(resourceName)][i].Preferred = len(hint.NUMANodeAffinity.GetBits()) == minAffinitySize + minAffinitySize := generator.minAffinitySize[resourceName] + for i, hint := range generator.hints[string(resourceName)] { + generator.hints[string(resourceName)][i].Preferred = len(hint.NUMANodeAffinity.GetBits()) == minAffinitySize } } - return hints + for resourceName := range podRequests { + if totalResourceNames.Has(string(resourceName)) { + hints := generator.hints[string(resourceName)] + if hints == nil { + // no possible NUMA affinities for resource + hints = []topologymanager.NUMATopologyHint{} + generator.hints[string(resourceName)] = hints + } + } + } + return generator.hints } -func shouldGenerateHint(total resource.Quantity, available resource.Quantity, request resource.Quantity, nodeCount int, minAffinitySize *int) bool { - if total.Cmp(request) < 0 { - return false +type hintsGenerator struct { + minAffinitySize map[corev1.ResourceName]int + hints map[string][]topologymanager.NUMATopologyHint +} + +func (g *hintsGenerator) generateHints(mask bitmask.BitMask, score int64, totalAllocatable, totalFree corev1.ResourceList, podRequests corev1.ResourceList, resourceNames ...corev1.ResourceName) { + for _, resourceName := range resourceNames { + total, request := totalAllocatable[resourceName], podRequests[resourceName] + if total.Cmp(request) < 0 { + return + } } - if nodeCount < *minAffinitySize { - *minAffinitySize = nodeCount + + nodeCount := mask.Count() + for _, resourceName := range resourceNames { + affinitySize := g.minAffinitySize[resourceName] + if nodeCount < affinitySize { + g.minAffinitySize[resourceName] = nodeCount + } + } + + for _, resourceName := range resourceNames { + free, request := totalFree[resourceName], podRequests[resourceName] + if free.Cmp(request) < 0 { + return + } } - if available.Cmp(request) < 0 { - return false + + for _, resourceName := range resourceNames { + if _, ok := g.hints[string(resourceName)]; !ok { + g.hints[string(resourceName)] = []topologymanager.NUMATopologyHint{} + } + g.hints[string(resourceName)] = append(g.hints[string(resourceName)], topologymanager.NUMATopologyHint{ + NUMANodeAffinity: mask, + Preferred: false, + Score: score, + }) } - return true } -func filterAvailableCPUsByRequiredCPUBindPolicy(policy schedulingconfig.CPUBindPolicy, availableCPUs cpuset.CPUSet, cpuDetails CPUDetails, cpusPerCore int) cpuset.CPUSet { - if policy == schedulingconfig.CPUBindPolicyFullPCPUs { - cpuDetails.KeepOnly(availableCPUs) - cpus := cpuDetails.CPUsInCores(cpuDetails.Cores().ToSliceNoSort()...) - if cpus.Size()%cpusPerCore != 0 { - return availableCPUs +func filterCPUsByRequiredCPUBindPolicy(policy schedulingconfig.CPUBindPolicy, availableCPUs cpuset.CPUSet, cpuDetails CPUDetails, cpusPerCore int) cpuset.CPUSet { + builder := cpuset.NewCPUSetBuilder() + cpuDetails = cpuDetails.KeepOnly(availableCPUs) + switch policy { + case schedulingconfig.CPUBindPolicyFullPCPUs: + for _, core := range cpuDetails.Cores().ToSliceNoSort() { + cpus := cpuDetails.CPUsInCores(core) + if cpus.Size() == cpusPerCore { + builder.Add(cpus.ToSliceNoSort()...) + } + } + availableCPUs = builder.Result() + case schedulingconfig.CPUBindPolicySpreadByPCPUs: + for _, core := range cpuDetails.Cores().ToSliceNoSort() { + // TODO(joseph): Maybe we should support required exclusive policy as following + // allocated := allocatedCPUs.CPUsInCores(core) + // if allocated.Size() > 0 { + // cpuInfo := allocatedCPUs[allocated.ToSliceNoSort()[0]] + // if cpuInfo.ExclusivePolicy != "" && + // cpuInfo.ExclusivePolicy != schedulingconfig.CPUExclusivePolicyNone && + // cpuInfo.ExclusivePolicy == exclusivePolicy { + // continue + // } + // } + + // Using only one CPU per core ensures correct hints are generated + cpus := cpuDetails.CPUsInCores(core).ToSlice() + builder.Add(cpus[0]) } - return cpus + availableCPUs = builder.Result() } return availableCPUs } diff --git a/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go b/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go index 582c09398..9ec9215e5 100644 --- a/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go +++ b/pkg/scheduler/plugins/nodenumaresource/resource_manager_test.go @@ -722,22 +722,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { }, }, want: map[string][]topologymanager.NUMATopologyHint{ - string(corev1.ResourceCPU): { - { - NUMANodeAffinity: func() bitmask.BitMask { - mask, _ := bitmask.NewBitMask(0) - return mask - }(), - Preferred: true, - }, - { - NUMANodeAffinity: func() bitmask.BitMask { - mask, _ := bitmask.NewBitMask(0, 1) - return mask - }(), - Preferred: false, - }, - }, + string(corev1.ResourceCPU): {}, }, wantErr: false, }, @@ -865,22 +850,7 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { }, }, want: map[string][]topologymanager.NUMATopologyHint{ - string(corev1.ResourceCPU): { - { - NUMANodeAffinity: func() bitmask.BitMask { - mask, _ := bitmask.NewBitMask(0) - return mask - }(), - Preferred: true, - }, - { - NUMANodeAffinity: func() bitmask.BitMask { - mask, _ := bitmask.NewBitMask(0, 1) - return mask - }(), - Preferred: false, - }, - }, + string(corev1.ResourceCPU): {}, }, wantErr: false, }, @@ -933,6 +903,69 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { }, }, }, + { + name: "failed to generate hints with insufficient memory and hugepages", + pod: &corev1.Pod{}, + options: &ResourceOptions{ + numCPUsNeeded: 4, + requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("4Gi"), + corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"), + }, + }, + allocated: &PodAllocation{ + UID: "123456", + Name: "test-xxx", + Namespace: "default", + NUMANodeResources: []NUMANodeResource{ + { + Node: 0, + Resources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("120Gi"), + corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"), + }, + }, + { + Node: 1, + Resources: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("120Gi"), + corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"), + }, + }, + }, + }, + want: map[string][]topologymanager.NUMATopologyHint{ + string(corev1.ResourceCPU): { + { + NUMANodeAffinity: func() bitmask.BitMask { + mask, _ := bitmask.NewBitMask(0) + return mask + }(), + Preferred: true, + }, + { + NUMANodeAffinity: func() bitmask.BitMask { + mask, _ := bitmask.NewBitMask(1) + return mask + }(), + Preferred: true, + }, + { + NUMANodeAffinity: func() bitmask.BitMask { + mask, _ := bitmask.NewBitMask(0, 1) + return mask + }(), + Preferred: false, + }, + }, + string(corev1.ResourceMemory): {}, + corev1.ResourceHugePagesPrefix + "1Gi": {}, + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -944,15 +977,17 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { { Node: 0, Resources: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("52"), - corev1.ResourceMemory: resource.MustParse("128Gi"), + corev1.ResourceCPU: resource.MustParse("52"), + corev1.ResourceMemory: resource.MustParse("128Gi"), + corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"), }, }, { Node: 1, Resources: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("52"), - corev1.ResourceMemory: resource.MustParse("128Gi"), + corev1.ResourceCPU: resource.MustParse("52"), + corev1.ResourceMemory: resource.MustParse("128Gi"), + corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("4Gi"), }, }, } @@ -963,8 +998,9 @@ func TestResourceManagerGetTopologyHint(t *testing.T) { }, Status: corev1.NodeStatus{ Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("104"), - corev1.ResourceMemory: resource.MustParse("256Gi"), + corev1.ResourceCPU: resource.MustParse("104"), + corev1.ResourceMemory: resource.MustParse("256Gi"), + corev1.ResourceHugePagesPrefix + "1Gi": resource.MustParse("8Gi"), }, }, }