From 7286f960612f4d2d37753158ad1b7da5431dc30a Mon Sep 17 00:00:00 2001 From: "lihanbo.0316" Date: Fri, 14 Jul 2023 16:05:39 +0800 Subject: [PATCH] refactor: override controller --- .../app/controllermanager.go | 2 + cmd/controller-manager/app/core.go | 20 + .../override/overridepolicy_controller.go | 476 ++++++++++-------- pkg/controllers/override/util.go | 42 +- pkg/controllers/util/clusterselector/util.go | 1 - pkg/controllers/util/genericinformer.go | 1 - pkg/controllers/util/meta.go | 1 - pkg/controllers/util/overrides.go | 70 +-- 8 files changed, 316 insertions(+), 297 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 87811a6a5..0020d284c 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -39,10 +39,12 @@ const ( FederateControllerName = "federate" MonitorControllerName = "monitor" FollowerControllerName = "follower" + OverrideControllerName = "overridepolicy" ) var knownControllers = map[string]controllermanager.StartControllerFunc{ FederateControllerName: startFederateController, + OverrideControllerName: startOverridePolicyController, } var controllersDisabledByDefault = sets.New(MonitorControllerName) diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index 31feff65a..7c9d88798 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -25,6 +25,7 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/controllermanager" controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" "github.com/kubewharf/kubeadmiral/pkg/controllers/federate" + "github.com/kubewharf/kubeadmiral/pkg/controllers/override" ) func startFederateController( @@ -51,3 +52,22 @@ func startFederateController( return federateController, nil } + +func startOverridePolicyController(ctx context.Context, controllerCtx *controllercontext.Context) (controllermanager.Controller, error) { + overrideController, err := override.NewOverridePolicyController( + controllerCtx.KubeClientset, + controllerCtx.FedClientset, + controllerCtx.FedInformerFactory, + controllerCtx.InformerManager, + controllerCtx.Metrics, + klog.Background(), + controllerCtx.WorkerCount, + ) + if err != nil { + return nil, fmt.Errorf("error creating override controller: %w", err) + } + + go overrideController.Run(ctx) + + return overrideController, nil +} diff --git a/pkg/controllers/override/overridepolicy_controller.go b/pkg/controllers/override/overridepolicy_controller.go index f1ad8a5db..91108f9cc 100644 --- a/pkg/controllers/override/overridepolicy_controller.go +++ b/pkg/controllers/override/overridepolicy_controller.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -20,29 +19,33 @@ package override import ( "context" "fmt" - "strings" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - kubeclient "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" + "k8s.io/apimachinery/pkg/labels" + pkgruntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned" + fedinformers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions" + fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" "github.com/kubewharf/kubeadmiral/pkg/stats" + "github.com/kubewharf/kubeadmiral/pkg/util/eventsink" + "github.com/kubewharf/kubeadmiral/pkg/util/fedobjectadapters" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/pendingcontrollers" + "github.com/kubewharf/kubeadmiral/pkg/util/worker" ) const ( @@ -54,224 +57,253 @@ const ( var PrefixedControllerName = common.DefaultPrefix + ControllerName -// OverrideController adds override rules specified in OverridePolicies +// Controller adds override rules specified in OverridePolicies // to federated objects. type Controller struct { - // name of controller - name string - - // FederatedTypeConfig for this controller - typeConfig *fedcorev1a1.FederatedTypeConfig - - // Store for federated objects - federatedStore cache.Store - // Controller for federated objects - federatedController cache.Controller - // Client for federated objects - federatedClient util.ResourceClient - - // Store for OverridePolicy - overridePolicyStore cache.Store - // Controller for OverridePolicy - overridePolicyController cache.Controller - - // Store for ClusterOverridePolicy - clusterOverridePolicyStore cache.Store - // Controller for ClusterOverridePolicy - clusterOverridePolicyController cache.Controller - - // Store for FederatedCluster - clusterStore cache.Store - // Controller for FederatedCluster - clusterController cache.Controller - - worker worker.ReconcileWorker + worker worker.ReconcileWorker[common.QualifiedName] + + informerManager informermanager.InformerManager + fedObjectInformer fedcorev1a1informers.FederatedObjectInformer + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer + overridePolicyInformer fedcorev1a1informers.OverridePolicyInformer + clusterOverridePolicyInformer fedcorev1a1informers.ClusterOverridePolicyInformer + federatedClusterInformer fedcorev1a1informers.FederatedClusterInformer + + fedClient fedclient.Interface eventRecorder record.EventRecorder metrics stats.Metrics + logger klog.Logger } -func StartController( - controllerConfig *util.ControllerConfig, - stopChan <-chan struct{}, - typeConfig *fedcorev1a1.FederatedTypeConfig, -) error { - controller, err := newController(controllerConfig, typeConfig) - if err != nil { - return err - } - klog.V(4).Infof("Starting %s", controller.name) - controller.Run(stopChan) - return nil -} - -func newController( - controllerConfig *util.ControllerConfig, - typeConfig *fedcorev1a1.FederatedTypeConfig, +func NewOverridePolicyController( + kubeClient kubernetes.Interface, + fedClient fedclient.Interface, + fedInformerFactory fedinformers.SharedInformerFactory, + informerManager informermanager.InformerManager, + metrics stats.Metrics, + logger klog.Logger, + workerCount int, ) (*Controller, error) { - userAgent := fmt.Sprintf("%s-%s", strings.ToLower(typeConfig.GetFederatedType().Kind), ControllerName) - configWithUserAgent := rest.CopyConfig(controllerConfig.KubeConfig) - rest.AddUserAgent(configWithUserAgent, userAgent) - - kubeClient := kubeclient.NewForConfigOrDie(configWithUserAgent) - recorder := eventsink.NewDefederatingRecorderMux(kubeClient, userAgent, 4) - c := &Controller{ - name: userAgent, - typeConfig: typeConfig, - eventRecorder: recorder, - metrics: controllerConfig.Metrics, + informerManager: informerManager, + fedObjectInformer: fedInformerFactory.Core().V1alpha1().FederatedObjects(), + clusterFedObjectInformer: fedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), + overridePolicyInformer: fedInformerFactory.Core().V1alpha1().OverridePolicies(), + clusterOverridePolicyInformer: fedInformerFactory.Core().V1alpha1().ClusterOverridePolicies(), + federatedClusterInformer: fedInformerFactory.Core().V1alpha1().FederatedClusters(), + fedClient: fedClient, + metrics: metrics, + logger: logger.WithValues("controller", ControllerName), } - var err error - - federatedApiResource := typeConfig.GetFederatedType() - c.federatedClient, err = util.NewResourceClient(configWithUserAgent, &federatedApiResource) - if err != nil { - return nil, fmt.Errorf("NewResourceClient failed: %w", err) - } - - c.worker = worker.NewReconcileWorker( + c.eventRecorder = eventsink.NewDefederatingRecorderMux(kubeClient, ControllerName, 4) + c.worker = worker.NewReconcileWorker[common.QualifiedName]( + ControllerName, + nil, c.reconcile, worker.RateLimiterOptions{}, - controllerConfig.WorkerCount, - controllerConfig.Metrics, - delayingdeliver.NewMetricTags(c.name, federatedApiResource.Kind), - ) - enqueueObj := c.worker.EnqueueObject - c.federatedStore, c.federatedController = util.NewResourceInformer( - c.federatedClient, - controllerConfig.TargetNamespace, - enqueueObj, - controllerConfig.Metrics, + workerCount, + metrics, ) - getPolicyHandlers := func(labelKey string) *cache.ResourceEventHandlerFuncs { - return &cache.ResourceEventHandlerFuncs{ - // Policy added/updated: we need to reconcile all fedObjects referencing this policy - AddFunc: func(obj interface{}) { - policy := obj.(fedcorev1a1.GenericOverridePolicy) - c.enqueueFedObjectsUsingPolicy(policy, labelKey) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - oldPolicy := oldObj.(fedcorev1a1.GenericOverridePolicy) - newPolicy := newObj.(fedcorev1a1.GenericOverridePolicy) - if !equality.Semantic.DeepEqual(oldPolicy.GetSpec(), newPolicy.GetSpec()) { - c.enqueueFedObjectsUsingPolicy(newPolicy, labelKey) - } - }, - DeleteFunc: nil, - } + if _, err := c.fedObjectInformer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(func(o pkgruntime.Object) { + fedObj := o.(*fedcorev1a1.FederatedObject) + c.worker.Enqueue(common.QualifiedName{Namespace: fedObj.Namespace, Name: fedObj.Name}) + })); err != nil { + return nil, err } - c.overridePolicyStore, c.overridePolicyController, err = util.NewGenericInformerWithEventHandler( - controllerConfig.KubeConfig, - "", - &fedcorev1a1.OverridePolicy{}, - util.NoResyncPeriod, - getPolicyHandlers(OverridePolicyNameLabel), - controllerConfig.Metrics, - ) - if err != nil { + if _, err := c.clusterFedObjectInformer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(func(o pkgruntime.Object) { + fedObj := o.(*fedcorev1a1.ClusterFederatedObject) + c.worker.Enqueue(common.QualifiedName{Name: fedObj.Name}) + })); err != nil { return nil, err } - c.clusterOverridePolicyStore, c.clusterOverridePolicyController, err = util.NewGenericInformerWithEventHandler( - controllerConfig.KubeConfig, - "", - &fedcorev1a1.ClusterOverridePolicy{}, - util.NoResyncPeriod, - getPolicyHandlers(ClusterOverridePolicyNameLabel), - controllerConfig.Metrics, - ) - if err != nil { + if _, err := c.overridePolicyInformer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(func(o pkgruntime.Object) { + policy := o.(fedcorev1a1.GenericOverridePolicy) + c.enqueueFedObjectsUsingPolicy(policy, OverridePolicyNameLabel) + })); err != nil { return nil, err } - c.clusterStore, c.clusterController, err = util.NewGenericInformerWithEventHandler( - controllerConfig.KubeConfig, - "", - &fedcorev1a1.FederatedCluster{}, - util.NoResyncPeriod, - &cache.ResourceEventHandlerFuncs{ - /* - No need to reconcile on Add and Delete. Since we only resolve overrides for - scheduled clusters, there's no point in reconciling before scheduler does rescheduling. - */ - AddFunc: nil, - DeleteFunc: nil, - // We only care about label change, since that is the only cluster change - // that can affect overrider computation. - // Currently MatchFields only matches /metadata/name. - // If we extend MatchFields to match new fields, we may need to revise UpdateFunc - // to expand the trigger conditions. - UpdateFunc: func(oldObj, newObj interface{}) { - oldCluster := oldObj.(*fedcorev1a1.FederatedCluster) - newCluster := newObj.(*fedcorev1a1.FederatedCluster) - if !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels) { - c.reconcileOnClusterChange(newCluster) - } - }, + if _, err := c.clusterOverridePolicyInformer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(func(o pkgruntime.Object) { + policy := o.(fedcorev1a1.GenericOverridePolicy) + c.enqueueFedObjectsUsingPolicy(policy, OverridePolicyNameLabel) + })); err != nil { + return nil, err + } + + if _, err := c.federatedClusterInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + /* + No need to reconcile on Add and Delete. Since we only resolve overrides for + scheduled clusters, there's no point in reconciling before scheduler does rescheduling. + */ + AddFunc: nil, + DeleteFunc: nil, + // We only care about label change, since that is the only cluster change + // that can affect overrider computation. + // Currently MatchFields only matches /metadata/name. + // If we extend MatchFields to match new fields, we may need to revise UpdateFunc + // to expand the trigger conditions. + UpdateFunc: func(oldObj, newObj interface{}) { + oldCluster := oldObj.(*fedcorev1a1.FederatedCluster) + newCluster := newObj.(*fedcorev1a1.FederatedCluster) + if !equality.Semantic.DeepEqual(oldCluster.Labels, newCluster.Labels) { + c.reconcileOnClusterChange(newCluster) + } }, - controllerConfig.Metrics, - ) - if err != nil { + }); err != nil { return nil, err } + informerManager.AddFTCUpdateHandler(func(lastObserved, latest *fedcorev1a1.FederatedTypeConfig) { + if lastObserved == nil && latest != nil { + c.enqueueFederatedObjectsForFTC(latest) + return + } + }) + return c, nil } +func (c *Controller) enqueueFederatedObjectsForFTC(ftc *fedcorev1a1.FederatedTypeConfig) { + logger := c.logger.WithValues("ftc", ftc.GetName()) + + logger.V(2).Info("Enqueue federated objects for FTC") + + allObjects := []fedcorev1a1.GenericFederatedObject{} + fedObjects, err := c.fedObjectInformer.Lister().List(labels.Everything()) + if err != nil { + c.logger.Error(err, "Failed to enqueue FederatedObjects for override policy") + return + } + for _, obj := range fedObjects { + allObjects = append(allObjects, obj) + } + clusterFedObjects, err := c.clusterFedObjectInformer.Lister().List(labels.Everything()) + if err != nil { + c.logger.Error(err, "Failed to enqueue ClusterFederatedObjects for override policy") + return + } + for _, obj := range clusterFedObjects { + allObjects = append(allObjects, obj) + } + + for _, obj := range allObjects { + sourceGVK, err := obj.GetSpec().GetTemplateGVK() + if err != nil { + c.logger.Error(err, "Failed to get source GVK from FederatedObject, will not enqueue") + continue + } + if sourceGVK == ftc.GetSourceTypeGVK() { + c.worker.Enqueue(common.NewQualifiedName(obj)) + } + } +} + func (c *Controller) enqueueFedObjectsUsingPolicy(policy fedcorev1a1.GenericOverridePolicy, labelKey string) { - klog.V(2).Infof("%s observed a policy change for %s %q", c.name, util.GetResourceKind(policy), policy.GetKey()) - for _, fedObjectInterface := range c.federatedStore.List() { - fedObject := fedObjectInterface.(*unstructured.Unstructured) - labelValue, exists := fedObject.GetLabels()[labelKey] - if exists && - // fedObject must reference the policy - labelValue == policy.GetName() && - // for ClusterOverridePolicy, fedObject can be cluster-scoped or belong to any namespace - // for OverridePolicy, policy and fedObject must belong to the same namespace; - (policy.GetNamespace() == "" || policy.GetNamespace() == fedObject.GetNamespace()) { - c.worker.EnqueueObject(fedObject) + logger := c.logger.WithValues("override-policy", policy.GetKey()) + logger.V(2).Info("observed a policy change") + + selector := labels.SelectorFromSet(labels.Set{labelKey: policy.GetName()}) + clusterFedObjects, err := c.clusterFedObjectInformer.Lister().List(selector) + if err != nil { + logger.Error(err, "Failed to list reference cluster federated objects") + return + } + + for _, clusterFedObject := range clusterFedObjects { + c.worker.Enqueue(common.QualifiedName{Name: clusterFedObject.GetName()}) + } + + if policy.GetNamespace() != "" { + fedObjects, err := c.fedObjectInformer.Lister().FederatedObjects(policy.GetNamespace()).List(selector) + if err != nil { + logger.Error(err, "Failed to list reference federated objects") + return + } + + for _, fedObject := range fedObjects { + c.worker.Enqueue(common.QualifiedName{ + Namespace: fedObject.GetNamespace(), + Name: fedObject.GetName(), + }) } } } func (c *Controller) reconcileOnClusterChange(cluster *fedcorev1a1.FederatedCluster) { - klog.V(2).Infof("%s observed a cluster change for %q", c.name, cluster.GetName()) - for _, fedObjectInterface := range c.federatedStore.List() { - fedObject := fedObjectInterface.(*unstructured.Unstructured) - labels := fedObject.GetLabels() - // only enqueue fedObjects with a policy since we only need to recompute policies that are already applied - if len(labels[OverridePolicyNameLabel]) > 0 || len(labels[ClusterOverridePolicyNameLabel]) > 0 { - c.worker.EnqueueObject(fedObject) + logger := c.logger.WithValues("federated-cluster", cluster.GetName()) + logger.V(2).Info("observed a cluster change") + + opRequirement, _ := labels.NewRequirement(OverridePolicyNameLabel, selection.Exists, nil) + copRequirement, _ := labels.NewRequirement(ClusterOverridePolicyNameLabel, selection.Exists, nil) + + for _, requirement := range []labels.Requirement{*opRequirement, *copRequirement} { + fedObjects, err := c.fedObjectInformer.Lister().List(labels.NewSelector().Add(requirement)) + if err != nil { + logger.Error(err, "Failed to list federated objects") + return + } + for _, fedObject := range fedObjects { + c.worker.Enqueue(common.QualifiedName{ + Namespace: fedObject.Namespace, + Name: fedObject.Name, + }) + } + + // no need to list cluster federated object for override policy + if requirement.Key() == ClusterOverridePolicyNameLabel { + clusterFedObjects, err := c.clusterFedObjectInformer.Lister().List(labels.NewSelector().Add(requirement)) + if err != nil { + logger.Error(err, "Failed to list cluster federated objects") + return + } + for _, clusterFedObject := range clusterFedObjects { + c.worker.Enqueue(common.QualifiedName{ + Name: clusterFedObject.Name, + }) + } } } } -func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result { - kind := c.typeConfig.GetFederatedType().Kind - key := qualifiedName.String() +func (c *Controller) reconcile(ctx context.Context, qualifiedName common.QualifiedName) (status worker.Result) { + ctx, keyedLogger := logging.InjectLoggerValues(ctx, "federated-name", qualifiedName.String()) - c.metrics.Rate(fmt.Sprintf("%v.throughput", c.name), 1) - klog.V(4).Infof("%s starting to reconcile %s %v", c.name, kind, key) + c.metrics.Rate(fmt.Sprintf("%v.throughput", ControllerName), 1) + keyedLogger.V(3).Info("Starting to reconcile") startTime := time.Now() defer func() { - c.metrics.Duration(fmt.Sprintf("%s.latency", c.name), startTime) - klog.V(4).Infof("%s finished reconciling %s %v (duration: %v)", c.name, kind, key, time.Since(startTime)) + c.metrics.Duration(fmt.Sprintf("%s.latency", ControllerName), startTime) + keyedLogger.WithValues("duration", time.Since(startTime), "status", status).V(3).Info("Finished reconciling") }() fedObject, err := c.getFederatedObject(qualifiedName) if err != nil { - utilruntime.HandleError(err) + keyedLogger.Error(err, "Failed to get federated object") return worker.StatusError } + if fedObject == nil || fedObject.GetDeletionTimestamp() != nil { return worker.StatusAllOK } + templateGVK, err := fedObject.GetSpec().GetTemplateGVK() + if err != nil { + keyedLogger.Error(err, "Failed to get template gvk") + return worker.StatusError + } + + typeConfig, exist := c.informerManager.GetResourceFTC(templateGVK) + if !exist { + keyedLogger.V(3).Info("Resource ftc not found") + return worker.StatusAllOK + } + if ok, err := pendingcontrollers.ControllerDependenciesFulfilled(fedObject, PrefixedControllerName); err != nil { - utilruntime.HandleError(fmt.Errorf("failed to check controller dependencies for %s %q: %w", kind, key, err)) + keyedLogger.Error(err, "Failed to check controller dependencies") return worker.StatusError } else if !ok { return worker.StatusAllOK @@ -280,11 +312,12 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result // TODO: don't apply a policy until it has the required finalizer for deletion protection policies, recheckOnErr, err := lookForMatchedPolicies( fedObject, - c.typeConfig.GetNamespaced(), - c.overridePolicyStore, - c.clusterOverridePolicyStore, + fedObject.GetNamespace() != "", + c.overridePolicyInformer.Lister(), + c.clusterOverridePolicyInformer.Lister(), ) if err != nil { + keyedLogger.Error(err, "Failed to look for matched policy") c.eventRecorder.Eventf( fedObject, corev1.EventTypeWarning, @@ -300,7 +333,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result placedClusters, err := c.getPlacedClusters(fedObject) if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to get placed clusters for %s %q: %w", kind, key, err)) + keyedLogger.Error(err, "Failed to get placed clusters") return worker.StatusError } @@ -325,7 +358,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result currentOverrides, err := util.GetOverrides(fedObject, PrefixedControllerName) if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to get overrides from %s %q: %w", kind, key, err)) + keyedLogger.Error(err, "Failed to get overrides") return worker.StatusError } @@ -334,7 +367,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result if needsUpdate { err = util.SetOverrides(fedObject, PrefixedControllerName, overrides) if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to set overrides for %s %q: %w", kind, key, err)) + keyedLogger.Error(err, "Failed to set overrides") return worker.StatusError } } @@ -343,22 +376,21 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result fedObject, PrefixedControllerName, needsUpdate, - c.typeConfig.GetControllers(), + typeConfig.GetControllers(), ) if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to update pending controllers for %s %q: %w", kind, key, err)) + keyedLogger.Error(err, "Failed to update pending controllers") return worker.StatusError } needsUpdate = needsUpdate || pendingControllersUpdated if needsUpdate { - _, err = c.federatedClient.Resources(fedObject.GetNamespace()). - Update(context.TODO(), fedObject, metav1.UpdateOptions{}) + _, err = fedobjectadapters.Update(context.Background(), c.fedClient.CoreV1alpha1(), fedObject, metav1.UpdateOptions{}) if err != nil { if apierrors.IsConflict(err) { return worker.StatusConflict } - utilruntime.HandleError(fmt.Errorf("failed to update %s %q for applying overrides: %w", kind, key, err)) + keyedLogger.Error(err, "Failed to update federated object for applying overrides") return worker.StatusAllOK } @@ -373,53 +405,61 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result return worker.StatusAllOK } -func (c *Controller) getPlacedClusters(fedObject *unstructured.Unstructured) ([]*fedcorev1a1.FederatedCluster, error) { - placementObj, err := util.UnmarshalGenericPlacements(fedObject) +func (c *Controller) getPlacedClusters(fedObject fedcorev1a1.GenericFederatedObject) ([]*fedcorev1a1.FederatedCluster, error) { + placedClusterNames := fedObject.GetSpec().GetPlacementUnion() + clusterObjs, err := c.federatedClusterInformer.Lister().List(labels.Everything()) if err != nil { - return nil, fmt.Errorf("failed to unmarshal placements: %w", err) + return nil, fmt.Errorf("failed to list federated cluster: %w", err) } - placedClusterNames := placementObj.ClusterNameUnion() - - clusterObjs := c.clusterStore.List() placedClusters := make([]*fedcorev1a1.FederatedCluster, 0, len(clusterObjs)) for _, clusterObj := range clusterObjs { - cluster, ok := clusterObj.(*fedcorev1a1.FederatedCluster) - if !ok { - return nil, fmt.Errorf("got wrong type %T from cluster store", cluster) - } - if _, exists := placedClusterNames[cluster.Name]; exists { - placedClusters = append(placedClusters, cluster) + if _, exists := placedClusterNames[clusterObj.Name]; exists { + placedClusters = append(placedClusters, clusterObj) } } return placedClusters, nil } -func (c *Controller) Run(stopChan <-chan struct{}) { - go c.federatedController.Run(stopChan) - go c.overridePolicyController.Run(stopChan) - go c.clusterOverridePolicyController.Run(stopChan) - go c.clusterController.Run(stopChan) - - if !cache.WaitForNamedCacheSync(c.name, stopChan, - c.federatedController.HasSynced, - c.overridePolicyController.HasSynced, - c.clusterOverridePolicyController.HasSynced, - c.clusterController.HasSynced, - ) { - utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync for controller: %s", c.name)) - } - c.worker.Run(stopChan) +func (c *Controller) Run(ctx context.Context) { + ctx, logger := logging.InjectLogger(ctx, c.logger) + + logger.Info("Starting controller") + defer logger.Info("Stopping controller") + + if !cache.WaitForNamedCacheSync(ControllerName, ctx.Done(), c.HasSynced) { + logger.Error(nil, "Timed out waiting for caches to sync") + return + } + logger.Info("Caches are synced") + c.worker.Run(ctx) + <-ctx.Done() +} + +func (c *Controller) HasSynced() bool { + return c.federatedClusterInformer.Informer().HasSynced() && + c.overridePolicyInformer.Informer().HasSynced() && + c.clusterOverridePolicyInformer.Informer().HasSynced() && + c.fedObjectInformer.Informer().HasSynced() && + c.clusterFedObjectInformer.Informer().HasSynced() && + c.informerManager.HasSynced() } -func (c *Controller) getFederatedObject(qualifiedName common.QualifiedName) (*unstructured.Unstructured, error) { - cachedObj, exist, err := c.federatedStore.GetByKey(qualifiedName.String()) - if err != nil { +func (c *Controller) IsControllerReady() bool { + return c.HasSynced() +} + +func (c *Controller) getFederatedObject(qualifiedName common.QualifiedName) (fedcorev1a1.GenericFederatedObject, error) { + cachedObj, err := fedobjectadapters.GetFromLister(c.fedObjectInformer.Lister(), c.clusterFedObjectInformer.Lister(), + qualifiedName.Namespace, qualifiedName.Name) + if err != nil && !apierrors.IsNotFound(err) { return nil, err } - if !exist { + + if apierrors.IsNotFound(err) { return nil, nil } - return cachedObj.(*unstructured.Unstructured).DeepCopy(), nil + + return cachedObj.DeepCopyGenericFederatedObject(), nil } diff --git a/pkg/controllers/override/util.go b/pkg/controllers/override/util.go index e9b44eadb..cea0b9f76 100644 --- a/pkg/controllers/override/util.go +++ b/pkg/controllers/override/util.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -21,13 +20,12 @@ import ( "encoding/json" "fmt" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/tools/cache" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" + fedcorev1a1listers "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/util" "github.com/kubewharf/kubeadmiral/pkg/controllers/util/clusterselector" ) @@ -44,10 +42,10 @@ that match the obj in the stores. Returns the policy if found, whether a recheck is needed on error, and encountered error if any. */ func lookForMatchedPolicies( - obj *unstructured.Unstructured, + obj fedcorev1a1.GenericFederatedObject, isNamespaced bool, - overridePolicyStore cache.Store, - clusterOverridePolicyStore cache.Store, + overridePolicyStore fedcorev1a1listers.OverridePolicyLister, + clusterOverridePolicyStore fedcorev1a1listers.ClusterOverridePolicyLister, ) ([]fedcorev1a1.GenericOverridePolicy, bool, error) { policies := make([]fedcorev1a1.GenericOverridePolicy, 0) @@ -59,17 +57,14 @@ func lookForMatchedPolicies( return nil, false, fmt.Errorf("policy name cannot be empty") } - matchedPolicyObj, exists, err := clusterOverridePolicyStore.GetByKey(clusterPolicyName) - if err != nil { + matchedPolicy, err := clusterOverridePolicyStore.Get(clusterPolicyName) + if err != nil && !errors.IsNotFound(err) { return nil, true, err } - if !exists { + if errors.IsNotFound(err) { return nil, false, fmt.Errorf("ClusterOverridePolicy %s not found", clusterPolicyName) } - matchedPolicy, ok := matchedPolicyObj.(*fedcorev1a1.ClusterOverridePolicy) - if !ok { - return nil, false, fmt.Errorf("object retrieved from store is not a ClusterOverridePolicy") - } + policies = append(policies, matchedPolicy) } @@ -79,17 +74,12 @@ func lookForMatchedPolicies( return nil, false, fmt.Errorf("policy name cannot be empty") } - key := obj.GetNamespace() + "/" + policyName - matchedPolicyObj, exists, err := overridePolicyStore.GetByKey(key) - if err != nil { + matchedPolicy, err := overridePolicyStore.OverridePolicies(obj.GetNamespace()).Get(policyName) + if err != nil && !errors.IsNotFound(err) { return nil, true, err } - if !exists { - return nil, false, fmt.Errorf("OverridePolicy %s not found", key) - } - matchedPolicy, ok := matchedPolicyObj.(*fedcorev1a1.OverridePolicy) - if !ok { - return nil, false, fmt.Errorf("object retrieved from store is not an OverridePolicy") + if errors.IsNotFound(err) { + return nil, false, fmt.Errorf("OverridePolicy %s/%s not found", matchedPolicy.Namespace, matchedPolicy.Name) } policies = append(policies, matchedPolicy) } @@ -104,7 +94,7 @@ func parseOverrides( overridesMap := make(util.OverridesMap) for _, cluster := range clusters { - patches := make(fedtypesv1a1.OverridePatches, 0) + patches := make(fedcorev1a1.OverridePatches, 0) spec := policy.GetSpec() for i, rule := range spec.OverrideRules { @@ -223,8 +213,8 @@ func isClusterMatchedByClusterAffinity( func policyJsonPatchOverriderToOverridePatch( overrider *fedcorev1a1.JsonPatchOverrider, -) (*fedtypesv1a1.OverridePatch, error) { - overridePatch := &fedtypesv1a1.OverridePatch{ +) (*fedcorev1a1.OverridePatch, error) { + overridePatch := &fedcorev1a1.OverridePatch{ Op: overrider.Operator, Path: overrider.Path, } diff --git a/pkg/controllers/util/clusterselector/util.go b/pkg/controllers/util/clusterselector/util.go index fa6a7bcfa..5f3e2f2aa 100644 --- a/pkg/controllers/util/clusterselector/util.go +++ b/pkg/controllers/util/clusterselector/util.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. diff --git a/pkg/controllers/util/genericinformer.go b/pkg/controllers/util/genericinformer.go index 746211648..d46d8cbeb 100644 --- a/pkg/controllers/util/genericinformer.go +++ b/pkg/controllers/util/genericinformer.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2019 The Kubernetes Authors. diff --git a/pkg/controllers/util/meta.go b/pkg/controllers/util/meta.go index a2feb5bda..eccfa5832 100644 --- a/pkg/controllers/util/meta.go +++ b/pkg/controllers/util/meta.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2016 The Kubernetes Authors. diff --git a/pkg/controllers/util/overrides.go b/pkg/controllers/util/overrides.go index dcf656264..76a2cbc47 100644 --- a/pkg/controllers/util/overrides.go +++ b/pkg/controllers/util/overrides.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2018 The Kubernetes Authors. @@ -30,8 +29,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/sets" - fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/controllers/common" + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" ) // Namespace and name may not be overridden since these fields are the @@ -53,41 +51,23 @@ var invalidPaths = sets.NewString( ) // Mapping of clusterName to overrides for the cluster -type OverridesMap map[string]fedtypesv1a1.OverridePatches - -func UnmarshalGenericOverrides(uns *unstructured.Unstructured) (*fedtypesv1a1.GenericObjectWithOverrides, error) { - obj := &fedtypesv1a1.GenericObjectWithOverrides{} - err := UnstructuredToInterface(uns, obj) - if err != nil { - return nil, err - } - return obj, nil -} +type OverridesMap map[string]fedcorev1a1.OverridePatches // GetOverrides returns a map of overrides populated from the given // unstructured object. -func GetOverrides(rawObj *unstructured.Unstructured, controller string) (OverridesMap, error) { +func GetOverrides(federatedObj fedcorev1a1.GenericFederatedObject, controller string) (OverridesMap, error) { overridesMap := make(OverridesMap) - if rawObj == nil { - return overridesMap, nil - } - - overrideObj, err := UnmarshalGenericOverrides(rawObj) - if err != nil { - return nil, err - } - - if overrideObj.Spec == nil || overrideObj.Spec.Overrides == nil { + if federatedObj == nil || federatedObj.GetSpec().Overrides == nil { // No overrides defined for the federated type return overridesMap, nil } - overrides := overrideObj.Spec.Overrides - var clusterOverrides []fedtypesv1a1.ClusterOverride + overrides := federatedObj.GetSpec().Overrides + var clusterOverrides []fedcorev1a1.ClusterReferenceWithPatches for i := range overrides { if overrides[i].Controller == controller { - clusterOverrides = overrides[i].Clusters + clusterOverrides = overrides[i].Override break } } @@ -97,7 +77,7 @@ func GetOverrides(rawObj *unstructured.Unstructured, controller string) (Overrid } for _, overrideItem := range clusterOverrides { - clusterName := overrideItem.ClusterName + clusterName := overrideItem.Cluster if _, ok := overridesMap[clusterName]; ok { return nil, errors.Errorf("cluster %q appears more than once", clusterName) } @@ -118,20 +98,15 @@ func GetOverrides(rawObj *unstructured.Unstructured, controller string) (Overrid // object from the provided overrides map. // // This function takes ownership of the `overridesMap` and may mutate it arbitrarily. -func SetOverrides(uns *unstructured.Unstructured, controller string, overridesMap OverridesMap) error { +func SetOverrides(federatedObj fedcorev1a1.GenericFederatedObject, controller string, overridesMap OverridesMap) error { for clusterName, clusterOverrides := range overridesMap { if len(clusterOverrides) == 0 { delete(overridesMap, clusterName) } } - overrideObj, err := UnmarshalGenericOverrides(uns) - if err != nil { - return err - } - index := -1 - for i, overrides := range overrideObj.Spec.Overrides { + for i, overrides := range federatedObj.GetSpec().Overrides { if overrides.Controller == controller { index = i break @@ -141,18 +116,18 @@ func SetOverrides(uns *unstructured.Unstructured, controller string, overridesMa if len(overridesMap) == 0 { // delete index if index != -1 { - overrideObj.Spec.Overrides = append(overrideObj.Spec.Overrides[:index], overrideObj.Spec.Overrides[(index+1):]...) + federatedObj.GetSpec().Overrides = append(federatedObj.GetSpec().Overrides[:index], federatedObj.GetSpec().Overrides[(index+1):]...) } } else { if index == -1 { - index = len(overrideObj.Spec.Overrides) - overrideObj.Spec.Overrides = append(overrideObj.Spec.Overrides, fedtypesv1a1.ControllerOverride{ + index = len(federatedObj.GetSpec().Overrides) + federatedObj.GetSpec().Overrides = append(federatedObj.GetSpec().Overrides, fedcorev1a1.OverrideWithController{ Controller: controller, }) } - overrides := &overrideObj.Spec.Overrides[index] - overrides.Clusters = nil + overrides := &federatedObj.GetSpec().Overrides[index] + overrides.Override = nil // Write in ascending order of cluster names for better readability clusterNames := make([]string, 0, len(overridesMap)) @@ -162,19 +137,14 @@ func SetOverrides(uns *unstructured.Unstructured, controller string, overridesMa sort.Strings(clusterNames) for _, clusterName := range clusterNames { clusterOverrides := overridesMap[clusterName] - overrides.Clusters = append(overrides.Clusters, fedtypesv1a1.ClusterOverride{ - ClusterName: clusterName, - Patches: clusterOverrides, + overrides.Override = append(overrides.Override, fedcorev1a1.ClusterReferenceWithPatches{ + Cluster: clusterName, + Patches: clusterOverrides, }) } } - overridesUns, err := InterfaceToUnstructured(overrideObj.Spec.Overrides) - if err != nil { - return err - } - - return unstructured.SetNestedField(uns.Object, overridesUns, common.OverridesPath...) + return nil } // UnstructuredToInterface converts an unstructured object to the @@ -201,7 +171,7 @@ func InterfaceToUnstructured(obj interface{}) (ret interface{}, err error) { } // ApplyJsonPatch applies the override on to the given unstructured object. -func ApplyJsonPatch(obj *unstructured.Unstructured, overrides fedtypesv1a1.OverridePatches) error { +func ApplyJsonPatch(obj *unstructured.Unstructured, overrides fedcorev1a1.OverridePatches) error { // TODO: Do the defaulting of "op" field to "replace" in API defaulting for i, overrideItem := range overrides { if overrideItem.Op == "" {