Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feat: implement reserved capacity #1911

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion kwok/apis/crds/karpenter.kwok.sh_kwoknodeclasses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.17.1
controller-gen.kubebuilder.io/version: v0.16.5
name: kwoknodeclasses.karpenter.kwok.sh
spec:
group: karpenter.kwok.sh
Expand Down
2 changes: 1 addition & 1 deletion kwok/charts/crds/karpenter.sh_nodeclaims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.17.1
controller-gen.kubebuilder.io/version: v0.16.5
name: nodeclaims.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion kwok/charts/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.17.1
controller-gen.kubebuilder.io/version: v0.16.5
name: nodepools.karpenter.sh
spec:
group: karpenter.sh
Expand Down
3 changes: 2 additions & 1 deletion kwok/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/rand"
"strings"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/docker/docker/pkg/namesgenerator"
"github.com/samber/lo"
Expand Down Expand Up @@ -109,7 +110,7 @@ func (c CloudProvider) List(ctx context.Context) ([]*v1.NodeClaim, error) {
}

// Return the hard-coded instance types.
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (c CloudProvider) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
return c.instanceTypes, nil
}

Expand Down
4 changes: 2 additions & 2 deletions kwok/cloudprovider/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,11 @@ func newInstanceType(options InstanceTypeOptions) *cloudprovider.InstanceType {
Requirements: requirements,
Offerings: lo.Map(options.Offerings, func(off KWOKOffering, _ int) cloudprovider.Offering {
return cloudprovider.Offering{
ReservationManager: off.Offering.ReservationManager,
Requirements: scheduling.NewRequirements(lo.Map(off.Requirements, func(req corev1.NodeSelectorRequirement, _ int) *scheduling.Requirement {
return scheduling.NewRequirement(req.Key, req.Operator, req.Values...)
})...),
Price: off.Offering.Price,
Available: off.Offering.Available,
Price: off.Offering.Price,
}
}),
Capacity: options.Resources,
Expand Down
2 changes: 1 addition & 1 deletion kwok/tools/gen_instance_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func constructGenericInstanceTypes() []kwok.InstanceTypeOptions {
corev1.NodeSelectorRequirement{Key: corev1.LabelTopologyZone, Operator: corev1.NodeSelectorOpIn, Values: []string{zone}},
},
Offering: cloudprovider.Offering{
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
Available: true,
Price: lo.Ternary(ct == v1.CapacityTypeSpot, price*.7, price),
},
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodeclaims.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.17.1
controller-gen.kubebuilder.io/version: v0.16.5
name: nodeclaims.karpenter.sh
spec:
group: karpenter.sh
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/crds/karpenter.sh_nodepools.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.17.1
controller-gen.kubebuilder.io/version: v0.16.5
name: nodepools.karpenter.sh
spec:
group: karpenter.sh
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/v1/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const (
ArchitectureArm64 = "arm64"
CapacityTypeSpot = "spot"
CapacityTypeOnDemand = "on-demand"
CapacityTypeReserved = "reserved"
)

// Karpenter specific domains and labels
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sync"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (c *CloudProvider) List(_ context.Context) ([]*v1.NodeClaim, error) {
}), nil
}

