diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index a01a505d..5b4c5cb1 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -28,7 +28,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/healthz" "github.com/kubewharf/kubeadmiral/cmd/controller-manager/app/options" - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllermanager" "github.com/kubewharf/kubeadmiral/pkg/controllermanager/healthcheck" fedleaderelection "github.com/kubewharf/kubeadmiral/pkg/controllermanager/leaderelection" @@ -37,14 +36,12 @@ import ( const ( FederatedClusterControllerName = "cluster" - TypeConfigControllerName = "typeconfig" MonitorControllerName = "monitor" FollowerControllerName = "follower" ) var knownControllers = map[string]controllermanager.StartControllerFunc{ FederatedClusterControllerName: startFederatedClusterController, - TypeConfigControllerName: startTypeConfigController, MonitorControllerName: startMonitorController, FollowerControllerName: startFollowerController, } @@ -77,7 +74,7 @@ func Run(ctx context.Context, opts *options.Options) { defer klog.Infoln("Ready to stop controllers") klog.Infoln("Ready to start controllers") - err := startControllers(ctx, controllerCtx, knownControllers, knownFTCSubControllers, opts.Controllers, healthCheckHandler) + err := startControllers(ctx, controllerCtx, knownControllers, opts.Controllers, healthCheckHandler) if err != nil { klog.Fatalf("Error starting controllers %s: %v", opts.Controllers, err) } @@ -127,7 +124,6 @@ func startControllers( ctx context.Context, controllerCtx *controllercontext.Context, startControllerFuncs map[string]controllermanager.StartControllerFunc, - ftcSubControllerInitFuncs map[string]controllermanager.FTCSubControllerInitFuncs, enabledControllers []string, healthCheckHandler *healthcheck.MutableHealthCheckHandler, ) error { @@ -153,26 +149,5 @@ func startControllers( }) } - manager := NewFederatedTypeConfigManager( - controllerCtx.FedInformerFactory.Core().V1alpha1().FederatedTypeConfigs(), - controllerCtx, - healthCheckHandler, - controllerCtx.Metrics, - ) - for controllerName, initFuncs := range ftcSubControllerInitFuncs { - controllerName := controllerName - initFuncs := initFuncs - manager.RegisterSubController(controllerName, initFuncs.StartFunc, func(typeConfig *fedcorev1a1.FederatedTypeConfig) bool { - if !isControllerEnabled(controllerName, controllersDisabledByDefault, enabledControllers) { - return false - } - if initFuncs.IsEnabledFunc != nil { - return initFuncs.IsEnabledFunc(typeConfig) - } - return true - }) - } - go manager.Run(ctx) - return nil } diff --git a/cmd/controller-manager/app/core.go b/cmd/controller-manager/app/core.go index 739d5340..6157efbd 100644 --- a/cmd/controller-manager/app/core.go +++ b/cmd/controller-manager/app/core.go @@ -58,27 +58,6 @@ func startFederatedClusterController(ctx context.Context, controllerCtx *control return clusterController, nil } -func startTypeConfigController(ctx context.Context, controllerCtx *controllercontext.Context) (controllermanager.Controller, error) { - controllerConfig := controllerConfigFromControllerContext(controllerCtx) - //nolint:contextcheck - typeConfigController, err := federatedtypeconfig.NewController( - controllerConfig, - controllerCtx.KubeClientset, - controllerCtx.DynamicClientset, - controllerCtx.FedClientset, - controllerCtx.KubeInformerFactory, - controllerCtx.DynamicInformerFactory, - controllerCtx.FedInformerFactory, - ) - if err != nil { - return nil, fmt.Errorf("error creating type config controller: %w", err) - } - - go typeConfigController.Run(ctx.Done()) - - return typeConfigController, nil -} - func startMonitorController(ctx context.Context, controllerCtx *controllercontext.Context) (controllermanager.Controller, error) { controllerConfig := controllerConfigFromControllerContext(controllerCtx) //nolint:contextcheck diff --git a/cmd/controller-manager/app/ftcmanager.go b/cmd/controller-manager/app/ftcmanager.go index e30b5387..95773149 100644 --- a/cmd/controller-manager/app/ftcmanager.go +++ b/cmd/controller-manager/app/ftcmanager.go @@ -17,26 +17,7 @@ limitations under the License. package app import ( - "context" - "fmt" - "net/http" - "sync" - "time" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" "github.com/kubewharf/kubeadmiral/pkg/controllermanager" - "github.com/kubewharf/kubeadmiral/pkg/controllermanager/healthcheck" - "github.com/kubewharf/kubeadmiral/pkg/controllers/common" - controllercontext "github.com/kubewharf/kubeadmiral/pkg/controllers/context" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" - "github.com/kubewharf/kubeadmiral/pkg/stats" ) const ( @@ -59,191 +40,3 @@ var knownFTCSubControllers = map[string]controllermanager.FTCSubControllerInitFu IsEnabledFunc: isAutoMigrationControllerEnabled, }, } - -type FederatedTypeConfigManager struct { - informer fedcorev1a1informers.FederatedTypeConfigInformer - handle cache.ResourceEventHandlerRegistration - - lock sync.Mutex - registeredSubControllers map[string]controllermanager.StartFTCSubControllerFunc - isSubControllerEnabledFuncs map[string]controllermanager.IsFTCSubControllerEnabledFunc - - subControllerContexts map[string]context.Context - subControllerCancelFuncs map[string]context.CancelFunc - startedSubControllers map[string]sets.Set[string] - - healthCheckHandler *healthcheck.MutableHealthCheckHandler - worker worker.ReconcileWorker - controllerCtx *controllercontext.Context - - metrics stats.Metrics - logger klog.Logger -} - -func NewFederatedTypeConfigManager( - informer fedcorev1a1informers.FederatedTypeConfigInformer, - controllerCtx *controllercontext.Context, - healthCheckHandler *healthcheck.MutableHealthCheckHandler, - metrics stats.Metrics, -) *FederatedTypeConfigManager { - m := &FederatedTypeConfigManager{ - informer: informer, - lock: sync.Mutex{}, - registeredSubControllers: map[string]controllermanager.StartFTCSubControllerFunc{}, - isSubControllerEnabledFuncs: map[string]controllermanager.IsFTCSubControllerEnabledFunc{}, - subControllerContexts: map[string]context.Context{}, - subControllerCancelFuncs: map[string]context.CancelFunc{}, - startedSubControllers: map[string]sets.Set[string]{}, - controllerCtx: controllerCtx, - healthCheckHandler: healthCheckHandler, - metrics: metrics, - logger: klog.LoggerWithValues(klog.Background(), "controller", "federated-type-config-manager"), - } - - m.worker = worker.NewReconcileWorker( - m.reconcile, - worker.WorkerTiming{}, - 1, - metrics, - delayingdeliver.NewMetricTags("federated-type-config-manager", "FederatedTypeConfig"), - ) - - m.handle, _ = informer.Informer().AddEventHandler(util.NewTriggerOnAllChanges(m.worker.EnqueueObject)) - - return m -} - -func (m *FederatedTypeConfigManager) RegisterSubController( - name string, - startFunc controllermanager.StartFTCSubControllerFunc, - isEnabledFunc controllermanager.IsFTCSubControllerEnabledFunc, -) { - m.lock.Lock() - defer m.lock.Unlock() - m.registeredSubControllers[name] = startFunc - m.isSubControllerEnabledFuncs[name] = isEnabledFunc -} - -func (m *FederatedTypeConfigManager) Run(ctx context.Context) { - m.logger.Info("Starting FederatedTypeConfig manager") - defer m.logger.Info("Stopping FederatedTypeConfig manager") - - if !cache.WaitForNamedCacheSync("federated-type-config-manager", ctx.Done(), m.informer.Informer().HasSynced) { - return - } - - m.worker.Run(ctx.Done()) - <-ctx.Done() -} - -func (m *FederatedTypeConfigManager) reconcile(qualifiedName common.QualifiedName) (status worker.Result) { - _ = m.metrics.Rate("federated-type-config-manager.throughput", 1) - key := qualifiedName.String() - logger := m.logger.WithValues("federated-type-config", key) - startTime := time.Now() - - logger.V(3).Info("Start reconcile") - defer m.metrics.Duration("federated-type-config-manager.latency", startTime) - defer func() { - logger.WithValues("duration", time.Since(startTime), "status", status.String()).V(3).Info("Finished reconcile") - }() - - typeConfig, err := m.informer.Lister().Get(qualifiedName.Name) - if err != nil && apierrors.IsNotFound(err) { - logger.V(3).Info("Observed FederatedTypeConfig deletion") - m.processFTCDeletion(qualifiedName.Name) - return worker.StatusAllOK - } - if err != nil { - logger.Error(err, "Failed to get FederatedTypeConfig") - return worker.StatusError - } - - m.lock.Lock() - defer m.lock.Unlock() - - startedSubControllers, ok := m.startedSubControllers[qualifiedName.Name] - if !ok { - startedSubControllers = sets.New[string]() - m.startedSubControllers[qualifiedName.Name] = startedSubControllers - } - subControllerCtx, ok := m.subControllerContexts[qualifiedName.Name] - if !ok { - subControllerCtx, m.subControllerCancelFuncs[qualifiedName.Name] = context.WithCancel(context.TODO()) - m.subControllerContexts[qualifiedName.Name] = subControllerCtx - } - - needRetry := false - for controllerName, startFunc := range m.registeredSubControllers { - logger := logger.WithValues("subcontroller", controllerName) - - if startedSubControllers.Has(controllerName) { - logger.V(3).Info("Subcontroller already started") - continue - } - - isEnabledFunc := m.isSubControllerEnabledFuncs[controllerName] - if isEnabledFunc != nil && !isEnabledFunc(typeConfig) { - logger.V(3).Info("Skip starting subcontroller, is disabled") - continue - } - - controller, err := startFunc(subControllerCtx, m.controllerCtx, typeConfig) - if err != nil { - logger.Error(err, "Failed to start subcontroller") - needRetry = true - continue - } else { - logger.Info("Started subcontroller") - startedSubControllers.Insert(controllerName) - } - - m.healthCheckHandler.AddReadyzChecker( - resolveSubcontrollerName(controllerName, qualifiedName.Name), - func(_ *http.Request) error { - if controller.IsControllerReady() { - return nil - } - return fmt.Errorf("controller not ready") - }, - ) - } - - // Since the controllers are created dynamically, we have to start the informer factories again, in case any new - // informers were accessed. Note that a different context is used in case a FTC is recreated and the same informer - // needs to be used again (SharedInformerFactory and SharedInformers do not support restarts). - ctx := context.TODO() - m.controllerCtx.KubeInformerFactory.Start(ctx.Done()) - m.controllerCtx.DynamicInformerFactory.Start(ctx.Done()) - m.controllerCtx.FedInformerFactory.Start(ctx.Done()) - - if needRetry { - return worker.StatusError - } - - return worker.StatusAllOK -} - -func (m *FederatedTypeConfigManager) processFTCDeletion(ftcName string) { - m.lock.Lock() - defer m.lock.Unlock() - - cancel, ok := m.subControllerCancelFuncs[ftcName] - if !ok { - return - } - - cancel() - - for controller := range m.startedSubControllers[ftcName] { - m.healthCheckHandler.RemoveReadyzChecker(resolveSubcontrollerName(controller, ftcName)) - } - - delete(m.subControllerCancelFuncs, ftcName) - delete(m.subControllerContexts, ftcName) - delete(m.startedSubControllers, ftcName) -} - -func resolveSubcontrollerName(baseName, ftcName string) string { - return fmt.Sprintf("%s[%s]", ftcName, baseName) -} diff --git a/pkg/controllers/federatedtypeconfig/crd_schema.go b/pkg/controllers/federatedtypeconfig/crd_schema.go deleted file mode 100644 index 31b26098..00000000 --- a/pkg/controllers/federatedtypeconfig/crd_schema.go +++ /dev/null @@ -1,102 +0,0 @@ -/* -Copyright 2023 The KubeAdmiral Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package federatedtypeconfig - -import ( - "bytes" - "strings" - - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - "k8s.io/apimachinery/pkg/util/yaml" -) - -const fedObjectSchemaYaml = ` -openAPIV3Schema: - type: object - properties: - apiVersion: - type: string - kind: - type: string - metadata: - type: object - spec: - type: object - x-kubernetes-preserve-unknown-fields: true - status: - type: object - properties: - syncedGeneration: - format: int64 - type: integer - clusters: - type: array - items: - type: object - properties: - generation: - type: integer - name: - type: string - status: - type: string - required: [name] - conditions: - type: array - items: - type: object - properties: - lastTransitionTime: - type: string - format: date-time - lastUpdateTime: - type: string - format: date-time - reason: - type: string - status: - type: string - type: - type: string - required: [type, status] - required: [spec] - x-kubernetes-preserve-unknown-fields: true -` - -const statusObjectSchemaYaml = ` -openAPIV3Schema: - type: object - x-kubernetes-preserve-unknown-fields: true -` - -var fedObjectSchema, statusObjectSchema apiextensionsv1.CustomResourceValidation - -func init() { - if err := yaml.NewYAMLOrJSONDecoder( - bytes.NewReader([]byte(strings.Replace(fedObjectSchemaYaml, "\t", " ", -1))), - len(fedObjectSchemaYaml), - ).Decode(&fedObjectSchema); err != nil { - panic(err) - } - - if err := yaml.NewYAMLOrJSONDecoder( - bytes.NewReader([]byte(strings.Replace(statusObjectSchemaYaml, "\t", " ", -1))), - len(statusObjectSchemaYaml), - ).Decode(&statusObjectSchema); err != nil { - panic(err) - } -} diff --git a/pkg/controllers/federatedtypeconfig/federatedtypeconfig_controller.go b/pkg/controllers/federatedtypeconfig/federatedtypeconfig_controller.go deleted file mode 100644 index 0cb4f07b..00000000 --- a/pkg/controllers/federatedtypeconfig/federatedtypeconfig_controller.go +++ /dev/null @@ -1,902 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -This file may have been modified by The KubeAdmiral Authors -("KubeAdmiral Modifications"). All KubeAdmiral Modifications -are Copyright 2023 The KubeAdmiral Authors. -*/ - -package federatedtypeconfig - -import ( - "context" - "fmt" - "strings" - "sync" - - "github.com/pkg/errors" - appsv1 "k8s.io/api/apps/v1" - apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" - apiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - typedapiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - pkgruntime "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/discovery" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/informers" - "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" - - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - fedclient "github.com/kubewharf/kubeadmiral/pkg/client/clientset/versioned" - fedinformers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions" - "github.com/kubewharf/kubeadmiral/pkg/controllers/common" - "github.com/kubewharf/kubeadmiral/pkg/controllers/nsautoprop" - "github.com/kubewharf/kubeadmiral/pkg/controllers/override" - "github.com/kubewharf/kubeadmiral/pkg/controllers/policyrc" - statuscontroller "github.com/kubewharf/kubeadmiral/pkg/controllers/status" - "github.com/kubewharf/kubeadmiral/pkg/controllers/statusaggregator" - synccontroller "github.com/kubewharf/kubeadmiral/pkg/controllers/sync" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/delayingdeliver" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util/worker" -) - -var mutex = sync.Mutex{} - -var finalizer string = "core." + common.DefaultPrefix + "federated-type-config" - -const ControllerName = "federated-type-config" - -// The FederatedTypeConfig controller configures sync and status -// controllers in response to FederatedTypeConfig resources in the -// KubeFed system namespace. -type Controller struct { - // Arguments to use when starting new controllers - controllerConfig *util.ControllerConfig - - kubeClient kubernetes.Interface - fedClient fedclient.Interface - dynamicClient dynamic.Interface - - discoveryClient discovery.DiscoveryInterface - crdClient typedapiextensionsv1.CustomResourceDefinitionInterface - - kubeInformerFactory informers.SharedInformerFactory - dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory - fedInformerFactory fedinformers.SharedInformerFactory - - // Map of running sync controllers keyed by qualified target type - stopChannels map[string]chan struct{} - lock sync.RWMutex - - // Store for the FederatedTypeConfig objects - ftcStore cache.Store - // Informer for the FederatedTypeConfig objects - ftcController cache.Controller - - worker worker.ReconcileWorker - - controllerRevisionStore cache.Store - controllerRevisionController cache.Controller - isControllerRevisionExists bool -} - -func (c *Controller) IsControllerReady() bool { - return c.HasSynced() -} - -// NewController returns a new controller to manage FederatedTypeConfig objects. -func NewController( - config *util.ControllerConfig, - kubeClient kubernetes.Interface, - dynamicClient dynamic.Interface, - fedClient fedclient.Interface, - kubeInformerFactory informers.SharedInformerFactory, - dynamicInformerFactory dynamicinformer.DynamicSharedInformerFactory, - fedInformerFactory fedinformers.SharedInformerFactory, -) (*Controller, error) { - userAgent := "FederatedTypeConfig" - kubeConfig := restclient.CopyConfig(config.KubeConfig) - restclient.AddUserAgent(kubeConfig, userAgent) - - extClient, err := apiextensions.NewForConfig(kubeConfig) - if err != nil { - return nil, err - } - - c := &Controller{ - controllerConfig: config, - fedClient: fedClient, - kubeClient: kubeClient, - dynamicClient: dynamicClient, - discoveryClient: kubeClient.Discovery(), - crdClient: extClient.ApiextensionsV1().CustomResourceDefinitions(), - kubeInformerFactory: kubeInformerFactory, - dynamicInformerFactory: dynamicInformerFactory, - fedInformerFactory: fedInformerFactory, - stopChannels: make(map[string]chan struct{}), - } - - c.worker = worker.NewReconcileWorker(c.reconcile, worker.WorkerTiming{}, 1, config.Metrics, - delayingdeliver.NewMetricTags("typeconfig-worker", "FederatedTypeConfig")) - - c.ftcStore, c.ftcController, err = util.NewGenericInformer( - kubeConfig, - "", - &fedcorev1a1.FederatedTypeConfig{}, - util.NoResyncPeriod, - c.worker.EnqueueObject, - config.Metrics, - ) - if err != nil { - return nil, err - } - - return c, nil -} - -func (c *Controller) HasSynced() bool { - if !c.ftcController.HasSynced() { - klog.V(2).Infof("typeconfig controller's controller hasn't synced") - return false - } - return true -} - -// Run runs the Controller. -func (c *Controller) Run(stopChan <-chan struct{}) { - klog.Infof("Starting FederatedTypeConfig controller") - go c.ftcController.Run(stopChan) - - // wait for the caches to synchronize before starting the worker - if !cache.WaitForNamedCacheSync("type-config-controller", stopChan, c.ftcController.HasSynced) { - return - } - - var nsFTC *fedcorev1a1.FederatedTypeConfig - ftcs := c.ftcStore.List() - for _, obj := range ftcs { - ftc := obj.(*fedcorev1a1.FederatedTypeConfig) - if ftc.Name == common.NamespaceResource && ftc.GetTargetType().Kind == common.NamespaceKind && - ftc.GetSourceType().Kind == common.NamespaceKind { - nsFTC = ftc - break - } - } - if nsFTC == nil { - // panic if ftc for namespace does not exist, since no other resource can be synced otherwise - klog.Fatal("FederatedTypeConfig for namespaces not found") - } - - // ensure the federated namespace since it is a requirement for other controllers - if err := c.ensureFederatedObjectCrd(nsFTC); err != nil { - klog.Fatalf("Failed to ensure FederatedNamespace CRD: %v", err) - } - - c.worker.Run(stopChan) - - // Ensure all goroutines are cleaned up when the stop channel closes - go func() { - <-stopChan - c.shutDown() - }() -} - -func (c *Controller) reconcile(qualifiedName common.QualifiedName) worker.Result { - key := qualifiedName.String() - - klog.V(3).Infof("Running reconcile FederatedTypeConfig for %q", key) - - cachedObj, err := c.objCopyFromCache(key) - if err != nil { - return worker.StatusError - } - - if cachedObj == nil { - return worker.StatusAllOK - } - typeConfig := cachedObj.(*fedcorev1a1.FederatedTypeConfig) - - // TODO(marun) Perform this defaulting in a webhook - SetFederatedTypeConfigDefaults(typeConfig) - - deleted := typeConfig.DeletionTimestamp != nil - if c.controllerConfig.CreateCrdForFtcs && !deleted { - if err := c.ensureFederatedObjectCrd(typeConfig); err != nil { - klog.Error(fmt.Errorf("cannot ensure federated object CRD for %q: %w", typeConfig.Name, err)) - return worker.StatusError - } - } - - syncEnabled := typeConfig.GetPropagationEnabled() - statusEnabled := typeConfig.GetStatusEnabled() - statusAggregationEnabled := typeConfig.GetStatusAggregationEnabled() - policyRcEnabled := typeConfig.GetPolicyRcEnabled() - controllers := sets.New[string]() - for _, controllerGroup := range typeConfig.GetControllers() { - for _, controller := range controllerGroup { - controllers.Insert(controller) - } - } - namespaceAutoPropagationEnabled := controllers.Has(nsautoprop.PrefixedNamespaceAutoPropagationControllerName) - overridePolicyEnabled := controllers.Has(override.PrefixedControllerName) - - limitedScope := c.controllerConfig.TargetNamespace != metav1.NamespaceAll - if limitedScope && syncEnabled && !typeConfig.GetNamespaced() { - _, ok := c.getStopChannel(typeConfig.Name) - if !ok { - holderChan := make(chan struct{}) - c.lock.Lock() - c.stopChannels[typeConfig.Name] = holderChan - c.lock.Unlock() - klog.Infof( - "Skipping start of sync & status controller for cluster-scoped resource %q. It is not required for a namespaced KubeFed control plane.", - typeConfig.GetFederatedType().Kind, - ) - } - - // typeConfig.Status.ObservedGeneration = typeConfig.Generation - // typeConfig.Status.PropagationController = corev1a1.ControllerStatusNotRunning - - /*if typeConfig.Status.StatusController == nil { - typeConfig.Status.StatusController = new(corev1a1.ControllerStatus) - } - *typeConfig.Status.StatusController = corev1a1.ControllerStatusNotRunning - err = c.client.UpdateStatus(context.TODO(), typeConfig) - if err != nil { - runtime.HandleError(errors.Wrapf(err, "Could not update status fields of the CRD: %q", key)) - return worker.StatusError - }*/ - return worker.StatusAllOK - } - - statusKey := typeConfig.Name + "/status" - statusAggregationKey := typeConfig.Name + "/statusAggregation" - policyRcKey := typeConfig.Name + "/policyRc" - federateKey := typeConfig.Name + "/federate" - schedulerKey := typeConfig.Name + "/scheduler" - namespaceAutoPropagationKey := typeConfig.Name + "/namespaceAutoPropagation" - overridePolicyKey := typeConfig.Name + "/overridePolicy" - - syncStopChan, syncRunning := c.getStopChannel(typeConfig.Name) - statusStopChan, statusRunning := c.getStopChannel(statusKey) - statusAggregationStopChan, statusAggregationRunning := c.getStopChannel(statusAggregationKey) - policyRcStopChan, policyRcRunning := c.getStopChannel(policyRcKey) - federateStopChan, federateRunning := c.getStopChannel(federateKey) - schedulerStopChan, schedulerRunning := c.getStopChannel(schedulerKey) - namespaceAutoPropagationStopChan, namespaceAutoPropagationRunning := c.getStopChannel(namespaceAutoPropagationKey) - overridePolicyStopChan, overridePolicyRunning := c.getStopChannel(overridePolicyKey) - - if deleted { - if syncRunning { - c.stopController(typeConfig.Name, syncStopChan) - } - if statusRunning { - c.stopController(statusKey, statusStopChan) - } - if federateRunning { - c.stopController(federateKey, federateStopChan) - } - if schedulerRunning { - c.stopController(schedulerKey, schedulerStopChan) - } - if overridePolicyRunning { - c.stopController(overridePolicyKey, overridePolicyStopChan) - } - - if typeConfig.IsNamespace() { - if namespaceAutoPropagationRunning { - c.stopController(namespaceAutoPropagationKey, namespaceAutoPropagationStopChan) - } - klog.Infof("Reconciling all namespaced FederatedTypeConfig resources on deletion of %q", key) - c.reconcileOnNamespaceFTCUpdate() - } - - err := c.removeFinalizer(typeConfig) - if err != nil { - klog.Error(errors.Wrapf(err, "Failed to remove finalizer from FederatedTypeConfig %q", key)) - return worker.StatusError - } - return worker.StatusAllOK - } - - typeConfig, updated, err := c.ensureFinalizer(typeConfig) - if err != nil { - klog.Error(errors.Wrapf(err, "Failed to ensure finalizer for FederatedTypeConfig %q", key)) - return worker.StatusError - } else if updated && typeConfig.IsNamespace() { - // Detected creation of the namespace FTC. If there are existing FTCs - // which did not start their sync controllers due to the lack of a - // namespace FTC, then reconcile them now so they can start. - klog.Infof("Reconciling all namespaced FederatedTypeConfig resources on finalizer update for %q", key) - c.reconcileOnNamespaceFTCUpdate() - } - - startNewSyncController := !syncRunning && syncEnabled - stopSyncController := syncRunning && (!syncEnabled || (typeConfig.GetNamespaced() && !c.namespaceFTCExists())) - if startNewSyncController { - if err := c.startSyncController(typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } else if stopSyncController { - c.stopController(typeConfig.Name, syncStopChan) - } - - startNewStatusController := !statusRunning && statusEnabled - stopStatusController := statusRunning && !statusEnabled - if startNewStatusController { - if err := c.startStatusController(statusKey, typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } else if stopStatusController { - c.stopController(statusKey, statusStopChan) - } - - startNewStatusAggregationController := !statusAggregationRunning && statusAggregationEnabled - stopStatusAggregationController := statusAggregationRunning && !statusAggregationEnabled - if startNewStatusAggregationController { - if err := c.startStatusAggregationController(statusAggregationKey, typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } else if stopStatusAggregationController { - c.stopController(statusAggregationKey, statusAggregationStopChan) - } - - startNewPolicyRcController := !policyRcRunning && policyRcEnabled - stopPolicyRcController := policyRcRunning && !policyRcEnabled - if startNewPolicyRcController { - if err := c.startPolicyRcController(policyRcKey, typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } else if stopPolicyRcController { - c.stopController(policyRcKey, policyRcStopChan) - } - - startNewNamespaceAutoPropagationController := !namespaceAutoPropagationRunning && typeConfig.IsNamespace() && - namespaceAutoPropagationEnabled - stopNamespaceAutoPropagationController := namespaceAutoPropagationRunning && - (!typeConfig.IsNamespace() || !namespaceAutoPropagationEnabled) - if startNewNamespaceAutoPropagationController { - if err := c.startNamespaceAutoPropagationController(namespaceAutoPropagationKey, typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } else if stopNamespaceAutoPropagationController { - c.stopController(namespaceAutoPropagationKey, namespaceAutoPropagationStopChan) - } - - startOverridePolicyController := !overridePolicyRunning && overridePolicyEnabled - stopOverridePolicyController := overridePolicyRunning && !overridePolicyEnabled - if startOverridePolicyController { - if err := c.startOverridePolicyController(overridePolicyKey, typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } else if stopOverridePolicyController { - c.stopController(overridePolicyKey, overridePolicyStopChan) - } - - if !startNewSyncController && !stopSyncController && - typeConfig.Status.ObservedGeneration != typeConfig.Generation { - if err := c.refreshSyncController(typeConfig); err != nil { - klog.Error(err) - return worker.StatusError - } - } - - typeConfig.Status.ObservedGeneration = typeConfig.Generation - /*syncControllerRunning := startNewSyncController || (syncRunning && !stopSyncController) - if syncControllerRunning { - typeConfig.Status.PropagationController = corev1a1.ControllerStatusRunning - } else { - typeConfig.Status.PropagationController = corev1a1.ControllerStatusNotRunning - } - - if typeConfig.Status.StatusController == nil { - typeConfig.Status.StatusController = new(corev1a1.ControllerStatus) - } - - statusControllerRunning := startNewStatusController || (statusRunning && !stopStatusController) - if statusControllerRunning { - *typeConfig.Status.StatusController = corev1a1.ControllerStatusRunning - } else { - *typeConfig.Status.StatusController = corev1a1.ControllerStatusNotRunning - } - err = c.client.UpdateStatus(context.TODO(), typeConfig) - if err != nil { - runtime.HandleError(errors.Wrapf(err, "Could not update status fields of the CRD: %q", key)) - return worker.StatusError - }*/ - return worker.StatusAllOK -} - -func (c *Controller) ensureFederatedObjectCrd(ftc *fedcorev1a1.FederatedTypeConfig) error { - fedTy := ftc.Spec.FederatedType - crdName := fedTy.PluralName - if fedTy.Group != "" { - crdName += "." - crdName += fedTy.Group - } - - _, err := c.crdClient.Get(context.TODO(), crdName, metav1.GetOptions{ResourceVersion: "0"}) - if err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf("cannot check for existence of CRD %q: %w", crdName, err) - } - - needObjectCrd := err != nil - - needStatusCrd := false - statusTy := ftc.Spec.StatusType - var statusCrdName string - if statusTy != nil { - statusCrdName = statusTy.PluralName - if statusTy.Group != "" { - statusCrdName += "." - statusCrdName += statusTy.Group - } - - _, err = c.crdClient.Get(context.TODO(), statusCrdName, metav1.GetOptions{ResourceVersion: "0"}) - if err != nil && !apierrors.IsNotFound(err) { - return fmt.Errorf("cannot check for existence of CRD %q: %w", statusCrdName, err) - } - - needStatusCrd = err != nil - } - - var sourceResource *metav1.APIResource - if ftc.Spec.SourceType != nil { - srcTy := ftc.Spec.SourceType - - resourceList, err := c.discoveryClient.ServerResourcesForGroupVersion(schema.GroupVersion{ - Group: srcTy.Group, - Version: srcTy.Version, - }.String()) - if err != nil { - return fmt.Errorf("cannot invoke discovery client: %w", err) - } - - for _, resource := range resourceList.APIResources { - // we don't care about resource.Group because subresources are not supported - - if resource.Name == srcTy.PluralName { - resource := resource - sourceResource = &resource - break - } - } - } - - // create CRD now - if needObjectCrd { - klog.V(2).Infof("Creating federated CRD for %q", ftc.Name) - - fedShortNames := []string{"f" + strings.ToLower(ftc.Spec.TargetType.Kind)} - if sourceResource != nil { - for _, shortName := range sourceResource.ShortNames { - fedShortNames = append(fedShortNames, "f"+shortName) - } - } - - crd := &apiextensionsv1.CustomResourceDefinition{ - ObjectMeta: metav1.ObjectMeta{ - Name: crdName, - }, - Spec: apiextensionsv1.CustomResourceDefinitionSpec{ - Group: fedTy.Group, - Names: apiextensionsv1.CustomResourceDefinitionNames{ - Plural: fedTy.PluralName, - Kind: fedTy.Kind, - Singular: strings.ToLower(fedTy.Kind), - ShortNames: fedShortNames, - ListKind: fedTy.Kind + "List", - }, - Scope: apiextensionsv1.ResourceScope(fedTy.Scope), - Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ - { - Name: fedTy.Version, - Served: true, - Storage: true, - Subresources: &apiextensionsv1.CustomResourceSubresources{ - Status: &apiextensionsv1.CustomResourceSubresourceStatus{}, - }, - Schema: &fedObjectSchema, - }, - }, - }, - } - _, err = c.crdClient.Create(context.TODO(), crd, metav1.CreateOptions{}) - if err != nil { - return err - } - } - - if needStatusCrd { - klog.V(2).Infof("Creating status CRD for %q", ftc.Name) - - statusShortNames := []string{fmt.Sprintf("f%sstatus", strings.ToLower(ftc.Spec.TargetType.Kind))} - if sourceResource != nil { - for _, shortName := range sourceResource.ShortNames { - statusShortNames = append(statusShortNames, fmt.Sprintf("f%sstatus", shortName)) - } - } - - crd := &apiextensionsv1.CustomResourceDefinition{ - ObjectMeta: metav1.ObjectMeta{ - Name: statusCrdName, - }, - Spec: apiextensionsv1.CustomResourceDefinitionSpec{ - Group: statusTy.Group, - Names: apiextensionsv1.CustomResourceDefinitionNames{ - Plural: statusTy.PluralName, - Kind: statusTy.Kind, - Singular: strings.ToLower(statusTy.Kind), - ShortNames: statusShortNames, - ListKind: statusTy.Kind + "List", - }, - Scope: apiextensionsv1.ResourceScope(statusTy.Scope), - Versions: []apiextensionsv1.CustomResourceDefinitionVersion{ - { - Name: statusTy.Version, - Served: true, - Storage: true, - Subresources: &apiextensionsv1.CustomResourceSubresources{ - Status: &apiextensionsv1.CustomResourceSubresourceStatus{}, - }, - Schema: &statusObjectSchema, - }, - }, - }, - } - _, err = c.crdClient.Create(context.TODO(), crd, metav1.CreateOptions{}) - if err != nil { - return err - } - } - - return nil -} - -func (c *Controller) objCopyFromCache(key string) (pkgruntime.Object, error) { - cachedObj, exist, err := c.ftcStore.GetByKey(key) - if err != nil { - wrappedErr := errors.Wrapf(err, "Failed to query FederatedTypeConfig store for %q", key) - klog.Error(wrappedErr) - return nil, err - } - if !exist { - return nil, nil - } - return cachedObj.(pkgruntime.Object).DeepCopyObject(), nil -} - -func (c *Controller) shutDown() { - c.lock.Lock() - defer c.lock.Unlock() - - // Stop all sync and status controllers - for key, stopChannel := range c.stopChannels { - close(stopChannel) - delete(c.stopChannels, key) - } -} - -func (c *Controller) getStopChannel(name string) (chan struct{}, bool) { - c.lock.RLock() - defer c.lock.RUnlock() - stopChan, ok := c.stopChannels[name] - return stopChan, ok -} - -func (c *Controller) startSyncController(tc *fedcorev1a1.FederatedTypeConfig) error { - ftc := tc.DeepCopyObject().(*fedcorev1a1.FederatedTypeConfig) - kind := ftc.Spec.FederatedType.Kind - controllerConfig := new(util.ControllerConfig) - *controllerConfig = *(c.controllerConfig) - - // A sync controller for a namespaced resource must be supplied - // with the ftc for namespaces so that it can consider federated - // namespace placement when determining the placement for - // contained resources. - var fedNamespaceAPIResource *metav1.APIResource - if ftc.GetNamespaced() { - var err error - fedNamespaceAPIResource, err = c.getFederatedNamespaceAPIResource() - if err != nil { - return errors.Wrapf( - err, - "Unable to start sync controller for %q due to missing FederatedTypeConfig for namespaces", - kind, - ) - } - } - - mutex.Lock() - defer mutex.Unlock() - stopChan := make(chan struct{}) - - if !c.isControllerRevisionExists { - controllerRevision := util.GetResourceKind(&appsv1.ControllerRevision{}) - controllerRevisionsResource := &metav1.APIResource{ - Name: util.GetPluralName(controllerRevision), - Group: appsv1.SchemeGroupVersion.Group, - Version: appsv1.SchemeGroupVersion.Version, - Kind: controllerRevision, - Namespaced: true, - } - userAgent := "controller-revision-federate-sync-controller" - configWithUserAgent := restclient.CopyConfig(controllerConfig.KubeConfig) - restclient.AddUserAgent(configWithUserAgent, userAgent) - controllerRevisionClient, err := util.NewResourceClient(configWithUserAgent, controllerRevisionsResource) - if err != nil { - klog.Errorf("Failed to initiate controller revision client.") - return err - } - - triggerFunc := func(obj pkgruntime.Object) { - if accessor, err := meta.Accessor(obj); err == nil { - klog.V(4). - Infof("ControllerRevision changement observed: %s/%s", accessor.GetNamespace(), accessor.GetName()) - } - } - c.controllerRevisionStore, c.controllerRevisionController = util.NewResourceInformer( - controllerRevisionClient, - "", - triggerFunc, - c.controllerConfig.Metrics, - ) - c.isControllerRevisionExists = true - go c.controllerRevisionController.Run(stopChan) - } - - err := synccontroller.StartSyncController( - controllerConfig, - stopChan, - ftc, - fedNamespaceAPIResource, - c.controllerRevisionStore, - c.controllerRevisionController) - if err != nil { - close(stopChan) - return errors.Wrapf(err, "Error starting sync controller for %q", kind) - } - klog.Infof("Started sync controller for %q", kind) - c.lock.Lock() - defer c.lock.Unlock() - c.stopChannels[ftc.Name] = stopChan - return nil -} - -func (c *Controller) startStatusController(statusKey string, tc *fedcorev1a1.FederatedTypeConfig) error { - kind := tc.Spec.FederatedType.Kind - stopChan := make(chan struct{}) - ftc := tc.DeepCopyObject().(*fedcorev1a1.FederatedTypeConfig) - err := statuscontroller.StartStatusController(c.controllerConfig, stopChan, ftc) - if err != nil { - close(stopChan) - return errors.Wrapf(err, "Error starting status controller for %q", kind) - } - klog.Infof("Started status controller for %q", kind) - c.lock.Lock() - defer c.lock.Unlock() - c.stopChannels[statusKey] = stopChan - return nil -} - -func (c *Controller) startStatusAggregationController( - statusAggregationKey string, - tc *fedcorev1a1.FederatedTypeConfig, -) error { - kind := tc.Spec.FederatedType.Kind - stopChan := make(chan struct{}) - ftc := tc.DeepCopyObject().(*fedcorev1a1.FederatedTypeConfig) - err := statusaggregator.StartStatusAggregator(c.controllerConfig, stopChan, ftc) - if err != nil { - close(stopChan) - return errors.Wrapf(err, "Error starting status aggregator for %q", kind) - } - klog.Infof("Started status aggregator for %q", kind) - c.lock.Lock() - defer c.lock.Unlock() - c.stopChannels[statusAggregationKey] = stopChan - return nil -} - -func (c *Controller) startPolicyRcController(policyRcKey string, tc *fedcorev1a1.FederatedTypeConfig) error { - kind := tc.Spec.FederatedType.Kind - stopChan := make(chan struct{}) - ftc := tc.DeepCopyObject().(*fedcorev1a1.FederatedTypeConfig) - err := policyrc.StartController(c.controllerConfig, stopChan, ftc) - if err != nil { - close(stopChan) - return errors.Wrapf(err, "Error starting policy-rc controller for %q", kind) - } - klog.Infof("Started policy-rc controller for %q", kind) - c.lock.Lock() - defer c.lock.Unlock() - c.stopChannels[policyRcKey] = stopChan - return nil -} - -func (c *Controller) startNamespaceAutoPropagationController( - namespaceAutoPropagationKey string, - tc *fedcorev1a1.FederatedTypeConfig, -) error { - kind := tc.Spec.FederatedType.Kind - stopChan := make(chan struct{}) - ftc := tc.DeepCopyObject().(*fedcorev1a1.FederatedTypeConfig) - err := nsautoprop.StartController( - c.controllerConfig, - stopChan, - ftc, - c.kubeClient, - c.dynamicClient, - c.dynamicInformerFactory, - c.fedInformerFactory, - ) - if err != nil { - close(stopChan) - return errors.Wrapf(err, "Error starting namespace-auto-propagation controller for %q", kind) - } - klog.Infof("Started namespace-auto-propagation controller for %q", kind) - c.lock.Lock() - defer c.lock.Unlock() - c.stopChannels[namespaceAutoPropagationKey] = stopChan - return nil -} - -func (c *Controller) startOverridePolicyController( - overridePolicyKey string, - tc *fedcorev1a1.FederatedTypeConfig, -) error { - kind := tc.Spec.FederatedType.Kind - stopChan := make(chan struct{}) - if err := override.StartController(c.controllerConfig, stopChan, tc); err != nil { - close(stopChan) - return errors.Wrapf(err, "Error starting overridepolicy-controller for %q", kind) - } - klog.Infof("Started overridepolicy-controller for %q", kind) - c.lock.Lock() - defer c.lock.Unlock() - c.stopChannels[overridePolicyKey] = stopChan - return nil -} - -func (c *Controller) stopController(key string, stopChan chan struct{}) { - klog.Infof("Stopping controller for %q", key) - close(stopChan) - c.lock.Lock() - defer c.lock.Unlock() - delete(c.stopChannels, key) -} - -func (c *Controller) refreshSyncController(tc *fedcorev1a1.FederatedTypeConfig) error { - klog.Infof("refreshing sync controller for %q", tc.Name) - - syncStopChan, ok := c.getStopChannel(tc.Name) - if ok { - c.stopController(tc.Name, syncStopChan) - } - - return c.startSyncController(tc) -} - -func (c *Controller) ensureFinalizer( - tc *fedcorev1a1.FederatedTypeConfig, -) (*fedcorev1a1.FederatedTypeConfig, bool, error) { - accessor, err := meta.Accessor(tc) - if err != nil { - return nil, false, err - } - finalizers := sets.NewString(accessor.GetFinalizers()...) - if finalizers.Has(finalizer) { - return tc, false, nil - } - finalizers.Insert(finalizer) - accessor.SetFinalizers(finalizers.List()) - tc, err = c.fedClient.CoreV1alpha1().FederatedTypeConfigs().Update(context.TODO(), tc, metav1.UpdateOptions{}) - return tc, true, err -} - -func (c *Controller) removeFinalizer(tc *fedcorev1a1.FederatedTypeConfig) error { - accessor, err := meta.Accessor(tc) - if err != nil { - return err - } - finalizers := sets.NewString(accessor.GetFinalizers()...) - if !finalizers.Has(finalizer) { - return nil - } - finalizers.Delete(finalizer) - accessor.SetFinalizers(finalizers.List()) - _, err = c.fedClient.CoreV1alpha1().FederatedTypeConfigs().Update(context.TODO(), tc, metav1.UpdateOptions{}) - return err -} - -func (c *Controller) namespaceFTCExists() bool { - _, err := c.getFederatedNamespaceAPIResource() - return err == nil -} - -func (c *Controller) getFederatedNamespaceAPIResource() (*metav1.APIResource, error) { - qualifiedName := common.QualifiedName{ - Namespace: "", - Name: common.NamespaceResource, - } - key := qualifiedName.String() - cachedObj, exists, err := c.ftcStore.GetByKey(key) - if err != nil { - return nil, errors.Wrapf(err, "Error retrieving %q from the informer cache", key) - } - if !exists { - return nil, errors.Errorf("Unable to find %q in the informer cache", key) - } - namespaceTypeConfig := cachedObj.(*fedcorev1a1.FederatedTypeConfig) - apiResource := namespaceTypeConfig.GetFederatedType() - return &apiResource, nil -} - -func (c *Controller) reconcileOnNamespaceFTCUpdate() { - for _, cachedObj := range c.ftcStore.List() { - typeConfig := cachedObj.(*fedcorev1a1.FederatedTypeConfig) - if typeConfig.GetNamespaced() && !typeConfig.IsNamespace() { - c.worker.EnqueueObject(typeConfig) - } - } -} - -// pluralName computes the plural name from the kind by lowercasing and suffixing with 's' or `es`. -func SetFederatedTypeConfigDefaults(obj *fedcorev1a1.FederatedTypeConfig) { - nameParts := strings.SplitN(obj.Name, ".", 2) - targetPluralName := nameParts[0] - setStringDefault(&obj.Spec.TargetType.PluralName, targetPluralName) - if len(nameParts) > 1 { - group := nameParts[1] - setStringDefault(&obj.Spec.TargetType.Group, group) - } - setStringDefault(&obj.Spec.FederatedType.PluralName, pluralName(obj.Spec.FederatedType.Kind)) -} - -func pluralName(kind string) string { - lowerKind := strings.ToLower(kind) - if strings.HasSuffix(lowerKind, "s") || strings.HasSuffix(lowerKind, "x") || - strings.HasSuffix(lowerKind, "ch") || strings.HasSuffix(lowerKind, "sh") || - strings.HasSuffix(lowerKind, "z") || strings.HasSuffix(lowerKind, "o") { - return fmt.Sprintf("%ses", lowerKind) - } - if strings.HasSuffix(lowerKind, "y") { - lowerKind = strings.TrimSuffix(lowerKind, "y") - return fmt.Sprintf("%sies", lowerKind) - } - return fmt.Sprintf("%ss", lowerKind) -} - -func setStringDefault(value *string, defaultValue string) { - if value == nil || len(*value) > 0 { - return - } - *value = defaultValue -}