Skip to content

Commit

Permalink
support unknown resources
Browse files Browse the repository at this point in the history
Karpenter cannot be used on clusters where custom resources for pods are used, such as device drivers like `/dev/fuse` used with Podman.

Following error is logged:
```
karpenter-778b9dbc4f-gk88t {"level":"ERROR",..."logger":"controller.provisioner","message":"Could not schedule pod, incompatible with provisioner \"default\", daemonset overhead={\"cpu\":\"562m\",\"memory\":\"758026799\",\"pods\":\"10\"}, no instance type satisfied resources {\"cpu\":\"1562m\",\"memory\":\"1831768623\",\"pods\":\"11\",\"smarter-devices/fuse\":\"1\"} and requirements karpenter.k8s.aws/instance-category In [c m r], karpenter.k8s.aws/instance-generation Exists >2, karpenter.k8s.aws/instance-hypervisor In [nitro], karpenter.k8s.aws/instance-size NotIn [medium micro nano small], karpenter.sh/capacity-type In [on-demand spot], karpenter.sh/provisioner-name In [default], kubernetes.io/arch In [amd64], kubernetes.io/os In [linux], node.kubernetes.io/node-group In [primary] (no instance type has enough resources)"}
```

Here we add a flag to instruct Karpenter to ignore certain defined resources, which will allow the usage of Karpenter for these clusters.
  • Loading branch information
universam1 committed Oct 13, 2023
1 parent a1a59ae commit 23e01c0
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 12 deletions.
13 changes: 12 additions & 1 deletion pkg/apis/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (

"go.uber.org/multierr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
)

type settingsKeyType struct{}
Expand All @@ -39,7 +42,8 @@ type Settings struct {
BatchMaxDuration time.Duration
BatchIdleDuration time.Duration
// This feature flag is temporary and will be removed in the near future.
DriftEnabled bool
DriftEnabled bool
IgnoredDeviceRequests v1.ResourceList
}