func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (c *CloudProvider) GetInstanceTypes(_ context.Context, np *v1.NodePool, _ ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
if np != nil {
if err, ok := c.ErrorsForNodePool[np.Name]; ok {
return nil, err
Expand Down
64 changes: 42 additions & 22 deletions pkg/cloudprovider/fake/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,26 +65,46 @@ func NewInstanceTypeWithCustomRequirement(options InstanceTypeOptions, customReq
}
if len(options.Offerings) == 0 {
options.Offerings = []cloudprovider.Offering{
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}), Price: PriceFromResources(options.Resources), Available: true},
{Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}), Price: PriceFromResources(options.Resources), Available: true},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "spot",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-1",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-2",
}),
Price: PriceFromResources(options.Resources),
},
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: "on-demand",
corev1.LabelTopologyZone: "test-zone-3",
}),
Price: PriceFromResources(options.Resources),
},
}
}
if len(options.Architecture) == 0 {
Expand Down Expand Up @@ -153,12 +173,12 @@ func InstanceTypesAssorted() []*cloudprovider.InstanceType {
price := PriceFromResources(opts.Resources)
opts.Offerings = []cloudprovider.Offering{
{
Available: true,
Requirements: scheduling.NewLabelRequirements(map[string]string{
v1.CapacityTypeLabelKey: ct,
corev1.LabelTopologyZone: zone,
}),
Price: price,
Available: true,
Price: price,
},
}
instanceTypes = append(instanceTypes, NewInstanceType(opts))
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/metrics/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"

opmetrics "github.com/awslabs/operatorpkg/metrics"
"github.com/awslabs/operatorpkg/option"
"github.com/prometheus/client_golang/prometheus"
crmetrics "sigs.k8s.io/controller-runtime/pkg/metrics"

Expand Down Expand Up @@ -133,7 +134,7 @@ func (d *decorator) List(ctx context.Context) ([]*v1.NodeClaim, error) {
return nodeClaims, err
}

func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool) ([]*cloudprovider.InstanceType, error) {
func (d *decorator) GetInstanceTypes(ctx context.Context, nodePool *v1.NodePool, opts ...option.Function[cloudprovider.GetInstanceTypeOptions]) ([]*cloudprovider.InstanceType, error) {
method := "GetInstanceTypes"
defer metrics.Measure(MethodDuration, getLabelsMapForDuration(ctx, d, method))()
instanceType, err := d.CloudProvider.GetInstanceTypes(ctx, nodePool)
Expand Down
80 changes: 75 additions & 5 deletions pkg/cloudprovider/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import (
"sync"
"time"

"github.com/awslabs/operatorpkg/option"
"github.com/awslabs/operatorpkg/status"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"

v1 "sigs.k8s.io/karpenter/pkg/apis/v1"
Expand Down Expand Up @@ -67,7 +69,7 @@ type CloudProvider interface {
// Availability of types or zone may vary by nodepool or over time. Regardless of
// availability, the GetInstanceTypes method should always return all instance types,
// even those with no offerings available.
GetInstanceTypes(context.Context, *v1.NodePool) ([]*InstanceType, error)
GetInstanceTypes(context.Context, *v1.NodePool, ...option.Function[GetInstanceTypeOptions]) ([]*InstanceType, error)
// IsDrifted returns whether a NodeClaim has drifted from the provisioning requirements
// it is tied to.
IsDrifted(context.Context, *v1.NodeClaim) (DriftReason, error)
Expand All @@ -81,6 +83,18 @@ type CloudProvider interface {
GetSupportedNodeClasses() []status.Object
}

type GetInstanceTypeOptions struct {
AvailabilitySnapshotUUID types.UID
}

// GetInstanceTypes calls made with the same snapshot ID should have a consistent view of offering availability. This
// is crucial for offerings with capacity type "reserved" since cross-nodepool offerings may share availability.
func WithAvailabilitySnapshotUUID(uuid types.UID) option.Function[GetInstanceTypeOptions] {
return func(opts *GetInstanceTypeOptions) {
opts.AvailabilitySnapshotUUID = uuid
}
}

// InstanceType describes the properties of a potential node (either concrete attributes of an instance of this type
// or supported options in the case of arrays)
type InstanceType struct {
Expand Down Expand Up @@ -224,6 +238,15 @@ func (its InstanceTypes) Truncate(requirements scheduling.Requirements, maxItems
return truncatedInstanceTypes, nil
}

func (its InstanceTypes) Difference(other InstanceTypes) InstanceTypes {
names := sets.New(lo.Map(other, func(it *InstanceType, _ int) string {
return it.Name
})...)
return lo.Reject(its, func(it *InstanceType, _ int) bool {
return names.Has(it.Name)
})
}

type InstanceTypeOverhead struct {
// KubeReserved returns the default resources allocated to kubernetes system daemons by default
KubeReserved corev1.ResourceList
Expand All @@ -237,27 +260,74 @@ func (i InstanceTypeOverhead) Total() corev1.ResourceList {
return resources.Merge(i.KubeReserved, i.SystemReserved, i.EvictionThreshold)
}

// ReservationManager is used to track the availability of a reserved offering over the course of a scheduling
// simulation. Reserved offerings may have a limited number of available instances associated with them,
// This is exposed as an interface for cloudprovider's to implement to give flexibility when dealing with separate
// offerings with associated availablility.
type ReservationManager interface {
// Reserve takes a unique identifier for a reservation, and returns a boolean indicating if the reservation was
// successful. Reserve should be idempotent, i.e. multiple calls with the same reservation ID should only count for a
// single reservation.
Reserve(string) bool
// Release takes a unique identifier for a reservation, and should discard any matching reservations. If no
// reservations exist for the given id, release should be a no-op.
Release(string)
}

// An Offering describes where an InstanceType is available to be used, with the expectation that its properties
// may be tightly coupled (e.g. the availability of an instance type in some zone is scoped to a capacity type) and
// these properties are captured with labels in Requirements.
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone
// Requirements are required to contain the keys v1.CapacityTypeLabelKey and corev1.LabelTopologyZone.
type Offering struct {
// ReservationManager is used for tracking availabity of reserved offerings over the course of a scheduling loop. It
// must be non-nil for offerings with capacity type "reserved", but may be nil otherwise.
ReservationManager

Requirements scheduling.Requirements
Price float64
// Available is added so that Offerings can return all offerings that have ever existed for an instance type,
// so we can get historical pricing data for calculating savings in consolidation
Available bool
Available bool
}

type Offerings []Offering

// WithCapacityType filters the offerings by the provided capacity type.
func (ofs Offerings) WithCapacityType(capacityType string) Offerings {
return lo.Filter(ofs, func(o Offering, _ int) bool {
return o.Requirements.Get(v1.CapacityTypeLabelKey).Any() == capacityType
})
}

// Reserve attempts to make a reservation for each offering, returning true if it was successful for any.
func (ofs Offerings) Reserve(id string) Offerings {
return lo.Filter(ofs, func(o Offering, _ int) bool {
return o.Reserve(id)
})
}

func (ofs Offerings) Release(id string) {
for i := range ofs {
ofs[i].Release(id)
}
}

// Available filters the available offerings from the returned offerings
func (ofs Offerings) Available() Offerings {
return lo.Filter(ofs, func(o Offering, _ int) bool {
return o.Available
})
}

func (ofs Offerings) PartitionCompatible(reqs scheduling.Requirements) (compatible Offerings, incompatible Offerings) {
for _, o := range ofs {
if reqs.IsCompatible(o.Requirements, scheduling.AllowUndefinedWellKnownLabels) {
compatible = append(compatible, o)
} else {
incompatible = append(incompatible, o)
}
}
return
}

// Compatible returns the offerings based on the passed requirements
func (ofs Offerings) Compatible(reqs scheduling.Requirements) Offerings {
return lo.Filter(ofs, func(offering Offering, _ int) bool {
Expand Down
Loading
Loading