Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: optimize reservation BeforePreFilter performance #1695

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions apis/extension/reservation.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,17 @@ func GetReservationAffinity(annotations map[string]string) (*ReservationAffinity
}
return &affinity, nil
}

func SetReservationAffinity(obj metav1.Object, affinity *ReservationAffinity) error {
data, err := json.Marshal(affinity)
if err != nil {
return err
}
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[AnnotationReservationAffinity] = string(data)
obj.SetAnnotations(annotations)
return nil
}
51 changes: 38 additions & 13 deletions pkg/scheduler/frameworkext/reservation_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type ReservationInfo struct {
AllocatablePorts framework.HostPortInfo
AllocatedPorts framework.HostPortInfo
AssignedPods map[types.UID]*PodRequirement
OwnerMatchers []reservationutil.ReservationOwnerMatcher
ParseError error
}

type PodRequirement struct {
Expand Down Expand Up @@ -79,26 +81,47 @@ func NewReservationInfo(r *schedulingv1alpha1.Reservation) *ReservationInfo {
resourceNames := quotav1.ResourceNames(allocatable)
reservedPod := reservationutil.NewReservePod(r)

ownerMatchers, err := reservationutil.ParseReservationOwnerMatchers(r.Spec.Owners)
if err != nil {
klog.ErrorS(err, "Failed to parse reservation owner matchers", "reservation", klog.KObj(r))
}

return &ReservationInfo{
Reservation: r.DeepCopy(),
Reservation: r,
Pod: reservedPod,
ResourceNames: resourceNames,
Allocatable: allocatable,
AllocatablePorts: util.RequestedHostPorts(reservedPod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
ParseError: err,
}
}

func NewReservationInfoFromPod(pod *corev1.Pod) *ReservationInfo {
allocatable, _ := resource.PodRequestsAndLimits(pod)
resourceNames := quotav1.ResourceNames(allocatable)

owners, err := apiext.GetReservationOwners(pod.Annotations)
if err != nil {
klog.ErrorS(err, "Invalid reservation owners annotation of Pod", "pod", klog.KObj(pod))
}
var ownerMatchers []reservationutil.ReservationOwnerMatcher
if owners != nil {
ownerMatchers, err = reservationutil.ParseReservationOwnerMatchers(owners)
if err != nil {
klog.ErrorS(err, "Failed to parse reservation owner matchers of pod", "pod", klog.KObj(pod))
}
}

return &ReservationInfo{
Pod: pod,
ResourceNames: resourceNames,
Allocatable: allocatable,
AllocatablePorts: util.RequestedHostPorts(pod),
AssignedPods: map[types.UID]*PodRequirement{},
OwnerMatchers: ownerMatchers,
ParseError: err,
}
}

Expand Down Expand Up @@ -205,6 +228,13 @@ func (ri *ReservationInfo) GetPodOwners() []schedulingv1alpha1.ReservationOwner
return nil
}

func (ri *ReservationInfo) Match(pod *corev1.Pod) bool {
if ri.ParseError != nil {
return false
}
return reservationutil.MatchReservationOwners(pod, ri.OwnerMatchers)
}

func (ri *ReservationInfo) IsAvailable() bool {
if ri.Reservation != nil {
return reservationutil.IsReservationAvailable(ri.Reservation)
Expand All @@ -230,30 +260,25 @@ func (ri *ReservationInfo) Clone() *ReservationInfo {
resourceNames = append(resourceNames, v)
}

pods := map[types.UID]*PodRequirement{}
assignedPods := make(map[types.UID]*PodRequirement, len(ri.AssignedPods))
for k, v := range ri.AssignedPods {
pods[k] = v.Clone()
}

var reservation *schedulingv1alpha1.Reservation
if ri.Reservation != nil {
reservation = ri.Reservation.DeepCopy()
assignedPods[k] = v
}

return &ReservationInfo{
Reservation: reservation,
Pod: ri.Pod.DeepCopy(),
Reservation: ri.Reservation,
Pod: ri.Pod,
ResourceNames: resourceNames,
Allocatable: ri.Allocatable.DeepCopy(),
Allocated: ri.Allocated.DeepCopy(),
AllocatablePorts: util.CloneHostPorts(ri.AllocatablePorts),
AllocatedPorts: util.CloneHostPorts(ri.AllocatedPorts),
AssignedPods: pods,
AssignedPods: assignedPods,
}
}

func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation) {
ri.Reservation = r.DeepCopy()
ri.Reservation = r
ri.Pod = reservationutil.NewReservePod(r)
ri.Allocatable = reservationutil.ReservationRequests(r)
ri.AllocatablePorts = util.RequestedHostPorts(ri.Pod)
Expand All @@ -262,7 +287,7 @@ func (ri *ReservationInfo) UpdateReservation(r *schedulingv1alpha1.Reservation)
}

func (ri *ReservationInfo) UpdatePod(pod *corev1.Pod) {
ri.Pod = pod.DeepCopy()
ri.Pod = pod
ri.Allocatable, _ = resource.PodRequestsAndLimits(pod)
ri.AllocatablePorts = util.RequestedHostPorts(pod)
ri.ResourceNames = quotav1.ResourceNames(ri.Allocatable)
Expand Down
17 changes: 14 additions & 3 deletions pkg/scheduler/plugins/reservation/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/scheduler/framework"

schedulingv1alpha1 "github.com/koordinator-sh/koordinator/apis/scheduling/v1alpha1"
schedulinglister "github.com/koordinator-sh/koordinator/pkg/client/listers/scheduling/v1alpha1"
Expand Down Expand Up @@ -232,20 +233,30 @@ func (cache *reservationCache) getReservationInfoByUID(uid types.UID) *framework
return nil
}

func (cache *reservationCache) listAvailableReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo {
func (cache *reservationCache) forEachAvailableReservationOnNode(nodeName string, fn func(rInfo *frameworkext.ReservationInfo) *framework.Status) *framework.Status {
cache.lock.RLock()
defer cache.lock.RUnlock()
rOnNode := cache.reservationsOnNode[nodeName]
if len(rOnNode) == 0 {
return nil
}
result := make([]*frameworkext.ReservationInfo, 0, len(rOnNode))
for uid := range rOnNode {
rInfo := cache.reservationInfos[uid]
if rInfo != nil && rInfo.IsAvailable() {
result = append(result, rInfo.Clone())
if status := fn(rInfo); !status.IsSuccess() {
return status
}
}
}
return nil
}

func (cache *reservationCache) listAvailableReservationInfosOnNode(nodeName string) []*frameworkext.ReservationInfo {
var result []*frameworkext.ReservationInfo
cache.forEachAvailableReservationOnNode(nodeName, func(rInfo *frameworkext.ReservationInfo) *framework.Status {
result = append(result, rInfo.Clone())
return nil
})
return result
}

Expand Down
13 changes: 8 additions & 5 deletions pkg/scheduler/plugins/reservation/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,17 @@ func (pl *Plugin) Filter(ctx context.Context, cycleState *framework.CycleState,
allocatePolicy = schedulingv1alpha1.ReservationAllocatePolicyAligned
}

rInfos := pl.reservationCache.listAvailableReservationInfosOnNode(node.Name)
for _, v := range rInfos {
status := pl.reservationCache.forEachAvailableReservationOnNode(node.Name, func(rInfo *frameworkext.ReservationInfo) *framework.Status {
// ReservationAllocatePolicyDefault cannot coexist with other allocate policies
if (allocatePolicy == schedulingv1alpha1.ReservationAllocatePolicyDefault ||
v.GetAllocatePolicy() == schedulingv1alpha1.ReservationAllocatePolicyDefault) &&
allocatePolicy != v.GetAllocatePolicy() {
rInfo.GetAllocatePolicy() == schedulingv1alpha1.ReservationAllocatePolicyDefault) &&
allocatePolicy != rInfo.GetAllocatePolicy() {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReservationAllocatePolicyConflict)
}
return nil
})
if !status.IsSuccess() {
return status
}
}

Expand Down Expand Up @@ -602,7 +605,7 @@ func (pl *Plugin) PreBind(ctx context.Context, cycleState *framework.CycleState,

state := getStateData(cycleState)
if state.assumed == nil {
klog.V(5).Infof("Skip the Reservation PreBind since no reservation allocated for the pod %d o node %s", klog.KObj(pod), nodeName)
klog.V(5).Infof("Skip the Reservation PreBind since no reservation allocated for the pod %s on node %s", klog.KObj(pod), nodeName)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/plugins/reservation/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type pluginTestSuit struct {
extenderFactory *frameworkext.FrameworkExtenderFactory
}

func newPluginTestSuitWith(t *testing.T, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit {
func newPluginTestSuitWith(t testing.TB, pods []*corev1.Pod, nodes []*corev1.Node) *pluginTestSuit {
var v1beta2args v1beta2.ReservationArgs
v1beta2.SetDefaults_ReservationArgs(&v1beta2args)
var reservationArgs config.ReservationArgs
Expand Down
Loading