Skip to content

Commit

Permalink
scheduler: optimize reservation BeforePreFilter performance (#1695)
Browse files Browse the repository at this point in the history
Signed-off-by: Joseph <[email protected]>
  • Loading branch information
eahydra authored Oct 8, 2023
1 parent d04f86b commit 9a80fe4
Show file tree
Hide file tree
Showing 10 changed files with 536 additions and 112 deletions.
14 changes: 14 additions & 0 deletions apis/extension/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,17 @@ func GetReservationAffinity(annotations map[string]string) (*ReservationAffinity
}
return &affinity, nil
}

func SetReservationAffinity(obj metav1.Object, affinity *ReservationAffinity) error {
data, err := json.Marshal(affinity)
if err != nil {
return err
}
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[AnnotationReservationAffinity] = string(data)
obj.SetAnnotations(annotations)
return nil
}
51 changes: 38 additions & 13 deletions pkg/scheduler/frameworkext/reservation_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type ReservationInfo struct {
AllocatablePorts framework.HostPortInfo
AllocatedPorts framework.HostPortInfo
AssignedPods map[types.UID]*PodRequirement
OwnerMatchers []reservationutil.ReservationOwnerMatcher
ParseError error
}

type PodRequirement struct {
Expand Down Expand Up @@ -79,26 +81,47 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
resourceNames := quotav1.ResourceNames(allocatable)
reservedPod := reservationutil.NewReservePod(r)

ownerMatchers, err := reservationutil.ParseReservationOwnerMatchers(r.Spec.Owners)
if err != nil {
klog.ErrorS(err, "Failed to parse reservation owner matchers", "reservation", klog.KObj(r))
}

return &ReservationInfo{
Reservation: r.DeepCopy(),
Reservation: r,
Pod: reservedPod,
ResourceNames: resourceNames,
Allocatable: allocatable,
AllocatablePorts: util.RequestedHostPorts(reservedPod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
ParseError: err,
}
}

func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
allocatable, _ := resource.PodRequestsAndLimits(pod)
resourceNames := quotav1.ResourceNames(allocatable)

owners, err := apiext.GetReservationOwners(pod.Annotations)
if err != nil {
klog.ErrorS(err, "Invalid reservation owners annotation of Pod", "pod", klog.KObj(pod))
}
var ownerMatchers []reservationutil.ReservationOwnerMatcher
if owners != nil {
ownerMatchers, err = reservationutil.ParseReservationOwnerMatchers(owners)
if err != nil {
klog.ErrorS(err, "Failed to parse reservation owner matchers of pod", "pod", klog.KObj(pod))
}
}

return &ReservationInfo{
Pod: pod,
ResourceNames: resourceNames,
Allocatable: allocatable,
AllocatablePorts: util.RequestedHostPorts(pod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
ParseError: err,
}
}

Expand Down Expand Up @@ -205,6 +228,13 @@ func (ri *ReservationInfo) GetPodOwners() []schedulingv1alpha1.ReservationOwner
return nil
}

func (ri *ReservationInfo) Match(pod *corev1.Pod) bool {
if ri.ParseError != nil {
return false
}
return reservationutil.MatchReservationOwners(pod, ri.OwnerMatchers)
}

func (ri *ReservationInfo) IsAvailable() bool {
if ri.Reservation != nil {
return reservationutil.IsReservationAvailable(ri.Reservation)
Expand All @@ -230,30 +260,25 @@ func (ri *ReservationInfo) Clone() *ReservationInfo {
resourceNames = append(resourceNames, v)
}

pods := map[types.UID]*PodRequirement{}
assignedPods := make(map[types.UID]*PodRequirement, len(ri.AssignedPods))
for k, v := range ri.AssignedPods {
pods[k] = v.Clone()
}

var reservation *schedulingv1alpha1.Reservation
if ri.Reservation != nil {
reservation = ri.Reservation.DeepCopy()
assignedPods[k] = v
}

return &ReservationInfo{
Reservation: reservation,
Pod: ri.Pod.DeepCopy(),
Reservation: ri.Reservation,
Pod: ri.Pod,
ResourceNames: resourceNames,
Allocatable: ri.Allocatable.DeepCopy(),
Allocated: ri.Allocated.DeepCopy(),
AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts),
AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts),
AssignedPods: pods,
AssignedPods: assignedPods,
}
}