func (*Settings) ConfigMap() string {
Expand All @@ -49,13 +53,20 @@ func (*Settings) ConfigMap() string {
// Inject creates a Settings from the supplied ConfigMap
func (*Settings) Inject(ctx context.Context, cm *v1.ConfigMap) (context.Context, error) {
s := defaultSettings.DeepCopy()
idr := sets.String{}
if err := configmap.Parse(cm.Data,
configmap.AsDuration("batchMaxDuration", &s.BatchMaxDuration),
configmap.AsDuration("batchIdleDuration", &s.BatchIdleDuration),
configmap.AsBool("featureGates.driftEnabled", &s.DriftEnabled),
configmap.AsStringSet("ignoredDeviceRequests", &idr),
); err != nil {
return ctx, fmt.Errorf("parsing settings, %w", err)
}
s.IgnoredDeviceRequests = make(v1.ResourceList, len(idr))
for _, k := range idr.List() {
logging.FromContext(ctx).Debugf("ignoring device request '%s'", k)
s.IgnoredDeviceRequests[v1.ResourceName(k)] = resource.Quantity{}
}
if err := s.Validate(); err != nil {
return ctx, fmt.Errorf("validating settings, %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloudprovider/fake/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/apis/v1beta1"
"github.com/aws/karpenter-core/pkg/cloudprovider"
Expand Down Expand Up @@ -97,7 +98,7 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *v1beta1.NodeClaim
instanceTypes := lo.Filter(lo.Must(c.GetInstanceTypes(ctx, np)), func(i *cloudprovider.InstanceType, _ int) bool {
return reqs.Compatible(i.Requirements, scheduling.AllowUndefinedWellKnownLabelsV1Alpha5) == nil &&
len(i.Offerings.Requirements(reqs).Available()) > 0 &&
resources.Fits(nodeClaim.Spec.Resources.Requests, i.Allocatable())
resources.Fits(nodeClaim.Spec.Resources.Requests, settings.FromContext(ctx).IgnoredDeviceRequests, i.Allocatable())
})
// Order instance types so that we get the cheapest instance types of the available offerings
sort.Slice(instanceTypes, func(i, j int) bool {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/provisioning/scheduling/existingnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/controllers/state"
"github.com/aws/karpenter-core/pkg/scheduling"
"github.com/aws/karpenter-core/pkg/utils/resources"
Expand Down Expand Up @@ -82,7 +83,7 @@ func (n *ExistingNode) Add(ctx context.Context, kubeClient client.Client, pod *v
// node, which at this point can't be increased in size
requests := resources.Merge(n.requests, resources.RequestsForPods(pod))

if !resources.Fits(requests, n.Available()) {
if !resources.Fits(requests, settings.FromContext(ctx).IgnoredDeviceRequests, n.Available()) {
return fmt.Errorf("exceeds node resources")
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/controllers/provisioning/scheduling/nodeclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ limitations under the License.
package scheduling

import (
"context"
"fmt"
"strings"
"sync/atomic"
Expand All @@ -23,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"

"github.com/aws/karpenter-core/pkg/apis/settings"
"github.com/aws/karpenter-core/pkg/apis/v1alpha5"
"github.com/aws/karpenter-core/pkg/cloudprovider"
"github.com/aws/karpenter-core/pkg/scheduling"
Expand Down Expand Up @@ -61,7 +63,7 @@ func NewNodeClaim(nodeClaimTemplate *NodeClaimTemplate, topology *Topology, daem
}
}

func (n *NodeClaim) Add(pod *v1.Pod) error {
func (n *NodeClaim) Add(ctx context.Context, pod *v1.Pod) error {
// Check Taints
if err := scheduling.Taints(n.Spec.Taints).Tolerates(pod); err != nil {
return err
Expand Down Expand Up @@ -100,7 +102,7 @@ func (n *NodeClaim) Add(pod *v1.Pod) error {

// Check instance type combinations
requests := resources.Merge(n.Spec.Resources.Requests, resources.RequestsForPods(pod))
filtered := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, requests)
filtered := filterInstanceTypesByRequirements(n.InstanceTypeOptions, nodeClaimRequirements, requests, settings.FromContext(ctx).IgnoredDeviceRequests)
if len(filtered.remaining) == 0 {
// log the total resources being requested (daemonset + the pod)
cumulativeResources := resources.Merge(n.daemonResources, resources.RequestsForPods(pod))
Expand Down Expand Up @@ -221,7 +223,7 @@ func (r filterResults) FailureReason() string {
}

//nolint:gocyclo
func filterInstanceTypesByRequirements(instanceTypes []*cloudprovider.InstanceType, requirements scheduling.Requirements, requests v1.ResourceList) filterResults {
func filterInstanceTypesByRequirements(instanceTypes []*cloudprovider.InstanceType, requirements scheduling.Requirements, requests, ignored v1.ResourceList) filterResults {
results := filterResults{
requests: requests,
requirementsMet: false,
Expand All @@ -236,7 +238,7 @@ func filterInstanceTypesByRequirements(instanceTypes []*cloudprovider.InstanceTy
// the tradeoff to not short circuiting on the filtering is that we can report much better error messages
// about why scheduling failed
itCompat := compatible(it, requirements)
itFits := fits(it, requests)
itFits := fits(it, requests, ignored)
itHasOffering := hasOffering(it, requirements)

// track if any single instance type met a single criteria
Expand All @@ -262,8 +264,8 @@ func compatible(instanceType *cloudprovider.InstanceType, requirements schedulin
return instanceType.Requirements.Intersects(requirements) == nil
}

func fits(instanceType *cloudprovider.InstanceType, requests v1.ResourceList) bool {
return resources.Fits(requests, instanceType.Allocatable())
func fits(instanceType *cloudprovider.InstanceType, requests, ignored v1.ResourceList) bool {
return resources.Fits(requests, ignored, instanceType.Allocatable())
}

func hasOffering(instanceType *cloudprovider.InstanceType, requirements scheduling.Requirements) bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {

// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
if err := nodeClaim.Add(pod); err == nil {
if err := nodeClaim.Add(ctx, pod); err == nil {
return nil
}
}
Expand All @@ -268,7 +268,7 @@ func (s *Scheduler) add(ctx context.Context, pod *v1.Pod) error {
}
}
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes)
if err := nodeClaim.Add(pod); err != nil {
if err := nodeClaim.Add(ctx, pod); err != nil {
errs = multierr.Append(errs, fmt.Errorf("incompatible with %s %q, daemonset overhead=%s, %w",
nodeClaimTemplate.OwnerKind(),
nodeClaimTemplate.OwnerKey.Name,
Expand Down
5 changes: 4 additions & 1 deletion pkg/utils/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,17 @@ func Cmp(lhs resource.Quantity, rhs resource.Quantity) int {
}

// Fits returns true if the candidate set of resources is less than or equal to the total set of resources.
func Fits(candidate, total v1.ResourceList) bool {
func Fits(candidate, ignored, total v1.ResourceList) bool {
// If any of the total resource values are negative then the resource will never fit
for _, quantity := range total {
if Cmp(resource.MustParse("0"), quantity) > 0 {
return false
}
}
for resourceName, quantity := range candidate {
if _, match := ignored[resourceName]; match {
continue
}
if Cmp(quantity, total[resourceName]) > 0 {
return false
}
Expand Down

0 comments on commit 23e01c0

Please sign in to comment.