From 85e709c58985cd8a25a8bf9a256666ae29a7f71c Mon Sep 17 00:00:00 2001 From: "zhangxinjie.next" Date: Wed, 3 Jan 2024 21:54:38 +0800 Subject: [PATCH] feat: add support for katalyst --- cmd/controller-manager/app/core.go | 4 +- cmd/controller-manager/app/options/options.go | 21 + cmd/controller-manager/app/util.go | 6 + go.mod | 1 + go.sum | 2 + pkg/controllers/context/context.go | 2 + .../federatedcluster/clusterstatus.go | 190 ++++++++- .../federatedcluster/controller.go | 32 +- .../federatedcluster/plugins/katalyst.go | 110 +++++ .../federatedcluster/plugins/katalyst_test.go | 402 ++++++++++++++++++ .../federatedcluster/plugins/plugins.go | 85 ++++ .../federatedcluster/plugins/plugins_test.go | 86 ++++ pkg/controllers/federatedcluster/util.go | 4 - .../balanced_allocation_test.go | 22 + .../framework/plugins/clusterresources/fit.go | 13 +- .../clusterresources/least_allocated.go | 10 +- .../clusterresources/least_allocated_test.go | 45 +- .../clusterresources/most_allocated.go | 10 +- .../clusterresources/most_allocated_test.go | 22 + .../framework/plugins/katalyst/exist.go | 63 +++ .../framework/plugins/katalyst/exist_test.go | 146 +++++++ .../framework/plugins/names/names.go | 1 + .../scheduler/framework/plugins/rsp/rsp.go | 4 + pkg/controllers/scheduler/framework/types.go | 2 + pkg/controllers/scheduler/framework/util.go | 21 + pkg/controllers/scheduler/profile.go | 5 + pkg/controllers/scheduler/scheduler.go | 6 +- pkg/controllers/scheduler/schedulingunit.go | 15 +- pkg/util/resource/resource.go | 7 + 29 files changed, 1302 insertions(+), 35 deletions(-) create mode 100644 pkg/controllers/federatedcluster/plugins/katalyst.go create mode 100644 pkg/controllers/federatedcluster/plugins/katalyst_test.go create mode 100644 pkg/controllers/federatedcluster/plugins/plugins.go create mode 100644 pkg/controllers/federatedcluster/plugins/plugins_test.go create mode 100644 pkg/controllers/scheduler/framework/plugins/katalyst/exist.go create mode 100644 pkg/controllers/scheduler/framework/plugins/katalyst/exist_test.go diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index a1d516be..acdd85eb 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -171,10 +171,9 @@ func startFederatedClusterController( controllerCtx.FederatedInformerManager, controllerCtx.Metrics, klog.Background(), - controllerCtx.ComponentConfig.ClusterJoinTimeout, controllerCtx.WorkerCount, controllerCtx.FedSystemNamespace, - controllerCtx.ComponentConfig.ResourceAggregationNodeFilter, + controllerCtx.ComponentConfig, ) if err != nil { return nil, fmt.Errorf("error creating federate controller: %w", err) @@ -204,6 +203,7 @@ func startScheduler( controllerCtx.Metrics, klog.Background(), controllerCtx.WorkerCount, + controllerCtx.ComponentConfig.EnableKatalystSupport, ) if err != nil { return nil, fmt.Errorf("error creating scheduler: %w", err) diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 2ec8b60c..8b982a01 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -29,6 +29,8 @@ import ( const ( DefaultPort = 11257 + + MinClusterHealthCheckPeriod = 15 * time.Second ) type Options struct { @@ -64,6 +66,9 @@ type Options struct { PrometheusQuantiles map[string]string ResourceAggregationNodeFilter []string + + EnableKatalystSupport bool + ClusterHealthCheckPeriod time.Duration } func NewOptions() *Options { @@ -153,6 +158,22 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string, disabl "If the flag is provided multiple times, "+ "nodes are excluded as long as at least one of the selectors is matched.", ) + + flags.BoolVar( + &o.EnableKatalystSupport, + "enable-katalyst-support", + false, + "Enable katalyst support in the cluster controller and scheduler. Enabling this, cluster controller "+ + "will collect katalyst CNR resource into FederatedCluster if katalyst plugin was enabled in its "+ + "annotations, and the scheduler will enable KatalystResourcesExist filter and support split replicas "+ + "by katalyst resources.", + ) + flags.DurationVar( + &o.ClusterHealthCheckPeriod, + "cluster-health-check-period", + time.Second*30, + "The period of health check for member clusters. The minimum value is "+MinClusterHealthCheckPeriod.String()+".", + ) } func (o *Options) addKlogFlags(flags *pflag.FlagSet) { diff --git a/cmd/controller-manager/app/util.go b/cmd/controller-manager/app/util.go index 36ddeea7..38609e3f 100644 --- a/cmd/controller-manager/app/util.go +++ b/cmd/controller-manager/app/util.go @@ -183,6 +183,12 @@ func getComponentConfig(opts *options.Options) (*controllercontext.ComponentConf componentConfig := &controllercontext.ComponentConfig{ ClusterJoinTimeout: opts.ClusterJoinTimeout, MemberObjectEnqueueDelay: opts.MemberObjectEnqueueDelay, + EnableKatalystSupport: opts.EnableKatalystSupport, + ClusterHealthCheckPeriod: opts.ClusterHealthCheckPeriod, + } + + if opts.ClusterHealthCheckPeriod < options.MinClusterHealthCheckPeriod { + componentConfig.ClusterHealthCheckPeriod = options.MinClusterHealthCheckPeriod } if opts.NSAutoPropExcludeRegexp != "" { diff --git a/go.mod b/go.mod index d6204f1f..8aada373 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 github.com/go-logr/logr v1.2.4 github.com/google/go-cmp v0.5.9 + github.com/kubewharf/katalyst-api v0.3.3 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.8 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 0f44eab9..5d31e48e 100644 --- a/go.sum +++ b/go.sum @@ -264,6 +264,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kubewharf/katalyst-api v0.3.3 h1:/aE7PmOr5IFXgNTDe4sbmjsqw48o1QeIrLlr18t5/7s= +github.com/kubewharf/katalyst-api v0.3.3/go.mod h1:iVILS5UL5PRtkUPH2Iu1K/gFGTPMNItnth5fmQ80VGE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y= diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 19d498d8..564f06af 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -83,4 +83,6 @@ type ComponentConfig struct { ClusterJoinTimeout time.Duration MemberObjectEnqueueDelay time.Duration ResourceAggregationNodeFilter []labels.Selector + EnableKatalystSupport bool + ClusterHealthCheckPeriod time.Duration } diff --git a/pkg/controllers/federatedcluster/clusterstatus.go b/pkg/controllers/federatedcluster/clusterstatus.go index 5daa7825..dfd12af0 100644 --- a/pkg/controllers/federatedcluster/clusterstatus.go +++ b/pkg/controllers/federatedcluster/clusterstatus.go @@ -25,22 +25,29 @@ import ( "fmt" "sort" "strings" + "sync" "time" corev1 "k8s.io/api/core/v1" apiextv1b1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic/dynamicinformer" corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster/plugins" "github.com/kubewharf/kubeadmiral/pkg/stats" "github.com/kubewharf/kubeadmiral/pkg/stats/metrics" clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/resource" ) const ( @@ -60,8 +67,166 @@ const ( ClusterReachableMsg = "Cluster is reachable" ClusterNotReachableReason = "ClusterNotReachable" ClusterNotReachableMsg = "Cluster is not reachable" + + maxHealthCheckTimeout = 30 * time.Second ) +type resourceCollector struct { + name string + plugin plugins.Plugin + hasSynceds map[schema.GroupVersionResource]cache.InformerSynced + dynamicLister map[schema.GroupVersionResource]cache.GenericLister + + allocatable, available corev1.ResourceList +} + +func (c *FederatedClusterController) getExternalResourceCollectors( + cluster *fedcorev1a1.FederatedCluster, +) (resourceCollectors []resourceCollector) { + pluginMaps, err := plugins.ResolvePlugins(cluster.Annotations) + if err != nil { + c.logger.V(4).Info("Failed to get cluster plugins", "cluster", cluster.Name, "err", err) + return nil + } + + clusterKey, _ := informermanager.DefaultClusterConnectionHash(cluster) + c.lock.Lock() + defer c.lock.Unlock() + + if factory := c.externalClusterResourceInformers[string(clusterKey)]; factory != nil { + for pluginName, plugin := range pluginMaps { + rc := resourceCollector{ + name: pluginName, + plugin: plugin, + hasSynceds: map[schema.GroupVersionResource]cache.InformerSynced{}, + dynamicLister: map[schema.GroupVersionResource]cache.GenericLister{}, + } + + for gvr := range plugin.ClusterResourcesToCollect() { + rc.hasSynceds[gvr] = factory.ForResource(gvr).Informer().HasSynced + rc.dynamicLister[gvr] = factory.ForResource(gvr).Lister() + } + resourceCollectors = append(resourceCollectors, rc) + } + } + + return resourceCollectors +} + +func (c *FederatedClusterController) addOrUpdateExternalClusterResourceInformers( + cluster *fedcorev1a1.FederatedCluster, +) { + key, _ := informermanager.DefaultClusterConnectionHash(cluster) + clusterKey := string(key) + + pluginMaps, err := plugins.ResolvePlugins(cluster.Annotations) + if err != nil { + c.logger.V(4).Info("Failed to get cluster plugins", "cluster", cluster.Name, "err", err) + return + } + if len(pluginMaps) == 0 { + c.removeExternalClusterResourceInformers(cluster.Name) + return + } + pluginsHash := plugins.PluginsHash(pluginMaps) + + c.lock.Lock() + defer c.lock.Unlock() + + if old, exist := c.clusterConnectionHashes[cluster.Name]; exist { + // Connection and plugins are unchanged, do nothing + if old == clusterKey && c.enabledClusterResourcePluginHashes[cluster.Name] == pluginsHash { + return + } + // Otherwise, delete old informer since the connection has changed. + // We use the same context for a cluster, so if the plugins changed, + // we have to rebuild the informer. + if cancel := c.externalClusterResourceInformerCancelFuncs[old]; cancel != nil { + cancel() + } + delete(c.externalClusterResourceInformers, old) + delete(c.externalClusterResourceInformerCancelFuncs, old) + } + + client, ok := c.federatedInformerManager.GetClusterDynamicClient(cluster.Name) + if !ok { + return + } + informer := dynamicinformer.NewDynamicSharedInformerFactory(client, 0) + ctx, cancel := context.WithCancel(context.Background()) + for _, plugin := range pluginMaps { + for gvr := range plugin.ClusterResourcesToCollect() { + informer.ForResource(gvr) + } + } + informer.Start(ctx.Done()) + + c.clusterConnectionHashes[cluster.Name] = clusterKey + c.enabledClusterResourcePluginHashes[cluster.Name] = pluginsHash + c.externalClusterResourceInformers[clusterKey] = informer + c.externalClusterResourceInformerCancelFuncs[clusterKey] = cancel +} + +func (c *FederatedClusterController) removeExternalClusterResourceInformers(clusterName string) { + c.lock.Lock() + defer c.lock.Unlock() + + key := c.clusterConnectionHashes[clusterName] + if cancel := c.externalClusterResourceInformerCancelFuncs[key]; cancel != nil { + cancel() + } + + delete(c.clusterConnectionHashes, clusterName) + delete(c.enabledClusterResourcePluginHashes, clusterName) + delete(c.externalClusterResourceInformers, key) + delete(c.externalClusterResourceInformerCancelFuncs, key) +} + +func collectExternalClusterResources( + ctx context.Context, + availableNodes []*corev1.Node, + pods []*corev1.Pod, + resourceCollectors []resourceCollector, +) { + wg := sync.WaitGroup{} + for i, collector := range resourceCollectors { + wg.Add(1) + go func(i int, collector resourceCollector) { + defer wg.Done() + + ctx, logger := logging.InjectLoggerValues(ctx, "plugin", collector.name) + ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + hasSynceds := make([]cache.InformerSynced, 0, len(collector.hasSynceds)) + names := make([]string, 0, len(collector.hasSynceds)) + for gvr, synced := range collector.hasSynceds { + hasSynceds = append(hasSynceds, synced) + names = append(names, gvr.String()) + } + if !cache.WaitForCacheSync(ctxWithTimeout.Done(), hasSynceds...) { + logger.V(4).Info("Timeout waiting for informers sync", + "informers", strings.Join(names, ";")) + return + } + + allocatable, available, err := collector.plugin.CollectClusterResources( + ctx, + availableNodes, + pods, + plugins.ClusterHandle{DynamicLister: collector.dynamicLister}, + ) + if err != nil { + logger.V(4).Info("Failed to collect cluster resources", "err", err) + return + } + resourceCollectors[i].allocatable = allocatable + resourceCollectors[i].available = available + }(i, collector) + } + wg.Wait() +} + func (c *FederatedClusterController) collectIndividualClusterStatus( ctx context.Context, cluster *fedcorev1a1.FederatedCluster, @@ -91,7 +256,11 @@ func (c *FederatedClusterController) collectIndividualClusterStatus( cluster = cluster.DeepCopy() conditionTime := metav1.Now() - offlineStatus, readyStatus := checkReadyByHealthz(ctx, discoveryClient) + timeout := c.clusterHealthCheckConfig.Period / 2 + if timeout > maxHealthCheckTimeout { + timeout = maxHealthCheckTimeout + } + offlineStatus, readyStatus := checkReadyByHealthz(ctx, discoveryClient, timeout) var readyReason, readyMessage string switch readyStatus { case corev1.ConditionTrue: @@ -114,6 +283,7 @@ func (c *FederatedClusterController) collectIndividualClusterStatus( podsSynced, nodeLister, nodesSynced, + c.getExternalResourceCollectors(cluster), ); err != nil { logger.Error(err, "Failed to update cluster resources") readyStatus = corev1.ConditionFalse @@ -167,10 +337,11 @@ func (c *FederatedClusterController) collectIndividualClusterStatus( func checkReadyByHealthz( ctx context.Context, clusterDiscoveryClient discovery.DiscoveryInterface, + timeout time.Duration, ) (offline, ready corev1.ConditionStatus) { logger := klog.FromContext(ctx) - body, err := clusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Timeout(30 * time.Second).Do(ctx).Raw() + body, err := clusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Timeout(timeout).Do(ctx).Raw() if err != nil { logger.Error(err, "Cluster health check failed") return corev1.ConditionTrue, corev1.ConditionUnknown @@ -193,10 +364,11 @@ func (c *FederatedClusterController) updateClusterResources( podsSynced cache.InformerSynced, nodeLister corev1listers.NodeLister, nodesSynced cache.InformerSynced, + resourceCollectors []resourceCollector, ) error { - ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() - if !cache.WaitForCacheSync(ctx.Done(), podsSynced, nodesSynced) { + if !cache.WaitForCacheSync(ctxWithTimeout.Done(), podsSynced, nodesSynced) { return fmt.Errorf("timeout waiting for node and pod informer sync") } @@ -210,13 +382,21 @@ func (c *FederatedClusterController) updateClusterResources( } schedulableNodes := int64(0) + availableNodes := make([]*corev1.Node, 0, len(nodes)) for _, node := range nodes { if isNodeSchedulable(node) && !c.isNodeFiltered(node) { schedulableNodes++ + availableNodes = append(availableNodes, node) } } + allocatable, available := c.aggregateResources(availableNodes, pods) + + collectExternalClusterResources(ctx, availableNodes, pods, resourceCollectors) + for _, collector := range resourceCollectors { + resource.MergeResources(collector.allocatable, allocatable) + resource.MergeResources(collector.available, available) + } - allocatable, available := c.aggregateResources(nodes, pods) clusterStatus.Resources = fedcorev1a1.Resources{ SchedulableNodes: &schedulableNodes, Allocatable: allocatable, diff --git a/pkg/controllers/federatedcluster/controller.go b/pkg/controllers/federatedcluster/controller.go index f76ae4b2..ea5cc240 100644 --- a/pkg/controllers/federatedcluster/controller.go +++ b/pkg/controllers/federatedcluster/controller.go @@ -23,6 +23,7 @@ package federatedcluster import ( "context" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -32,6 +33,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic/dynamicinformer" kubeclient "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" @@ -44,6 +46,8 @@ import ( genscheme "github.com/kubewharf/kubeadmiral/pkg/client/generic/scheme" fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" + controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" + "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster/plugins" "github.com/kubewharf/kubeadmiral/pkg/stats" "github.com/kubewharf/kubeadmiral/pkg/stats/metrics" clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster" @@ -81,6 +85,12 @@ type FederatedClusterController struct { clusterJoinTimeout time.Duration resourceAggregationNodeFilter []labels.Selector + lock sync.Mutex + clusterConnectionHashes map[string]string + enabledClusterResourcePluginHashes map[string]string + externalClusterResourceInformers map[string]dynamicinformer.DynamicSharedInformerFactory + externalClusterResourceInformerCancelFuncs map[string]context.CancelFunc + worker worker.ReconcileWorker[common.QualifiedName] statusCollectWorker worker.ReconcileWorker[common.QualifiedName] eventRecorder record.EventRecorder @@ -96,10 +106,9 @@ func NewFederatedClusterController( federatedInformerManager informermanager.FederatedInformerManager, metrics stats.Metrics, logger klog.Logger, - clusterJoinTimeout time.Duration, workerCount int, fedSystemNamespace string, - resourceAggregationNodeFilter []labels.Selector, + componentConfig *controllercontext.ComponentConfig, ) (*FederatedClusterController, error) { c := &FederatedClusterController{ clusterInformer: clusterInformer, @@ -108,14 +117,23 @@ func NewFederatedClusterController( fedClient: fedClient, fedSystemNamespace: fedSystemNamespace, clusterHealthCheckConfig: &ClusterHealthCheckConfig{ - // TODO: make health check period configurable - Period: time.Second * 30, + Period: componentConfig.ClusterHealthCheckPeriod, }, - clusterJoinTimeout: clusterJoinTimeout, - resourceAggregationNodeFilter: resourceAggregationNodeFilter, + + lock: sync.Mutex{}, + clusterConnectionHashes: map[string]string{}, + enabledClusterResourcePluginHashes: map[string]string{}, + externalClusterResourceInformers: map[string]dynamicinformer.DynamicSharedInformerFactory{}, + externalClusterResourceInformerCancelFuncs: map[string]context.CancelFunc{}, + + clusterJoinTimeout: componentConfig.ClusterJoinTimeout, + resourceAggregationNodeFilter: componentConfig.ResourceAggregationNodeFilter, metrics: metrics, logger: logger.WithValues("controller", FederatedClusterControllerName), } + if componentConfig.EnableKatalystSupport { + plugins.AddKatalystPluginIntoDefaultPlugins() + } broadcaster := record.NewBroadcaster() broadcaster.StartRecordingToSink( @@ -238,6 +256,7 @@ func (c *FederatedClusterController) reconcile( logger.Error(err, "Failed to handle terminating cluster") return worker.StatusError } + c.removeExternalClusterResourceInformers(cluster.Name) return worker.StatusAllOK } @@ -249,6 +268,7 @@ func (c *FederatedClusterController) reconcile( return worker.StatusError } + c.addOrUpdateExternalClusterResourceInformers(cluster) if joined, alreadyFailed := isClusterJoined(&cluster.Status); joined || alreadyFailed { return worker.StatusAllOK } diff --git a/pkg/controllers/federatedcluster/plugins/katalyst.go b/pkg/controllers/federatedcluster/plugins/katalyst.go new file mode 100644 index 00000000..798854bb --- /dev/null +++ b/pkg/controllers/federatedcluster/plugins/katalyst.go @@ -0,0 +1,110 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "context" + + katalystv1a1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/resource" +) + +var katalystCNR = katalystv1a1.SchemeGroupVersion.WithResource(katalystv1a1.ResourceNameKatalystCNR) + +type katalystPlugin struct{} + +func (k *katalystPlugin) CollectClusterResources( + ctx context.Context, + nodes []*corev1.Node, + pods []*corev1.Pod, + handle ClusterHandle, +) (allocatable, available corev1.ResourceList, err error) { + _, logger := logging.InjectLoggerValues(ctx, "gvr", katalystCNR) + + cnr, ok := handle.DynamicLister[katalystCNR] + if !ok { + logger.V(4).Info("Lister not found, cluster resource collection skipped") + return nil, nil, nil + } + unsList, err := cnr.List(labels.Everything()) + if err != nil || len(unsList) == 0 { + return nil, nil, err + } + + allocatable = make(corev1.ResourceList) + nodeSet := sets.New[string]() + for _, node := range nodes { + nodeSet.Insert(node.Name) + } + + for _, uns := range unsList { + unsCNR, ok := uns.(*unstructured.Unstructured) + if !ok || !nodeSet.Has(unsCNR.GetName()) { + continue + } + + cnr := &katalystv1a1.CustomNodeResource{} + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsCNR.UnstructuredContent(), cnr); err != nil { + logger.Error(err, "Failed to convert unstructured to CNR") + continue + } + if cnr.Status.Resources.Allocatable == nil { + continue + } + resource.AddResources(*cnr.Status.Resources.Allocatable, allocatable) + } + + available = make(corev1.ResourceList) + for name, quantity := range allocatable { + available[name] = quantity.DeepCopy() + } + + for _, pod := range pods { + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + continue + } + + podRequests := resource.GetPodResourceRequests(&pod.Spec) + for name, requestedQuantity := range podRequests { + if availableQuantity, ok := available[name]; ok { + availableQuantity.Sub(requestedQuantity) + available[name] = availableQuantity + } + } + } + + return allocatable, available, nil +} + +func (k *katalystPlugin) ClusterResourcesToCollect() sets.Set[schema.GroupVersionResource] { + return sets.New(katalystCNR) +} + +func AddKatalystPluginIntoDefaultPlugins() { + if _, ok := defaultPlugins[katalystv1a1.GroupName]; ok { + return + } + defaultPlugins[katalystv1a1.GroupName] = &katalystPlugin{} +} diff --git a/pkg/controllers/federatedcluster/plugins/katalyst_test.go b/pkg/controllers/federatedcluster/plugins/katalyst_test.go new file mode 100644 index 00000000..38e4ae28 --- /dev/null +++ b/pkg/controllers/federatedcluster/plugins/katalyst_test.go @@ -0,0 +1,402 @@ +/* +Copyright 2024 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "context" + "errors" + "reflect" + "testing" + + katalystv1a1 "github.com/kubewharf/katalyst-api/pkg/apis/node/v1alpha1" + katalystconsts "github.com/kubewharf/katalyst-api/pkg/consts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" +) + +func newNode(name string) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } +} + +func newCNR(name string, allocatable corev1.ResourceList) *katalystv1a1.CustomNodeResource { + return &katalystv1a1.CustomNodeResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: katalystv1a1.CustomNodeResourceStatus{ + Resources: katalystv1a1.Resources{Allocatable: &allocatable}, + }, + } +} + +func newUnsCNR(name string, allocatable corev1.ResourceList) *unstructured.Unstructured { + uns, _ := runtime.DefaultUnstructuredConverter.ToUnstructured(newCNR(name, allocatable)) + return &unstructured.Unstructured{Object: uns} +} + +func newPod(name string, requests corev1.ResourceList, phase corev1.PodPhase) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Resources: corev1.ResourceRequirements{ + Requests: requests, + }, + }}, + }, + Status: corev1.PodStatus{ + Phase: phase, + }, + } +} + +type fakeUnstructuredLister struct { + data []*unstructured.Unstructured + err error +} + +func (pl fakeUnstructuredLister) List(selector labels.Selector) (ret []runtime.Object, err error) { + if pl.err != nil { + return nil, pl.err + } + res := []runtime.Object{} + for _, uns := range pl.data { + if selector.Matches(labels.Set(uns.GetLabels())) { + res = append(res, uns) + } + } + return res, nil +} + +func (pl fakeUnstructuredLister) Get(name string) (runtime.Object, error) { + if pl.err != nil { + return nil, pl.err + } + for _, uns := range pl.data { + if uns.GetName() == name { + return uns, nil + } + } + return nil, nil +} + +func (pl fakeUnstructuredLister) ByNamespace(namespace string) cache.GenericNamespaceLister { + return pl +} + +func Test_katalystPlugin_CollectClusterResources(t *testing.T) { + type args struct { + nodes []*corev1.Node + pods []*corev1.Pod + handle ClusterHandle + } + tests := []struct { + name string + args args + wantAllocatable corev1.ResourceList + wantAvailable corev1.ResourceList + wantErr bool + }{ + { + name: "0 cnr", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodRunning, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{}, + }, + }, + }, + wantAllocatable: nil, + wantAvailable: nil, + wantErr: false, + }, + { + name: "2 nodes, 1 pod, 2 cnr", + args: args{ + nodes: []*corev1.Node{newNode("node1"), newNode("node2")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodRunning, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{ + data: []*unstructured.Unstructured{ + newUnsCNR("node1", corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }), + newUnsCNR("node2", corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("2k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("2Mi"), + }), + }, + }, + }, + }, + }, + wantAllocatable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: *resource.NewScaledQuantity(3, 3), // calculated result + katalystconsts.ReclaimedResourceMemory: *resource.NewQuantity(3145728, resource.BinarySI), // calculated result + }, + wantAvailable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: *resource.NewScaledQuantity(2, 3), // calculated result + katalystconsts.ReclaimedResourceMemory: *resource.NewQuantity(2097152, resource.BinarySI), // calculated result + }, + wantErr: false, + }, + { + name: "1 nodes, 1 pod, 2 cnr", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodRunning, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{ + data: []*unstructured.Unstructured{ + newUnsCNR("node1", corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }), + newUnsCNR("node2", corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("2k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("2Mi"), + }), + }, + }, + }, + }, + }, + wantAllocatable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + wantAvailable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: *resource.NewScaledQuantity(0, 3), // calculated result + katalystconsts.ReclaimedResourceMemory: *resource.NewQuantity(0, resource.BinarySI), // calculated result + }, + wantErr: false, + }, + { + name: "1 nodes, 1 pod failed, 1 cnr", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodFailed, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{ + data: []*unstructured.Unstructured{ + newUnsCNR("node1", corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }), + }, + }, + }, + }, + }, + wantAllocatable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + wantAvailable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + wantErr: false, + }, + { + name: "1 nodes, 1 pod succeeded, 1 cnr", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodSucceeded, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{ + data: []*unstructured.Unstructured{ + newUnsCNR("node1", corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }), + }, + }, + }, + }, + }, + wantAllocatable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + wantAvailable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + wantErr: false, + }, + { + name: "1 nodes, 1 pod, 1 cnr without resources", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodRunning, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{ + data: []*unstructured.Unstructured{ + newUnsCNR("node1", nil), + }, + }, + }, + }, + }, + wantAllocatable: corev1.ResourceList{}, + wantAvailable: corev1.ResourceList{}, + wantErr: false, + }, + { + name: "handle not exist", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodRunning, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{}, + }, + }, + wantAllocatable: nil, + wantAvailable: nil, + wantErr: false, + }, + { + name: "lister failed", + args: args{ + nodes: []*corev1.Node{newNode("node1")}, + pods: []*corev1.Pod{ + newPod( + "pod1", + corev1.ResourceList{ + katalystconsts.ReclaimedResourceMilliCPU: resource.MustParse("1k"), + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Mi"), + }, + corev1.PodRunning, + ), + }, + handle: ClusterHandle{ + DynamicLister: map[schema.GroupVersionResource]cache.GenericLister{ + katalystCNR: fakeUnstructuredLister{err: errors.New("lister failed")}, + }, + }, + }, + wantAllocatable: nil, + wantAvailable: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + k := &katalystPlugin{} + gotAllocatable, gotAvailable, err := k.CollectClusterResources( + context.Background(), + tt.args.nodes, + tt.args.pods, + tt.args.handle, + ) + if (err != nil) != tt.wantErr { + t.Errorf("CollectClusterResources() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(gotAllocatable, tt.wantAllocatable) { + t.Errorf("CollectClusterResources() gotAllocatable = %v, want %v", gotAllocatable, tt.wantAllocatable) + } + if !reflect.DeepEqual(gotAvailable, tt.wantAvailable) { + t.Errorf("CollectClusterResources() gotAvailable = %v, want %v", gotAvailable, tt.wantAvailable) + } + }) + } +} diff --git a/pkg/controllers/federatedcluster/plugins/plugins.go b/pkg/controllers/federatedcluster/plugins/plugins.go new file mode 100644 index 00000000..b5460bae --- /dev/null +++ b/pkg/controllers/federatedcluster/plugins/plugins.go @@ -0,0 +1,85 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "context" + "encoding/json" + "fmt" + "hash/fnv" + "sort" + "strconv" + "strings" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" +) + +const ClusterResourcePluginsAnnotationKey = common.DefaultPrefix + "cluster-resource-plugins" + +type ClusterHandle struct { + DynamicLister map[schema.GroupVersionResource]cache.GenericLister +} + +type Plugin interface { + CollectClusterResources( + ctx context.Context, + nodes []*corev1.Node, + pods []*corev1.Pod, + handle ClusterHandle, + ) (allocatable, available corev1.ResourceList, err error) + + ClusterResourcesToCollect() sets.Set[schema.GroupVersionResource] +} + +var defaultPlugins = map[string]Plugin{} + +func ResolvePlugins(annotations map[string]string) (map[string]Plugin, error) { + if len(annotations) == 0 || annotations[ClusterResourcePluginsAnnotationKey] == "" { + return nil, nil + } + pluginNames := []string{} + err := json.Unmarshal([]byte(annotations[ClusterResourcePluginsAnnotationKey]), &pluginNames) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal plugins from %s: %w", + annotations[ClusterResourcePluginsAnnotationKey], err) + } + + plugins := map[string]Plugin{} + for _, name := range pluginNames { + if plugin, exists := defaultPlugins[name]; exists { + plugins[name] = plugin + } + } + return plugins, nil +} + +func PluginsHash(plugins map[string]Plugin) string { + var names []string + for name := range plugins { + names = append(names, name) + } + sort.Strings(names) + + hash := fnv.New64() + _, _ = hash.Write([]byte(strings.Join(names, ","))) + return strconv.FormatUint(hash.Sum64(), 16) +} diff --git a/pkg/controllers/federatedcluster/plugins/plugins_test.go b/pkg/controllers/federatedcluster/plugins/plugins_test.go new file mode 100644 index 00000000..96fd2b53 --- /dev/null +++ b/pkg/controllers/federatedcluster/plugins/plugins_test.go @@ -0,0 +1,86 @@ +/* +Copyright 2024 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "reflect" + "testing" +) + +func TestResolvePlugins(t *testing.T) { + AddKatalystPluginIntoDefaultPlugins() + defer func() { + // recovery default plugins + defaultPlugins = map[string]Plugin{} + }() + + tests := []struct { + name string + annotations map[string]string + want map[string]Plugin + wantErr bool + }{ + { + name: "get plugins", + annotations: map[string]string{ClusterResourcePluginsAnnotationKey: `["node.katalyst.kubewharf.io"]`}, + want: map[string]Plugin{"node.katalyst.kubewharf.io": &katalystPlugin{}}, + wantErr: false, + }, + { + name: "get plugins with not existed plugins", + annotations: map[string]string{ClusterResourcePluginsAnnotationKey: `["node.katalyst.kubewharf.io", "foo"]`}, + want: map[string]Plugin{"node.katalyst.kubewharf.io": &katalystPlugin{}}, + wantErr: false, + }, + { + name: "get empty plugins", + annotations: map[string]string{ClusterResourcePluginsAnnotationKey: `["foo"]`}, + want: map[string]Plugin{}, + wantErr: false, + }, + { + name: "nil annotations", + annotations: nil, + want: nil, + wantErr: false, + }, + { + name: "plugin annotations not exists", + annotations: map[string]string{"bar": `["foo"]`}, + want: nil, + wantErr: false, + }, + { + name: "failed to unmarshal", + annotations: map[string]string{ClusterResourcePluginsAnnotationKey: `["foo",]`}, + want: nil, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ResolvePlugins(tt.annotations) + if (err != nil) != tt.wantErr { + t.Errorf("ResolvePlugins() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ResolvePlugins() got = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controllers/federatedcluster/util.go b/pkg/controllers/federatedcluster/util.go index 52c5b929..c145834d 100644 --- a/pkg/controllers/federatedcluster/util.go +++ b/pkg/controllers/federatedcluster/util.go @@ -158,10 +158,6 @@ func (c *FederatedClusterController) aggregateResources( ) (corev1.ResourceList, corev1.ResourceList) { allocatable := make(corev1.ResourceList) for _, node := range nodes { - if !isNodeSchedulable(node) || c.isNodeFiltered(node) { - continue - } - resource.AddResources(node.Status.Allocatable, allocatable) } diff --git a/pkg/controllers/scheduler/framework/plugins/clusterresources/balanced_allocation_test.go b/pkg/controllers/scheduler/framework/plugins/clusterresources/balanced_allocation_test.go index 4c21a2e5..4b052ffa 100644 --- a/pkg/controllers/scheduler/framework/plugins/clusterresources/balanced_allocation_test.go +++ b/pkg/controllers/scheduler/framework/plugins/clusterresources/balanced_allocation_test.go @@ -134,6 +134,28 @@ func TestClusterResourcesBalancedAllocation(t *testing.T) { }, name: "nothing scheduled, resources requested, differently sized machines", }, + { + // Cluster1 scores on 0-100 scale + // CPU Fraction: 6000 / 10000 = 60% + // Memory Fraction: 5000 / 10000 = 50% + // foo.bar/any Fraction: 4000 / 10000 = 40% + // Cluster1 Score: (1 - sqrt((0.1 * 0.1 + 0.1 * 0.1)/3)) * 100 = 91 + // Cluster2 scores on 0-100 scale + // CPU Fraction: 6000 / 15000 = 40% + // Memory Fraction: 5000 / 10000 = 50% + // foo.bar/any Fraction: 4000 / 20000 = 20% + // Cluster2 Score: (1 - sqrt((0.0333 * 0.0333 + 0.1333 * 0.1333 + 0.1667 * 0.1667)/3)) * 100 = 87 + su: schedulingUnitWithResourceName(makeSchedulingUnit("su4", 6000, 5000), "foo.bar/any", 4000), + clusters: []*fedcorev1a1.FederatedCluster{ + clusterWithResourceName(makeCluster("cluster1", 10000, 10000, 10000, 10000), "foo.bar/any", 10000, 10000), + clusterWithResourceName(makeCluster("cluster2", 15000, 10000, 15000, 10000), "foo.bar/any", 20000, 20000), + }, + expectedList: []framework.ClusterScore{ + {Cluster: clusterWithResourceName(makeCluster("cluster1", 10000, 10000, 10000, 10000), "foo.bar/any", 10000, 10000), Score: 91}, + {Cluster: clusterWithResourceName(makeCluster("cluster2", 15000, 10000, 15000, 10000), "foo.bar/any", 20000, 20000), Score: 87}, + }, + name: "nothing scheduled, resources requested with external resource, differently sized machines, external resource fraction differs", + }, } p, _ := NewClusterResourcesBalancedAllocation(nil) diff --git a/pkg/controllers/scheduler/framework/plugins/clusterresources/fit.go b/pkg/controllers/scheduler/framework/plugins/clusterresources/fit.go index 1d3a87bf..3c3b2df4 100644 --- a/pkg/controllers/scheduler/framework/plugins/clusterresources/fit.go +++ b/pkg/controllers/scheduler/framework/plugins/clusterresources/fit.go @@ -184,13 +184,18 @@ func calculateResourceAllocatableRequest( } func getRelevantResources(su *framework.SchedulingUnit) []corev1.ResourceName { - resources := make([]corev1.ResourceName, 0, len(framework.DefaultRequestedRatioResources)) - for resourceName := range framework.DefaultRequestedRatioResources { - if resourceName == corev1.ResourceCPU || resourceName == corev1.ResourceMemory || - su.ResourceRequest.HasScalarResource(resourceName) { + // innerResourceCount: milliCPU, memory, ephemeralStorage + const innerResourceCount = 3 + resources := make([]corev1.ResourceName, 0, len(su.ResourceRequest.ScalarResources)+innerResourceCount) + resources = append(resources, corev1.ResourceCPU, corev1.ResourceMemory) // always returns CPU and memory + for resourceName := range su.ResourceRequest.ScalarResources { + if framework.IsScalarResourceName(resourceName) { resources = append(resources, resourceName) } } + if su.ResourceRequest.EphemeralStorage != 0 { + resources = append(resources, corev1.ResourceEphemeralStorage) + } return resources } diff --git a/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated.go b/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated.go index 89606e1d..43cb0339 100644 --- a/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated.go +++ b/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated.go @@ -66,9 +66,13 @@ func (pl *ClusterResourcesLeastAllocated) Score( // gpu((capacity-sum(requested))*100/capacity) * gpu_weight) / (cpu_weight + memory_weight + gpu_weight) for _, resource := range resources { resourceScore := leastRequestedScore(requested[resource], allocatable[resource]) - weight := framework.DefaultRequestedRatioResources[resource] - score += resourceScore * weight - weightSum += weight + if weight, ok := framework.DefaultRequestedRatioResources[resource]; ok { + score += resourceScore * weight + weightSum += weight + } else { + score += resourceScore * framework.DefaultRatio + weightSum += framework.DefaultRatio + } } if weightSum == 0 { diff --git a/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated_test.go b/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated_test.go index 67286295..48ff4166 100644 --- a/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated_test.go +++ b/pkg/controllers/scheduler/framework/plugins/clusterresources/least_allocated_test.go @@ -24,6 +24,7 @@ import ( "context" "testing" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" @@ -31,13 +32,29 @@ import ( ) func schedulingUnitWithGPU(su *framework.SchedulingUnit, value int64) *framework.SchedulingUnit { - su.ResourceRequest.SetScalar(framework.ResourceGPU, value) + return schedulingUnitWithResourceName(su, framework.ResourceGPU, value) +} + +func schedulingUnitWithResourceName( + su *framework.SchedulingUnit, + resourceName corev1.ResourceName, + value int64, +) *framework.SchedulingUnit { + su.ResourceRequest.SetScalar(resourceName, value) return su } func clusterWithGPU(fc *fedcorev1a1.FederatedCluster, allocatable, available int64) *fedcorev1a1.FederatedCluster { - fc.Status.Resources.Allocatable[framework.ResourceGPU] = *resource.NewQuantity(allocatable, resource.BinarySI) - fc.Status.Resources.Available[framework.ResourceGPU] = *resource.NewQuantity(available, resource.BinarySI) + return clusterWithResourceName(fc, framework.ResourceGPU, allocatable, available) +} + +func clusterWithResourceName( + fc *fedcorev1a1.FederatedCluster, + resourceName corev1.ResourceName, + allocatable, available int64, +) *fedcorev1a1.FederatedCluster { + fc.Status.Resources.Allocatable[resourceName] = *resource.NewQuantity(allocatable, resource.BinarySI) + fc.Status.Resources.Available[resourceName] = *resource.NewQuantity(available, resource.BinarySI) return fc } @@ -132,6 +149,28 @@ func TestClusterResourcesLeastAllocated(t *testing.T) { }, name: "nothing scheduled, resources requested with gpu, differently sized machines, gpu fraction differs", }, + { + // Cluster1 scores on 0-100 scale + // CPU Fraction: 6000 / 10000 = 60% + // Memory Fraction: 5000 / 10000 = 50% + // foo.bar/any Fraction: 4000 / 10000 = 40% + // Cluster1 Score: (40 + 50 + 60 * 1) / 3 = 50 + // Cluster2 scores on 0-100 scale + // CPU Fraction: 6000 / 15000 = 40% + // Memory Fraction: 5000 / 10000 = 50% + // foo.bar/any Fraction: 4000 / 20000 = 20% + // Cluster2 Score: (60 + 50 + 80 * 1) / 3 = 63 + su: schedulingUnitWithResourceName(makeSchedulingUnit("su5", 6000, 5000), "foo.bar/any", 4000), + clusters: []*fedcorev1a1.FederatedCluster{ + clusterWithResourceName(makeCluster("cluster1", 10000, 10000, 10000, 10000), "foo.bar/any", 10000, 10000), + clusterWithResourceName(makeCluster("cluster2", 15000, 10000, 15000, 10000), "foo.bar/any", 20000, 20000), + }, + expectedList: []framework.ClusterScore{ + {Cluster: clusterWithResourceName(makeCluster("cluster1", 10000, 10000, 10000, 10000), "foo.bar/any", 10000, 10000), Score: 50}, + {Cluster: clusterWithResourceName(makeCluster("cluster2", 15000, 10000, 15000, 10000), "foo.bar/any", 20000, 20000), Score: 63}, + }, + name: "nothing scheduled, resources requested with external resource, differently sized machines, external resource fraction differs", + }, } p, _ := NewClusterResourcesLeastAllocated(nil) diff --git a/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated.go b/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated.go index ae445cbe..274ea9fd 100644 --- a/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated.go +++ b/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated.go @@ -65,9 +65,13 @@ func (pl *ClusterResourcesMostAllocated) Score( // gpu((capacity-sum(requested))*100/capacity) * gpu_weight) / (cpu_weight + memory_weight + gpu_weight) for _, resource := range resources { resourceScore := mostRequestedScore(requested[resource], allocatable[resource]) - weight := framework.DefaultRequestedRatioResources[resource] - score += resourceScore * weight - weightSum += weight + if weight, ok := framework.DefaultRequestedRatioResources[resource]; ok { + score += resourceScore * weight + weightSum += weight + } else { + score += resourceScore * framework.DefaultRatio + weightSum += framework.DefaultRatio + } } if weightSum == 0 { diff --git a/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated_test.go b/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated_test.go index a79eb077..d2826a51 100644 --- a/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated_test.go +++ b/pkg/controllers/scheduler/framework/plugins/clusterresources/most_allocated_test.go @@ -119,6 +119,28 @@ func TestClusterResourcesMostAllocated(t *testing.T) { }, name: "nothing scheduled, resources requested with gpu, differently sized machines, gpu fraction differs", }, + { + // Cluster1 scores on 0-100 scale + // CPU Fraction: 6000 / 10000 = 60% + // Memory Fraction: 5000 / 10000 = 50% + // foo.bar/any Fraction: 4000 / 10000 = 40% + // Cluster1 Score: (60 + 50 + 40 * 1) / 3 = 50 + // Cluster2 scores on 0-100 scale + // CPU Fraction: 6000 / 15000 = 40% + // Memory Fraction: 5000 / 10000 = 50% + // foo.bar/any Fraction: 4000 / 20000 = 20% + // Cluster2 Score: (40 + 50 + 20 * 1) / 3 = 36 + su: schedulingUnitWithResourceName(makeSchedulingUnit("su5", 6000, 5000), "foo.bar/any", 4000), + clusters: []*fedcorev1a1.FederatedCluster{ + clusterWithResourceName(makeCluster("cluster1", 10000, 10000, 10000, 10000), "foo.bar/any", 10000, 10000), + clusterWithResourceName(makeCluster("cluster2", 15000, 10000, 15000, 10000), "foo.bar/any", 20000, 20000), + }, + expectedList: []framework.ClusterScore{ + {Cluster: clusterWithResourceName(makeCluster("cluster1", 10000, 10000, 10000, 10000), "foo.bar/any", 10000, 10000), Score: 50}, + {Cluster: clusterWithResourceName(makeCluster("cluster2", 15000, 10000, 15000, 10000), "foo.bar/any", 20000, 20000), Score: 36}, + }, + name: "nothing scheduled, resources requested with external resource, differently sized machines, external resource fraction differs", + }, } p, _ := NewClusterResourcesMostAllocated(nil) diff --git a/pkg/controllers/scheduler/framework/plugins/katalyst/exist.go b/pkg/controllers/scheduler/framework/plugins/katalyst/exist.go new file mode 100644 index 00000000..40b815cd --- /dev/null +++ b/pkg/controllers/scheduler/framework/plugins/katalyst/exist.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package katalyst + +import ( + "context" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/names" +) + +type KatalystResourcesExist struct{} + +func NewKatalystResourcesExist(_ framework.Handle) (framework.Plugin, error) { + return &KatalystResourcesExist{}, nil +} + +func (pl *KatalystResourcesExist) Name() string { + return names.KatalystResourcesExist +} + +func (pl *KatalystResourcesExist) Filter( + ctx context.Context, + su *framework.SchedulingUnit, + cluster *fedcorev1a1.FederatedCluster, +) *framework.Result { + err := framework.PreCheck(ctx, su, cluster) + if err != nil { + return framework.NewResult(framework.Error, err.Error()) + } + + // When a workload has been scheduled to the current cluster, the available resources of the current cluster + // currently does not (but should) include the amount of resources requested by Pods of the current workload. + // In the absence of ample resource buffer, rescheduling may mistakenly + // evict the workload from the current cluster. + // Disable this plugin for rescheduling as a temporary workaround. + if _, alreadyScheduled := su.CurrentClusters[cluster.Name]; alreadyScheduled { + return framework.NewResult(framework.Success) + } + + scRequest := &su.ResourceRequest + clusterAllocatable := framework.NewResource(cluster.Status.Resources.Allocatable) + if len(framework.GetKatalystResources(scRequest)) != 0 && + len(framework.GetKatalystResources(clusterAllocatable)) == 0 { + return framework.NewResult(framework.Unschedulable, "Katalyst resource does not exist") + } + return framework.NewResult(framework.Success) +} diff --git a/pkg/controllers/scheduler/framework/plugins/katalyst/exist_test.go b/pkg/controllers/scheduler/framework/plugins/katalyst/exist_test.go new file mode 100644 index 00000000..57084411 --- /dev/null +++ b/pkg/controllers/scheduler/framework/plugins/katalyst/exist_test.go @@ -0,0 +1,146 @@ +/* +Copyright 2024 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package katalyst + +import ( + "context" + "reflect" + "testing" + + katalystconsts "github.com/kubewharf/katalyst-api/pkg/consts" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" +) + +func TestKatalystResourcesExist_Filter(t *testing.T) { + type args struct { + ctx context.Context + su *framework.SchedulingUnit + cluster *fedcorev1a1.FederatedCluster + } + tests := []struct { + name string + args args + want *framework.Result + }{ + { + name: "cluster scheduled", + args: args{ + su: &framework.SchedulingUnit{ + CurrentClusters: map[string]*int64{"cluster1": pointer.Int64(1)}, + }, + cluster: &fedcorev1a1.FederatedCluster{ + ObjectMeta: metav1.ObjectMeta{Name: "cluster1"}, + }, + }, + want: framework.NewResult(framework.Success), + }, + { + name: "cluster not scheduled, su without katalyst resources", + args: args{ + su: &framework.SchedulingUnit{ + CurrentClusters: map[string]*int64{"cluster1": pointer.Int64(1)}, + }, + cluster: &fedcorev1a1.FederatedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster2", + }, + }, + }, + want: framework.NewResult(framework.Success), + }, + { + name: "cluster not scheduled, su with katalyst, cluster with katalyst", + args: args{ + su: &framework.SchedulingUnit{ + CurrentClusters: map[string]*int64{"cluster1": pointer.Int64(1)}, + ResourceRequest: framework.Resource{ + ScalarResources: map[corev1.ResourceName]int64{katalystconsts.ReclaimedResourceMilliCPU: 1}, + }, + }, + cluster: &fedcorev1a1.FederatedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster2", + }, + Status: fedcorev1a1.FederatedClusterStatus{ + Resources: fedcorev1a1.Resources{ + Allocatable: corev1.ResourceList{ + katalystconsts.ReclaimedResourceMemory: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + want: framework.NewResult(framework.Success), + }, + { + name: "cluster not scheduled, su with katalyst, cluster without katalyst", + args: args{ + su: &framework.SchedulingUnit{ + CurrentClusters: map[string]*int64{"cluster1": pointer.Int64(1)}, + ResourceRequest: framework.Resource{ + ScalarResources: map[corev1.ResourceName]int64{katalystconsts.ReclaimedResourceMilliCPU: 1}, + }, + }, + cluster: &fedcorev1a1.FederatedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "cluster2", + }, + Status: fedcorev1a1.FederatedClusterStatus{ + Resources: fedcorev1a1.Resources{ + Allocatable: corev1.ResourceList{ + corev1.ResourceMemory: resource.MustParse("1Gi"), + }, + }, + }, + }, + }, + want: framework.NewResult(framework.Unschedulable, "Katalyst resource does not exist"), + }, + { + name: "cluster is nil", + args: args{ + su: &framework.SchedulingUnit{ + CurrentClusters: map[string]*int64{"cluster1": pointer.Int64(1)}, + }, + cluster: nil, + }, + want: framework.NewResult(framework.Error, "invalid federated cluster"), + }, + { + name: "su is nil", + args: args{ + su: nil, + cluster: nil, + }, + want: framework.NewResult(framework.Error, "invalid scheduling unit"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pl := &KatalystResourcesExist{} + if got := pl.Filter(tt.args.ctx, tt.args.su, tt.args.cluster); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Filter() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/controllers/scheduler/framework/plugins/names/names.go b/pkg/controllers/scheduler/framework/plugins/names/names.go index cf7bcc7e..bd4d25f9 100644 --- a/pkg/controllers/scheduler/framework/plugins/names/names.go +++ b/pkg/controllers/scheduler/framework/plugins/names/names.go @@ -23,6 +23,7 @@ const ( ClusterTerminating = "ClusterTerminating" ClusterResourcesFit = "ClusterResourcesFit" PlacementFilter = "PlacementFilter" + KatalystResourcesExist = "ClusterResourcesExist" ClusterAffinity = "ClusterAffinity" ClusterResourcesBalancedAllocation = "ClusterResourcesBalancedAllocation" ClusterResourcesLeastAllocated = "ClusterResourcesLeastAllocated" diff --git a/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go b/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go index f2619016..a2278d33 100644 --- a/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go +++ b/pkg/controllers/scheduler/framework/plugins/rsp/rsp.go @@ -26,6 +26,7 @@ import ( "math" "github.com/davecgh/go-spew/spew" + katalystconsts "github.com/kubewharf/katalyst-api/pkg/consts" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -73,6 +74,9 @@ func (pl *ClusterCapacityWeight) ReplicaScheduling( resourceName := corev1.ResourceCPU if su.ResourceRequest.HasScalarResource(framework.ResourceGPU) { resourceName = framework.ResourceGPU + } else if su.EnableKatalystSupport && + su.ResourceRequest.HasScalarResource(katalystconsts.ReclaimedResourceMilliCPU) { + resourceName = katalystconsts.ReclaimedResourceMilliCPU } clusterAvailables := QueryClusterResource(clusters, availableResource) if len(clusters) != len(clusterAvailables) { diff --git a/pkg/controllers/scheduler/framework/types.go b/pkg/controllers/scheduler/framework/types.go index 9debc0e8..2abe36aa 100644 --- a/pkg/controllers/scheduler/framework/types.go +++ b/pkg/controllers/scheduler/framework/types.go @@ -73,6 +73,8 @@ type SchedulingUnit struct { MaxReplicas map[string]int64 Weights map[string]int64 Priorities map[string]int64 + + EnableKatalystSupport bool } type AutoMigrationSpec struct { diff --git a/pkg/controllers/scheduler/framework/util.go b/pkg/controllers/scheduler/framework/util.go index 1bd80b7a..adf4e6c5 100644 --- a/pkg/controllers/scheduler/framework/util.go +++ b/pkg/controllers/scheduler/framework/util.go @@ -26,8 +26,10 @@ import ( "math" "strings" + katalystconsts "github.com/kubewharf/katalyst-api/pkg/consts" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" @@ -37,6 +39,12 @@ const ( ResourceGPU = corev1.ResourceName("nvidia.com/gpu") ) +var katalystResourceNames = sets.New( + katalystconsts.ReclaimedResourceMilliCPU, + katalystconsts.ReclaimedResourceMemory, + katalystconsts.ResourceNetBandwidth, +) + // For each of these resources, a pod that doesn't request the resource explicitly // will be treated as having requested the amount indicated below, for the purpose // of computing priority only. This ensures that when scheduling zero-request pods, such @@ -66,6 +74,9 @@ const ( // DefaultRequestedRatioResources is an empirical value derived from practice. var DefaultRequestedRatioResources = ResourceToWeightMap{corev1.ResourceMemory: 1, corev1.ResourceCPU: 1, ResourceGPU: 4} +// DefaultRatio as the ratio of any unknown requested resources +var DefaultRatio int64 = 1 + type ( ResourceToValueMap map[corev1.ResourceName]int64 ResourceToWeightMap map[corev1.ResourceName]int64 @@ -262,6 +273,16 @@ func (r *Resource) HasScalarResource(name corev1.ResourceName) bool { return false } +func GetKatalystResources(r *Resource) map[corev1.ResourceName]int64 { + res := make(map[corev1.ResourceName]int64) + for name, quantity := range r.ScalarResources { + if katalystResourceNames.Has(name) { + res[name] = quantity + } + } + return res +} + // resourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers) + overHead // //nolint:unused diff --git a/pkg/controllers/scheduler/profile.go b/pkg/controllers/scheduler/profile.go index 31506f57..db156617 100644 --- a/pkg/controllers/scheduler/profile.go +++ b/pkg/controllers/scheduler/profile.go @@ -30,6 +30,7 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/clusterready" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/clusterresources" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/clusterterminating" + "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/katalyst" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/maxcluster" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/names" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework/plugins/placement" @@ -48,6 +49,7 @@ var inTreeRegistry = runtime.Registry{ names.ClusterAffinity: clusteraffinity.NewClusterAffinity, names.ClusterResourcesFit: clusterresources.NewClusterResourcesFit, names.PlacementFilter: placement.NewPlacementFilter, + names.KatalystResourcesExist: katalyst.NewKatalystResourcesExist, names.TaintToleration: tainttoleration.NewTaintToleration, names.ClusterResourcesBalancedAllocation: clusterresources.NewClusterResourcesBalancedAllocation, names.ClusterResourcesLeastAllocated: clusterresources.NewClusterResourcesLeastAllocated, @@ -98,6 +100,9 @@ func (s *Scheduler) createFramework( ) (framework.Framework, error) { enabledPlugins := fedcorev1a1.GetDefaultEnabledPlugins() + if s.enableKatalystSupport { + enabledPlugins.FilterPlugins = append(enabledPlugins.FilterPlugins, names.KatalystResourcesExist) + } if len(replicasPlugin) != 0 { enabledPlugins.ReplicasPlugins = replicasPlugin } diff --git a/pkg/controllers/scheduler/scheduler.go b/pkg/controllers/scheduler/scheduler.go index 9e255e08..912bdeda 100644 --- a/pkg/controllers/scheduler/scheduler.go +++ b/pkg/controllers/scheduler/scheduler.go @@ -97,6 +97,8 @@ type Scheduler struct { algorithm core.ScheduleAlgorithm + enableKatalystSupport bool + metrics stats.Metrics logger klog.Logger } @@ -120,6 +122,7 @@ func NewScheduler( metrics stats.Metrics, logger klog.Logger, workerCount int, + enableKatalystSupport bool, ) (*Scheduler, error) { s := &Scheduler{ fedClient: fedClient, @@ -135,6 +138,7 @@ func NewScheduler( webhookPlugins: sync.Map{}, metrics: metrics, logger: logger.WithValues("controller", SchedulerName), + enableKatalystSupport: enableKatalystSupport, } s.eventRecorder = eventsink.NewDefederatingRecorderMux(kubeClient, SchedulerName, 6) @@ -567,7 +571,7 @@ func (s *Scheduler) schedule( common.NewQualifiedName(policy).String(), ) - schedulingUnit, err := schedulingUnitForFedObject(ftc, fedObject, policy) + schedulingUnit, err := schedulingUnitForFedObject(ftc, fedObject, policy, s.enableKatalystSupport) if err != nil { logger.Error(err, "Failed to get scheduling unit") s.eventRecorder.Eventf( diff --git a/pkg/controllers/scheduler/schedulingunit.go b/pkg/controllers/scheduler/schedulingunit.go index 52ad8627..7feaf604 100644 --- a/pkg/controllers/scheduler/schedulingunit.go +++ b/pkg/controllers/scheduler/schedulingunit.go @@ -35,6 +35,7 @@ func schedulingUnitForFedObject( typeConfig *fedcorev1a1.FederatedTypeConfig, fedObject fedcorev1a1.GenericFederatedObject, policy fedcorev1a1.GenericPropagationPolicy, + enableKatalystSupport bool, ) (*framework.SchedulingUnit, error) { template, err := fedObject.GetSpec().GetTemplateAsUnstructured() if err != nil { @@ -170,12 +171,18 @@ func schedulingUnitForFedObject( if err != nil { return nil, err } - gpuResourceRequest := &framework.Resource{} + // now we only consider the resource request of gpu and katalyst resources if resourceRequest.HasScalarResource(framework.ResourceGPU) { - gpuResourceRequest.SetScalar(framework.ResourceGPU, resourceRequest.ScalarResources[framework.ResourceGPU]) + schedulingUnit.ResourceRequest.SetScalar(framework.ResourceGPU, resourceRequest.ScalarResources[framework.ResourceGPU]) + } + if enableKatalystSupport { + if res := framework.GetKatalystResources(&resourceRequest); len(res) != 0 { + for name, quantity := range res { + schedulingUnit.ResourceRequest.SetScalar(name, quantity) + } + } + schedulingUnit.EnableKatalystSupport = true } - // now we only consider the resource request of gpu - schedulingUnit.ResourceRequest = *gpuResourceRequest return schedulingUnit, nil } diff --git a/pkg/util/resource/resource.go b/pkg/util/resource/resource.go index 791f4f05..35d220e7 100644 --- a/pkg/util/resource/resource.go +++ b/pkg/util/resource/resource.go @@ -40,6 +40,13 @@ func MaxResources(src, dst corev1.ResourceList) { } } +// MergeResources merges the new ResourceList into the dst ResourceList +func MergeResources(new, dst corev1.ResourceList) { + for k, v := range new { + dst[k] = v.DeepCopy() + } +} + // podResourceRequest = max(sum(podSpec.Containers), podSpec.InitContainers...) + overHead func GetPodResourceRequests(podSpec *corev1.PodSpec) corev1.ResourceList { reqs := make(corev1.ResourceList)