func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation) {
ri.Reservation = r.DeepCopy()
ri.Reservation = r
ri.Pod = reservationutil.NewReservePod(r)
ri.Allocatable = reservationutil.ReservationRequests(r)
ri.AllocatablePorts = util.RequestedHostPorts(ri.Pod)
Expand All @@ -262,7 +287,7 @@ func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation)
}

func (ri *ReservationInfo) UpdatePod(pod *corev1.Pod) {
ri.Pod = pod.DeepCopy()
ri.Pod = pod
ri.Allocatable, _ = resource.PodRequestsAndLimits(pod)
ri.AllocatablePorts = util.RequestedHostPorts(pod)
ri.ResourceNames = quotav1.ResourceNames(ri.Allocatable)
Expand Down
17 changes: 14 additions & 3 deletions pkg/scheduler/plugins/reservation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
schedulinglister "github.com/koordinator-sh/koordinator/pkg/client/listers/scheduling/v1alpha1"
Expand Down Expand Up @@ -232,20 +233,30 @@ func (cache *reservationCache) getReservationInfoByUID(uid types.UID) *framework
return nil
}

func (cache *reservationCache) listAvailableReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo {
func (cache *reservationCache) forEachAvailableReservationOnNode(nodeName string, fn func(rInfo *frameworkext.ReservationInfo) *framework.Status) *framework.Status {
cache.lock.RLock()
defer cache.lock.RUnlock()
rOnNode := cache.reservationsOnNode[nodeName]
if len(rOnNode) == 0 {
return nil
}
result := make([]*frameworkext.ReservationInfo, 0, len(rOnNode))
for uid := range rOnNode {
rInfo := cache.reservationInfos[uid]
if rInfo != nil && rInfo.IsAvailable() {
result = append(result, rInfo.Clone())
if status := fn(rInfo); !status.IsSuccess() {
return status
}
}
}
return nil
}

func (cache *reservationCache) listAvailableReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo {
var result []*frameworkext.ReservationInfo
cache.forEachAvailableReservationOnNode(nodeName, func(rInfo *frameworkext.ReservationInfo) *framework.Status {
result = append(result, rInfo.Clone())
return nil
})
return result
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,17 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState,
allocatePolicy = schedulingv1alpha1.ReservationAllocatePolicyAligned
}

rInfos := pl.reservationCache.listAvailableReservationInfosOnNode(node.Name)
for _, v := range rInfos {
status := pl.reservationCache.forEachAvailableReservationOnNode(node.Name, func(rInfo *frameworkext.ReservationInfo) *framework.Status {
// ReservationAllocatePolicyDefault cannot coexist with other allocate policies
if (allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
v.GetAllocatePolicy() == schedulingv1alpha1.ReservationAllocatePolicyDefault) &&
allocatePolicy != v.GetAllocatePolicy() {
rInfo.GetAllocatePolicy() == schedulingv1alpha1.ReservationAllocatePolicyDefault) &&
allocatePolicy != rInfo.GetAllocatePolicy() {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAllocatePolicyConflict)
}
return nil
})
if !status.IsSuccess() {
return status
}
}

Expand Down Expand Up @@ -602,7 +605,7 @@ func (pl *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,

state := getStateData(cycleState)
if state.assumed == nil {
klog.V(5).Infof("Skip the Reservation PreBind since no reservation allocated for the pod %d o node %s", klog.KObj(pod), nodeName)
klog.V(5).Infof("Skip the Reservation PreBind since no reservation allocated for the pod %s on node %s", klog.KObj(pod), nodeName)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type pluginTestSuit struct {
extenderFactory *frameworkext.FrameworkExtenderFactory
}

func newPluginTestSuitWith(t *testing.T, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit {
func newPluginTestSuitWith(t testing.TB, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit {
var v1beta2args v1beta2.ReservationArgs
v1beta2.SetDefaults_ReservationArgs(&v1beta2args)
var reservationArgs config.ReservationArgs
Expand Down
Loading

0 comments on commit 9a80fe4

Please sign in to comment.