From 15592f41da0540898d73d2731925efcfa7db110e Mon Sep 17 00:00:00 2001 From: "zhangxinjie.next" Date: Fri, 14 Jul 2023 18:21:08 +0800 Subject: [PATCH] refactor(auto-migration): unify types and support pod trigger --- .../app/controllermanager.go | 2 + cmd/controller-manager/app/core.go | 22 +- cmd/controller-manager/app/ftcmanager.go | 5 - pkg/controllers/automigration/controller.go | 398 ++++++++++++++---- .../automigration/plugins/deployments.go | 56 ++- .../automigration/plugins/plugins.go | 26 +- pkg/controllers/automigration/util.go | 44 +- pkg/controllers/automigration/util_test.go | 68 +++ pkg/controllers/common/constants.go | 10 +- pkg/controllers/context/context.go | 3 + pkg/controllers/util/store.go | 12 + .../pkg/controller/controller_utils.go | 45 ++ 12 files changed, 545 insertions(+), 146 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 5b4c5cb15..b5d4e1f42 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -38,12 +38,14 @@ const ( FederatedClusterControllerName = "cluster" MonitorControllerName = "monitor" FollowerControllerName = "follower" + AutoMigrationControllerName = "automigration" ) var knownControllers = map[string]controllermanager.StartControllerFunc{ FederatedClusterControllerName: startFederatedClusterController, MonitorControllerName: startMonitorController, FollowerControllerName: startFollowerController, + AutoMigrationControllerName: startAutoMigrationController, } var controllersDisabledByDefault = sets.New(MonitorControllerName) diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index 6157efbd9..748d7c3dc 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -23,13 +23,11 @@ import ( "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/client/generic" "github.com/kubewharf/kubeadmiral/pkg/controllermanager" "github.com/kubewharf/kubeadmiral/pkg/controllers/automigration" controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" "github.com/kubewharf/kubeadmiral/pkg/controllers/federate" "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster" - "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedtypeconfig" "github.com/kubewharf/kubeadmiral/pkg/controllers/follower" "github.com/kubewharf/kubeadmiral/pkg/controllers/monitor" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler" @@ -190,24 +188,14 @@ func isFederateControllerEnabled(typeConfig *fedcorev1a1.FederatedTypeConfig) bo func startAutoMigrationController( ctx context.Context, controllerCtx *controllercontext.Context, - typeConfig *fedcorev1a1.FederatedTypeConfig, ) (controllermanager.Controller, error) { - genericClient, err := generic.New(controllerCtx.RestConfig) - if err != nil { - return nil, fmt.Errorf("error creating generic client: %w", err) - } - - federatedAPIResource := typeConfig.GetFederatedType() - federatedGVR := schemautil.APIResourceToGVR(&federatedAPIResource) - //nolint:contextcheck controller, err := automigration.NewAutoMigrationController( controllerConfigFromControllerContext(controllerCtx), - typeConfig, - genericClient, controllerCtx.KubeClientset, - controllerCtx.DynamicClientset.Resource(federatedGVR), - controllerCtx.DynamicInformerFactory.ForResource(federatedGVR), + controllerCtx.FedClientset.CoreV1alpha1(), + controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(), + controllerCtx.FederatedInformerManager, ) if err != nil { return nil, fmt.Errorf("error creating auto-migration controller: %w", err) @@ -217,7 +205,3 @@ func startAutoMigrationController( return controller, nil } - -func isAutoMigrationControllerEnabled(typeConfig *fedcorev1a1.FederatedTypeConfig) bool { - return typeConfig.Spec.AutoMigration != nil && typeConfig.Spec.AutoMigration.Enabled -} diff --git a/cmd/controller-manager/app/ftcmanager.go b/cmd/controller-manager/app/ftcmanager.go index 957731495..16d30e9c3 100644 --- a/cmd/controller-manager/app/ftcmanager.go +++ b/cmd/controller-manager/app/ftcmanager.go @@ -23,7 +23,6 @@ import ( const ( FederateControllerName = "federate" GlobalSchedulerName = "scheduler" - AutoMigrationControllerName = "automigration" ) var knownFTCSubControllers = map[string]controllermanager.FTCSubControllerInitFuncs{ @@ -35,8 +34,4 @@ var knownFTCSubControllers = map[string]controllermanager.FTCSubControllerInitFu StartFunc: startFederateController, IsEnabledFunc: isFederateControllerEnabled, }, - AutoMigrationControllerName: { - StartFunc: startAutoMigrationController, - IsEnabledFunc: isAutoMigrationControllerEnabled, - }, } diff --git a/pkg/controllers/automigration/controller.go b/pkg/controllers/automigration/controller.go index 2a05b4cee..84215e063 100644 --- a/pkg/controllers/automigration/controller.go +++ b/pkg/controllers/automigration/controller.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -27,8 +28,10 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" pkgruntime "k8s.io/apimachinery/pkg/runtime" - dynamicclient "k8s.io/client-go/dynamic" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/informers" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -36,20 +39,23 @@ import ( "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/client/generic" + fedcorev1a1clientset "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned/typed/core/v1alpha1" + fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/automigration/plugins" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink" utilunstructured "github.com/kubewharf/kubeadmiral/pkg/controllers/util/unstructured" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" "github.com/kubewharf/kubeadmiral/pkg/stats" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/managedlabel" ) const ( EventReasonAutoMigrationInfoUpdated = "AutoMigrationInfoUpdated" + controllerName = "auto-migration" ) /* @@ -64,15 +70,17 @@ One way to prevent both is: */ type Controller struct { - typeConfig *fedcorev1a1.FederatedTypeConfig - name string + name string - federatedObjectClient dynamicclient.NamespaceableResourceInterface - federatedObjectInformer informers.GenericInformer + federatedObjectClient fedcorev1a1clientset.FederatedObjectsGetter + federatedObjectInformer fedcorev1a1informers.FederatedObjectInformer - federatedInformer util.FederatedInformer + federatedInformer informermanager.FederatedInformerManager - worker worker.ReconcileWorker + lock sync.Mutex + podInformers map[string]*stoppableInformer + + worker worker.ReconcileWorker[common.QualifiedName] eventRecorder record.EventRecorder @@ -80,6 +88,12 @@ type Controller struct { logger klog.Logger } +type stoppableInformer struct { + stop context.CancelFunc + + informers.GenericInformer +} + // IsControllerReady implements controllermanager.Controller func (c *Controller) IsControllerReady() bool { return c.HasSynced() @@ -87,32 +101,35 @@ func (c *Controller) IsControllerReady() bool { func NewAutoMigrationController( controllerConfig *util.ControllerConfig, - typeConfig *fedcorev1a1.FederatedTypeConfig, - genericFedClient generic.Client, kubeClient kubeclient.Interface, - federatedObjectClient dynamicclient.NamespaceableResourceInterface, - federatedObjectInformer informers.GenericInformer, + federatedObjectClient fedcorev1a1clientset.FederatedObjectsGetter, + federatedObjectInformer fedcorev1a1informers.FederatedObjectInformer, + federatedInformer informermanager.FederatedInformerManager, ) (*Controller, error) { - controllerName := fmt.Sprintf("%s-auto-migration", typeConfig.Name) - c := &Controller{ - typeConfig: typeConfig, - name: controllerName, + name: controllerName, federatedObjectClient: federatedObjectClient, federatedObjectInformer: federatedObjectInformer, + federatedInformer: federatedInformer, + + lock: sync.Mutex{}, + podInformers: map[string]*stoppableInformer{}, metrics: controllerConfig.Metrics, - logger: klog.NewKlogr().WithValues("controller", "auto-migration", "ftc", typeConfig.Name), + logger: klog.NewKlogr().WithValues("controller", controllerName), eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, controllerName, 6), } - c.worker = worker.NewReconcileWorker( + c.worker = worker.NewReconcileWorker[common.QualifiedName]( + controllerName, + func(obj metav1.Object) common.QualifiedName { + return common.QualifiedName{Name: obj.GetName(), Namespace: obj.GetNamespace()} + }, c.reconcile, worker.RateLimiterOptions{}, controllerConfig.WorkerCount, controllerConfig.Metrics, - delayingdeliver.NewMetricTags("auto-migration-worker", c.typeConfig.GetFederatedType().Kind), ) federatedObjectInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -128,22 +145,14 @@ func NewAutoMigrationController( }, }) - var err error - targetType := typeConfig.GetTargetType() - c.federatedInformer, err = util.NewFederatedInformer( - controllerConfig, - genericFedClient, - controllerConfig.KubeConfig, - &targetType, - func(o pkgruntime.Object) { - // enqueue with a delay to simulate a rudimentary rate limiter - c.worker.EnqueueWithDelay(common.NewQualifiedName(o), 10*time.Second) + c.federatedInformer.AddClusterEventHandler(&informermanager.ClusterEventHandler{ + Predicate: func(_, _ *fedcorev1a1.FederatedCluster) bool { + return true }, - &util.ClusterLifecycleHandlerFuncs{}, - ) - if err != nil { - return nil, fmt.Errorf("failed to create federated informer: %w", err) - } + Callback: func(cluster *fedcorev1a1.FederatedCluster) { + c.clusterEventHandlerFunc(cluster) + }, + }) return c, nil } @@ -152,19 +161,16 @@ func (c *Controller) Run(ctx context.Context) { c.logger.Info("Starting controller") defer c.logger.Info("Stopping controller") - c.federatedInformer.Start() - defer c.federatedInformer.Stop() - if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.HasSynced) { return } - c.worker.Run(ctx.Done()) + c.worker.Run(ctx) <-ctx.Done() } func (c *Controller) HasSynced() bool { - if !c.federatedObjectInformer.Informer().HasSynced() || !c.federatedInformer.ClustersSynced() { + if !c.federatedObjectInformer.Informer().HasSynced() || !c.federatedInformer.HasSynced() { return false } @@ -175,20 +181,20 @@ func (c *Controller) HasSynced() bool { return true } -func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worker.Result) { +func (c *Controller) reconcile(ctx context.Context, qualifiedName common.QualifiedName) (status worker.Result) { key := qualifiedName.String() keyedLogger := c.logger.WithValues("control-loop", "reconcile", "object", key) - ctx := klog.NewContext(context.TODO(), keyedLogger) + ctx = klog.NewContext(ctx, keyedLogger) startTime := time.Now() c.metrics.Rate("auto-migration.throughput", 1) keyedLogger.V(3).Info("Start reconcile") defer func() { c.metrics.Duration(fmt.Sprintf("%s.latency", c.name), startTime) - keyedLogger.V(3).Info("Finished reconcile", "duration", time.Since(startTime), "status", status.String()) + keyedLogger.V(3).Info("Finished reconcile", "duration", time.Since(startTime), "status", status) }() - fedObject, err := util.UnstructuredFromStore(c.federatedObjectInformer.Informer().GetStore(), key) + fedObject, err := util.GetObjectFromStore[fedcorev1a1.FederatedObject](c.federatedObjectInformer.Informer().GetStore(), key) if err != nil { keyedLogger.Error(err, "Failed to get object from store") return worker.StatusError @@ -197,16 +203,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke return worker.StatusAllOK } - // PodUnschedulableThresholdAnnotation is set by the scheduler. Its presence determines whether auto migration is enabled. annotations := fedObject.GetAnnotations() - var unschedulableThreshold *time.Duration - if value, exists := annotations[common.PodUnschedulableThresholdAnnotation]; exists { - if duration, err := time.ParseDuration(value); err != nil { - keyedLogger.Error(err, "Failed to parse PodUnschedulableThresholdAnnotation") - } else { - unschedulableThreshold = &duration - } - } // auto-migration controller sets AutoMigrationAnnotation to // feedback auto-migration information back to the scheduler @@ -214,6 +211,11 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke var estimatedCapacity map[string]int64 var result *worker.Result needsUpdate := false + clusterObjs, ftc, unschedulableThreshold, err := c.getTargetObjectsIfAutoMigrationEnabled(fedObject) + if err != nil { + keyedLogger.Error(err, "Failed to get objects from federated informer stores") + return worker.StatusError + } if unschedulableThreshold == nil { // Clean up the annotation if auto migration is disabled. keyedLogger.V(3).Info("Auto migration is disabled") @@ -225,13 +227,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke } else { // Keep the annotation up-to-date if auto migration is enabled. keyedLogger.V(3).Info("Auto migration is enabled") - clusterObjs, err := c.federatedInformer.GetTargetStore().GetFromAllClusters(key) - if err != nil { - keyedLogger.Error(err, "Failed to get objects from federated informer stores") - return worker.StatusError - } - - estimatedCapacity, result = c.estimateCapacity(ctx, clusterObjs, *unschedulableThreshold) + estimatedCapacity, result = c.estimateCapacity(ctx, ftc, clusterObjs, *unschedulableThreshold) autoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: estimatedCapacity} // Compare with the existing autoMigration annotation @@ -262,7 +258,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke keyedLogger.V(1).Info("Updating federated object with auto migration information", "estimatedCapacity", estimatedCapacity) _, err = c.federatedObjectClient. - Namespace(qualifiedName.Namespace). + FederatedObjects(qualifiedName.Namespace). Update(ctx, fedObject, metav1.UpdateOptions{}) if err != nil { keyedLogger.Error(err, "Failed to update federated object for auto migration") @@ -291,6 +287,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke func (c *Controller) estimateCapacity( ctx context.Context, + typeConfig *fedcorev1a1.FederatedTypeConfig, clusterObjs []util.FederatedObject, unschedulableThreshold time.Duration, ) (map[string]int64, *worker.Result) { @@ -301,26 +298,26 @@ func (c *Controller) estimateCapacity( estimatedCapacity := make(map[string]int64, len(clusterObjs)) for _, clusterObj := range clusterObjs { - keyedLogger := keyedLogger.WithValues("cluster", clusterObj.ClusterName) + keyedLogger := keyedLogger.WithValues("cluster", clusterObj.ClusterName, "ftc", typeConfig.Name) ctx := klog.NewContext(ctx, keyedLogger) unsClusterObj := clusterObj.Object.(*unstructured.Unstructured) // This is an optimization to skip pod listing when there are no unschedulable pods. - totalReplicas, readyReplicas, err := c.getTotalAndReadyReplicas(unsClusterObj) + totalReplicas, readyReplicas, err := c.getTotalAndReadyReplicas(typeConfig, unsClusterObj) if err == nil && totalReplicas == readyReplicas { keyedLogger.V(3).Info("No unschedulable pods found, skip estimating capacity") continue } - desiredReplicas, err := c.getDesiredReplicas(unsClusterObj) + desiredReplicas, err := c.getDesiredReplicas(typeConfig, unsClusterObj) if err != nil { keyedLogger.Error(err, "Failed to get desired replicas from object") continue } keyedLogger.V(2).Info("Getting pods from cluster") - pods, clusterNeedsBackoff, err := c.getPodsFromCluster(ctx, unsClusterObj, clusterObj.ClusterName) + pods, clusterNeedsBackoff, err := c.getPodsFromCluster(ctx, typeConfig, unsClusterObj, clusterObj.ClusterName) if err != nil { keyedLogger.Error(err, "Failed to get pods from cluster") if clusterNeedsBackoff { @@ -330,6 +327,15 @@ func (c *Controller) estimateCapacity( } unschedulable, nextCrossIn := countUnschedulablePods(pods, time.Now(), unschedulableThreshold) + keyedLogger.V(2).Info("Analyzed pods", + "total", len(pods), + "desired", desiredReplicas, + "unschedulable", unschedulable, + ) + + if nextCrossIn != nil && (retryAfter == nil || *nextCrossIn < *retryAfter) { + retryAfter = nextCrossIn + } var clusterEstimatedCapacity int64 if len(pods) >= int(desiredReplicas) { @@ -353,16 +359,6 @@ func (c *Controller) estimateCapacity( } estimatedCapacity[clusterObj.ClusterName] = clusterEstimatedCapacity - - keyedLogger.V(2).Info("Analyzed pods", - "total", len(pods), - "desired", desiredReplicas, - "unschedulable", unschedulable, - ) - - if nextCrossIn != nil && (retryAfter == nil || *nextCrossIn < *retryAfter) { - retryAfter = nextCrossIn - } } var result *worker.Result @@ -377,13 +373,14 @@ func (c *Controller) estimateCapacity( } func (c *Controller) getTotalAndReadyReplicas( + typeConfig *fedcorev1a1.FederatedTypeConfig, unsObj *unstructured.Unstructured, ) (int64, int64, error) { // These values might not have been populated by the controller, in which case we default to 0 totalReplicas := int64(0) if replicasPtr, err := utilunstructured.GetInt64FromPath( - unsObj, c.typeConfig.Spec.PathDefinition.ReplicasStatus, nil, + unsObj, typeConfig.Spec.PathDefinition.ReplicasStatus, nil, ); err != nil { return 0, 0, fmt.Errorf("replicas: %w", err) } else if replicasPtr != nil { @@ -392,7 +389,7 @@ func (c *Controller) getTotalAndReadyReplicas( readyReplicas := int64(0) if readyReplicasPtr, err := utilunstructured.GetInt64FromPath( - unsObj, c.typeConfig.Spec.PathDefinition.ReadyReplicasStatus, nil, + unsObj, typeConfig.Spec.PathDefinition.ReadyReplicasStatus, nil, ); err != nil { return 0, 0, fmt.Errorf("ready replicas: %w", err) } else if readyReplicasPtr != nil { @@ -402,12 +399,15 @@ func (c *Controller) getTotalAndReadyReplicas( return totalReplicas, readyReplicas, nil } -func (c *Controller) getDesiredReplicas(unsObj *unstructured.Unstructured) (int64, error) { - desiredReplicas, err := utilunstructured.GetInt64FromPath(unsObj, c.typeConfig.Spec.PathDefinition.ReplicasSpec, nil) +func (c *Controller) getDesiredReplicas( + typeConfig *fedcorev1a1.FederatedTypeConfig, + unsObj *unstructured.Unstructured, +) (int64, error) { + desiredReplicas, err := utilunstructured.GetInt64FromPath(unsObj, typeConfig.Spec.PathDefinition.ReplicasSpec, nil) if err != nil { return 0, fmt.Errorf("desired replicas: %w", err) } else if desiredReplicas == nil { - return 0, fmt.Errorf("no desired replicas at %s", c.typeConfig.Spec.PathDefinition.ReplicasSpec) + return 0, fmt.Errorf("no desired replicas at %s", typeConfig.Spec.PathDefinition.ReplicasSpec) } return *desiredReplicas, nil @@ -415,16 +415,17 @@ func (c *Controller) getDesiredReplicas(unsObj *unstructured.Unstructured) (int6 func (c *Controller) getPodsFromCluster( ctx context.Context, + typeConfig *fedcorev1a1.FederatedTypeConfig, unsClusterObj *unstructured.Unstructured, clusterName string, ) ([]*corev1.Pod, bool, error) { - plugin, err := plugins.ResolvePlugin(c.typeConfig) + plugin, err := plugins.ResolvePlugin(typeConfig.GetSourceTypeGVK()) if err != nil { return nil, false, fmt.Errorf("failed to get plugin for FTC: %w", err) } - client, err := c.federatedInformer.GetClientForCluster(clusterName) - if err != nil { + client, exist := c.federatedInformer.GetClusterClient(clusterName) + if !exist { return nil, true, fmt.Errorf("failed to get client for cluster: %w", err) } @@ -437,3 +438,234 @@ func (c *Controller) getPodsFromCluster( return pods, false, nil } + +func (c *Controller) getSourceObjectFromCluster( + ctx context.Context, + pod *corev1.Pod, + clusterName string, +) (possibleQualifies []common.QualifiedName, err error) { + client, exist := c.federatedInformer.GetClusterClient(clusterName) + if !exist { + return nil, fmt.Errorf("failed to get client for cluster %s", clusterName) + } + + for gvk, plugin := range plugins.NativePlugins { + object, found, err := plugin.GetTargetObjectFromPod(ctx, pod.DeepCopy(), plugins.ClusterHandle{ + Client: client, + }) + if err != nil || !found { + c.logger.V(3).Info( + "Failed to get target object form pod", + "cluster", clusterName, + "gvk", gvk, + "found", found, + "err", err, + ) + continue + } + + gv, err := schema.ParseGroupVersion(object.GetAPIVersion()) + if err != nil { + c.logger.V(3).Info( + "Failed to parse APIVersion form object", + "cluster", clusterName, + "gvk", gvk, + "err", err, + ) + continue + //return nil, false, fmt.Errorf("failed to parse APIVersion form object: %w", err) + } + qualifiedName := common.NewQualifiedName(object) + managed := object.GetLabels()[managedlabel.ManagedByKubeAdmiralLabelKey] == managedlabel.ManagedByKubeAdmiralLabelValue + + if gvk.Group != gv.Group || gvk.Kind != object.GetKind() || !managed { + c.logger.V(3).Info( + "Target object is unexpected", + "cluster", clusterName, + "expected-gvk", gvk, + "got-gvk", gv.WithKind(object.GetKind()), + "managed", managed, + "resource", qualifiedName.String(), + ) + continue + } + possibleQualifies = append(possibleQualifies, qualifiedName) + } + return +} + +func (c *Controller) getTargetObjectsIfAutoMigrationEnabled( + fedObject *fedcorev1a1.FederatedObject, +) (clusterObjs []util.FederatedObject, ftc *fedcorev1a1.FederatedTypeConfig, unschedulableThreshold *time.Duration, err error) { + // PodUnschedulableThresholdAnnotation is set by the scheduler. Its presence determines whether auto migration is enabled. + if value, exists := fedObject.GetAnnotations()[common.PodUnschedulableThresholdAnnotation]; exists { + if duration, err := time.ParseDuration(value); err != nil { + err = fmt.Errorf("failed to parse PodUnschedulableThresholdAnnotation: %w", err) + return nil, nil, nil, err + } else { + unschedulableThreshold = &duration + } + } + + objectMeta := &metav1.PartialObjectMetadata{} + if err = json.Unmarshal(fedObject.Spec.Template.Raw, objectMeta); err != nil { + err = fmt.Errorf("failed to unmarshall template of federated object: %w", err) + return nil, nil, nil, err + } + gv, err := schema.ParseGroupVersion(objectMeta.APIVersion) + if err != nil { + err = fmt.Errorf("failed to unmarshall template of federated object: %w", err) + return nil, nil, nil, err + } + gvk := schema.GroupVersionKind{ + Group: gv.Group, + Version: gv.Version, + Kind: objectMeta.Kind, + } + + ftcList, err := c.federatedInformer.GetFederatedTypeConfigLister().List(labels.Everything()) + if err != nil { + err = fmt.Errorf("failed to list FederatedTypeConfig list: %w", err) + return nil, nil, nil, err + } + for _, item := range ftcList { + if item.GetSourceTypeGVK() == gvk { + ftc = item + break + } + } + if ftc == nil || !isAutoMigrationEnabled(ftc) { + return nil, nil, nil, nil + } + + for _, placement := range fedObject.Spec.Placements { + for _, cluster := range placement.Placement { + lister, synced, exist := c.federatedInformer.GetResourceLister(gvk, cluster.Cluster) + if !exist || !synced() { + err = fmt.Errorf("informer of resource %v not exists or not synced for cluster %s", gvk, cluster.Cluster) + return nil, nil, nil, err + } + object, err := lister.ByNamespace(objectMeta.Namespace).Get(objectMeta.Name) + if err != nil && !apierrors.IsNotFound(err) { + err = fmt.Errorf("failed to get %v from informer stores for cluster %s: %w", objectMeta, cluster.Cluster, err) + return nil, nil, nil, err + } + clusterObjs = append(clusterObjs, util.FederatedObject{Object: object, ClusterName: cluster.Cluster}) + } + } + return clusterObjs, ftc, unschedulableThreshold, nil +} + +func (c *Controller) getPodInformer(cluster string) (informer *stoppableInformer, exist bool) { + c.lock.Lock() + defer c.lock.Unlock() + + informer, exist = c.podInformers[cluster] + return +} + +func (c *Controller) deletePodInformer(cluster string) { + c.lock.Lock() + defer c.lock.Unlock() + + informer, exist := c.podInformers[cluster] + if !exist { + return + } + delete(c.podInformers, cluster) + go func() { + informer.stop() + for !informer.Informer().IsStopped() { + time.Sleep(time.Second) + } + c.logger.V(3).Info("Pod informer has been deleted", "cluster", cluster) + }() + return +} + +func (c *Controller) clusterEventHandlerFunc(cluster *fedcorev1a1.FederatedCluster) { + _, ok := c.getPodInformer(cluster.Name) + if !cluster.DeletionTimestamp.IsZero() { + if !ok { + return + } + c.deletePodInformer(cluster.Name) + return + } + if ok { + return + } + client, exist := c.federatedInformer.GetClusterClient(cluster.Name) + if !exist { + c.logger.V(3).Info("Failed to get cluster client", "cluster", cluster.Name) + return + } + + // TODO: Use pod informer with pruning to reduce memory usage + informer := dynamicinformer.NewFilteredDynamicInformer( + client, + corev1.SchemeGroupVersion.WithResource("pods"), + corev1.NamespaceAll, + 0, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + nil, + ) + _, err := informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + UpdateFunc: func(oldObj, newObj interface{}) { + keyedLogger := c.logger.WithValues("cluster", cluster.Name) + ctx := klog.NewContext(context.TODO(), keyedLogger) + + unsNewObj := newObj.(*unstructured.Unstructured) + if unsNewObj.GetDeletionTimestamp() != nil { + return + } + unsOldObj := oldObj.(*unstructured.Unstructured) + + newPod := &corev1.Pod{} + if err := pkgruntime.DefaultUnstructuredConverter.FromUnstructured(unsNewObj.Object, newPod); err != nil { + keyedLogger.Error(nil, fmt.Sprintf("Internal error: newObj not of Pod type: %v", unsNewObj)) + return + } + oldPod := &corev1.Pod{} + if err := pkgruntime.DefaultUnstructuredConverter.FromUnstructured(unsOldObj.Object, oldPod); err != nil { + keyedLogger.Error(nil, fmt.Sprintf("Internal error: oldObj not of Pod type: %v", unsOldObj)) + return + } + + if !podScheduledConditionChanged(oldPod, newPod) { + return + } + + qualifiedNames, err := c.getSourceObjectFromCluster(ctx, newPod, cluster.Name) + if err != nil { + keyedLogger.V(3).Info( + "Failed to get source object form pod", + "pod", common.NewQualifiedName(newPod), + "err", err, + ) + return + } + // enqueue with a delay to simulate a rudimentary rate limiter + for _, qualifiedName := range qualifiedNames { + c.worker.EnqueueWithDelay(qualifiedName, 10*time.Second) + } + }, + }) + if err != nil { + c.logger.V(3).Info("Failed to add event handler for pod informer", "cluster", cluster.Name) + return + } + + ctx, cancel := context.WithCancel(context.TODO()) + go informer.Informer().Run(ctx.Done()) + c.lock.Lock() + defer c.lock.Unlock() + c.podInformers[cluster.Name] = &stoppableInformer{ + stop: cancel, + GenericInformer: informer, + } +} + +func isAutoMigrationEnabled(typeConfig *fedcorev1a1.FederatedTypeConfig) bool { + return typeConfig.Spec.AutoMigration != nil && typeConfig.Spec.AutoMigration.Enabled +} diff --git a/pkg/controllers/automigration/plugins/deployments.go b/pkg/controllers/automigration/plugins/deployments.go index 867240351..7ddd3fb61 100644 --- a/pkg/controllers/automigration/plugins/deployments.go +++ b/pkg/controllers/automigration/plugins/deployments.go @@ -24,8 +24,9 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" + "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/controller" deploymentutil "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/controller/deployment/util" ) @@ -43,11 +44,16 @@ func (*deploymentPlugin) GetPodsForClusterObject( rsList, err := deploymentutil.ListReplicaSets(deployment, func(ns string, opts metav1.ListOptions) ([]*appsv1.ReplicaSet, error) { rsList := &appsv1.ReplicaSetList{} - listOpts, err := convertListOptions(ns, &opts) + opts = *opts.DeepCopy() + opts.ResourceVersion = "0" // list from watch cache + unsRsList, err := handle.Client. + Resource(common.ReplicaSetGVR). + Namespace(ns). + List(ctx, opts) if err != nil { return nil, err } - if err := handle.Client.ListWithOptions(ctx, rsList, listOpts); err != nil { + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsRsList.Object, rsList); err != nil { return nil, err } ret := []*appsv1.ReplicaSet{} @@ -70,11 +76,19 @@ func (*deploymentPlugin) GetPodsForClusterObject( []*appsv1.ReplicaSet{newRS}, func(ns string, opts metav1.ListOptions) (*corev1.PodList, error) { podList := &corev1.PodList{} - listOpts, err := convertListOptions(ns, &opts) + opts = *opts.DeepCopy() + opts.ResourceVersion = "0" // list from watch cache if err != nil { return nil, err } - if err := handle.Client.ListWithOptions(ctx, podList, listOpts); err != nil { + unsPodList, err := handle.Client. + Resource(common.PodGVR). + Namespace(ns). + List(ctx, opts) + if err != nil { + return nil, err + } + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsPodList.Object, podList); err != nil { return nil, err } return podList, nil @@ -92,15 +106,27 @@ func (*deploymentPlugin) GetPodsForClusterObject( return ret, nil } -var _ Plugin = &deploymentPlugin{} - -func convertListOptions(ns string, opts *metav1.ListOptions) (*client.ListOptions, error) { - opts = opts.DeepCopy() - // list from watch cache - opts.ResourceVersion = "0" +func (*deploymentPlugin) GetTargetObjectFromPod( + ctx context.Context, + pod *corev1.Pod, + handle ClusterHandle, +) (obj *unstructured.Unstructured, found bool, err error) { + rs, found, err := controller.GetSpecifiedOwnerFromSourceObj(ctx, handle.Client, pod, metav1.APIResource{ + Name: "replicasets", + Group: appsv1.GroupName, + Version: "v1", + Kind: common.ReplicaSetKind, + }) + if err != nil || !found { + return nil, false, err + } - return &client.ListOptions{ - Namespace: ns, - Raw: opts, - }, nil + return controller.GetSpecifiedOwnerFromSourceObj(ctx, handle.Client, rs, metav1.APIResource{ + Name: "deployments", + Group: appsv1.GroupName, + Version: "v1", + Kind: common.DeploymentKind, + }) } + +var _ Plugin = &deploymentPlugin{} diff --git a/pkg/controllers/automigration/plugins/plugins.go b/pkg/controllers/automigration/plugins/plugins.go index ee77b2666..8143e4ecc 100644 --- a/pkg/controllers/automigration/plugins/plugins.go +++ b/pkg/controllers/automigration/plugins/plugins.go @@ -20,18 +20,17 @@ import ( "context" "fmt" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/client/generic" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" - schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema" ) type ClusterHandle struct { - Client generic.Client + Client dynamic.Interface } type Plugin interface { @@ -40,19 +39,22 @@ type Plugin interface { obj *unstructured.Unstructured, handle ClusterHandle, ) ([]*corev1.Pod, error) -} -var nativePlugins = map[schema.GroupVersionResource]Plugin{ - common.DeploymentGVR: &deploymentPlugin{}, + GetTargetObjectFromPod( + ctx context.Context, + pod *corev1.Pod, + handle ClusterHandle, + ) (obj *unstructured.Unstructured, found bool, err error) } -func ResolvePlugin(typeConfig *fedcorev1a1.FederatedTypeConfig) (Plugin, error) { - targetAPIResource := typeConfig.GetTargetType() - targetGVR := schemautil.APIResourceToGVR(&targetAPIResource) +var NativePlugins = map[schema.GroupVersionKind]Plugin{ + appsv1.SchemeGroupVersion.WithKind(common.DeploymentKind): &deploymentPlugin{}, +} - if plugin, exists := nativePlugins[targetGVR]; exists { +func ResolvePlugin(gvk schema.GroupVersionKind) (Plugin, error) { + if plugin, exists := NativePlugins[gvk]; exists { return plugin, nil } - return nil, fmt.Errorf("unsupported type %s", targetGVR.String()) + return nil, fmt.Errorf("unsupported type %s", gvk.String()) } diff --git a/pkg/controllers/automigration/util.go b/pkg/controllers/automigration/util.go index 9611e1b09..cf6cc3b49 100644 --- a/pkg/controllers/automigration/util.go +++ b/pkg/controllers/automigration/util.go @@ -36,17 +36,8 @@ func countUnschedulablePods( continue } - var scheduledCondition *corev1.PodCondition - for i := range pod.Status.Conditions { - condition := &pod.Status.Conditions[i] - if condition.Type == corev1.PodScheduled { - scheduledCondition = condition - break - } - } - if scheduledCondition == nil || - scheduledCondition.Status != corev1.ConditionFalse || - scheduledCondition.Reason != corev1.PodReasonUnschedulable { + scheduledCondition, isUnschedulable := getPodScheduledCondition(pod) + if !isUnschedulable { continue } @@ -62,3 +53,34 @@ func countUnschedulablePods( return unschedulableCount, nextCrossIn } + +func getPodScheduledCondition(pod *corev1.Pod) (scheduledCondition *corev1.PodCondition, isUnschedulable bool) { + for i := range pod.Status.Conditions { + condition := &pod.Status.Conditions[i] + if condition.Type == corev1.PodScheduled { + scheduledCondition = condition + break + } + } + if scheduledCondition == nil || + scheduledCondition.Status != corev1.ConditionFalse || + scheduledCondition.Reason != corev1.PodReasonUnschedulable { + return scheduledCondition, false + } + return scheduledCondition, true +} + +func podScheduledConditionChanged(oldPod, newPod *corev1.Pod) bool { + condition, _ := getPodScheduledCondition(newPod) + oldCondition, _ := getPodScheduledCondition(oldPod) + if condition == nil || oldCondition == nil { + return condition != oldCondition + } + + isEqual := condition.Status == oldCondition.Status && + condition.Reason == oldCondition.Reason && + condition.Message == oldCondition.Message && + condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) && + condition.LastTransitionTime.Equal(&oldCondition.LastTransitionTime) + return !isEqual +} diff --git a/pkg/controllers/automigration/util_test.go b/pkg/controllers/automigration/util_test.go index 4adf892b6..4ee1be6a9 100644 --- a/pkg/controllers/automigration/util_test.go +++ b/pkg/controllers/automigration/util_test.go @@ -120,3 +120,71 @@ func newPod(terminating bool, schedulable bool, lastTransitionTimestamp time.Tim } return pod } + +func Test_podScheduledConditionChanged(t *testing.T) { + now := time.Now() + podWithEmptyCond := newPod(false, false, now) + podWithEmptyCond.Status.Conditions = nil + + tests := []struct { + name string + oldPod *corev1.Pod + newPod *corev1.Pod + want bool + }{ + { + name: "both nil", + oldPod: podWithEmptyCond, + newPod: podWithEmptyCond, + want: false, + }, + { + name: "oldPod condition is nil", + oldPod: podWithEmptyCond, + newPod: newPod(false, false, now), + want: true, + }, + { + name: "newPod condition is nil", + oldPod: newPod(false, false, now), + newPod: podWithEmptyCond, + want: true, + }, + { + name: "unschedulable condition equal", + oldPod: newPod(false, false, now), + newPod: newPod(false, false, now), + want: false, + }, + { + name: "unschedulable condition not equal", + oldPod: newPod(false, false, now.Add(10*time.Second)), + newPod: newPod(false, false, now), + want: true, + }, + { + name: "schedulable condition equal", + oldPod: newPod(false, true, now), + newPod: newPod(false, true, now), + want: false, + }, + { + name: "schedulable condition not equal", + oldPod: newPod(false, true, now.Add(10*time.Second)), + newPod: newPod(false, true, now), + want: true, + }, + { + name: "schedulable and unschedulable conditions", + oldPod: newPod(false, true, now), + newPod: newPod(false, false, now), + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, podScheduledConditionChanged(tt.oldPod, tt.newPod), + "podScheduledConditionChanged(%v, %v)", tt.oldPod, tt.newPod) + }) + } +} diff --git a/pkg/controllers/common/constants.go b/pkg/controllers/common/constants.go index 810817e57..6653ee060 100644 --- a/pkg/controllers/common/constants.go +++ b/pkg/controllers/common/constants.go @@ -20,7 +20,11 @@ are Copyright 2023 The KubeAdmiral Authors. package common -import "k8s.io/apimachinery/pkg/runtime/schema" +import ( + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" +) const ( DefaultFedSystemNamespace = "kube-admiral-system" @@ -46,6 +50,7 @@ const ( PersistentVolumeKind = "PersistentVolume" PersistentVolumeClaimKind = "PersistentVolumeClaim" PodKind = "Pod" + ReplicaSetKind = "ReplicaSet" ) // The following consts are spec fields used to interact with unstructured resources @@ -166,6 +171,9 @@ var DeploymentGVR = schema.GroupVersionResource{ Resource: "deployments", } +var ReplicaSetGVR = appsv1.SchemeGroupVersion.WithResource("replicasets") +var PodGVR = corev1.SchemeGroupVersion.WithResource("pods") + var ConfigMapGVR = schema.GroupVersionResource{ Group: "", Version: "v1", diff --git a/pkg/controllers/context/context.go b/pkg/controllers/context/context.go index 3755abecd..511ae6498 100644 --- a/pkg/controllers/context/context.go +++ b/pkg/controllers/context/context.go @@ -31,6 +31,7 @@ import ( fedinformers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/federatedclient" "github.com/kubewharf/kubeadmiral/pkg/stats" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" ) type Context struct { @@ -53,6 +54,8 @@ type Context struct { DynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory FedInformerFactory fedinformers.SharedInformerFactory + FederatedInformerManager informermanager.FederatedInformerManager + FederatedClientFactory federatedclient.FederatedClientFactory } diff --git a/pkg/controllers/util/store.go b/pkg/controllers/util/store.go index df613e46e..fd8944b37 100644 --- a/pkg/controllers/util/store.go +++ b/pkg/controllers/util/store.go @@ -31,3 +31,15 @@ func UnstructuredFromStore(store cache.Store, key string) (*unstructured.Unstruc } return obj.(*unstructured.Unstructured), nil } + +func GetObjectFromStore[T any](store cache.Store, key string) (*T, error) { + obj, exists, err := store.GetByKey(key) + if err != nil || !exists { + return nil, err + } + t, ok := obj.(*T) + if !ok { + return nil, nil + } + return t, nil +} diff --git a/pkg/lifted/kubernetes/pkg/controller/controller_utils.go b/pkg/lifted/kubernetes/pkg/controller/controller_utils.go index 69546ebea..c10f41a3f 100644 --- a/pkg/lifted/kubernetes/pkg/controller/controller_utils.go +++ b/pkg/lifted/kubernetes/pkg/controller/controller_utils.go @@ -26,7 +26,15 @@ This file is lifted from k8s.io/kubernetes/pkg/controller/controller_utils.go package controller import ( + "context" + apps "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "sigs.k8s.io/controller-runtime/pkg/client" ) // ReplicaSetsByCreationTimestamp sorts a list of ReplicaSet by creation timestamp, using their names as a tie breaker. @@ -40,3 +48,40 @@ func (o ReplicaSetsByCreationTimestamp) Less(i, j int) bool { } return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) } + +// GetSpecifiedOwnerFromSourceObj returns the object of the owner matches the gvk. +func GetSpecifiedOwnerFromSourceObj( + ctx context.Context, + client dynamic.Interface, + sourceObj client.Object, + ownerApiResource metav1.APIResource, +) (obj *unstructured.Unstructured, found bool, err error) { + gv := schema.GroupVersion{ + Group: ownerApiResource.Group, + Version: ownerApiResource.Version, + } + var owner *metav1.OwnerReference + for _, o := range sourceObj.GetOwnerReferences() { + if o.APIVersion == gv.String() && o.Kind == ownerApiResource.Kind { + owner = &o + break + } + } + if owner == nil || client == nil { + return nil, false, nil + } + + obj, err = client.Resource(schema.GroupVersionResource{ + Group: ownerApiResource.Group, + Version: ownerApiResource.Version, + Resource: ownerApiResource.Name, + }).Namespace(sourceObj.GetNamespace()).Get(ctx, owner.Name, metav1.GetOptions{}) + if err != nil { + if errors.IsNotFound(err) { + return nil, false, nil + } + return nil, false, err + } + + return obj, true, nil +}