Skip to content

Commit

Permalink
scheduler: support numa topology policy on pod (#1939)
Browse files Browse the repository at this point in the history
Signed-off-by: KunWuLuan <[email protected]>
  • Loading branch information
KunWuLuan authored May 20, 2024
1 parent afa430a commit eba6094
Show file tree
Hide file tree
Showing 21 changed files with 475 additions and 40 deletions.
40 changes: 40 additions & 0 deletions apis/extension/numa_aware.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ import (

// Defines the pod level annotations and labels
const (
// AnnotationNUMATopologySpec represents numa allocation API defined by Koordinator.
// The user specifies the desired numa policy by setting the annotation.
AnnotationNUMATopologySpec = SchedulingDomainPrefix + "/numa-topology-spec"
// AnnotationResourceSpec represents resource allocation API defined by Koordinator.
// The user specifies the desired CPU orchestration policy by setting the annotation.
AnnotationResourceSpec = SchedulingDomainPrefix + "/resource-spec"
Expand Down Expand Up @@ -67,6 +70,15 @@ type ResourceSpec struct {
PreferredCPUExclusivePolicy CPUExclusivePolicy `json:"preferredCPUExclusivePolicy,omitempty"`
}

type NUMATopologySpec struct {
// NUMATopologyPolicy represents the numa topology policy when schedule pod
NUMATopologyPolicy NUMATopologyPolicy `json:"numaTopologyPolicy,omitempty"`
// SingleNUMANodeExclusive represents whether a Pod that will use a single NUMA node/multiple NUMA nodes
// on a NUMA node can be scheduled to use the NUMA node when another Pod that uses multiple NUMA nodes/a single NUMA node
// is already running on the same node.
SingleNUMANodeExclusive NumaTopologyExclusive `json:"singleNUMANodeExclusive,omitempty"`
}

// ResourceStatus describes resource allocation result, such as how to bind CPU.
type ResourceStatus struct {
// CPUSet represents the allocated CPUs. It is Linux CPU list formatted string.
Expand Down Expand Up @@ -135,6 +147,21 @@ const (
NodeNUMAAllocateStrategyMostAllocated = NUMAMostAllocated
)

type NumaTopologyExclusive string

const (
NumaTopologyExclusivePreferred NumaTopologyExclusive = "Preferred"
NumaTopologyExclusiveRequired NumaTopologyExclusive = "Required"
)

type NumaNodeStatus string

const (
NumaNodeStatusIdle NumaNodeStatus = "idle"
NumaNodeStatusShared NumaNodeStatus = "shared"
NumaNodeStatusSingle NumaNodeStatus = "single"
)

type NUMATopologyPolicy string

const (
Expand Down Expand Up @@ -187,6 +214,19 @@ type KubeletCPUManagerPolicy struct {
ReservedCPUs string `json:"reservedCPUs,omitempty"`
}

func GetNUMATopologySpec(annotations map[string]string) (*NUMATopologySpec, error) {
numaSpec := &NUMATopologySpec{}
data, ok := annotations[AnnotationNUMATopologySpec]
if !ok {
return numaSpec, nil
}
err := json.Unmarshal([]byte(data), numaSpec)
if err != nil {
return nil, err
}
return numaSpec, nil
}

// GetResourceSpec parses ResourceSpec from annotations
func GetResourceSpec(annotations map[string]string) (*ResourceSpec, error) {
resourceSpec := &ResourceSpec{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/frameworkext/framework_extender.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,8 +446,8 @@ func (ext *frameworkExtenderImpl) ForgetPod(pod *corev1.Pod) error {
return nil
}

func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status {
return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType)
func (ext *frameworkExtenderImpl) RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status {
return ext.topologyManager.Admit(ctx, cycleState, pod, nodeName, numaNodes, policyType, exclusivePolicy, allNUMANodeStatus)
}

func (ext *frameworkExtenderImpl) GetNUMATopologyHintProvider() []topologymanager.NUMATopologyHintProvider {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/frameworkext/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type FrameworkExtender interface {
RunReservationFilterPlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfo *ReservationInfo, nodeName string) *framework.Status
RunReservationScorePlugins(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, reservationInfos []*ReservationInfo, nodeName string) (PluginToReservationScores, *framework.Status)

RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status
RunNUMATopologyManagerAdmit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status

RunResizePod(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/scheduler/frameworkext/topologymanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
)

type Interface interface {
Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status
Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status
}

type NUMATopologyHintProvider interface {
Expand Down Expand Up @@ -55,7 +55,7 @@ func New(hintProviderFactory NUMATopologyHintProviderFactory) Interface {
}
}

func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy) *framework.Status {
func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.CycleState, pod *corev1.Pod, nodeName string, numaNodes []int, policyType apiext.NUMATopologyPolicy, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) *framework.Status {
s, err := cycleState.Read(affinityStateKey)
if err != nil {
return framework.AsStatus(err)
Expand All @@ -64,7 +64,7 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle

policy := createNUMATopologyPolicy(policyType, numaNodes)

bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName)
bestHint, admit := m.calculateAffinity(ctx, cycleState, policy, pod, nodeName, exclusivePolicy, allNUMANodeStatus)
klog.V(5).Infof("Best TopologyHint for (pod: %v): %v on node: %v", klog.KObj(pod), bestHint, nodeName)
if !admit {
return framework.NewStatus(framework.Unschedulable, "node(s) NUMA Topology affinity error")
Expand All @@ -79,9 +79,13 @@ func (m *topologyManager) Admit(ctx context.Context, cycleState *framework.Cycle
return nil
}

func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string) (NUMATopologyHint, bool) {
func (m *topologyManager) calculateAffinity(ctx context.Context, cycleState *framework.CycleState, policy Policy, pod *corev1.Pod, nodeName string, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
providersHints := m.accumulateProvidersHints(ctx, cycleState, pod, nodeName)
bestHint, admit := policy.Merge(providersHints)
bestHint, admit := policy.Merge(providersHints, exclusivePolicy, allNUMANodeStatus)
if !checkExclusivePolicy(bestHint, exclusivePolicy, allNUMANodeStatus) {
klog.V(5).Infof("bestHint violated the exclusivePolicy requirement: bestHint: %v, policy: %v, numaStatus: %v, nodeName: %v, pod: %v",
bestHint, exclusivePolicy, allNUMANodeStatus, nodeName, pod.Name)
}
klog.V(5).Infof("PodTopologyHint: %v", bestHint)
return bestHint, admit
}
Expand Down
31 changes: 29 additions & 2 deletions pkg/scheduler/frameworkext/topologymanager/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ package topologymanager
import (
"k8s.io/klog/v2"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

type Policy interface {
// Name returns Policy Name
Name() string
// Merge returns a merged NUMATopologyHint based on input from hint providers
Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool)
Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool)
}

// NUMATopologyHint is a struct containing the NUMANodeAffinity for a Container
Expand Down Expand Up @@ -62,6 +63,29 @@ func (th *NUMATopologyHint) LessThan(other NUMATopologyHint) bool {
return th.NUMANodeAffinity.IsNarrowerThan(other.NUMANodeAffinity)
}

// Check if the affinity match the exclusive policy, return true if match or false otherwise.
func checkExclusivePolicy(affinity NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) bool {
// check bestHint again if default hint is the best
if affinity.NUMANodeAffinity == nil {
return false
}
if exclusivePolicy == apiext.NumaTopologyExclusiveRequired {
if affinity.NUMANodeAffinity.Count() > 1 {
// we should make sure no numa is in single state
for _, nodeid := range affinity.NUMANodeAffinity.GetBits() {
if allNUMANodeStatus[nodeid] == apiext.NumaNodeStatusSingle {
return false
}
}
} else {
if allNUMANodeStatus[affinity.NUMANodeAffinity.GetBits()[0]] == apiext.NumaNodeStatusShared {
return false
}
}
}
return true
}

// Merge a TopologyHints permutation to a single hint by performing a bitwise-AND
// of their affinity masks. The hint shall be preferred if all hits in the permutation
// are preferred.
Expand Down Expand Up @@ -126,7 +150,7 @@ func filterProvidersHints(providersHints []map[string][]NUMATopologyHint) [][]NU
return allProviderHints
}

func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUMATopologyHint {
func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) NUMATopologyHint {
// Set the default affinity as an any-numa affinity containing the list
// of NUMA Nodes available on this machine.
defaultAffinity, _ := bitmask.NewBitMask(numaNodes...)
Expand All @@ -144,6 +168,9 @@ func mergeFilteredHints(numaNodes []int, filteredHints [][]NUMATopologyHint) NUM
if mergedHint.NUMANodeAffinity.Count() == 0 {
return
}
if !checkExclusivePolicy(mergedHint, exclusivePolicy, allNUMANodeStatus) {
mergedHint.Preferred = false
}

for _, v := range permutation {
if v.NUMANodeAffinity != nil && mergedHint.NUMANodeAffinity.IsEqual(v.NUMANodeAffinity) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type bestEffortPolicy struct {
//List of NUMA Nodes available on the underlying machine
numaNodes []int
Expand All @@ -40,9 +42,9 @@ func (p *bestEffortPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return true
}

func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *bestEffortPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
filteredProvidersHints := filterProvidersHints(providersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints)
bestHint := mergeFilteredHints(p.numaNodes, filteredProvidersHints, exclusivePolicy, allNUMANodeStatus)
admit := p.canAdmitPodResult(&bestHint)
return bestHint, admit
}
4 changes: 3 additions & 1 deletion pkg/scheduler/frameworkext/topologymanager/policy_none.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type nonePolicy struct{}

var _ Policy = &nonePolicy{}
Expand All @@ -37,6 +39,6 @@ func (p *nonePolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return true
}

func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *nonePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
return NUMATopologyHint{}, p.canAdmitPodResult(nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package topologymanager

import (
"testing"

apiext "github.com/koordinator-sh/koordinator/apis/extension"
)

func TestPolicyNoneName(t *testing.T) {
Expand Down Expand Up @@ -104,7 +106,7 @@ func TestPolicyNoneMerge(t *testing.T) {

for _, tc := range tcases {
policy := NewNonePolicy()
result, admit := policy.Merge(tc.providersHints)
result, admit := policy.Merge(tc.providersHints, apiext.NumaTopologyExclusivePreferred, []apiext.NumaNodeStatus{})
if !result.IsEqual(tc.expectedHint) || admit != tc.expectedAdmit {
t.Errorf("Test Case: %s: Expected merge hint to be %v, got %v", tc.name, tc.expectedHint, result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.

package topologymanager

import apiext "github.com/koordinator-sh/koordinator/apis/extension"

type restrictedPolicy struct {
bestEffortPolicy
}
Expand All @@ -39,9 +41,9 @@ func (p *restrictedPolicy) canAdmitPodResult(hint *NUMATopologyHint) bool {
return hint.Preferred
}

func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *restrictedPolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints)
hint := mergeFilteredHints(p.numaNodes, filteredHints, exclusivePolicy, allNUMANodeStatus)
admit := p.canAdmitPodResult(&hint)
return hint, admit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package topologymanager

import (
apiext "github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/util/bitmask"
)

Expand Down Expand Up @@ -62,11 +63,11 @@ func filterSingleNumaHints(allResourcesHints [][]NUMATopologyHint) [][]NUMATopol
return filteredResourcesHints
}

func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint) (NUMATopologyHint, bool) {
func (p *singleNumaNodePolicy) Merge(providersHints []map[string][]NUMATopologyHint, exclusivePolicy apiext.NumaTopologyExclusive, allNUMANodeStatus []apiext.NumaNodeStatus) (NUMATopologyHint, bool) {
filteredHints := filterProvidersHints(providersHints)
// Filter to only include don't care and hints with a single NUMA node.
singleNumaHints := filterSingleNumaHints(filteredHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints)
bestHint := mergeFilteredHints(p.numaNodes, singleNumaHints, exclusivePolicy, allNUMANodeStatus)

defaultAffinity, _ := bitmask.NewBitMask(p.numaNodes...)
if bestHint.NUMANodeAffinity.IsEqual(defaultAffinity) {
Expand Down
Loading

0 comments on commit eba6094

Please sign in to comment.