From 3473b46e0e58baf1bc52e3ef1206b5fe243911ba Mon Sep 17 00:00:00 2001 From: "lihanbo.0316" Date: Wed, 12 Jul 2023 16:34:39 +0800 Subject: [PATCH] refactor: follower controller --- .gitignore | 1 + .../app/controllermanager.go | 1 + cmd/controller-manager/app/core.go | 24 + config/sample/extra/pod-ftc.yaml | 1 - config/sample/host/01-ftc.yaml | 5 - pkg/client/generic/genericclient.go | 117 ----- .../follower/bidirectional_cache.go | 5 +- pkg/controllers/follower/controller.go | 477 ++++++++---------- pkg/controllers/follower/util.go | 105 ++-- pkg/controllers/follower/util_test.go | 16 +- 10 files changed, 290 insertions(+), 462 deletions(-) diff --git a/.gitignore b/.gitignore index 3d3028a7..00dae16d 100644 --- a/.gitignore +++ b/.gitignore @@ -28,6 +28,7 @@ output vendor .vscode *debug* +.DS_Store # log files *.log diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index b8211e92..cc8f3c13 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -55,6 +55,7 @@ var knownControllers = map[string]controllermanager.StartControllerFunc{ FederatedClusterControllerName: startFederatedClusterController, SchedulerName: startScheduler, SyncControllerName: startSyncController, + FollowerControllerName: startFollowerController, } var controllersDisabledByDefault = sets.New[string]() diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index a105add5..04466923 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -26,6 +26,7 @@ import ( controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" "github.com/kubewharf/kubeadmiral/pkg/controllers/federate" "github.com/kubewharf/kubeadmiral/pkg/controllers/federatedcluster" + "github.com/kubewharf/kubeadmiral/pkg/controllers/follower" "github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop" "github.com/kubewharf/kubeadmiral/pkg/controllers/override" "github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc" @@ -237,3 +238,26 @@ func startSyncController( return syncController, nil } + +func startFollowerController( + ctx context.Context, + controllerCtx *controllercontext.Context, +) (controllermanager.Controller, error) { + followerController, err := follower.NewFollowerController( + controllerCtx.KubeClientset, + controllerCtx.FedClientset, + controllerCtx.InformerManager, + controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedObjects(), + controllerCtx.FedInformerFactory.Core().V1alpha1().ClusterFederatedObjects(), + controllerCtx.Metrics, + klog.Background(), + controllerCtx.WorkerCount, + ) + if err != nil { + return nil, fmt.Errorf("error creating follower controller: %w", err) + } + + go followerController.Run(ctx) + + return followerController, nil +} diff --git a/config/sample/extra/pod-ftc.yaml b/config/sample/extra/pod-ftc.yaml index 464df484..38ca9acd 100644 --- a/config/sample/extra/pod-ftc.yaml +++ b/config/sample/extra/pod-ftc.yaml @@ -31,7 +31,6 @@ spec: controllers: - - kubeadmiral.io/global-scheduler - - kubeadmiral.io/overridepolicy-controller - - - kubeadmiral.io/follower-controller statusCollection: fields: - metadata.creationTimestamp diff --git a/config/sample/host/01-ftc.yaml b/config/sample/host/01-ftc.yaml index 5c6c05db..ee160329 100644 --- a/config/sample/host/01-ftc.yaml +++ b/config/sample/host/01-ftc.yaml @@ -51,7 +51,6 @@ spec: controllers: - - kubeadmiral.io/global-scheduler - - kubeadmiral.io/overridepolicy-controller - - - kubeadmiral.io/follower-controller pathDefinition: labelSelector: spec.selector replicasSpec: spec.replicas @@ -264,7 +263,6 @@ spec: controllers: - - kubeadmiral.io/global-scheduler - - kubeadmiral.io/overridepolicy-controller - - - kubeadmiral.io/follower-controller statusCollection: enabled: true fields: @@ -288,7 +286,6 @@ spec: controllers: - - kubeadmiral.io/global-scheduler - - kubeadmiral.io/overridepolicy-controller - - - kubeadmiral.io/follower-controller statusCollection: enabled: true fields: @@ -309,7 +306,6 @@ spec: controllers: - - kubeadmiral.io/global-scheduler - - kubeadmiral.io/overridepolicy-controller - - - kubeadmiral.io/follower-controller statusCollection: enabled: true fields: @@ -330,7 +326,6 @@ spec: controllers: - - kubeadmiral.io/global-scheduler - - kubeadmiral.io/overridepolicy-controller - - - kubeadmiral.io/follower-controller statusCollection: enabled: true fields: diff --git a/pkg/client/generic/genericclient.go b/pkg/client/generic/genericclient.go index f22ba80f..e372f51c 100644 --- a/pkg/client/generic/genericclient.go +++ b/pkg/client/generic/genericclient.go @@ -23,21 +23,15 @@ package generic import ( "context" "fmt" - "strings" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kubewharf/kubeadmiral/pkg/client/generic/scheme" - "github.com/kubewharf/kubeadmiral/pkg/controllers/common" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/history" ) type Client interface { @@ -48,7 +42,6 @@ type Client interface { List(ctx context.Context, obj client.ObjectList, namespace string) error UpdateStatus(ctx context.Context, obj client.Object) error Patch(ctx context.Context, obj client.Object, patch client.Patch) error - Rollback(ctx context.Context, obj client.Object, toRevision int64) error DeleteHistory(ctx context.Context, obj client.Object) error ListWithOptions(ctx context.Context, obj client.ObjectList, opts ...client.ListOption) error @@ -121,91 +114,6 @@ func (c *genericClient) Patch(ctx context.Context, obj client.Object, patch clie return c.client.Patch(ctx, obj, patch) } -// Rollback rollbacks federated Object such as FederatedDeployment -func (c *genericClient) Rollback(ctx context.Context, obj client.Object, toRevision int64) error { - if toRevision < 0 { - return fmt.Errorf("unable to find specified revision %v in history", toRevision) - } - if toRevision == 0 { - // try to get last revision from annotations, fallback to list all revisions on error - if err := c.rollbackToLastRevision(ctx, obj); err == nil { - return nil - } - } - - history, err := c.controlledHistory(ctx, obj) - if err != nil { - return fmt.Errorf("failed to list history: %s", err) - } - if toRevision == 0 && len(history) <= 1 { - return fmt.Errorf("no last revision to roll back to") - } - - toHistory := findHistory(toRevision, history) - if toHistory == nil { - return fmt.Errorf("unable to find specified revision %v in history", toHistory) - } - - // Restore revision - if err := c.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, toHistory.Data.Raw)); err != nil { - return fmt.Errorf("failed restoring revision %d: %v", toRevision, err) - } - return nil -} - -func (c *genericClient) rollbackToLastRevision(ctx context.Context, obj client.Object) error { - accessor, err := meta.Accessor(obj) - if err != nil { - return err - } - lastRevisionNameWithHash := accessor.GetAnnotations()[common.LastRevisionAnnotation] - if len(lastRevisionNameWithHash) == 0 { - return fmt.Errorf("annotation: %s not found", common.LastRevisionAnnotation) - } - - lastRevisionName, err := c.checkLastRevisionNameWithHash(lastRevisionNameWithHash, obj) - if err != nil { - return fmt.Errorf("failed to check last revision name, err: %v", err) - } - - latestRevision := &appsv1.ControllerRevision{} - if err := c.Get(ctx, latestRevision, accessor.GetNamespace(), lastRevisionName); err != nil { - return err - } - - // restore latest revision - if err := c.Patch(ctx, obj, client.RawPatch(types.JSONPatchType, latestRevision.Data.Raw)); err != nil { - return fmt.Errorf("failed restoring latest revision: %v", err) - } - return nil -} - -func (c *genericClient) checkLastRevisionNameWithHash(lastRevisionNameWithHash string, obj client.Object) (string, error) { - parts := strings.Split(lastRevisionNameWithHash, "|") - if len(parts) != 2 { - return "", fmt.Errorf("invalid lastRevisionNameWithHash: %s", lastRevisionNameWithHash) - } - lastRevisionName, hash := parts[0], parts[1] - - utdObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) - if err != nil { - return "", err - } - - template, ok, err := unstructured.NestedMap(utdObj, "spec", "template", "spec", "template") - if err != nil { - return "", err - } - if !ok { - return "", fmt.Errorf("spec.template.spec.template is not found, fedResource: %+v", obj) - } - - if templateHash := history.HashObject(template); templateHash != hash { - return "", fmt.Errorf("pod template hash: %s, last revision name suffix: %s, they should be equal", templateHash, hash) - } - return lastRevisionName, nil -} - // controlledHistories returns all ControllerRevisions in namespace that selected by selector and owned by accessor func (c *genericClient) controlledHistory(ctx context.Context, obj client.Object) ([]*appsv1.ControllerRevision, error) { accessor, err := meta.Accessor(obj) @@ -246,28 +154,3 @@ func (c *genericClient) DeleteHistory(ctx context.Context, obj client.Object) er } return nil } - -// findHistory returns a controllerrevision of a specific revision from the given controllerrevisions. -// It returns nil if no such controllerrevision exists. -// If toRevision is 0, the last previously used history is returned. -func findHistory(toRevision int64, allHistory []*appsv1.ControllerRevision) *appsv1.ControllerRevision { - if toRevision == 0 && len(allHistory) <= 1 { - return nil - } - - // Find the history to rollback to - var toHistory *appsv1.ControllerRevision - if toRevision == 0 { - // If toRevision == 0, find the latest revision (2nd max) - history.SortControllerRevisions(allHistory) - toHistory = allHistory[len(allHistory)-2] - } else { - for _, h := range allHistory { - if h.Revision == toRevision { - // If toRevision != 0, find the history with matching revision - return h - } - } - } - return toHistory -} diff --git a/pkg/controllers/follower/bidirectional_cache.go b/pkg/controllers/follower/bidirectional_cache.go index 1ea4af86..8935eb5d 100644 --- a/pkg/controllers/follower/bidirectional_cache.go +++ b/pkg/controllers/follower/bidirectional_cache.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -40,13 +39,13 @@ type bidirectionalCache[V1, V2 comparable] struct { func (c *bidirectionalCache[V1, V2]) lookup(key V1) sets.Set[V2] { c.RLock() defer c.RUnlock() - return c.cache[key] + return c.cache[key].Clone() } func (c *bidirectionalCache[V1, V2]) reverseLookup(key V2) sets.Set[V1] { c.RLock() defer c.RUnlock() - return c.reverseCache[key] + return c.reverseCache[key].Clone() } func (c *bidirectionalCache[V1, V2]) update(key V1, newValues sets.Set[V2]) { diff --git a/pkg/controllers/follower/controller.go b/pkg/controllers/follower/controller.go index 009a4c5c..cc79a486 100644 --- a/pkg/controllers/follower/controller.go +++ b/pkg/controllers/follower/controller.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -20,6 +19,7 @@ package follower import ( "context" "fmt" + "sync" "time" appsv1 "k8s.io/api/apps/v1" @@ -29,29 +29,26 @@ import ( "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - fedtypesv1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/types/v1alpha1" fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned" + fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/eventsink" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/pendingcontrollers" - schemautil "github.com/kubewharf/kubeadmiral/pkg/controllers/util/schema" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" "github.com/kubewharf/kubeadmiral/pkg/stats" + "github.com/kubewharf/kubeadmiral/pkg/util/eventhandlers" + "github.com/kubewharf/kubeadmiral/pkg/util/eventsink" + "github.com/kubewharf/kubeadmiral/pkg/util/fedobjectadapters" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" + "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/naming" + "github.com/kubewharf/kubeadmiral/pkg/util/worker" ) const ( @@ -92,40 +89,22 @@ var ( // TODO: support handles-object annotations in this controller? // TODO: support parsing followers introduced by overrides -// Handles for a leader or follower type. -type typeHandles struct { - // +federatedKind - name string - typeConfig *fedcorev1a1.FederatedTypeConfig - sourceGK schema.GroupKind - federatedGK schema.GroupKind - informer informers.GenericInformer - client dynamic.NamespaceableResourceInterface - worker worker.ReconcileWorker -} - type Controller struct { - name string + gkToFTCLock sync.RWMutex + gkToFTCName map[schema.GroupKind]string + + cacheObservedFromLeaders *bidirectionalCache[fedcorev1a1.LeaderReference, FollowerReference] + cacheObservedFromFollowers *bidirectionalCache[FollowerReference, fedcorev1a1.LeaderReference] + worker worker.ReconcileWorker[objectGroupKindKey] + informerManager informermanager.InformerManager + fedObjectInformer fedcorev1a1informers.FederatedObjectInformer + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer + + fedClient fedclient.Interface eventRecorder record.EventRecorder metrics stats.Metrics logger klog.Logger - - // The following maps are written during initialization and only read afterward, - // therefore no locks are required. - - // map from source GroupKind to federated GroupKind - sourceToFederatedGKMap map[schema.GroupKind]schema.GroupKind - // map from leader federated GroupKind to typeHandle - leaderTypeHandles map[schema.GroupKind]*typeHandles - // map from follower federated GroupKind to typeHandle - followerTypeHandles map[schema.GroupKind]*typeHandles - - cacheObservedFromLeaders *bidirectionalCache[fedtypesv1a1.LeaderReference, FollowerReference] - cacheObservedFromFollowers *bidirectionalCache[FollowerReference, fedtypesv1a1.LeaderReference] - - kubeClient kubernetes.Interface - fedClient fedclient.Interface } func (c *Controller) IsControllerReady() bool { @@ -134,121 +113,118 @@ func (c *Controller) IsControllerReady() bool { func NewFollowerController( kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface, fedClient fedclient.Interface, - informerFactory dynamicinformer.DynamicSharedInformerFactory, + informerManager informermanager.InformerManager, + fedObjectInformer fedcorev1a1informers.FederatedObjectInformer, + clusterFedObjectInformer fedcorev1a1informers.ClusterFederatedObjectInformer, metrics stats.Metrics, + logger klog.Logger, workerCount int, ) (*Controller, error) { c := &Controller{ - name: ControllerName, + gkToFTCName: make(map[schema.GroupKind]string), + informerManager: informerManager, + fedObjectInformer: fedObjectInformer, + clusterFedObjectInformer: clusterFedObjectInformer, + cacheObservedFromLeaders: newBidirectionalCache[fedcorev1a1.LeaderReference, FollowerReference](), + cacheObservedFromFollowers: newBidirectionalCache[FollowerReference, fedcorev1a1.LeaderReference](), + fedClient: fedClient, eventRecorder: eventsink.NewDefederatingRecorderMux(kubeClient, ControllerName, 4), metrics: metrics, - logger: klog.LoggerWithValues(klog.Background(), "controller", ControllerName), - sourceToFederatedGKMap: make(map[schema.GroupKind]schema.GroupKind), - leaderTypeHandles: make(map[schema.GroupKind]*typeHandles), - followerTypeHandles: make(map[schema.GroupKind]*typeHandles), - cacheObservedFromLeaders: newBidirectionalCache[fedtypesv1a1.LeaderReference, FollowerReference](), - cacheObservedFromFollowers: newBidirectionalCache[FollowerReference, fedtypesv1a1.LeaderReference](), - kubeClient: kubeClient, - fedClient: fedClient, + logger: logger.WithValues("controller", ControllerName), } - getHandles := func( - ftc *fedcorev1a1.FederatedTypeConfig, - handleNamePrefix string, - reconcile func(*typeHandles, common.QualifiedName) worker.Result, - ) *typeHandles { - targetType := ftc.GetTargetType() - federatedType := ftc.GetFederatedType() - federatedGVR := schemautil.APIResourceToGVR(&federatedType) - - handles := &typeHandles{ - name: handleNamePrefix + "-" + federatedType.Kind, - typeConfig: ftc, - sourceGK: schemautil.APIResourceToGVK(&targetType).GroupKind(), - federatedGK: schemautil.APIResourceToGVK(&federatedType).GroupKind(), - informer: informerFactory.ForResource(federatedGVR), - client: dynamicClient.Resource(federatedGVR), - } - handles.worker = worker.NewReconcileWorker( - func(qualifiedName common.QualifiedName) worker.Result { - return reconcile(handles, qualifiedName) - }, - worker.RateLimiterOptions{}, - workerCount, - c.metrics, - delayingdeliver.NewMetricTags("follower-controller-worker", handles.name), - ) - handles.informer.Informer().AddEventHandlerWithResyncPeriod( - util.NewTriggerOnAllChanges(handles.worker.EnqueueObject), - util.NoResyncPeriod, - ) - - return handles + if _, err := c.fedObjectInformer.Informer().AddEventHandlerWithResyncPeriod( + eventhandlers.NewTriggerOnAllChanges(c.enqueueSupportedType), + util.NoResyncPeriod, + ); err != nil { + return nil, err } - ftcs, err := c.fedClient.CoreV1alpha1(). - FederatedTypeConfigs(). - List(context.TODO(), metav1.ListOptions{}) - if err != nil { + if _, err := c.clusterFedObjectInformer.Informer().AddEventHandlerWithResyncPeriod( + eventhandlers.NewTriggerOnAllChanges(c.enqueueSupportedType), + util.NoResyncPeriod, + ); err != nil { return nil, err } - // Find the supported leader and follower types and create their handles - for i := range ftcs.Items { - ftc := &ftcs.Items[i] - targetType := ftc.Spec.TargetType - federatedType := ftc.Spec.FederatedType + c.worker = worker.NewReconcileWorker[objectGroupKindKey]( + "follower-controller-worker", + c.reconcile, + worker.RateLimiterOptions{}, + workerCount, + c.metrics, + ) + + if err := c.informerManager.AddFTCUpdateHandler(func(_, latest *fedcorev1a1.FederatedTypeConfig) { + targetType := latest.Spec.SourceType targetGK := schema.GroupKind{Group: targetType.Group, Kind: targetType.Kind} - federatedGK := schema.GroupKind{Group: federatedType.Group, Kind: federatedType.Kind} - - if _, exists := leaderPodTemplatePaths[targetGK]; exists { - handles := getHandles(ftc, "leader", c.reconcileLeader) - c.sourceToFederatedGKMap[targetGK] = federatedGK - c.leaderTypeHandles[federatedGK] = handles - c.logger.V(2).Info(fmt.Sprintf("Found supported leader FederatedTypeConfig %s", ftc.Name)) - } else if supportedFollowerTypes.Has(targetGK) { - handles := getHandles(ftc, "follower", c.reconcileFollower) - c.sourceToFederatedGKMap[targetGK] = federatedGK - c.followerTypeHandles[federatedGK] = handles - c.logger.V(2).Info(fmt.Sprintf("Found supported follower FederatedTypeConfig %s", ftc.Name)) - } + + c.gkToFTCLock.Lock() + defer c.gkToFTCLock.Unlock() + + c.gkToFTCName[targetGK] = latest.Name + }); err != nil { + return nil, err } return c, nil } -func (c *Controller) Run(stopChan <-chan struct{}) { - c.logger.Info("Starting controller") - defer c.logger.Info("Stopping controller") +func (c *Controller) Run(ctx context.Context) { + ctx, logger := logging.InjectLogger(ctx, c.logger) + + logger.Info("Starting controller") + defer logger.Info("Stopping controller") - if !cache.WaitForNamedCacheSync(c.name, stopChan, c.HasSynced) { + if !cache.WaitForNamedCacheSync(ControllerName, ctx.Done(), c.HasSynced) { + logger.Error(nil, "Timed out waiting for caches to sync") return } + logger.Info("Caches are synced") + c.worker.Run(ctx) + <-ctx.Done() +} + +func (c *Controller) HasSynced() bool { + return c.fedObjectInformer.Informer().HasSynced() && + c.clusterFedObjectInformer.Informer().HasSynced() +} - for _, handle := range c.leaderTypeHandles { - handle.worker.Run(stopChan) +func (c *Controller) enqueueSupportedType(object interface{}) { + fedObject, ok := object.(fedcorev1a1.GenericFederatedObject) + if !ok { + return } - for _, handle := range c.followerTypeHandles { - handle.worker.Run(stopChan) + + template, err := fedObject.GetSpec().GetTemplateAsUnstructured() + if err != nil { + return } - <-stopChan + templateGK := template.GroupVersionKind().GroupKind() + _, isLeader := leaderPodTemplatePaths[templateGK] + isFollower := supportedFollowerTypes.Has(templateGK) + if isLeader || isFollower { + c.worker.Enqueue(objectGroupKindKey{ + sourceGK: templateGK, + namespace: fedObject.GetNamespace(), + fedName: fedObject.GetName(), + sourceName: template.GetName(), + }) + } } -func (c *Controller) HasSynced() bool { - for _, handle := range c.leaderTypeHandles { - if !handle.informer.Informer().HasSynced() { - return false - } +func (c *Controller) reconcile(ctx context.Context, key objectGroupKindKey) (status worker.Result) { + if _, exists := leaderPodTemplatePaths[key.sourceGK]; exists { + return c.reconcileLeader(ctx, key) } - for _, handle := range c.followerTypeHandles { - if !handle.informer.Informer().HasSynced() { - return false - } + + if _, exists := supportedFollowerTypes[key.sourceGK]; exists { + return c.reconcileFollower(ctx, key) } - return true + + return worker.StatusAllOK } /* @@ -256,47 +232,42 @@ Reconciles the leader to make sure its desired followers (derivable from the lea and its stale followers (derivable from cache) no longer reference it. */ func (c *Controller) reconcileLeader( - handles *typeHandles, - qualifiedName common.QualifiedName, + ctx context.Context, + key objectGroupKindKey, ) (status worker.Result) { - c.metrics.Rate(fmt.Sprintf("follower-controller-%s.throughput", handles.name), 1) - key := qualifiedName.String() - logger := c.logger.WithValues("origin", "reconcileLeader", "type", handles.name, "key", key) + c.metrics.Rate(fmt.Sprintf("follower-controller-%s.throughput", key.sourceGK.String()), 1) + ctx, keyedLogger := logging.InjectLoggerValues(ctx, "origin", "reconcileLeader", "type", key.sourceGK.String(), "key", key.ObjectSourceKey()) startTime := time.Now() - logger.V(3).Info("Starting reconcileLeader") + keyedLogger.V(3).Info("Starting reconcileLeader") defer func() { - c.metrics.Duration(fmt.Sprintf("follower-controller-%s.latency", handles.name), startTime) - logger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcileLeader") + c.metrics.Duration(fmt.Sprintf("follower-controller-%s.latency", key.sourceGK.String()), startTime) + keyedLogger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcileLeader") }() - leader := fedtypesv1a1.LeaderReference{ - Group: handles.federatedGK.Group, - Kind: handles.federatedGK.Kind, - Namespace: qualifiedName.Namespace, - Name: qualifiedName.Name, + leader := fedcorev1a1.LeaderReference{ + Group: key.sourceGK.Group, + Kind: key.sourceGK.Kind, + Namespace: key.namespace, + Name: key.sourceName, } - fedObj, err := getObjectFromStore(handles.informer.Informer().GetStore(), key) - if err != nil { - logger.Error(err, "Failed to retrieve object from store") + fedObj, err := fedobjectadapters.GetFromLister(c.fedObjectInformer.Lister(), c.clusterFedObjectInformer.Lister(), key.namespace, key.fedName) + if err != nil && !apierrors.IsNotFound(err) { + keyedLogger.Error(err, "Failed to get leader object from store") return worker.StatusError } + if apierrors.IsNotFound(err) { + fedObj = nil + } + var desiredFollowers sets.Set[FollowerReference] if fedObj != nil { - // We only need to check for dependencies if the object exists - if ok, err := pendingcontrollers.ControllerDependenciesFulfilled(fedObj, PrefixedControllerName); err != nil { - logger.Error(err, "Failed to check controller dependencies") - return worker.StatusError - } else if !ok { - return worker.StatusAllOK - } - // Only leaders that have not been deleted should have followers - desiredFollowers, err = c.inferFollowers(handles, fedObj) + desiredFollowers, err = c.inferFollowers(key.sourceGK, fedObj) if err != nil { - logger.Error(err, "Failed to infer followers") + keyedLogger.Error(err, "Failed to infer followers") if fedObj != nil { c.eventRecorder.Eventf( fedObj, @@ -309,67 +280,55 @@ func (c *Controller) reconcileLeader( return worker.StatusError } } + c.cacheObservedFromLeaders.update(leader, desiredFollowers) currentFollowers := c.cacheObservedFromFollowers.reverseLookup(leader) // enqueue all followers whose desired state may have changed - for follower := range desiredFollowers.Union(currentFollowers) { - handles, exists := c.followerTypeHandles[follower.GroupKind] + c.enqueueFollowers(desiredFollowers.Union(currentFollowers)) + + return worker.StatusAllOK +} + +func (c *Controller) enqueueFollowers(followers sets.Set[FollowerReference]) { + c.gkToFTCLock.RLock() + defer c.gkToFTCLock.RUnlock() + + // enqueue all followers whose desired state may have changed + for follower := range followers { + ftcName, exists := c.gkToFTCName[follower.GroupKind] if !exists { - logger.WithValues("follower", follower).Error(nil, "Unsupported follower type") - return worker.StatusError + continue } - handles.worker.Enqueue( - common.QualifiedName{Namespace: follower.Namespace, Name: follower.Name}, - ) - } - if fedObj != nil { - updated, err := pendingcontrollers.UpdatePendingControllers( - fedObj, - PrefixedControllerName, - false, - handles.typeConfig.GetControllers(), + c.worker.Enqueue( + objectGroupKindKey{ + sourceGK: follower.GroupKind, + namespace: follower.Namespace, + sourceName: follower.Name, + fedName: naming.GenerateFederatedObjectName(follower.Name, ftcName), + }, ) - if err != nil { - logger.Error(err, "Failed to set pending controllers") - return worker.StatusError - } - if updated { - logger.V(1).Info("Updating leader to sync with pending controllers") - _, err = handles.client.Namespace(fedObj.GetNamespace()). - Update(context.Background(), fedObj, metav1.UpdateOptions{}) - if err != nil { - if apierrors.IsConflict(err) { - return worker.StatusConflict - } - logger.Error(err, "Failed to update after modifying pending controllers") - return worker.StatusError - } - } } - - return worker.StatusAllOK } func (c *Controller) inferFollowers( - handles *typeHandles, - fedObj *unstructured.Unstructured, + sourceGK schema.GroupKind, + fedObj fedcorev1a1.GenericFederatedObject, ) (sets.Set[FollowerReference], error) { if fedObj.GetAnnotations()[common.EnableFollowerSchedulingAnnotation] != common.AnnotationValueTrue { // follower scheduling is not enabled return nil, nil } - followersFromAnnotation, err := getFollowersFromAnnotation(fedObj, c.sourceToFederatedGKMap) + followersFromAnnotation, err := getFollowersFromAnnotation(fedObj) if err != nil { return nil, err } followersFromPodTemplate, err := getFollowersFromPodTemplate( fedObj, - leaderPodTemplatePaths[handles.sourceGK], - c.sourceToFederatedGKMap, + leaderPodTemplatePaths[sourceGK], ) if err != nil { return nil, err @@ -380,18 +339,14 @@ func (c *Controller) inferFollowers( func (c *Controller) updateFollower( ctx context.Context, - handles *typeHandles, - followerUns *unstructured.Unstructured, - followerObj *fedtypesv1a1.GenericFederatedFollower, + followerObj fedcorev1a1.GenericFederatedObject, leadersChanged bool, - leaders []fedtypesv1a1.LeaderReference, + leaders []fedcorev1a1.LeaderReference, ) (updated bool, err error) { logger := klog.FromContext(ctx) if leadersChanged { - if err := fedtypesv1a1.SetFollows(followerUns, leaders); err != nil { - return false, fmt.Errorf("set leaders on follower: %w", err) - } + followerObj.GetSpec().Follows = leaders } clusters, err := c.leaderPlacementUnion(leaders) @@ -399,19 +354,11 @@ func (c *Controller) updateFollower( return false, fmt.Errorf("get leader placement union: %w", err) } - placementsChanged := followerObj.Spec.SetPlacementNames(PrefixedControllerName, clusters) - if placementsChanged { - err = util.SetGenericPlacements(followerUns, followerObj.Spec.Placements) - if err != nil { - return false, fmt.Errorf("set placements: %w", err) - } - } - + placementsChanged := followerObj.GetSpec().SetControllerPlacement(PrefixedControllerName, clusters.UnsortedList()) needsUpdate := leadersChanged || placementsChanged if needsUpdate { logger.V(1).Info("Updating follower to sync with leaders") - _, err = handles.client.Namespace(followerUns.GetNamespace()). - Update(context.TODO(), followerUns, metav1.UpdateOptions{}) + _, err = fedobjectadapters.Update(ctx, c.fedClient.CoreV1alpha1(), followerObj, metav1.UpdateOptions{}) if err != nil { return false, fmt.Errorf("update follower: %w", err) } @@ -425,50 +372,44 @@ func (c *Controller) updateFollower( Reconciles the follower so it references the desired leaders and has the correct placements. */ func (c *Controller) reconcileFollower( - handles *typeHandles, - qualifiedName common.QualifiedName, + ctx context.Context, + key objectGroupKindKey, ) (status worker.Result) { - c.metrics.Rate(fmt.Sprintf("follower-controller-%s.throughput", handles.name), 1) - key := qualifiedName.String() - logger := c.logger.WithValues("origin", "reconcileFollower", "type", handles.name, "key", key) - ctx := klog.NewContext(context.TODO(), logger) + c.metrics.Rate(fmt.Sprintf("follower-controller-%s.throughput", key.sourceGK.String()), 1) + ctx, keyedLogger := logging.InjectLoggerValues(ctx, "origin", "reconcileFollower", "type", key.sourceGK.String(), "key", key.ObjectSourceKey()) + startTime := time.Now() - logger.V(3).Info("Starting reconcileFollower") + keyedLogger.V(3).Info("Starting reconcileFollower") defer func() { - c.metrics.Duration(fmt.Sprintf("follower-controller-%s.latency", handles.name), startTime) - logger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcileFollower") + c.metrics.Duration(fmt.Sprintf("follower-controller-%s.latency", key.sourceGK.String()), startTime) + keyedLogger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcileFollower") }() follower := FollowerReference{ - GroupKind: handles.federatedGK, - Namespace: qualifiedName.Namespace, - Name: qualifiedName.Name, + GroupKind: key.sourceGK, + Namespace: key.namespace, + Name: key.sourceName, } - followerUns, err := getObjectFromStore( - handles.informer.Informer().GetStore(), - qualifiedName.String(), + followerObject, err := fedobjectadapters.GetFromLister( + c.fedObjectInformer.Lister(), + c.clusterFedObjectInformer.Lister(), + key.namespace, + key.fedName, ) - if err != nil { - logger.Error(err, "Failed to get follower object from store") + if err != nil && !apierrors.IsNotFound(err) { + keyedLogger.Error(err, "Failed to get follower object from store") return worker.StatusError } - if followerUns == nil { + if apierrors.IsNotFound(err) { // The deleted follower no longer references any leaders c.cacheObservedFromFollowers.update(follower, nil) return worker.StatusAllOK } - followerObj := &fedtypesv1a1.GenericFederatedFollower{} - err = runtime.DefaultUnstructuredConverter.FromUnstructured(followerUns.Object, followerObj) - if err != nil { - logger.Error(err, "Failed to unmarshall follower object from unstructured") - return worker.StatusAllOK // retrying won't help - } - - currentLeaders := sets.New(followerObj.Spec.Follows...) + currentLeaders := sets.New(followerObject.GetSpec().Follows...) c.cacheObservedFromFollowers.update(follower, currentLeaders) desiredLeaders := c.cacheObservedFromLeaders.reverseLookup(follower) @@ -476,9 +417,7 @@ func (c *Controller) reconcileFollower( leadersChanged := !equality.Semantic.DeepEqual(desiredLeaders, currentLeaders) updated, err := c.updateFollower( ctx, - handles, - followerUns, - followerObj, + followerObject, leadersChanged, desiredLeaders.UnsortedList(), ) @@ -486,9 +425,9 @@ func (c *Controller) reconcileFollower( if apierrors.IsConflict(err) { return worker.StatusConflict } - logger.Error(err, "Failed to update follower") + keyedLogger.Error(err, "Failed to update follower") c.eventRecorder.Eventf( - followerUns, + followerObject, corev1.EventTypeWarning, EventReasonFailedUpdateFollower, "Failed to update follower to sync with leader placements: %v", @@ -496,67 +435,59 @@ func (c *Controller) reconcileFollower( ) return worker.StatusError } else if updated { - logger.V(1).Info("Updated follower to sync with leaders") + keyedLogger.V(1).Info("Updated follower to sync with leaders") } return worker.StatusAllOK } func (c *Controller) getLeaderObj( - leader fedtypesv1a1.LeaderReference, -) (*typeHandles, *fedtypesv1a1.GenericObjectWithPlacements, error) { + leader fedcorev1a1.LeaderReference, +) (fedcorev1a1.GenericFederatedObject, error) { leaderGK := leader.GroupKind() - handles, exists := c.leaderTypeHandles[leaderGK] + _, exists := leaderPodTemplatePaths[leaderGK] if !exists { - return nil, nil, fmt.Errorf("unsupported leader type %v", leaderGK) - } - leaderQualifiedName := common.QualifiedName{Namespace: leader.Namespace, Name: leader.Name} - leaderUns, err := getObjectFromStore( - handles.informer.Informer().GetStore(), - leaderQualifiedName.String(), - ) - if err != nil { - return nil, nil, fmt.Errorf("get from store: %w", err) + return nil, fmt.Errorf("unsupported leader type %v", leaderGK) } - if leaderUns == nil { - return nil, nil, nil + + c.gkToFTCLock.RLock() + ftcName, exists := c.gkToFTCName[leaderGK] + if !exists { + return nil, fmt.Errorf("unknown leader gk %v", leaderGK) } - leaderObj, err := util.UnmarshalGenericPlacements(leaderUns) - if err != nil { - return nil, nil, fmt.Errorf("unmarshal to generic object with placements: %w", err) + leaderName := naming.GenerateFederatedObjectName(leader.Name, ftcName) + leaderObj, err := fedobjectadapters.GetFromLister( + c.fedObjectInformer.Lister(), + c.clusterFedObjectInformer.Lister(), + leader.Namespace, + leaderName, + ) + if err != nil && !apierrors.IsNotFound(err) { + return nil, fmt.Errorf("get from store: %w", err) + } + if apierrors.IsNotFound(err) { + return nil, nil } - return handles, leaderObj, nil + return leaderObj, nil } func (c *Controller) leaderPlacementUnion( - leaders []fedtypesv1a1.LeaderReference, -) (map[string]struct{}, error) { - clusters := map[string]struct{}{} + leaders []fedcorev1a1.LeaderReference, +) (sets.Set[string], error) { + clusters := sets.New[string]() for _, leader := range leaders { - _, leaderObjWithPlacement, err := c.getLeaderObj(leader) + leaderObjWithPlacement, err := c.getLeaderObj(leader) if err != nil { return nil, fmt.Errorf("get leader object %v: %w", leader, err) } if leaderObjWithPlacement == nil { continue } - for cluster := range leaderObjWithPlacement.ClusterNameUnion() { - clusters[cluster] = struct{}{} - } + + clusters = leaderObjWithPlacement.GetSpec().GetPlacementUnion().Union(clusters) } return clusters, nil } - -func getObjectFromStore(store cache.Store, key string) (*unstructured.Unstructured, error) { - obj, exists, err := store.GetByKey(key) - if err != nil { - return nil, err - } - if !exists { - return nil, nil - } - return obj.(*unstructured.Unstructured).DeepCopy(), nil -} diff --git a/pkg/controllers/follower/util.go b/pkg/controllers/follower/util.go index 99b32364..c5977419 100644 --- a/pkg/controllers/follower/util.go +++ b/pkg/controllers/follower/util.go @@ -1,4 +1,3 @@ -//go:build exclude /* Copyright 2023 The KubeAdmiral Authors. @@ -28,10 +27,22 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllers/common" podutil "github.com/kubewharf/kubeadmiral/pkg/lifted/kubernetes/pkg/api/v1/pod" ) +type objectGroupKindKey struct { + namespace string + fedName string + sourceName string + sourceGK schema.GroupKind +} + +func (k objectGroupKindKey) ObjectSourceKey() string { + return fmt.Sprintf("%s/%s", k.namespace, k.sourceName) +} + type FollowerReference struct { GroupKind schema.GroupKind Namespace string @@ -45,8 +56,7 @@ type followerAnnotationElement struct { } func getFollowersFromAnnotation( - fedObject *unstructured.Unstructured, - sourceToFederatedGKMap map[schema.GroupKind]schema.GroupKind, + fedObject fedcorev1a1.GenericFederatedObject, ) (sets.Set[FollowerReference], error) { annotation := fedObject.GetAnnotations()[common.FollowersAnnotation] if len(annotation) == 0 { @@ -64,12 +74,9 @@ func getFollowersFromAnnotation( Group: followerFromAnnotation.Group, Kind: followerFromAnnotation.Kind, } - federatedGK, exists := sourceToFederatedGKMap[sourceGK] - if !exists { - return nil, fmt.Errorf("no federated type config found for source type %v", sourceGK) - } + followers.Insert(FollowerReference{ - GroupKind: federatedGK, + GroupKind: sourceGK, // Only allow followers from the same namespace Namespace: fedObject.GetNamespace(), Name: followerFromAnnotation.Name, @@ -79,9 +86,8 @@ func getFollowersFromAnnotation( } func getFollowersFromPodTemplate( - fedObject *unstructured.Unstructured, + fedObject fedcorev1a1.GenericFederatedObject, podTemplatePath string, - sourceToFederatedGKMap map[schema.GroupKind]schema.GroupKind, ) (sets.Set[FollowerReference], error) { podSpec, err := getPodSpec(fedObject, podTemplatePath) if err != nil { @@ -91,72 +97,67 @@ func getFollowersFromPodTemplate( pod := &corev1.Pod{ Spec: *podSpec, } - return getFollowersFromPod(fedObject.GetNamespace(), pod, sourceToFederatedGKMap), nil + return getFollowersFromPod(fedObject.GetNamespace(), pod), nil } func getFollowersFromPod( namespace string, pod *corev1.Pod, - sourceToFederatedGKMap map[schema.GroupKind]schema.GroupKind, ) sets.Set[FollowerReference] { followers := sets.New[FollowerReference]() - if federatedSecretGK, exists := sourceToFederatedGKMap[schema.GroupKind{Kind: "Secret"}]; exists { - podutil.VisitPodSecretNames(pod, func(name string) bool { - followers.Insert(FollowerReference{ - GroupKind: federatedSecretGK, - Namespace: namespace, - Name: name, - }) - return true + podutil.VisitPodSecretNames(pod, func(name string) bool { + followers.Insert(FollowerReference{ + GroupKind: schema.GroupKind{Kind: "Secret"}, + Namespace: namespace, + Name: name, }) - } + return true + }) - if federatedConfigMapGK, exists := sourceToFederatedGKMap[schema.GroupKind{Kind: "ConfigMap"}]; exists { - podutil.VisitPodConfigmapNames(pod, func(name string) bool { - followers.Insert(FollowerReference{ - GroupKind: federatedConfigMapGK, - Namespace: namespace, - Name: name, - }) - return true + podutil.VisitPodConfigmapNames(pod, func(name string) bool { + followers.Insert(FollowerReference{ + GroupKind: schema.GroupKind{Kind: "ConfigMap"}, + Namespace: namespace, + Name: name, }) - } + return true + }) - if federatedPVCGK, exists := sourceToFederatedGKMap[schema.GroupKind{Kind: "PersistentVolumeClaim"}]; exists { - for _, vol := range pod.Spec.Volumes { - // TODO: do we need to support PVCs created from ephemeral volumes? - if vol.PersistentVolumeClaim != nil { - followers.Insert(FollowerReference{ - GroupKind: federatedPVCGK, - Namespace: namespace, - Name: vol.PersistentVolumeClaim.ClaimName, - }) - } - } - } - - if federatedSAGK, exists := sourceToFederatedGKMap[schema.GroupKind{Kind: "ServiceAccount"}]; exists { - if saName := pod.Spec.ServiceAccountName; saName != "" { + for _, vol := range pod.Spec.Volumes { + // TODO: do we need to support PVCs created from ephemeral volumes? + if vol.PersistentVolumeClaim != nil { followers.Insert(FollowerReference{ - GroupKind: federatedSAGK, + GroupKind: schema.GroupKind{Kind: "PersistentVolumeClaim"}, Namespace: namespace, - Name: saName, + Name: vol.PersistentVolumeClaim.ClaimName, }) } } + if saName := pod.Spec.ServiceAccountName; saName != "" { + followers.Insert(FollowerReference{ + GroupKind: schema.GroupKind{Kind: "ServiceAccount"}, + Namespace: namespace, + Name: saName, + }) + } + return followers } -func getPodSpec(fedObject *unstructured.Unstructured, podTemplatePath string) (*corev1.PodSpec, error) { +func getPodSpec(fedObject fedcorev1a1.GenericFederatedObject, podTemplatePath string) (*corev1.PodSpec, error) { if fedObject == nil { return nil, fmt.Errorf("fedObject is nil") } - fedObjectPodTemplatePath := append( - []string{common.SpecField, common.TemplateField}, - strings.Split(podTemplatePath, ".")...) - podTemplateMap, found, err := unstructured.NestedMap(fedObject.Object, fedObjectPodTemplatePath...) + fedObjectPodTemplatePath := strings.Split(podTemplatePath, ".") + templateObj := &unstructured.Unstructured{} + err := templateObj.UnmarshalJSON(fedObject.GetSpec().Template.Raw) + if err != nil { + return nil, err + } + + podTemplateMap, found, err := unstructured.NestedMap(templateObj.Object, fedObjectPodTemplatePath...) if err != nil { return nil, err } diff --git a/pkg/controllers/follower/util_test.go b/pkg/controllers/follower/util_test.go index e129ed4c..df46e5cb 100644 --- a/pkg/controllers/follower/util_test.go +++ b/pkg/controllers/follower/util_test.go @@ -256,7 +256,7 @@ func TestGetFollowersFromPod(t *testing.T) { } expectedNamesByGK := map[schema.GroupKind]sets.Set[string]{ - {Group: "kubeadmiral.io", Kind: "FederatedConfigMap"}: sets.New( + {Group: "", Kind: "ConfigMap"}: sets.New( "Spec.Containers[*].EnvFrom[*].ConfigMapRef", "Spec.Containers[*].Env[*].ValueFrom.ConfigMapKeyRef", "Spec.EphemeralContainers[*].EphemeralContainerCommon.EnvFrom[*].ConfigMapRef", @@ -266,7 +266,7 @@ func TestGetFollowersFromPod(t *testing.T) { "Spec.Volumes[*].VolumeSource.Projected.Sources[*].ConfigMap", "Spec.Volumes[*].VolumeSource.ConfigMap", ), - {Group: "kubeadmiral.io", Kind: "FederatedSecret"}: sets.New( + {Group: "", Kind: "Secret"}: sets.New( "Spec.Containers[*].EnvFrom[*].SecretRef", "Spec.Containers[*].Env[*].ValueFrom.SecretKeyRef", "Spec.EphemeralContainers[*].EphemeralContainerCommon.EnvFrom[*].SecretRef", @@ -287,20 +287,14 @@ func TestGetFollowersFromPod(t *testing.T) { "Spec.Volumes[*].VolumeSource.StorageOS.SecretRef", "Spec.Volumes[*].VolumeSource.CSI.NodePublishSecretRef", ), - {Group: "kubeadmiral.io", Kind: "FederatedPersistentVolumeClaim"}: sets.New( + {Group: "", Kind: "PersistentVolumeClaim"}: sets.New( "Spec.Volumes[*].VolumeSource.PersistentVolumeClaim", ), - {Group: "kubeadmiral.io", Kind: "FederatedServiceAccount"}: sets.New( + {Group: "", Kind: "ServiceAccount"}: sets.New( "Spec.ServiceAccountName", ), } - sourceToFederatedGKMap := map[schema.GroupKind]schema.GroupKind{ - {Kind: "ConfigMap"}: {Group: "kubeadmiral.io", Kind: "FederatedConfigMap"}, - {Kind: "Secret"}: {Group: "kubeadmiral.io", Kind: "FederatedSecret"}, - {Kind: "PersistentVolumeClaim"}: {Group: "kubeadmiral.io", Kind: "FederatedPersistentVolumeClaim"}, - {Kind: "ServiceAccount"}: {Group: "kubeadmiral.io", Kind: "FederatedServiceAccount"}, - } namespace := "default" expectedFollowers := sets.New[FollowerReference]() @@ -314,7 +308,7 @@ func TestGetFollowersFromPod(t *testing.T) { } } - followers := getFollowersFromPod("default", &pod, sourceToFederatedGKMap) + followers := getFollowersFromPod("default", &pod) assert := assert.New(t) assert.Equal(expectedFollowers, followers)