diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 87811a6a5..aaf3cafdf 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -35,14 +35,16 @@ import ( ) const ( - FederatedClusterControllerName = "cluster" - FederateControllerName = "federate" - MonitorControllerName = "monitor" - FollowerControllerName = "follower" + FederatedClusterControllerName = "cluster" + FederateControllerName = "federate" + MonitorControllerName = "monitor" + FollowerControllerName = "follower" + NamespaceAutoPropagationControllerName = "nsautoprop" ) var knownControllers = map[string]controllermanager.StartControllerFunc{ - FederateControllerName: startFederateController, + FederateControllerName: startFederateController, + NamespaceAutoPropagationControllerName: startNamespaceAutoPropagationController, } var controllersDisabledByDefault = sets.New(MonitorControllerName) diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index 31feff65a..ec454eef0 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -25,6 +25,7 @@ import ( "github.com/kubewharf/kubeadmiral/pkg/controllermanager" controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" "github.com/kubewharf/kubeadmiral/pkg/controllers/federate" + "github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop" ) func startFederateController( @@ -51,3 +52,29 @@ func startFederateController( return federateController, nil } + +func startNamespaceAutoPropagationController( + ctx context.Context, + controllerCtx *controllercontext.Context, +) (controllermanager.Controller, error) { + nsAutoPropController, err := nsautoprop.NewNamespaceAutoPropagationController( + controllerCtx.KubeClientset, + controllerCtx.InformerManager, + controllerCtx.FedClientset, + controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedClusters(), + controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), + controllerCtx.KubeInformerFactory.Core().V1().Namespaces(), + controllerCtx.ComponentConfig.NSAutoPropExcludeRegexp, + controllerCtx.FedSystemNamespace, + controllerCtx.Metrics, + klog.Background(), + controllerCtx.WorkerCount, + ) + if err != nil { + return nil, fmt.Errorf("error creating namespace auto propagation controller: %w", err) + } + + go nsAutoPropController.Run(ctx) + + return nsAutoPropController, nil +} diff --git a/pkg/controllers/nsautoprop/controller.go b/pkg/controllers/nsautoprop/controller.go index 56658cb21..52f36efee 100644 --- a/pkg/controllers/nsautoprop/controller.go +++ b/pkg/controllers/nsautoprop/controller.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -31,27 +30,26 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" + corev1informers "k8s.io/client-go/informers/core/v1" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - fedinformers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions" + fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned" fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - annotationutil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/annotation" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers" - schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" "github.com/kubewharf/kubeadmiral/pkg/stats" + annotationutil "github.com/kubewharf/kubeadmiral/pkg/util/annotation" + "github.com/kubewharf/kubeadmiral/pkg/util/eventhandlers" + "github.com/kubewharf/kubeadmiral/pkg/util/eventsink" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/naming" + "github.com/kubewharf/kubeadmiral/pkg/util/pendingcontrollers" + "github.com/kubewharf/kubeadmiral/pkg/util/worker" ) const ( @@ -60,6 +58,8 @@ const ( EventReasonNamespaceAutoPropagation = "NamespaceAutoPropagation" ) +var namespaceGVK = corev1.SchemeGroupVersion.WithKind("Namespace") + /* NamespacesAutoPropagationController automatically propagates namespaces to all clusters without requiring a ClusterPropagationPolicy for scheduling. @@ -73,130 +73,135 @@ Note that since both NamespaceAutoPropagationController and global-scheduler set if both are enabled, they will conflict with each other and reconcile indefinitely. */ type Controller struct { - // name of controller - name string - - // FederatedTypeConfig for namespaces - typeConfig *fedcorev1a1.FederatedTypeConfig + fedClient fedclient.Interface - dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory - fedInformerFactory fedinformers.SharedInformerFactory + informerManager informermanager.InformerManager + namespaceInformer corev1informers.NamespaceInformer + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer + clusterInformer fedcorev1a1informers.FederatedClusterInformer - // Informer for FederatedCluster - clusterInformer fedcorev1a1informers.FederatedClusterInformer - // Informer for FederatedNamespace - fedNamespaceInformer informers.GenericInformer - // Client for FederatedNamespace - fedNamespaceClient dynamic.NamespaceableResourceInterface - - worker worker.ReconcileWorker + worker worker.ReconcileWorker[common.QualifiedName] eventRecorder record.EventRecorder - fedSystemNamespace string excludeRegexp *regexp.Regexp + fedSystemNamespace string + logger klog.Logger metrics stats.Metrics } -func StartController( - controllerConfig *util.ControllerConfig, - stopChan <-chan struct{}, - typeConfig *fedcorev1a1.FederatedTypeConfig, - kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface, - dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory, - fedInformerFactory fedinformers.SharedInformerFactory, -) error { - controller, err := newController( - controllerConfig, - typeConfig, - kubeClient, - dynamicClient, - fedInformerFactory, - dynamicInformerFactory, - ) - if err != nil { - return err - } - klog.V(4).Infof("Starting namespace auto propagation controller") - go controller.Run(stopChan) - return nil +func (c *Controller) IsControllerReady() bool { + return c.HasSynced() } -func newController( - controllerConfig *util.ControllerConfig, - typeConfig *fedcorev1a1.FederatedTypeConfig, +func NewNamespaceAutoPropagationController( kubeClient kubeclient.Interface, - dynamicClient dynamic.Interface, - fedInformerFactory fedinformers.SharedInformerFactory, - dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory, + informerManager informermanager.InformerManager, + fedClient fedclient.Interface, + clusterInformer fedcorev1a1informers.FederatedClusterInformer, + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer, + namespaceInformer corev1informers.NamespaceInformer, + nsExcludeRegexp *regexp.Regexp, + fedSystemNamespace string, + metrics stats.Metrics, + logger klog.Logger, + workerCount int, ) (*Controller, error) { - userAgent := NamespaceAutoPropagationControllerName - if !typeConfig.IsNamespace() { - return nil, fmt.Errorf("%s expects a FederatedTypeConfig for namespaces", userAgent) - } - - federatedNamespaceApiResource := typeConfig.GetFederatedType() - fedNamespaceGVR := schemautil.APIResourceToGVR(&federatedNamespaceApiResource) c := &Controller{ - name: userAgent, - typeConfig: typeConfig, - eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, userAgent, 4), - dynamicInformerFactory: dynamicInformerFactory, - fedInformerFactory: fedInformerFactory, - fedSystemNamespace: controllerConfig.FedSystemNamespace, - excludeRegexp: controllerConfig.NamespaceAutoPropagationExcludeRegexp, - metrics: controllerConfig.Metrics, - fedNamespaceClient: dynamicClient.Resource(fedNamespaceGVR), - clusterInformer: fedInformerFactory.Core().V1alpha1().FederatedClusters(), - fedNamespaceInformer: dynamicInformerFactory.ForResource(fedNamespaceGVR), + fedClient: fedClient, + + informerManager: informerManager, + clusterFedObjectInformer: clusterFedObjectInformer, + clusterInformer: clusterInformer, + namespaceInformer: namespaceInformer, + + excludeRegexp: nsExcludeRegexp, + fedSystemNamespace: fedSystemNamespace, + + eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, NamespaceAutoPropagationControllerName, 4), + metrics: metrics, + logger: logger.WithValues("controller", NamespaceAutoPropagationControllerName), } - c.worker = worker.NewReconcileWorker( + c.worker = worker.NewReconcileWorker[common.QualifiedName]( + NamespaceAutoPropagationControllerName, + nil, c.reconcile, worker.RateLimiterOptions{}, - controllerConfig.WorkerCount, - controllerConfig.Metrics, - delayingdeliver.NewMetricTags(userAgent, federatedNamespaceApiResource.Kind), + workerCount, + metrics, ) - enqueueObj := c.worker.EnqueueObject - c.fedNamespaceInformer.Informer(). - AddEventHandlerWithResyncPeriod(util.NewTriggerOnAllChanges(enqueueObj), util.NoResyncPeriod) + + if _, err := c.clusterFedObjectInformer.Informer().AddEventHandlerWithResyncPeriod( + eventhandlers.NewTriggerOnAllChanges(func(o runtime.Object) { + fedObj := o.(*fedcorev1a1.ClusterFederatedObject) + logger := c.logger.WithValues("cluster-federated-object", common.NewQualifiedName(fedObj)) + + srcMeta, err := fedObj.Spec.GetTemplateAsUnstructured() + if err != nil { + logger.Error(err, "Failed to get source object's metadata from ClusterFederatedObject") + return + } + + if srcMeta.GetKind() != common.NamespaceKind || !c.shouldBeAutoPropagated(srcMeta) { + return + } + + c.worker.Enqueue(common.QualifiedName{Name: fedObj.GetName()}) + }), util.NoResyncPeriod); err != nil { + return nil, err + } reconcileAll := func() { - for _, fns := range c.fedNamespaceInformer.Informer().GetStore().List() { - enqueueObj(fns.(runtime.Object)) + typeConfig, exists := c.informerManager.GetResourceFTC(namespaceGVK) + if !exists { + c.logger.Error(nil, "Namespace ftc does not exist") + return + } + + allNamespaces, err := c.namespaceInformer.Lister().List(labels.Everything()) + if err != nil { + c.logger.Error(err, "Failed to list all namespaces") + return + } + + for _, ns := range allNamespaces { + c.worker.Enqueue(common.QualifiedName{Name: naming.GenerateFederatedObjectName(ns.Name, typeConfig.Name)}) } } - c.clusterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + + if _, err := c.clusterInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { reconcileAll() }, DeleteFunc: func(obj interface{}) { reconcileAll() }, - }, util.NoResyncPeriod) + }, util.NoResyncPeriod); err != nil { + return nil, err + } + return c, nil } -func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result { - key := qualifiedName.String() +func (c *Controller) reconcile(ctx context.Context, qualifiedName common.QualifiedName) worker.Result { + ctx, keyedLogger := logging.InjectLoggerValues(ctx, "federated-name", qualifiedName.String()) c.metrics.Rate("namespace-auto-propagation-controller.throughput", 1) - klog.V(4).Infof("namespace auto propagation controller starting to reconcile %v", key) + keyedLogger.V(3).Info("Starting to reconcile") startTime := time.Now() defer func() { c.metrics.Duration("namespace-auto-propagation-controller.latency", startTime) - klog.V(4). - Infof("namespace auto propagation controller finished reconciling %v (duration: %v)", key, time.Since(startTime)) + keyedLogger.WithValues("duration", time.Since(startTime)).V(3).Info("Finished reconciling") }() - fedNamespace, err := c.getFederatedObject(qualifiedName) - if err != nil { - utilruntime.HandleError(err) + fedNamespace, err := c.clusterFedObjectInformer.Lister().Get(qualifiedName.Name) + if err != nil && !apierrors.IsNotFound(err) { + keyedLogger.Error(err, "Failed to get federated namespace") return worker.StatusError } - if fedNamespace == nil || fedNamespace.GetDeletionTimestamp() != nil { + + if apierrors.IsNotFound(err) || fedNamespace.GetDeletionTimestamp() != nil { return worker.StatusAllOK } @@ -204,31 +209,43 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result fedNamespace, PrefixedNamespaceAutoPropagationControllerName, ); err != nil { - utilruntime.HandleError(fmt.Errorf("failed to check controller dependencies for %q: %w", key, err)) + keyedLogger.Error(err, "Failed to get pending controllers") return worker.StatusError } else if !ok { return worker.StatusAllOK } - if !c.shouldBeAutoPropagated(fedNamespace) { + typeConfig, exists := c.informerManager.GetResourceFTC(namespaceGVK) + if !exists { + keyedLogger.Error(nil, "Namespace ftc does not exist") + return worker.StatusError + } + + srcMeta, err := fedNamespace.Spec.GetTemplateAsUnstructured() + if err != nil { + keyedLogger.Error(err, "Failed to get source object's metadata from ClusterFederatedObject") + return worker.StatusError + } + + if !c.shouldBeAutoPropagated(srcMeta) { updated, err := pendingcontrollers.UpdatePendingControllers( fedNamespace, PrefixedNamespaceAutoPropagationControllerName, false, - c.typeConfig.GetControllers(), + typeConfig.GetControllers(), ) if err != nil { - utilruntime.HandleError(err) + keyedLogger.Error(err, "Failed to set pending controllers") return worker.StatusError } if updated { - _, err = c.fedNamespaceClient.Update(context.TODO(), fedNamespace, metav1.UpdateOptions{}) + _, err = c.fedClient.CoreV1alpha1().ClusterFederatedObjects().Update(ctx, fedNamespace, metav1.UpdateOptions{}) if err != nil { if apierrors.IsConflict(err) { return worker.StatusConflict } - utilruntime.HandleError(err) + keyedLogger.Error(err, "Failed to update cluster federated object") return worker.StatusError } } @@ -240,23 +257,17 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result // Set placement to propagate to all clusters clusters, err := c.clusterInformer.Lister().List(labels.Everything()) if err != nil { - utilruntime.HandleError(fmt.Errorf("failed to list from cluster store: %w", err)) + keyedLogger.Error(err, "Failed to list federated clusters") return worker.StatusError } - clusterNames := make(map[string]struct{}, len(clusters)) - for _, cluster := range clusters { - clusterNames[cluster.Name] = struct{}{} - } - isDirty, err := util.SetPlacementClusterNames( - fedNamespace, - PrefixedNamespaceAutoPropagationControllerName, - clusterNames, - ) - if err != nil { - utilruntime.HandleError(err) - return worker.StatusError + clusterNames := make([]string, 0, len(clusters)) + for _, clusterName := range clusters { + clusterNames = append(clusterNames, clusterName.Name) } + + isDirty := fedNamespace.Spec.SetControllerPlacement(PrefixedNamespaceAutoPropagationControllerName, clusterNames) + needsUpdate = needsUpdate || isDirty // Set internal versions of the annotations so they do not get overridden by federate controller @@ -290,7 +301,7 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result fedNamespace, PrefixedNamespaceAutoPropagationControllerName, needsUpdate, - c.typeConfig.GetControllers(), + typeConfig.GetControllers(), ) if err != nil { utilruntime.HandleError(err) @@ -302,19 +313,19 @@ func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result return worker.StatusAllOK } - _, err = c.fedNamespaceClient.Update(context.TODO(), fedNamespace, metav1.UpdateOptions{}) + _, err = c.fedClient.CoreV1alpha1().ClusterFederatedObjects().Update(ctx, fedNamespace, metav1.UpdateOptions{}) if err != nil { if apierrors.IsConflict(err) { return worker.StatusConflict } c.eventRecorder.Eventf(fedNamespace, corev1.EventTypeWarning, EventReasonNamespaceAutoPropagation, - "failed to update %s %q for auto propagation, err: %v", - fedNamespace.GetKind(), fedNamespace.GetName(), err) + "failed to update %s for auto propagation, err: %v", + fedNamespace.GetName(), err) return worker.StatusError } c.eventRecorder.Eventf(fedNamespace, corev1.EventTypeNormal, EventReasonNamespaceAutoPropagation, - "updated %s %q for auto propagation", fedNamespace.GetKind(), fedNamespace.GetName()) + "updated %s for auto propagation", fedNamespace.GetName()) return worker.StatusAllOK } @@ -344,38 +355,38 @@ func (c *Controller) shouldBeAutoPropagated(fedNamespace *unstructured.Unstructu return true } -func (c *Controller) ensureAnnotation(fedNamespace *unstructured.Unstructured, key, value string) (bool, error) { +func (c *Controller) ensureAnnotation(fedNamespace *fedcorev1a1.ClusterFederatedObject, key, value string) (bool, error) { needsUpdate, err := annotationutil.AddAnnotation(fedNamespace, key, value) if err != nil { return false, fmt.Errorf( - "failed to add %s annotation to %s %q, err: %w", - key, fedNamespace.GetKind(), fedNamespace.GetName(), err) + "failed to add %s annotation to %s, err: %w", + key, fedNamespace.GetName(), err) } return needsUpdate, nil } -func (c *Controller) Run(stopChan <-chan struct{}) { - c.dynamicInformerFactory.Start(stopChan) - c.fedInformerFactory.Start(stopChan) - if !cache.WaitForNamedCacheSync(c.name, stopChan, c.HasSynced) { +func (c *Controller) Run(ctx context.Context) { + ctx, logger := logging.InjectLogger(ctx, c.logger) + + logger.Info("Starting controller") + defer logger.Info("Stopping controller") + + go c.namespaceInformer.Informer().Run(ctx.Done()) + + if !cache.WaitForNamedCacheSync(NamespaceAutoPropagationControllerName, ctx.Done(), c.HasSynced) { + logger.Error(nil, "Timed out waiting for caches to sync") return } - c.worker.Run(stopChan) + + logger.Info("Caches are synced") + c.worker.Run(ctx) + <-ctx.Done() } func (c *Controller) HasSynced() bool { return c.clusterInformer.Informer().HasSynced() && - c.fedNamespaceInformer.Informer().HasSynced() -} - -func (c *Controller) getFederatedObject(qualifiedName common.QualifiedName) (*unstructured.Unstructured, error) { - cachedObj, err := c.fedNamespaceInformer.Lister().Get(qualifiedName.String()) - if err != nil && !apierrors.IsNotFound(err) { - return nil, err - } - if err != nil { - return nil, nil - } - return cachedObj.(*unstructured.Unstructured).DeepCopy(), nil + c.clusterFedObjectInformer.Informer().HasSynced() && + c.namespaceInformer.Informer().HasSynced() && + c.informerManager.HasSynced() } diff --git a/pkg/controllers/util/controllerconfig.go b/pkg/controllers/util/controllerconfig.go index c13f92fbc..88fea409e 100644 --- a/pkg/controllers/util/controllerconfig.go +++ b/pkg/controllers/util/controllerconfig.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2018 The Kubernetes Authors. diff --git a/pkg/controllers/util/federatedclient/podinformer.go b/pkg/controllers/util/federatedclient/podinformer.go index b6fc61a16..03e7a0309 100644 --- a/pkg/controllers/util/federatedclient/podinformer.go +++ b/pkg/controllers/util/federatedclient/podinformer.go @@ -1,4 +1,5 @@ //go:build exclude + /* Copyright 2023 The KubeAdmiral Authors. diff --git a/pkg/controllers/util/genericinformer.go b/pkg/controllers/util/genericinformer.go index 746211648..d46d8cbeb 100644 --- a/pkg/controllers/util/genericinformer.go +++ b/pkg/controllers/util/genericinformer.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2019 The Kubernetes Authors.