diff --git a/apis/extension/reservation.go b/apis/extension/reservation.go index 559602bf2..19dc58a6c 100644 --- a/apis/extension/reservation.go +++ b/apis/extension/reservation.go @@ -108,3 +108,17 @@ func GetReservationAffinity(annotations map[string]string) (*ReservationAffinity } return &affinity, nil } + +func SetReservationAffinity(obj metav1.Object, affinity *ReservationAffinity) error { + data, err := json.Marshal(affinity) + if err != nil { + return err + } + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = map[string]string{} + } + annotations[AnnotationReservationAffinity] = string(data) + obj.SetAnnotations(annotations) + return nil +} diff --git a/pkg/scheduler/frameworkext/reservation_info.go b/pkg/scheduler/frameworkext/reservation_info.go index 8387383c9..be4830bd1 100644 --- a/pkg/scheduler/frameworkext/reservation_info.go +++ b/pkg/scheduler/frameworkext/reservation_info.go @@ -42,6 +42,8 @@ type ReservationInfo struct { AllocatablePorts framework.HostPortInfo AllocatedPorts framework.HostPortInfo AssignedPods map[types.UID]*PodRequirement + OwnerMatchers []reservationutil.ReservationOwnerMatcher + ParseError error } type PodRequirement struct { @@ -79,13 +81,20 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo { resourceNames := quotav1.ResourceNames(allocatable) reservedPod := reservationutil.NewReservePod(r) + ownerMatchers, err := reservationutil.ParseReservationOwnerMatchers(r.Spec.Owners) + if err != nil { + klog.ErrorS(err, "Failed to parse reservation owner matchers", "reservation", klog.KObj(r)) + } + return &ReservationInfo{ - Reservation: r.DeepCopy(), + Reservation: r, Pod: reservedPod, ResourceNames: resourceNames, Allocatable: allocatable, AllocatablePorts: util.RequestedHostPorts(reservedPod), AssignedPods: map[types.UID]*PodRequirement{}, + OwnerMatchers: ownerMatchers, + ParseError: err, } } @@ -93,12 +102,26 @@ func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo { allocatable, _ := resource.PodRequestsAndLimits(pod) resourceNames := quotav1.ResourceNames(allocatable) + owners, err := apiext.GetReservationOwners(pod.Annotations) + if err != nil { + klog.ErrorS(err, "Invalid reservation owners annotation of Pod", "pod", klog.KObj(pod)) + } + var ownerMatchers []reservationutil.ReservationOwnerMatcher + if owners != nil { + ownerMatchers, err = reservationutil.ParseReservationOwnerMatchers(owners) + if err != nil { + klog.ErrorS(err, "Failed to parse reservation owner matchers of pod", "pod", klog.KObj(pod)) + } + } + return &ReservationInfo{ Pod: pod, ResourceNames: resourceNames, Allocatable: allocatable, AllocatablePorts: util.RequestedHostPorts(pod), AssignedPods: map[types.UID]*PodRequirement{}, + OwnerMatchers: ownerMatchers, + ParseError: err, } } @@ -205,6 +228,13 @@ func (ri *ReservationInfo) GetPodOwners() []schedulingv1alpha1.ReservationOwner return nil } +func (ri *ReservationInfo) Match(pod *corev1.Pod) bool { + if ri.ParseError != nil { + return false + } + return reservationutil.MatchReservationOwners(pod, ri.OwnerMatchers) +} + func (ri *ReservationInfo) IsAvailable() bool { if ri.Reservation != nil { return reservationutil.IsReservationAvailable(ri.Reservation) @@ -230,30 +260,25 @@ func (ri *ReservationInfo) Clone() *ReservationInfo { resourceNames = append(resourceNames, v) } - pods := map[types.UID]*PodRequirement{} + assignedPods := make(map[types.UID]*PodRequirement, len(ri.AssignedPods)) for k, v := range ri.AssignedPods { - pods[k] = v.Clone() - } - - var reservation *schedulingv1alpha1.Reservation - if ri.Reservation != nil { - reservation = ri.Reservation.DeepCopy() + assignedPods[k] = v } return &ReservationInfo{ - Reservation: reservation, - Pod: ri.Pod.DeepCopy(), + Reservation: ri.Reservation, + Pod: ri.Pod, ResourceNames: resourceNames, Allocatable: ri.Allocatable.DeepCopy(), Allocated: ri.Allocated.DeepCopy(), AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts), AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts), - AssignedPods: pods, + AssignedPods: assignedPods, } } func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation) { - ri.Reservation = r.DeepCopy() + ri.Reservation = r ri.Pod = reservationutil.NewReservePod(r) ri.Allocatable = reservationutil.ReservationRequests(r) ri.AllocatablePorts = util.RequestedHostPorts(ri.Pod) @@ -262,7 +287,7 @@ func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation) } func (ri *ReservationInfo) UpdatePod(pod *corev1.Pod) { - ri.Pod = pod.DeepCopy() + ri.Pod = pod ri.Allocatable, _ = resource.PodRequestsAndLimits(pod) ri.AllocatablePorts = util.RequestedHostPorts(pod) ri.ResourceNames = quotav1.ResourceNames(ri.Allocatable) diff --git a/pkg/scheduler/plugins/reservation/cache.go b/pkg/scheduler/plugins/reservation/cache.go index e6c043a6e..eb332f2c1 100644 --- a/pkg/scheduler/plugins/reservation/cache.go +++ b/pkg/scheduler/plugins/reservation/cache.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/scheduler/framework" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" schedulinglister "github.com/koordinator-sh/koordinator/pkg/client/listers/scheduling/v1alpha1" @@ -232,20 +233,30 @@ func (cache *reservationCache) getReservationInfoByUID(uid types.UID) *framework return nil } -func (cache *reservationCache) listAvailableReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo { +func (cache *reservationCache) forEachAvailableReservationOnNode(nodeName string, fn func(rInfo *frameworkext.ReservationInfo) *framework.Status) *framework.Status { cache.lock.RLock() defer cache.lock.RUnlock() rOnNode := cache.reservationsOnNode[nodeName] if len(rOnNode) == 0 { return nil } - result := make([]*frameworkext.ReservationInfo, 0, len(rOnNode)) for uid := range rOnNode { rInfo := cache.reservationInfos[uid] if rInfo != nil && rInfo.IsAvailable() { - result = append(result, rInfo.Clone()) + if status := fn(rInfo); !status.IsSuccess() { + return status + } } } + return nil +} + +func (cache *reservationCache) listAvailableReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo { + var result []*frameworkext.ReservationInfo + cache.forEachAvailableReservationOnNode(nodeName, func(rInfo *frameworkext.ReservationInfo) *framework.Status { + result = append(result, rInfo.Clone()) + return nil + }) return result } diff --git a/pkg/scheduler/plugins/reservation/plugin.go b/pkg/scheduler/plugins/reservation/plugin.go index cdc377d90..8cfe59fa7 100644 --- a/pkg/scheduler/plugins/reservation/plugin.go +++ b/pkg/scheduler/plugins/reservation/plugin.go @@ -348,14 +348,17 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState, allocatePolicy = schedulingv1alpha1.ReservationAllocatePolicyAligned } - rInfos := pl.reservationCache.listAvailableReservationInfosOnNode(node.Name) - for _, v := range rInfos { + status := pl.reservationCache.forEachAvailableReservationOnNode(node.Name, func(rInfo *frameworkext.ReservationInfo) *framework.Status { // ReservationAllocatePolicyDefault cannot coexist with other allocate policies if (allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault || - v.GetAllocatePolicy() == schedulingv1alpha1.ReservationAllocatePolicyDefault) && - allocatePolicy != v.GetAllocatePolicy() { + rInfo.GetAllocatePolicy() == schedulingv1alpha1.ReservationAllocatePolicyDefault) && + allocatePolicy != rInfo.GetAllocatePolicy() { return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAllocatePolicyConflict) } + return nil + }) + if !status.IsSuccess() { + return status } } @@ -602,7 +605,7 @@ func (pl *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState, state := getStateData(cycleState) if state.assumed == nil { - klog.V(5).Infof("Skip the Reservation PreBind since no reservation allocated for the pod %d o node %s", klog.KObj(pod), nodeName) + klog.V(5).Infof("Skip the Reservation PreBind since no reservation allocated for the pod %s on node %s", klog.KObj(pod), nodeName) return nil } diff --git a/pkg/scheduler/plugins/reservation/plugin_test.go b/pkg/scheduler/plugins/reservation/plugin_test.go index c690b73ac..a363f2f9b 100644 --- a/pkg/scheduler/plugins/reservation/plugin_test.go +++ b/pkg/scheduler/plugins/reservation/plugin_test.go @@ -120,7 +120,7 @@ type pluginTestSuit struct { extenderFactory *frameworkext.FrameworkExtenderFactory } -func newPluginTestSuitWith(t *testing.T, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit { +func newPluginTestSuitWith(t testing.TB, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit { var v1beta2args v1beta2.ReservationArgs v1beta2.SetDefaults_ReservationArgs(&v1beta2args) var reservationArgs config.ReservationArgs diff --git a/pkg/scheduler/plugins/reservation/transformer.go b/pkg/scheduler/plugins/reservation/transformer.go index a503a32ac..4d1832fe0 100644 --- a/pkg/scheduler/plugins/reservation/transformer.go +++ b/pkg/scheduler/plugins/reservation/transformer.go @@ -28,6 +28,7 @@ import ( resourceapi "k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/framework/parallelize" + schedutil "k8s.io/kubernetes/pkg/scheduler/util" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" "github.com/koordinator-sh/koordinator/pkg/scheduler/frameworkext" @@ -72,7 +73,7 @@ func (pl *Plugin) prepareMatchReservationState(ctx context.Context, cycleState * processNode := func(i int) { nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(allNodes[i]) if err != nil { - klog.Warningf("Failed to get NodeInfo of %s during reservation's BeforePreFilter, err: %v", allNodes[i], err) + klog.Warningf("Failed to get NodeInfo of %s during reservation's BeforePreFilter for pod: %v, err: %v", allNodes[i], klog.KObj(pod), err) return } node := nodeInfo.Node() @@ -81,39 +82,39 @@ func (pl *Plugin) prepareMatchReservationState(ctx context.Context, cycleState * return } - rOnNode := pl.reservationCache.listAvailableReservationInfosOnNode(node.Name) - if len(rOnNode) == 0 { - return - } - var unmatched, matched []*frameworkext.ReservationInfo - for _, rInfo := range rOnNode { - if !rInfo.IsAvailable() { - continue + status := pl.reservationCache.forEachAvailableReservationOnNode(node.Name, func(rInfo *frameworkext.ReservationInfo) *framework.Status { + if !rInfo.IsAvailable() || rInfo.ParseError != nil { + return nil } // In this case, the Controller has not yet updated the status of the Reservation to Succeeded, // but in fact it can no longer be used for allocation. So it's better to skip first. if rInfo.IsAllocateOnce() && len(rInfo.AssignedPods) > 0 { - continue + return nil } if !isReservedPod && !rInfo.IsUnschedulable() && matchReservation(pod, node, rInfo, reservationAffinity) { - matched = append(matched, rInfo) + matched = append(matched, rInfo.Clone()) } else if len(rInfo.AssignedPods) > 0 { - unmatched = append(unmatched, rInfo) - if !isReservedPod { - klog.V(6).InfoS("got reservation on node does not match the pod", "reservation", klog.KObj(rInfo), "pod", klog.KObj(pod)) - } + unmatched = append(unmatched, rInfo.Clone()) } + return nil + }) + if !status.IsSuccess() { + err = status.AsError() + klog.ErrorS(err, "Failed to forEach reservations on node", "pod", klog.KObj(pod), "node", node.Name) + errCh.SendErrorWithCancel(err, cancel) + return } + if len(matched) == 0 && len(unmatched) == 0 { return } if err := extender.Scheduler().GetCache().InvalidNodeInfo(node.Name); err != nil { - klog.ErrorS(err, "Failed to InvalidNodeInfo", "node", node.Name) + klog.ErrorS(err, "Failed to InvalidNodeInfo", "pod", klog.KObj(pod), "node", node.Name) errCh.SendErrorWithCancel(err, cancel) return } @@ -173,11 +174,11 @@ func (pl *Plugin) prepareMatchReservationState(ctx context.Context, cycleState * } allPluginToRestoreState[index-1] = pluginToRestoreState } - klog.V(4).Infof("Pod %v has reservations on node %v, %d matched, %d unmatched", klog.KObj(pod), node.Name, len(matched), len(unmatched)) } pl.handle.Parallelizer().Until(parallelCtx, len(allNodes), processNode) err = errCh.ReceiveError() if err != nil { + klog.ErrorS(err, "Failed to find matched or unmatched reservations", "pod", klog.KObj(pod)) return nil, false, err } @@ -229,8 +230,11 @@ func restoreMatchedReservation(nodeInfo *framework.NodeInfo, rInfo *frameworkext // Retain ports that are not used by other Pods. These ports need to be erased from NodeInfo.UsedPorts, // otherwise it may cause Pod port conflicts allocatablePorts := util.CloneHostPorts(rInfo.AllocatablePorts) - util.RemoveHostPorts(allocatablePorts, rInfo.AllocatedPorts) - util.ResetHostPorts(reservePod, allocatablePorts) + if len(allocatablePorts) > 0 { + reservePod = reservePod.DeepCopy() + util.RemoveHostPorts(allocatablePorts, rInfo.AllocatedPorts) + util.ResetHostPorts(reservePod, allocatablePorts) + } // When AllocateOnce is disabled, some resources may have been allocated, // and an additional resource record will be accumulated at this time. @@ -246,38 +250,90 @@ func restoreMatchedReservation(nodeInfo *framework.NodeInfo, rInfo *frameworkext } func restoreUnmatchedReservations(nodeInfo *framework.NodeInfo, rInfo *frameworkext.ReservationInfo) error { + // Here len(rInfo.AssignedPods) == 0 is always false because it was checked before. + if len(rInfo.AssignedPods) == 0 { + return nil + } + // Reservations and Pods that consume the Reservations are cumulative in resource accounting. // For example, on a 32C machine, ReservationA reserves 8C, and then PodA uses ReservationA to allocate 4C, // then the record on NodeInfo is that 12C is allocated. But in fact it should be calculated according to 8C, // so we need to return some resources. - reservePod := rInfo.GetReservePod().DeepCopy() - if err := nodeInfo.RemovePod(reservePod); err != nil { - klog.Errorf("Failed to remove reserve pod %v from node %v, err: %v", klog.KObj(rInfo), nodeInfo.Node().Name, err) - return err + reservePod := rInfo.GetReservePod() + updateNodeInfoRequested(nodeInfo, reservePod, -1) + remainedResource := quotav1.SubtractWithNonNegativeResult(rInfo.Allocatable, rInfo.Allocated) + if !quotav1.IsZero(remainedResource) { + reservePod = &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{Requests: remainedResource}, + }, + }, + }, + } + updateNodeInfoRequested(nodeInfo, reservePod, 1) } - occupyUnallocatedResources(rInfo, reservePod, nodeInfo) return nil } -func occupyUnallocatedResources(rInfo *frameworkext.ReservationInfo, reservePod *corev1.Pod, nodeInfo *framework.NodeInfo) { - if len(rInfo.AssignedPods) == 0 { - nodeInfo.AddPod(reservePod) - } else { - for i := range reservePod.Spec.Containers { - reservePod.Spec.Containers[i].Resources.Requests = corev1.ResourceList{} +func updateNodeInfoRequested(n *framework.NodeInfo, pod *corev1.Pod, sign int64) { + res, non0CPU, non0Mem := calculateResource(pod) + n.Requested.MilliCPU += sign * res.MilliCPU + n.Requested.Memory += sign * res.Memory + n.Requested.EphemeralStorage += sign * res.EphemeralStorage + if n.Requested.ScalarResources == nil && len(res.ScalarResources) > 0 { + n.Requested.ScalarResources = map[corev1.ResourceName]int64{} + } + for rName, rQuant := range res.ScalarResources { + n.Requested.ScalarResources[rName] += sign * rQuant + } + n.NonZeroRequested.MilliCPU += sign * non0CPU + n.NonZeroRequested.Memory += sign * non0Mem +} + +func max(a, b int64) int64 { + if a >= b { + return a + } + return b +} + +// resourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead +func calculateResource(pod *corev1.Pod) (res framework.Resource, non0CPU int64, non0Mem int64) { + resPtr := &res + for _, c := range pod.Spec.Containers { + resPtr.Add(c.Resources.Requests) + non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&c.Resources.Requests) + non0CPU += non0CPUReq + non0Mem += non0MemReq + // No non-zero resources for GPUs or opaque resources. + } + + for _, ic := range pod.Spec.InitContainers { + resPtr.SetMaxResource(ic.Resources.Requests) + non0CPUReq, non0MemReq := schedutil.GetNonzeroRequests(&ic.Resources.Requests) + non0CPU = max(non0CPU, non0CPUReq) + non0Mem = max(non0Mem, non0MemReq) + } + + // If Overhead is being utilized, add to the total requests for the pod + if pod.Spec.Overhead != nil { + resPtr.Add(pod.Spec.Overhead) + if _, found := pod.Spec.Overhead[corev1.ResourceCPU]; found { + non0CPU += pod.Spec.Overhead.Cpu().MilliValue() } - remainedResource := quotav1.SubtractWithNonNegativeResult(rInfo.Allocatable, rInfo.Allocated) - if !quotav1.IsZero(remainedResource) { - reservePod.Spec.Containers = append(reservePod.Spec.Containers, corev1.Container{ - Resources: corev1.ResourceRequirements{Requests: remainedResource}, - }) + + if _, found := pod.Spec.Overhead[corev1.ResourceMemory]; found { + non0Mem += pod.Spec.Overhead.Memory().Value() } - nodeInfo.AddPod(reservePod) } + + return } func matchReservation(pod *corev1.Pod, node *corev1.Node, reservation *frameworkext.ReservationInfo, reservationAffinity *reservationutil.RequiredReservationAffinity) bool { - if !reservationutil.MatchReservationOwners(pod, reservation.GetPodOwners()) { + if !reservation.Match(pod) { return false } diff --git a/pkg/scheduler/plugins/reservation/transformer_benchmark_test.go b/pkg/scheduler/plugins/reservation/transformer_benchmark_test.go new file mode 100644 index 000000000..6d07e65db --- /dev/null +++ b/pkg/scheduler/plugins/reservation/transformer_benchmark_test.go @@ -0,0 +1,281 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reservation + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/utils/pointer" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + reservationutil "github.com/koordinator-sh/koordinator/pkg/util/reservation" +) + +func BenchmarkBeforePrefilterWithMatchedPod(b *testing.B) { + var nodes []*corev1.Node + for i := 0; i < 1024; i++ { + nodes = append(nodes, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node-%d", i), + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }) + } + suit := newPluginTestSuitWith(b, nil, nodes) + p, err := suit.pluginFactory() + assert.NoError(b, err) + pl := p.(*Plugin) + + reservePods := map[string]*corev1.Pod{} + for i, node := range nodes { + reservation := &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: fmt.Sprintf("reservation-%d", i), + Labels: map[string]string{ + "test-reservation": "true", + }, + }, + Spec: schedulingv1alpha1.ReservationSpec{ + AllocateOnce: pointer.Bool(false), + Owners: []schedulingv1alpha1.ReservationOwner{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test-reservation": "true", + }, + }, + }, + }, + Template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + }, + }, + }, + }, + Status: schedulingv1alpha1.ReservationStatus{ + Phase: schedulingv1alpha1.ReservationAvailable, + NodeName: node.Name, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + } + pl.reservationCache.updateReservation(reservation) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(node.Name) + assert.NoError(b, err) + reservePod := reservationutil.NewReservePod(reservation) + reservePods[string(reservePod.UID)] = reservePod + nodeInfo.AddPod(reservePod) + assert.NoError(b, pl.handle.Scheduler().GetCache().AddPod(reservePod)) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + Labels: map[string]string{ + "test-reservation": "true", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + }, + }, + } + err = apiext.SetReservationAffinity(pod, &apiext.ReservationAffinity{ + ReservationSelector: map[string]string{ + "test-reservation": "true", + }, + }) + assert.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cycleState := framework.NewCycleState() + _, restored, status := pl.BeforePreFilter(context.TODO(), cycleState, pod) + assert.True(b, restored) + assert.True(b, status.IsSuccess()) + + sd := getStateData(cycleState) + for _, v := range sd.nodeReservationStates { + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(v.nodeName) + assert.NoError(b, err) + for _, ri := range v.matched { + p := reservePods[string(ri.UID())] + if p != nil { + nodeInfo.AddPod(p) + } + } + } + } +} + +func BenchmarkBeforePrefilterWithUnmatchedPod(b *testing.B) { + var nodes []*corev1.Node + for i := 0; i < 1024; i++ { + nodes = append(nodes, &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("node-%d", i), + }, + Status: corev1.NodeStatus{ + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }) + } + suit := newPluginTestSuitWith(b, nil, nodes) + p, err := suit.pluginFactory() + assert.NoError(b, err) + pl := p.(*Plugin) + + for i, node := range nodes { + reservation := &schedulingv1alpha1.Reservation{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: fmt.Sprintf("reservation-%d", i), + Labels: map[string]string{ + "test-reservation": "true", + }, + }, + Spec: schedulingv1alpha1.ReservationSpec{ + AllocateOnce: pointer.Bool(false), + Owners: []schedulingv1alpha1.ReservationOwner{ + { + LabelSelector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "test-reservation": "true", + }, + }, + }, + }, + Template: &corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + }, + }, + }, + }, + Status: schedulingv1alpha1.ReservationStatus{ + Phase: schedulingv1alpha1.ReservationAvailable, + NodeName: node.Name, + Allocatable: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + } + pl.reservationCache.updateReservation(reservation) + assignedPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + Name: fmt.Sprintf("pod-%s", reservation.Name), + Namespace: "default", + }, + Spec: corev1.PodSpec{ + NodeName: node.Name, + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("4"), + corev1.ResourceMemory: resource.MustParse("8Gi"), + }, + }, + }, + }, + }, + } + pl.reservationCache.updatePod(reservation.UID, nil, assignedPod) + nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(node.Name) + assert.NoError(b, err) + reservePod := reservationutil.NewReservePod(reservation) + nodeInfo.AddPod(reservePod) + nodeInfo.AddPod(assignedPod) + assert.NoError(b, pl.handle.Scheduler().GetCache().AddPod(reservePod)) + assert.NoError(b, pl.handle.Scheduler().GetCache().AddPod(assignedPod)) + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pod", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("32"), + corev1.ResourceMemory: resource.MustParse("64Gi"), + }, + }, + }, + }, + }, + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cycleState := framework.NewCycleState() + _, restored, status := pl.BeforePreFilter(context.TODO(), cycleState, pod) + assert.True(b, restored) + assert.True(b, status.IsSuccess()) + } +} diff --git a/pkg/scheduler/plugins/reservation/transformer_test.go b/pkg/scheduler/plugins/reservation/transformer_test.go index 80f235cac..2d61f0de0 100644 --- a/pkg/scheduler/plugins/reservation/transformer_test.go +++ b/pkg/scheduler/plugins/reservation/transformer_test.go @@ -139,7 +139,7 @@ func TestRestoreReservation(t *testing.T) { unmatchedReservation := &schedulingv1alpha1.Reservation{ ObjectMeta: metav1.ObjectMeta{ UID: uuid.NewUUID(), - Name: "reservation4C8G", + Name: "reservation12C24G", }, Spec: schedulingv1alpha1.ReservationSpec{ AllocateOnce: pointer.Bool(false), @@ -163,7 +163,7 @@ func TestRestoreReservation(t *testing.T) { { Resources: corev1.ResourceRequirements{ Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("12"), + corev1.ResourceCPU: *resource.NewQuantity(12, resource.DecimalSI), corev1.ResourceMemory: resource.MustParse("24Gi"), }, }, @@ -183,7 +183,7 @@ func TestRestoreReservation(t *testing.T) { Phase: schedulingv1alpha1.ReservationAvailable, NodeName: "test-node", Allocatable: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("12"), + corev1.ResourceCPU: *resource.NewQuantity(12, resource.DecimalSI), corev1.ResourceMemory: resource.MustParse("24Gi"), }, }, @@ -194,7 +194,7 @@ func TestRestoreReservation(t *testing.T) { matchedReservation := &schedulingv1alpha1.Reservation{ ObjectMeta: metav1.ObjectMeta{ UID: uuid.NewUUID(), - Name: "reservation2C4G", + Name: "reservation8C16G", }, Spec: schedulingv1alpha1.ReservationSpec{ Owners: []schedulingv1alpha1.ReservationOwner{ @@ -253,29 +253,14 @@ func TestRestoreReservation(t *testing.T) { } pods = append(pods, reservationutil.NewReservePod(matchedReservation)) - suit := newPluginTestSuitWith(t, pods, []*corev1.Node{node}) - p, err := suit.pluginFactory() - assert.NoError(t, err) - pl := p.(*Plugin) - - nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name) - assert.NoError(t, err) - expectedRequestedResources := &framework.Resource{ - MilliCPU: 32000, - Memory: 64 * 1024 * 1024 * 1024, - } - assert.Equal(t, expectedRequestedResources, nodeInfo.Requested) - - pl.reservationCache.updateReservation(unmatchedReservation) - pl.reservationCache.updateReservation(matchedReservation) - - pl.reservationCache.addPod(unmatchedReservation.UID, &corev1.Pod{ + podAllocatedWithUnmatchedReservation := &corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ UID: uuid.NewUUID(), Name: "unmatched-allocated-pod-1", Namespace: "default", }, Spec: corev1.PodSpec{ + NodeName: node.Name, Containers: []corev1.Container{ { Resources: corev1.ResourceRequirements{ @@ -287,7 +272,26 @@ func TestRestoreReservation(t *testing.T) { }, }, }, - }) + } + pods = append(pods, podAllocatedWithUnmatchedReservation) + + suit := newPluginTestSuitWith(t, pods, []*corev1.Node{node}) + p, err := suit.pluginFactory() + assert.NoError(t, err) + pl := p.(*Plugin) + + nodeInfo, err := suit.fw.SnapshotSharedLister().NodeInfos().Get(node.Name) + assert.NoError(t, err) + expectedRequestedResources := &framework.Resource{ + MilliCPU: 36000, + Memory: 72 * 1024 * 1024 * 1024, + } + assert.Equal(t, expectedRequestedResources, nodeInfo.Requested) + + pl.reservationCache.updateReservation(unmatchedReservation) + pl.reservationCache.updateReservation(matchedReservation) + + pl.reservationCache.addPod(unmatchedReservation.UID, podAllocatedWithUnmatchedReservation) cycleState := framework.NewCycleState() _, restored, status := pl.BeforePreFilter(context.TODO(), cycleState, &corev1.Pod{ @@ -311,8 +315,8 @@ func TestRestoreReservation(t *testing.T) { nodeName: node.Name, matched: []*frameworkext.ReservationInfo{matchRInfo}, podRequested: &framework.Resource{ - MilliCPU: 28000, - Memory: 60129542144, + MilliCPU: 32000, + Memory: 68719476736, }, rAllocated: framework.NewResource(nil), }, @@ -321,23 +325,21 @@ func TestRestoreReservation(t *testing.T) { assert.Equal(t, expectedStat, getStateData(cycleState)) unmatchedReservePod := pods[2].DeepCopy() - unmatchedReservePod.Spec.Containers[0].Resources.Requests = corev1.ResourceList{} - unmatchedReservePod.Spec.Containers = append(unmatchedReservePod.Spec.Containers, corev1.Container{ - Resources: corev1.ResourceRequirements{ - Requests: corev1.ResourceList{ - corev1.ResourceCPU: resource.MustParse("8000m"), - corev1.ResourceMemory: resource.MustParse("16Gi"), - }, - }, - }) - expectNodeInfo := framework.NewNodeInfo(pods[0], pods[1], unmatchedReservePod) + unmatchedReservePod.Spec.Containers[0].Resources.Requests = corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("12"), + corev1.ResourceMemory: resource.MustParse("24Gi"), + } + expectNodeInfo := framework.NewNodeInfo(pods[0], pods[1], unmatchedReservePod, podAllocatedWithUnmatchedReservation) expectNodeInfo.SetNode(node) + expectNodeInfo.Requested.MilliCPU -= 4000 + expectNodeInfo.Requested.Memory -= 8 * 1024 * 1024 * 1024 + expectNodeInfo.NonZeroRequested.MilliCPU -= 4000 + expectNodeInfo.NonZeroRequested.Memory -= 8 * 1024 * 1024 * 1024 assert.Equal(t, expectNodeInfo.Requested, nodeInfo.Requested) assert.Equal(t, expectNodeInfo.UsedPorts, nodeInfo.UsedPorts) nodeInfo.Generation = 0 expectNodeInfo.Generation = 0 assert.True(t, equality.Semantic.DeepEqual(expectNodeInfo, nodeInfo)) - status = pl.AfterPreFilter(context.TODO(), cycleState, &corev1.Pod{}) assert.True(t, status.IsSuccess()) } diff --git a/pkg/util/reservation/reservation.go b/pkg/util/reservation/reservation.go index 63bf4a47a..6a6acc47c 100644 --- a/pkg/util/reservation/reservation.go +++ b/pkg/util/reservation/reservation.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" quotav1 "k8s.io/apiserver/pkg/quota/v1" "k8s.io/component-helpers/scheduling/corev1/nodeaffinity" "k8s.io/klog/v2" @@ -33,6 +34,7 @@ import ( "github.com/koordinator-sh/koordinator/apis/extension" schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1" + "github.com/koordinator-sh/koordinator/pkg/util" ) var ( @@ -352,16 +354,55 @@ func ReservePorts(r *schedulingv1alpha1.Reservation) framework.HostPortInfo { return portInfo } +type ReservationOwnerMatcher struct { + schedulingv1alpha1.ReservationOwner + Selector labels.Selector +} + +func ParseReservationOwnerMatchers(owners []schedulingv1alpha1.ReservationOwner) ([]ReservationOwnerMatcher, error) { + if len(owners) == 0 { + return nil, nil + } + var errs field.ErrorList + ownerMatchers := make([]ReservationOwnerMatcher, 0, len(owners)) + for i, v := range owners { + var selector labels.Selector + if v.LabelSelector != nil { + var err error + selector, err = util.GetFastLabelSelector(v.LabelSelector) + if err != nil { + errs = append(errs, field.Invalid(field.NewPath("owners").Index(i), v.LabelSelector, err.Error())) + continue + } + } + ownerMatchers = append(ownerMatchers, ReservationOwnerMatcher{ + ReservationOwner: v, + Selector: selector, + }) + } + if len(errs) > 0 { + return nil, errs.ToAggregate() + } + return ownerMatchers, nil +} + +func (m *ReservationOwnerMatcher) Match(pod *corev1.Pod) bool { + if MatchObjectRef(pod, m.Object) && + MatchReservationControllerReference(pod, m.Controller) && + (m.Selector == nil || m.Selector.Matches(labels.Set(pod.Labels))) { + return true + } + return false +} + // MatchReservationOwners checks if the scheduling pod matches the reservation's owner spec. // `reservation.spec.owners` defines the DNF (disjunctive normal form) of ObjectReference, ControllerReference // (extended), LabelSelector, which means multiple selectors are firstly ANDed and secondly ORed. -func MatchReservationOwners(pod *corev1.Pod, owners []schedulingv1alpha1.ReservationOwner) bool { +func MatchReservationOwners(pod *corev1.Pod, matchers []ReservationOwnerMatcher) bool { // assert pod != nil && r != nil // Owners == nil matches nothing, while Owners = [{}] matches everything - for _, owner := range owners { - if MatchObjectRef(pod, owner.Object) && - MatchReservationControllerReference(pod, owner.Controller) && - matchLabelSelector(pod, owner.LabelSelector) { + for _, m := range matchers { + if m.Match(pod) { return true } } @@ -400,17 +441,6 @@ func MatchReservationControllerReference(pod *corev1.Pod, controllerRef *schedul return false } -func matchLabelSelector(pod *corev1.Pod, labelSelector *metav1.LabelSelector) bool { - if labelSelector == nil { - return true - } - selector, err := metav1.LabelSelectorAsSelector(labelSelector) - if err != nil { - return false - } - return selector.Matches(labels.Set(pod.Labels)) -} - type RequiredReservationAffinity struct { labelSelector labels.Selector nodeSelector *nodeaffinity.NodeSelector diff --git a/pkg/util/reservation/reservation_test.go b/pkg/util/reservation/reservation_test.go index 8fddd6130..6bb28e87d 100644 --- a/pkg/util/reservation/reservation_test.go +++ b/pkg/util/reservation/reservation_test.go @@ -253,7 +253,7 @@ func TestGetReservationSchedulerName(t *testing.T) { } } -func Test_matchReservationOwners(t *testing.T) { +func TestMatchReservationOwners(t *testing.T) { type args struct { pod *corev1.Pod r *schedulingv1alpha1.Reservation @@ -434,7 +434,9 @@ func Test_matchReservationOwners(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := MatchReservationOwners(tt.args.pod, tt.args.r.Spec.Owners) + matchers, err := ParseReservationOwnerMatchers(tt.args.r.Spec.Owners) + assert.NoError(t, err) + got := MatchReservationOwners(tt.args.pod, matchers) assert.Equal(t, tt.want, got) }) }