diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index cc8f3c13..4265c9dd 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -44,6 +44,7 @@ const ( StatusControllerName = "status" SchedulerName = "scheduler" SyncControllerName = "sync" + AutoMigrationControllerName = "auto-migration" ) var knownControllers = map[string]controllermanager.StartControllerFunc{ @@ -56,6 +57,7 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{ SchedulerName: startScheduler, SyncControllerName: startSyncController, FollowerControllerName: startFollowerController, + AutoMigrationControllerName: startAutoMigrationController, } var controllersDisabledByDefault = sets.New[string]() diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index 04466923..c648a103 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -23,6 +23,7 @@ import ( "k8s.io/klog/v2" "github.com/kubewharf/kubeadmiral/pkg/controllermanager" + "github.com/kubewharf/kubeadmiral/pkg/controllers/automigration" controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" "github.com/kubewharf/kubeadmiral/pkg/controllers/federate" "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster" @@ -261,3 +262,28 @@ func startFollowerController( return followerController, nil } + +func startAutoMigrationController( + ctx context.Context, + controllerCtx *controllercontext.Context, +) (controllermanager.Controller, error) { + autoMigrationController, err := automigration.NewAutoMigrationController( + ctx, + controllerCtx.KubeClientset, + controllerCtx.FedClientset, + controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(), + controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), + controllerCtx.FederatedInformerManager, + controllerCtx.InformerManager, + controllerCtx.Metrics, + klog.Background(), + controllerCtx.WorkerCount, + ) + if err != nil { + return nil, fmt.Errorf("error creating auto-migration controller: %w", err) + } + + go autoMigrationController.Run(ctx) + + return autoMigrationController, nil +} diff --git a/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go b/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go index 486f8766..41e2b931 100644 --- a/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go +++ b/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go @@ -72,6 +72,10 @@ func (f *FederatedTypeConfig) GetStatusAggregationEnabled() bool { return f.Spec.StatusAggregation != nil && f.Spec.StatusAggregation.Enabled } +func (f *FederatedTypeConfig) GetAutoMigrationEnabled() bool { + return f.Spec.AutoMigration != nil && f.Spec.AutoMigration.Enabled +} + func (f *FederatedTypeConfig) GetPolicyRcEnabled() bool { return true // TODO: should this be configurable? } diff --git a/pkg/controllers/automigration/controller.go b/pkg/controllers/automigration/controller.go index bc08257c..d066bf4d 100644 --- a/pkg/controllers/automigration/controller.go +++ b/pkg/controllers/automigration/controller.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -28,29 +27,32 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - pkgruntime "k8s.io/apimachinery/pkg/runtime" - dynamicclient "k8s.io/client-go/dynamic" - "k8s.io/client-go/informers" + "k8s.io/apimachinery/pkg/runtime/schema" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/client/generic" + fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned" + fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/automigration/plugins" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/scheduler/framework" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink" - utilunstructured "github.com/kubewharf/kubeadmiral/pkg/controllers/util/unstructured" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" "github.com/kubewharf/kubeadmiral/pkg/stats" + "github.com/kubewharf/kubeadmiral/pkg/util/eventsink" + "github.com/kubewharf/kubeadmiral/pkg/util/fedobjectadapters" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/managedlabel" + "github.com/kubewharf/kubeadmiral/pkg/util/naming" + utilunstructured "github.com/kubewharf/kubeadmiral/pkg/util/unstructured" + "github.com/kubewharf/kubeadmiral/pkg/util/worker" ) const ( EventReasonAutoMigrationInfoUpdated = "AutoMigrationInfoUpdated" + AutoMigrationControllerName = "auto-migration" ) /* @@ -65,15 +67,16 @@ One way to prevent both is: */ type Controller struct { - typeConfig *fedcorev1a1.FederatedTypeConfig - name string + name string - federatedObjectClient dynamicclient.NamespaceableResourceInterface - federatedObjectInformer informers.GenericInformer + fedObjectInformer fedcorev1a1informers.FederatedObjectInformer + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer + federatedInformer informermanager.FederatedInformerManager + ftcManager informermanager.FederatedTypeConfigManager - federatedInformer util.FederatedInformer + fedClient fedclient.Interface - worker worker.ReconcileWorker + worker worker.ReconcileWorker[common.QualifiedName] eventRecorder record.EventRecorder @@ -87,85 +90,157 @@ func (c *Controller) IsControllerReady() bool { } func NewAutoMigrationController( - controllerConfig *util.ControllerConfig, - typeConfig *fedcorev1a1.FederatedTypeConfig, - genericFedClient generic.Client, + ctx context.Context, kubeClient kubeclient.Interface, - federatedObjectClient dynamicclient.NamespaceableResourceInterface, - federatedObjectInformer informers.GenericInformer, + fedClient fedclient.Interface, + fedObjectInformer fedcorev1a1informers.FederatedObjectInformer, + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer, + federatedInformer informermanager.FederatedInformerManager, + ftcManager informermanager.FederatedTypeConfigManager, + metrics stats.Metrics, + logger klog.Logger, + workerCount int, ) (*Controller, error) { - controllerName := fmt.Sprintf("%s-auto-migration", typeConfig.Name) - c := &Controller{ - typeConfig: typeConfig, - name: controllerName, + name: AutoMigrationControllerName, + + fedObjectInformer: fedObjectInformer, + clusterFedObjectInformer: clusterFedObjectInformer, + federatedInformer: federatedInformer, + ftcManager: ftcManager, - federatedObjectClient: federatedObjectClient, - federatedObjectInformer: federatedObjectInformer, + fedClient: fedClient, - metrics: controllerConfig.Metrics, - logger: klog.NewKlogr().WithValues("controller", "auto-migration", "ftc", typeConfig.Name), - eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, controllerName, 6), + metrics: metrics, + logger: logger.WithValues("controller", AutoMigrationControllerName), + eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, AutoMigrationControllerName, 6), } - c.worker = worker.NewReconcileWorker( + c.worker = worker.NewReconcileWorker[common.QualifiedName]( + AutoMigrationControllerName, c.reconcile, worker.RateLimiterOptions{}, - controllerConfig.WorkerCount, - controllerConfig.Metrics, - delayingdeliver.NewMetricTags("auto-migration-worker", c.typeConfig.GetFederatedType().Kind), + workerCount, + metrics, ) - federatedObjectInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + federatedObjectHandler := cache.ResourceEventHandlerFuncs{ // Only need to handle UnschedulableThreshold updates // Addition and deletion will be triggered by the target resources. - UpdateFunc: func(oldUntyped, newUntyped interface{}) { - oldObj, newObj := oldUntyped.(*unstructured.Unstructured), newUntyped.(*unstructured.Unstructured) + UpdateFunc: func(oldFedObj, newFedObj interface{}) { + oldObj, newObj := oldFedObj.(fedcorev1a1.GenericFederatedObject), newFedObj.(fedcorev1a1.GenericFederatedObject) oldThreshold := oldObj.GetAnnotations()[common.PodUnschedulableThresholdAnnotation] newThreshold := newObj.GetAnnotations()[common.PodUnschedulableThresholdAnnotation] if oldThreshold != newThreshold { c.worker.Enqueue(common.NewQualifiedName(newObj)) } }, + } + if _, err := c.fedObjectInformer.Informer().AddEventHandler(federatedObjectHandler); err != nil { + return nil, fmt.Errorf("failed to create federated informer: %w", err) + } + if _, err := c.clusterFedObjectInformer.Informer().AddEventHandler(federatedObjectHandler); err != nil { + return nil, fmt.Errorf("failed to create cluster federated informer: %w", err) + } + + c.federatedInformer.AddPodEventHandler(&informermanager.ResourceEventHandlerWithClusterFuncs{ + UpdateFunc: func(oldObj, newObj interface{}, cluster string) { + ctx := klog.NewContext(ctx, c.logger) + ctx, logger := logging.InjectLoggerValues(ctx, "cluster", cluster) + + newPod := newObj.(*corev1.Pod) + if newPod.GetDeletionTimestamp() != nil { + return + } + oldPod := oldObj.(*corev1.Pod) + if !podScheduledConditionChanged(oldPod, newPod) { + return + } + + qualifiedNames, err := c.getPossibleSourceObjectsFromCluster(ctx, newPod, cluster) + if err != nil { + logger.V(3).Info( + "Failed to get possible source objects form pod", + "pod", common.NewQualifiedName(newPod), + "err", err, + ) + return + } + for _, qualifiedName := range qualifiedNames { + // enqueue with a delay to simulate a rudimentary rate limiter + c.worker.EnqueueWithDelay(qualifiedName, 10*time.Second) + } + }, }) - var err error - targetType := typeConfig.GetTargetType() - c.federatedInformer, err = util.NewFederatedInformer( - controllerConfig, - genericFedClient, - controllerConfig.KubeConfig, - &targetType, - func(o pkgruntime.Object) { - // enqueue with a delay to simulate a rudimentary rate limiter - c.worker.EnqueueWithDelay(common.NewQualifiedName(o), 10*time.Second) + objectHandler := func(obj interface{}) { + // work.enqueue + targetObj := obj.(*unstructured.Unstructured) + if !targetObj.GetDeletionTimestamp().IsZero() { + return + } + gvk := targetObj.GroupVersionKind() + ftc := c.getFTCIfAutoMigrationIsEnabled(gvk) + if ftc == nil { + c.logger.V(3).Info("Auto migration is disabled", "gvk", gvk) + return + } + c.worker.EnqueueWithDelay(common.QualifiedName{ + Namespace: targetObj.GetNamespace(), + Name: naming.GenerateFederatedObjectName(targetObj.GetName(), ftc.Name), + }, 10*time.Second) + } + if err := c.federatedInformer.AddEventHandlerGenerator(&informermanager.EventHandlerGenerator{ + Predicate: informermanager.RegisterOncePredicate, + Generator: func(ftc *fedcorev1a1.FederatedTypeConfig) cache.ResourceEventHandler { + // EventHandler for target obj + return cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + objectHandler(obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + objectHandler(newObj) + }, + DeleteFunc: func(obj interface{}) { + if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = tombstone.Obj + if obj == nil { + return + } + } + objectHandler(obj) + }, + } }, - &util.ClusterLifecycleHandlerFuncs{}, - ) - if err != nil { - return nil, fmt.Errorf("failed to create federated informer: %w", err) + }); err != nil { + return nil, fmt.Errorf("failed to create target object informer: %w", err) } return c, nil } func (c *Controller) Run(ctx context.Context) { - c.logger.Info("Starting controller") - defer c.logger.Info("Stopping controller") + ctx, logger := logging.InjectLogger(ctx, c.logger) - c.federatedInformer.Start() - defer c.federatedInformer.Stop() + logger.Info("Starting controller") + defer logger.Info("Stopping controller") - if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.HasSynced) { + if !cache.WaitForNamedCacheSync(AutoMigrationControllerName, ctx.Done(), c.HasSynced) { + logger.Error(nil, "Timed out waiting for cache sync") return } - c.worker.Run(ctx.Done()) + logger.Info("Caches are synced") + + c.worker.Run(ctx) <-ctx.Done() } func (c *Controller) HasSynced() bool { - if !c.federatedObjectInformer.Informer().HasSynced() || !c.federatedInformer.ClustersSynced() { + if !c.fedObjectInformer.Informer().HasSynced() || + !c.clusterFedObjectInformer.Informer().HasSynced() || + !c.federatedInformer.HasSynced() || + !c.ftcManager.HasSynced() { return false } @@ -176,39 +251,39 @@ func (c *Controller) HasSynced() bool { return true } -func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worker.Result) { +func (c *Controller) reconcile(ctx context.Context, qualifiedName common.QualifiedName) (status worker.Result) { key := qualifiedName.String() - keyedLogger := c.logger.WithValues("control-loop", "reconcile", "object", key) - ctx := klog.NewContext(context.TODO(), keyedLogger) + ctx, keyedLogger := logging.InjectLoggerValues(ctx, "control-loop", "reconcile", "object", key) startTime := time.Now() c.metrics.Rate("auto-migration.throughput", 1) keyedLogger.V(3).Info("Start reconcile") defer func() { c.metrics.Duration(fmt.Sprintf("%s.latency", c.name), startTime) - keyedLogger.V(3).Info("Finished reconcile", "duration", time.Since(startTime), "status", status.String()) + keyedLogger.V(3).Info("Finished reconcile", "duration", time.Since(startTime), "status", status) }() - fedObject, err := util.UnstructuredFromStore(c.federatedObjectInformer.Informer().GetStore(), key) + fedObject, err := fedobjectadapters.GetFromLister( + c.fedObjectInformer.Lister(), + c.clusterFedObjectInformer.Lister(), + qualifiedName.Namespace, + qualifiedName.Name, + ) if err != nil { - keyedLogger.Error(err, "Failed to get object from store") + keyedLogger.Error(err, "Failed to get federated object from store") return worker.StatusError } if fedObject == nil || fedObject.GetDeletionTimestamp() != nil { return worker.StatusAllOK } + fedObject = fedObject.DeepCopyGenericFederatedObject() - // PodUnschedulableThresholdAnnotation is set by the scheduler. Its presence determines whether auto migration is enabled. annotations := fedObject.GetAnnotations() - var unschedulableThreshold *time.Duration - if value, exists := annotations[common.PodUnschedulableThresholdAnnotation]; exists { - if duration, err := time.ParseDuration(value); err != nil { - keyedLogger.Error(err, "Failed to parse PodUnschedulableThresholdAnnotation") - } else { - unschedulableThreshold = &duration - } + clusterObjs, ftc, unschedulableThreshold, err := c.getTargetObjectsIfAutoMigrationEnabled(fedObject) + if err != nil { + keyedLogger.Error(err, "Failed to get objects from federated informer stores") + return worker.StatusError } - // auto-migration controller sets AutoMigrationAnnotation to // feedback auto-migration information back to the scheduler @@ -226,13 +301,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke } else { // Keep the annotation up-to-date if auto migration is enabled. keyedLogger.V(3).Info("Auto migration is enabled") - clusterObjs, err := c.federatedInformer.GetTargetStore().GetFromAllClusters(key) - if err != nil { - keyedLogger.Error(err, "Failed to get objects from federated informer stores") - return worker.StatusError - } - - estimatedCapacity, result = c.estimateCapacity(ctx, clusterObjs, *unschedulableThreshold) + estimatedCapacity, result = c.estimateCapacity(ctx, ftc, clusterObjs, *unschedulableThreshold) autoMigrationInfo := &framework.AutoMigrationInfo{EstimatedCapacity: estimatedCapacity} // Compare with the existing autoMigration annotation @@ -258,14 +327,15 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke keyedLogger.V(3).Info("Observed migration information", "estimatedCapacity", estimatedCapacity) if needsUpdate { - fedObject = fedObject.DeepCopy() fedObject.SetAnnotations(annotations) keyedLogger.V(1).Info("Updating federated object with auto migration information", "estimatedCapacity", estimatedCapacity) - _, err = c.federatedObjectClient. - Namespace(qualifiedName.Namespace). - Update(ctx, fedObject, metav1.UpdateOptions{}) - if err != nil { + if _, err = fedobjectadapters.Update( + ctx, + c.fedClient.CoreV1alpha1(), + fedObject, + metav1.UpdateOptions{}, + ); err != nil { keyedLogger.Error(err, "Failed to update federated object for auto migration") if apierrors.IsConflict(err) { return worker.StatusConflict @@ -292,38 +362,35 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) (status worke func (c *Controller) estimateCapacity( ctx context.Context, - clusterObjs []util.FederatedObject, + typeConfig *fedcorev1a1.FederatedTypeConfig, + clusterObjs []FederatedObject, unschedulableThreshold time.Duration, ) (map[string]int64, *worker.Result) { - keyedLogger := klog.FromContext(ctx) needsBackoff := false var retryAfter *time.Duration estimatedCapacity := make(map[string]int64, len(clusterObjs)) for _, clusterObj := range clusterObjs { - keyedLogger := keyedLogger.WithValues("cluster", clusterObj.ClusterName) - ctx := klog.NewContext(ctx, keyedLogger) - - unsClusterObj := clusterObj.Object.(*unstructured.Unstructured) + ctx, logger := logging.InjectLoggerValues(ctx, "cluster", clusterObj.ClusterName, "ftc", typeConfig.Name) // This is an optimization to skip pod listing when there are no unschedulable pods. - totalReplicas, readyReplicas, err := c.getTotalAndReadyReplicas(unsClusterObj) + totalReplicas, readyReplicas, err := c.getTotalAndReadyReplicas(typeConfig, clusterObj.Object) if err == nil && totalReplicas == readyReplicas { - keyedLogger.V(3).Info("No unschedulable pods found, skip estimating capacity") + logger.V(3).Info("No unschedulable pods found, skip estimating capacity") continue } - desiredReplicas, err := c.getDesiredReplicas(unsClusterObj) + desiredReplicas, err := c.getDesiredReplicas(typeConfig, clusterObj.Object) if err != nil { - keyedLogger.Error(err, "Failed to get desired replicas from object") + logger.Error(err, "Failed to get desired replicas from object") continue } - keyedLogger.V(2).Info("Getting pods from cluster") - pods, clusterNeedsBackoff, err := c.getPodsFromCluster(ctx, unsClusterObj, clusterObj.ClusterName) + logger.V(2).Info("Getting pods from cluster") + pods, clusterNeedsBackoff, err := c.getPodsFromCluster(ctx, typeConfig, clusterObj.Object, clusterObj.ClusterName) if err != nil { - keyedLogger.Error(err, "Failed to get pods from cluster") + logger.Error(err, "Failed to get pods from cluster") if clusterNeedsBackoff { needsBackoff = true } @@ -331,6 +398,15 @@ func (c *Controller) estimateCapacity( } unschedulable, nextCrossIn := countUnschedulablePods(pods, time.Now(), unschedulableThreshold) + logger.V(2).Info("Analyzed pods", + "total", len(pods), + "desired", desiredReplicas, + "unschedulable", unschedulable, + ) + + if nextCrossIn != nil && (retryAfter == nil || *nextCrossIn < *retryAfter) { + retryAfter = nextCrossIn + } var clusterEstimatedCapacity int64 if len(pods) >= int(desiredReplicas) { @@ -354,16 +430,6 @@ func (c *Controller) estimateCapacity( } estimatedCapacity[clusterObj.ClusterName] = clusterEstimatedCapacity - - keyedLogger.V(2).Info("Analyzed pods", - "total", len(pods), - "desired", desiredReplicas, - "unschedulable", unschedulable, - ) - - if nextCrossIn != nil && (retryAfter == nil || *nextCrossIn < *retryAfter) { - retryAfter = nextCrossIn - } } var result *worker.Result @@ -378,13 +444,14 @@ func (c *Controller) estimateCapacity( } func (c *Controller) getTotalAndReadyReplicas( + typeConfig *fedcorev1a1.FederatedTypeConfig, unsObj *unstructured.Unstructured, ) (int64, int64, error) { // These values might not have been populated by the controller, in which case we default to 0 totalReplicas := int64(0) if replicasPtr, err := utilunstructured.GetInt64FromPath( - unsObj, c.typeConfig.Spec.PathDefinition.ReplicasStatus, nil, + unsObj, typeConfig.Spec.PathDefinition.ReplicasStatus, nil, ); err != nil { return 0, 0, fmt.Errorf("replicas: %w", err) } else if replicasPtr != nil { @@ -393,7 +460,7 @@ func (c *Controller) getTotalAndReadyReplicas( readyReplicas := int64(0) if readyReplicasPtr, err := utilunstructured.GetInt64FromPath( - unsObj, c.typeConfig.Spec.PathDefinition.ReadyReplicasStatus, nil, + unsObj, typeConfig.Spec.PathDefinition.ReadyReplicasStatus, nil, ); err != nil { return 0, 0, fmt.Errorf("ready replicas: %w", err) } else if readyReplicasPtr != nil { @@ -403,12 +470,15 @@ func (c *Controller) getTotalAndReadyReplicas( return totalReplicas, readyReplicas, nil } -func (c *Controller) getDesiredReplicas(unsObj *unstructured.Unstructured) (int64, error) { - desiredReplicas, err := utilunstructured.GetInt64FromPath(unsObj, c.typeConfig.Spec.PathDefinition.ReplicasSpec, nil) +func (c *Controller) getDesiredReplicas( + typeConfig *fedcorev1a1.FederatedTypeConfig, + unsObj *unstructured.Unstructured, +) (int64, error) { + desiredReplicas, err := utilunstructured.GetInt64FromPath(unsObj, typeConfig.Spec.PathDefinition.ReplicasSpec, nil) if err != nil { return 0, fmt.Errorf("desired replicas: %w", err) } else if desiredReplicas == nil { - return 0, fmt.Errorf("no desired replicas at %s", c.typeConfig.Spec.PathDefinition.ReplicasSpec) + return 0, fmt.Errorf("no desired replicas at %s", typeConfig.Spec.PathDefinition.ReplicasSpec) } return *desiredReplicas, nil @@ -416,16 +486,17 @@ func (c *Controller) getDesiredReplicas(unsObj *unstructured.Unstructured) (int6 func (c *Controller) getPodsFromCluster( ctx context.Context, + typeConfig *fedcorev1a1.FederatedTypeConfig, unsClusterObj *unstructured.Unstructured, clusterName string, ) ([]*corev1.Pod, bool, error) { - plugin, err := plugins.ResolvePlugin(c.typeConfig) + plugin, err := plugins.ResolvePlugin(typeConfig.GetSourceTypeGVK()) if err != nil { return nil, false, fmt.Errorf("failed to get plugin for FTC: %w", err) } - client, err := c.federatedInformer.GetClientForCluster(clusterName) - if err != nil { + client, exist := c.federatedInformer.GetClusterDynamicClient(clusterName) + if !exist { return nil, true, fmt.Errorf("failed to get client for cluster: %w", err) } @@ -438,3 +509,108 @@ func (c *Controller) getPodsFromCluster( return pods, false, nil } + +func (c *Controller) getPossibleSourceObjectsFromCluster( + ctx context.Context, + pod *corev1.Pod, + clusterName string, +) (possibleQualifies []common.QualifiedName, err error) { + client, exist := c.federatedInformer.GetClusterDynamicClient(clusterName) + if !exist { + return nil, fmt.Errorf("failed to get client for cluster %s", clusterName) + } + + for gvk, plugin := range plugins.NativePlugins { + ctx, logger := logging.InjectLoggerValues(ctx, "gvk", gvk) + ftc := c.getFTCIfAutoMigrationIsEnabled(gvk) + if ftc == nil { + continue + } + object, found, err := plugin.GetTargetObjectFromPod(ctx, pod.DeepCopy(), plugins.ClusterHandle{ + Client: client, + }) + if err != nil || !found { + logger.V(3).Info( + "Failed to get target object form pod", + "found", found, + "err", err, + ) + continue + } + managed := object.GetLabels()[managedlabel.ManagedByKubeAdmiralLabelKey] == managedlabel.ManagedByKubeAdmiralLabelValue + gkMatched := object.GroupVersionKind().GroupKind() == gvk.GroupKind() + if !managed || !gkMatched { + c.logger.V(3).Info( + "The GVK of Target object not matched", + "got-gvk", object.GroupVersionKind(), + "managed", managed, + "resource", common.NewQualifiedName(object), + ) + continue + } + possibleQualifies = append(possibleQualifies, common.QualifiedName{ + Namespace: object.GetNamespace(), + Name: naming.GenerateFederatedObjectName(object.GetName(), ftc.Name), + }) + } + return possibleQualifies, nil +} + +func (c *Controller) getTargetObjectsIfAutoMigrationEnabled( + fedObject fedcorev1a1.GenericFederatedObject, +) (clusterObjs []FederatedObject, ftc *fedcorev1a1.FederatedTypeConfig, unschedulableThreshold *time.Duration, err error) { + // PodUnschedulableThresholdAnnotation is set by the scheduler. Its presence determines whether auto migration is enabled. + if value, exists := fedObject.GetAnnotations()[common.PodUnschedulableThresholdAnnotation]; exists { + if duration, err := time.ParseDuration(value); err != nil { + err = fmt.Errorf("failed to parse PodUnschedulableThresholdAnnotation: %w", err) + return nil, nil, nil, err + } else { + unschedulableThreshold = &duration + } + } + + objectMeta := &metav1.PartialObjectMetadata{} + if err = json.Unmarshal(fedObject.GetSpec().Template.Raw, objectMeta); err != nil { + err = fmt.Errorf("failed to unmarshall template of federated object: %w", err) + return nil, nil, nil, err + } + gvk := objectMeta.GroupVersionKind() + + ftc = c.getFTCIfAutoMigrationIsEnabled(gvk) + if ftc == nil { + return nil, nil, nil, nil + } + + for _, placement := range fedObject.GetSpec().Placements { + for _, cluster := range placement.Placement { + lister, synced, exist := c.federatedInformer.GetResourceLister(gvk, cluster.Cluster) + if !exist || !synced() { + err = fmt.Errorf("informer of resource %v not exists or not synced for cluster %s", gvk, cluster.Cluster) + return nil, nil, nil, err + } + object, err := lister.ByNamespace(objectMeta.Namespace).Get(objectMeta.Name) + if err != nil && !apierrors.IsNotFound(err) { + err = fmt.Errorf("failed to get %v from informer stores for cluster %s: %w", objectMeta, cluster.Cluster, err) + return nil, nil, nil, err + } + if apierrors.IsNotFound(err) { + continue + } + unsObj, ok := object.(*unstructured.Unstructured) + if !ok { + continue + } + clusterObjs = append(clusterObjs, FederatedObject{Object: unsObj, ClusterName: cluster.Cluster}) + } + } + return clusterObjs, ftc, unschedulableThreshold, nil +} + +func (c *Controller) getFTCIfAutoMigrationIsEnabled(gvk schema.GroupVersionKind) *fedcorev1a1.FederatedTypeConfig { + typeConfig, exists := c.ftcManager.GetResourceFTC(gvk) + if !exists || typeConfig == nil || !typeConfig.GetAutoMigrationEnabled() { + return nil + } + + return typeConfig +} diff --git a/pkg/controllers/automigration/plugins/deployments.go b/pkg/controllers/automigration/plugins/deployments.go index 5e3a6b8b..21768842 100644 --- a/pkg/controllers/automigration/plugins/deployments.go +++ b/pkg/controllers/automigration/plugins/deployments.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -25,8 +24,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/kubewharf/kubeadmiral/pkg/controllers/common" deploymentutil "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/controller/deployment/util" ) @@ -44,11 +43,16 @@ func (*deploymentPlugin) GetPodsForClusterObject( rsList, err := deploymentutil.ListReplicaSets(deployment, func(ns string, opts metav1.ListOptions) ([]*appsv1.ReplicaSet, error) { rsList := &appsv1.ReplicaSetList{} - listOpts, err := convertListOptions(ns, &opts) + opts = *opts.DeepCopy() + opts.ResourceVersion = "0" // list from watch cache + unsRsList, err := handle.Client. + Resource(common.ReplicaSetGVR). + Namespace(ns). + List(ctx, opts) if err != nil { return nil, err } - if err := handle.Client.ListWithOptions(ctx, rsList, listOpts); err != nil { + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsRsList.Object, rsList); err != nil { return nil, err } ret := []*appsv1.ReplicaSet{} @@ -71,11 +75,19 @@ func (*deploymentPlugin) GetPodsForClusterObject( []*appsv1.ReplicaSet{newRS}, func(ns string, opts metav1.ListOptions) (*corev1.PodList, error) { podList := &corev1.PodList{} - listOpts, err := convertListOptions(ns, &opts) + opts = *opts.DeepCopy() + opts.ResourceVersion = "0" // list from watch cache if err != nil { return nil, err } - if err := handle.Client.ListWithOptions(ctx, podList, listOpts); err != nil { + unsPodList, err := handle.Client. + Resource(common.PodGVR). + Namespace(ns). + List(ctx, opts) + if err != nil { + return nil, err + } + if err = runtime.DefaultUnstructuredConverter.FromUnstructured(unsPodList.Object, podList); err != nil { return nil, err } return podList, nil @@ -93,15 +105,27 @@ func (*deploymentPlugin) GetPodsForClusterObject( return ret, nil } -var _ Plugin = &deploymentPlugin{} - -func convertListOptions(ns string, opts *metav1.ListOptions) (*client.ListOptions, error) { - opts = opts.DeepCopy() - // list from watch cache - opts.ResourceVersion = "0" +func (*deploymentPlugin) GetTargetObjectFromPod( + ctx context.Context, + pod *corev1.Pod, + handle ClusterHandle, +) (obj *unstructured.Unstructured, found bool, err error) { + rs, found, err := GetSpecifiedOwnerFromObj(ctx, handle.Client, pod, metav1.APIResource{ + Name: "replicasets", + Group: appsv1.GroupName, + Version: "v1", + Kind: common.ReplicaSetKind, + }) + if err != nil || !found { + return nil, false, err + } - return &client.ListOptions{ - Namespace: ns, - Raw: opts, - }, nil + return GetSpecifiedOwnerFromObj(ctx, handle.Client, rs, metav1.APIResource{ + Name: "deployments", + Group: appsv1.GroupName, + Version: "v1", + Kind: common.DeploymentKind, + }) } + +var _ Plugin = &deploymentPlugin{} diff --git a/pkg/controllers/automigration/plugins/plugins.go b/pkg/controllers/automigration/plugins/plugins.go index 4699709f..8143e4ec 100644 --- a/pkg/controllers/automigration/plugins/plugins.go +++ b/pkg/controllers/automigration/plugins/plugins.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -21,18 +20,17 @@ import ( "context" "fmt" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/client/generic" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" - schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema" ) type ClusterHandle struct { - Client generic.Client + Client dynamic.Interface } type Plugin interface { @@ -41,19 +39,22 @@ type Plugin interface { obj *unstructured.Unstructured, handle ClusterHandle, ) ([]*corev1.Pod, error) -} -var nativePlugins = map[schema.GroupVersionResource]Plugin{ - common.DeploymentGVR: &deploymentPlugin{}, + GetTargetObjectFromPod( + ctx context.Context, + pod *corev1.Pod, + handle ClusterHandle, + ) (obj *unstructured.Unstructured, found bool, err error) } -func ResolvePlugin(typeConfig *fedcorev1a1.FederatedTypeConfig) (Plugin, error) { - targetAPIResource := typeConfig.GetTargetType() - targetGVR := schemautil.APIResourceToGVR(&targetAPIResource) +var NativePlugins = map[schema.GroupVersionKind]Plugin{ + appsv1.SchemeGroupVersion.WithKind(common.DeploymentKind): &deploymentPlugin{}, +} - if plugin, exists := nativePlugins[targetGVR]; exists { +func ResolvePlugin(gvk schema.GroupVersionKind) (Plugin, error) { + if plugin, exists := NativePlugins[gvk]; exists { return plugin, nil } - return nil, fmt.Errorf("unsupported type %s", targetGVR.String()) + return nil, fmt.Errorf("unsupported type %s", gvk.String()) } diff --git a/pkg/controllers/automigration/plugins/util.go b/pkg/controllers/automigration/plugins/util.go new file mode 100644 index 00000000..081ff9a9 --- /dev/null +++ b/pkg/controllers/automigration/plugins/util.go @@ -0,0 +1,66 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package plugins + +import ( + "context" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/dynamic" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// GetSpecifiedOwnerFromObj returns the owner of the given object matches the APIResource. +func GetSpecifiedOwnerFromObj( + ctx context.Context, + client dynamic.Interface, + obj client.Object, + ownerAPIResource metav1.APIResource, +) (ownerObj *unstructured.Unstructured, found bool, err error) { + gv := schema.GroupVersion{ + Group: ownerAPIResource.Group, + Version: ownerAPIResource.Version, + } + var owner *metav1.OwnerReference + ownerReferences := obj.GetOwnerReferences() + for i, o := range ownerReferences { + if o.APIVersion == gv.String() && o.Kind == ownerAPIResource.Kind { + owner = &ownerReferences[i] + break + } + } + if owner == nil || client == nil { + return nil, false, nil + } + + ownerObj, err = client.Resource(schema.GroupVersionResource{ + Group: ownerAPIResource.Group, + Version: ownerAPIResource.Version, + Resource: ownerAPIResource.Name, + }).Namespace(obj.GetNamespace()).Get(ctx, owner.Name, metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return nil, false, nil + } + return nil, false, err + } + + return ownerObj, true, nil +} diff --git a/pkg/controllers/automigration/util.go b/pkg/controllers/automigration/util.go index 55a20a94..5eb5970f 100644 --- a/pkg/controllers/automigration/util.go +++ b/pkg/controllers/automigration/util.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -21,6 +20,7 @@ import ( "time" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) // Returns the number of unschedulable pods that remain @@ -37,17 +37,8 @@ func countUnschedulablePods( continue } - var scheduledCondition *corev1.PodCondition - for i := range pod.Status.Conditions { - condition := &pod.Status.Conditions[i] - if condition.Type == corev1.PodScheduled { - scheduledCondition = condition - break - } - } - if scheduledCondition == nil || - scheduledCondition.Status != corev1.ConditionFalse || - scheduledCondition.Reason != corev1.PodReasonUnschedulable { + scheduledCondition, isUnschedulable := getPodScheduledCondition(pod) + if !isUnschedulable { continue } @@ -63,3 +54,40 @@ func countUnschedulablePods( return unschedulableCount, nextCrossIn } + +func getPodScheduledCondition(pod *corev1.Pod) (scheduledCondition *corev1.PodCondition, isUnschedulable bool) { + for i := range pod.Status.Conditions { + condition := &pod.Status.Conditions[i] + if condition.Type == corev1.PodScheduled { + scheduledCondition = condition + break + } + } + if scheduledCondition == nil || + scheduledCondition.Status != corev1.ConditionFalse || + scheduledCondition.Reason != corev1.PodReasonUnschedulable { + return scheduledCondition, false + } + return scheduledCondition, true +} + +func podScheduledConditionChanged(oldPod, newPod *corev1.Pod) bool { + condition, _ := getPodScheduledCondition(newPod) + oldCondition, _ := getPodScheduledCondition(oldPod) + if condition == nil || oldCondition == nil { + return condition != oldCondition + } + + isEqual := condition.Status == oldCondition.Status && + condition.Reason == oldCondition.Reason && + condition.Message == oldCondition.Message && + condition.LastProbeTime.Equal(&oldCondition.LastProbeTime) && + condition.LastTransitionTime.Equal(&oldCondition.LastTransitionTime) + return !isEqual +} + +// An object with an origin information. +type FederatedObject struct { + Object *unstructured.Unstructured + ClusterName string +} diff --git a/pkg/controllers/automigration/util_test.go b/pkg/controllers/automigration/util_test.go index 4adf892b..4ee1be6a 100644 --- a/pkg/controllers/automigration/util_test.go +++ b/pkg/controllers/automigration/util_test.go @@ -120,3 +120,71 @@ func newPod(terminating bool, schedulable bool, lastTransitionTimestamp time.Tim } return pod } + +func Test_podScheduledConditionChanged(t *testing.T) { + now := time.Now() + podWithEmptyCond := newPod(false, false, now) + podWithEmptyCond.Status.Conditions = nil + + tests := []struct { + name string + oldPod *corev1.Pod + newPod *corev1.Pod + want bool + }{ + { + name: "both nil", + oldPod: podWithEmptyCond, + newPod: podWithEmptyCond, + want: false, + }, + { + name: "oldPod condition is nil", + oldPod: podWithEmptyCond, + newPod: newPod(false, false, now), + want: true, + }, + { + name: "newPod condition is nil", + oldPod: newPod(false, false, now), + newPod: podWithEmptyCond, + want: true, + }, + { + name: "unschedulable condition equal", + oldPod: newPod(false, false, now), + newPod: newPod(false, false, now), + want: false, + }, + { + name: "unschedulable condition not equal", + oldPod: newPod(false, false, now.Add(10*time.Second)), + newPod: newPod(false, false, now), + want: true, + }, + { + name: "schedulable condition equal", + oldPod: newPod(false, true, now), + newPod: newPod(false, true, now), + want: false, + }, + { + name: "schedulable condition not equal", + oldPod: newPod(false, true, now.Add(10*time.Second)), + newPod: newPod(false, true, now), + want: true, + }, + { + name: "schedulable and unschedulable conditions", + oldPod: newPod(false, true, now), + newPod: newPod(false, false, now), + want: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, podScheduledConditionChanged(tt.oldPod, tt.newPod), + "podScheduledConditionChanged(%v, %v)", tt.oldPod, tt.newPod) + }) + } +} diff --git a/pkg/controllers/common/constants.go b/pkg/controllers/common/constants.go index 49404b97..08881f08 100644 --- a/pkg/controllers/common/constants.go +++ b/pkg/controllers/common/constants.go @@ -140,6 +140,8 @@ const ( DaemonSetResource = "daemonsets" ConfigMapResource = "configmaps" SecretResource = "secrets" + PodResource = "pods" + ReplicaSetResource = "replicasets" NamespaceKind = "Namespace" DeploymentKind = "Deployment" @@ -155,6 +157,7 @@ const ( PersistentVolumeKind = "PersistentVolume" PersistentVolumeClaimKind = "PersistentVolumeClaim" PodKind = "Pod" + ReplicaSetKind = "ReplicaSet" ) var ( @@ -171,9 +174,11 @@ var ( NamespaceGVR = corev1.SchemeGroupVersion.WithResource(NamespaceResource) ConfigMapGVR = corev1.SchemeGroupVersion.WithResource(ConfigMapResource) SecretGVR = corev1.SchemeGroupVersion.WithResource(SecretResource) + PodGVR = corev1.SchemeGroupVersion.WithResource(PodResource) DeploymentGVR = appsv1.SchemeGroupVersion.WithResource(DeploymentResource) DaemonSetGVR = appsv1.SchemeGroupVersion.WithResource(DaemonSetResource) + ReplicaSetGVR = appsv1.SchemeGroupVersion.WithResource(ReplicaSetResource) ) // MaxFederatedObjectNameLength defines the max length of a federated object name. diff --git a/pkg/util/informermanager/federatedinformermanager.go b/pkg/util/informermanager/federatedinformermanager.go index 9a5fd79c..131ccc43 100644 --- a/pkg/util/informermanager/federatedinformermanager.go +++ b/pkg/util/informermanager/federatedinformermanager.go @@ -77,6 +77,9 @@ type federatedInformerManager struct { queue workqueue.RateLimitingInterface podListerSemaphore *semaphore.Weighted initialClusters sets.Set[string] + + podEventHandlers []*ResourceEventHandlerWithClusterFuncs + podEventRegistrations map[string]map[*ResourceEventHandlerWithClusterFuncs]cache.ResourceEventHandlerRegistration } func NewFederatedInformerManager( @@ -102,6 +105,8 @@ func NewFederatedInformerManager( queue: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()), podListerSemaphore: semaphore.NewWeighted(3), // TODO: make this configurable initialClusters: sets.New[string](), + podEventHandlers: []*ResourceEventHandlerWithClusterFuncs{}, + podEventRegistrations: map[string]map[*ResourceEventHandlerWithClusterFuncs]cache.ResourceEventHandlerRegistration{}, } clusterInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ @@ -264,6 +269,7 @@ func (m *federatedInformerManager) processCluster( m.clusterCancelFuncs[clusterName] = cancel m.informerManagers[clusterName] = manager m.informerFactories[clusterName] = factory + m.podEventRegistrations[clusterName] = map[*ResourceEventHandlerWithClusterFuncs]cache.ResourceEventHandlerRegistration{} } if m.initialClusters.Has(cluster.Name) { @@ -276,6 +282,17 @@ func (m *federatedInformerManager) processCluster( } } + registrations := m.podEventRegistrations[clusterName] + factory := m.informerFactories[clusterName] + for _, handler := range m.podEventHandlers { + if registrations[handler] == nil { + copied := handler.copyWithClusterName(clusterName) + if r, err := factory.Core().V1().Pods().Informer().AddEventHandler(copied); err == nil { + registrations[handler] = r + } + } + } + return false, 0, nil } @@ -297,6 +314,7 @@ func (m *federatedInformerManager) processClusterDeletionUnlocked(ctx context.Co delete(m.informerManagers, clusterName) delete(m.informerFactories, clusterName) delete(m.clusterCancelFuncs, clusterName) + delete(m.podEventRegistrations, clusterName) m.initialClusters.Delete(clusterName) @@ -395,6 +413,13 @@ func (m *federatedInformerManager) GetNodeLister( return factory.Core().V1().Nodes().Lister(), factory.Core().V1().Nodes().Informer().HasSynced, true } +func (m *federatedInformerManager) AddPodEventHandler(handler *ResourceEventHandlerWithClusterFuncs) { + m.lock.Lock() + defer m.lock.Unlock() + + m.podEventHandlers = append(m.podEventHandlers, handler) +} + func (m *federatedInformerManager) GetPodLister( cluster string, ) (lister v1.PodLister, informerSynced cache.InformerSynced, exists bool) { diff --git a/pkg/util/informermanager/interface.go b/pkg/util/informermanager/interface.go index 185e8b70..55342d10 100644 --- a/pkg/util/informermanager/interface.go +++ b/pkg/util/informermanager/interface.go @@ -42,6 +42,52 @@ type EventHandlerGenerator struct { Generator func(ftc *fedcorev1a1.FederatedTypeConfig) cache.ResourceEventHandler } +// ResourceEventHandlerWithClusterFuncs is an adaptor to let you easily specify as many or +// as few of the notification functions as you want while still implementing +// ResourceEventHandler. This adapter does not remove the prohibition against +// modifying the objects. +type ResourceEventHandlerWithClusterFuncs struct { + clusterName string + + AddFunc func(obj interface{}, cluster string) + UpdateFunc func(oldObj, newObj interface{}, cluster string) + DeleteFunc func(obj interface{}, cluster string) +} + +// OnAdd calls AddFunc if it's not nil. +func (p *ResourceEventHandlerWithClusterFuncs) OnAdd(obj interface{}) { + if p.AddFunc != nil { + p.AddFunc(obj, p.clusterName) + } +} + +// OnUpdate calls UpdateFunc if it's not nil. +func (p *ResourceEventHandlerWithClusterFuncs) OnUpdate(oldObj, newObj interface{}) { + if p.UpdateFunc != nil { + p.UpdateFunc(oldObj, newObj, p.clusterName) + } +} + +// OnDelete calls DeleteFunc if it's not nil. +func (p *ResourceEventHandlerWithClusterFuncs) OnDelete(obj interface{}) { + if p.DeleteFunc != nil { + p.DeleteFunc(obj, p.clusterName) + } +} + +// copyWithClusterName returns a copy of ResourceEventHandlerWithClusterFuncs with given cluster name +func (p *ResourceEventHandlerWithClusterFuncs) copyWithClusterName( + clusterName string, +) *ResourceEventHandlerWithClusterFuncs { + return &ResourceEventHandlerWithClusterFuncs{ + clusterName: clusterName, + + AddFunc: p.AddFunc, + UpdateFunc: p.UpdateFunc, + DeleteFunc: p.DeleteFunc, + } +} + // FTCUpdateHandler is called by InformerManager each time it finishes processing an FTC. This allows controllers to // hook into the InformerManager's view of an FTC's lifecycle. When a new FTC is observed, lastObserved will be nil. // When a FTC deletion is observed, latest will be nil. @@ -121,6 +167,8 @@ type FederatedInformerManager interface { // Returns a kubernetes client for the given cluster if it exists. The client for each cluster will eventually exist. GetClusterKubeClient(cluster string) (client kubernetes.Interface, exists bool) + // Register EventHandlers for each pod informer of cluster. + AddPodEventHandler(handler *ResourceEventHandlerWithClusterFuncs) GetPodLister(cluster string) (lister corev1listers.PodLister, informerSynced cache.InformerSynced, exists bool) GetNodeLister(cluster string) (lister corev1listers.NodeLister, informerSynced cache.InformerSynced, exists bool) diff --git a/pkg/util/informermanager/podinformer.go b/pkg/util/informermanager/podinformer.go index c49da4b5..d797a7dc 100644 --- a/pkg/util/informermanager/podinformer.go +++ b/pkg/util/informermanager/podinformer.go @@ -125,11 +125,12 @@ func prunePod(pod *corev1.Pod) { } *pod = corev1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - Generation: pod.Generation, - ResourceVersion: pod.ResourceVersion, - UID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + Generation: pod.Generation, + ResourceVersion: pod.ResourceVersion, + UID: pod.UID, + DeletionTimestamp: pod.DeletionTimestamp, }, Spec: corev1.PodSpec{ NodeName: pod.Spec.NodeName, @@ -137,5 +138,9 @@ func prunePod(pod *corev1.Pod) { Containers: containers, InitContainers: initContainers, }, + Status: corev1.PodStatus{ + Phase: pod.Status.Phase, + Conditions: pod.Status.Conditions, + }, } }