diff --git a/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go b/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go index f1634890..cdeb2d3d 100644 --- a/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go +++ b/pkg/apis/core/v1alpha1/extensions_federatedtypeconfig.go @@ -55,6 +55,15 @@ func (f *FederatedTypeConfig) GetSourceTypeGVR() schema.GroupVersionResource { } } +func (f *FederatedTypeConfig) GetSourceTypeGVK() schema.GroupVersionKind { + apiResource := f.GetSourceType() + return schema.GroupVersionKind{ + Group: apiResource.Group, + Version: apiResource.Version, + Kind: apiResource.Kind, + } +} + func (f *FederatedTypeConfig) GetStatusCollectionEnabled() bool { return f.Spec.StatusCollection != nil } diff --git a/pkg/controllers/util/federatedinformer.go b/pkg/controllers/util/federatedinformer.go index 95c28928..a57ce03b 100644 --- a/pkg/controllers/util/federatedinformer.go +++ b/pkg/controllers/util/federatedinformer.go @@ -289,28 +289,6 @@ func NewFederatedInformer( return federatedInformer, err } -func IsClusterReady(clusterStatus *fedcorev1a1.FederatedClusterStatus) bool { - for _, condition := range clusterStatus.Conditions { - if condition.Type == fedcorev1a1.ClusterReady { - if condition.Status == corev1.ConditionTrue { - return true - } - } - } - return false -} - -func IsClusterJoined(clusterStatus *fedcorev1a1.FederatedClusterStatus) bool { - for _, condition := range clusterStatus.Conditions { - if condition.Type == fedcorev1a1.ClusterJoined { - if condition.Status == corev1.ConditionTrue { - return true - } - } - } - return false -} - type informer struct { controller cache.Controller store cache.Store diff --git a/pkg/util/cluster/util.go b/pkg/util/cluster/util.go new file mode 100644 index 00000000..00734405 --- /dev/null +++ b/pkg/util/cluster/util.go @@ -0,0 +1,46 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + corev1 "k8s.io/api/core/v1" + + fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" +) + + +func IsClusterReady(clusterStatus *fedcorev1a1.FederatedClusterStatus) bool { + for _, condition := range clusterStatus.Conditions { + if condition.Type == fedcorev1a1.ClusterReady { + if condition.Status == corev1.ConditionTrue { + return true + } + } + } + return false +} + +func IsClusterJoined(clusterStatus *fedcorev1a1.FederatedClusterStatus) bool { + for _, condition := range clusterStatus.Conditions { + if condition.Type == fedcorev1a1.ClusterJoined { + if condition.Status == corev1.ConditionTrue { + return true + } + } + } + return false +} diff --git a/pkg/util/informermanager/common.go b/pkg/util/informermanager/common.go new file mode 100644 index 00000000..84195264 --- /dev/null +++ b/pkg/util/informermanager/common.go @@ -0,0 +1,25 @@ +/* +Copyright 2023 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package informermanager + +import fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + +// RegisterOncePredicate can be used to as an EventHandlerGenerator predicate +// to generate and register event handlers exactly once for each FTC. +func RegisterOncePredicate(old, _ *fedcorev1a1.FederatedTypeConfig) bool { + return old == nil +} diff --git a/pkg/util/informermanager/federatedinformermanager.go b/pkg/util/informermanager/federatedinformermanager.go index d4e342f5..1fc7e442 100644 --- a/pkg/util/informermanager/federatedinformermanager.go +++ b/pkg/util/informermanager/federatedinformermanager.go @@ -24,6 +24,7 @@ import ( "sync" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" @@ -34,8 +35,9 @@ import ( fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" fedcorev1a1informers "github.com/kubewharf/kubeadmiral/pkg/client/informers/externalversions/core/v1alpha1" fedcorev1a1listers "github.com/kubewharf/kubeadmiral/pkg/client/listers/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/controllers/util" + clusterutil "github.com/kubewharf/kubeadmiral/pkg/util/cluster" "github.com/kubewharf/kubeadmiral/pkg/util/logging" + "github.com/kubewharf/kubeadmiral/pkg/util/managedlabel" ) type federatedInformerManager struct { @@ -83,7 +85,7 @@ func NewFederatedInformerManager( clusterInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { cluster := obj.(*fedcorev1a1.FederatedCluster) - return util.IsClusterJoined(&cluster.Status) + return clusterutil.IsClusterJoined(&cluster.Status) }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { manager.enqueue(obj) }, @@ -129,7 +131,7 @@ func (m *federatedInformerManager) worker(ctx context.Context) { m.queue.AddRateLimited(key) return } - if apierrors.IsNotFound(err) || !util.IsClusterJoined(&cluster.Status) { + if apierrors.IsNotFound(err) || !clusterutil.IsClusterJoined(&cluster.Status) { if err := m.processClusterDeletion(ctx, name); err != nil { logger.Error(err, "Failed to process FederatedCluster, will retry") m.queue.AddRateLimited(key) @@ -185,7 +187,19 @@ func (m *federatedInformerManager) processCluster( return fmt.Errorf("failed to get client for cluster %s: %w", clusterName, err), true } - manager := NewInformerManager(clusterClient, m.ftcInformer) + manager := NewInformerManager( + clusterClient, + m.ftcInformer, + func(opts *metav1.ListOptions) { + selector := &metav1.LabelSelector{} + metav1.AddLabelToSelector( + selector, + managedlabel.ManagedByKubeAdmiralLabelKey, + managedlabel.ManagedByKubeAdmiralLabelValue, + ) + opts.LabelSelector = metav1.FormatLabelSelector(selector) + }, + ) ctx, cancel := context.WithCancel(ctx) for _, generator := range m.eventHandlerGenerators { @@ -268,7 +282,7 @@ func (m *federatedInformerManager) GetFederatedTypeConfigLister() fedcorev1a1lis } func (m *federatedInformerManager) GetResourceLister( - gvr schema.GroupVersionResource, + gvk schema.GroupVersionKind, cluster string, ) (lister cache.GenericLister, informerSynced cache.InformerSynced, exists bool) { m.lock.RLock() @@ -279,7 +293,7 @@ func (m *federatedInformerManager) GetResourceLister( return nil, nil, false } - return manager.GetResourceLister(gvr) + return manager.GetResourceLister(gvk) } func (m *federatedInformerManager) HasSynced() bool { diff --git a/pkg/util/informermanager/federatedinformermanager_test.go b/pkg/util/informermanager/federatedinformermanager_test.go index 45c40ed6..0d651a62 100644 --- a/pkg/util/informermanager/federatedinformermanager_test.go +++ b/pkg/util/informermanager/federatedinformermanager_test.go @@ -182,11 +182,11 @@ func TestFederatedInformerManager(t *testing.T) { // 2. Verify that listers for existing FTCs and clusters are eventually available for _, ftc := range defaultFTCs { - gvr := ftc.GetSourceTypeGVR() + gvk := ftc.GetSourceTypeGVK() for _, cluster := range defaultClusters { g.Eventually(func(g gomega.Gomega) { - lister, informerSynced, exists := manager.GetResourceLister(gvr, cluster.Name) + lister, informerSynced, exists := manager.GetResourceLister(gvk, cluster.Name) g.Expect(exists).To(gomega.BeTrue()) g.Expect(lister).ToNot(gomega.BeNil()) @@ -197,17 +197,17 @@ func TestFederatedInformerManager(t *testing.T) { // 3. Verify that the lister for non-existent FTCs or clusters are not available - lister, informerSynced, exists := manager.GetResourceLister(common.DaemonSetGVR, "cluster-1") + lister, informerSynced, exists := manager.GetResourceLister(daemonsetGVK, "cluster-1") g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) - lister, informerSynced, exists = manager.GetResourceLister(common.DeploymentGVR, "cluster-4") + lister, informerSynced, exists = manager.GetResourceLister(deploymentGVK, "cluster-4") g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) - lister, informerSynced, exists = manager.GetResourceLister(common.DaemonSetGVR, "cluster-4") + lister, informerSynced, exists = manager.GetResourceLister(daemonsetGVK, "cluster-4") g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) @@ -245,13 +245,13 @@ func TestFederatedInformerManager(t *testing.T) { }() ftc := daemonsetFTC - gvr := ftc.GetSourceTypeGVR() + gvk := ftc.GetSourceTypeGVK() // 2. Verify that listers for daemonsets FTCs is not available at the start g.Consistently(func(g gomega.Gomega) { for _, cluster := range defaultClusters { - lister, informerSynced, exists := manager.GetResourceLister(common.DeploymentGVR, cluster.Name) + lister, informerSynced, exists := manager.GetResourceLister(deploymentGVK, cluster.Name) g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) @@ -267,7 +267,7 @@ func TestFederatedInformerManager(t *testing.T) { g.Eventually(func(g gomega.Gomega) { for _, cluster := range defaultClusters { - lister, informerSynced, exists := manager.GetResourceLister(gvr, cluster.Name) + lister, informerSynced, exists := manager.GetResourceLister(gvk, cluster.Name) g.Expect(exists).To(gomega.BeTrue()) g.Expect(lister).ToNot(gomega.BeNil()) g.Expect(informerSynced()).To(gomega.BeTrue()) @@ -308,9 +308,9 @@ func TestFederatedInformerManager(t *testing.T) { g.Consistently(func(g gomega.Gomega) { for _, ftc := range defaultFTCs { - gvr := ftc.GetSourceTypeGVR() + gvk := ftc.GetSourceTypeGVK() - lister, informerSynced, exists := manager.GetResourceLister(gvr, cluster.Name) + lister, informerSynced, exists := manager.GetResourceLister(gvk, cluster.Name) g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) @@ -326,9 +326,9 @@ func TestFederatedInformerManager(t *testing.T) { g.Eventually(func(g gomega.Gomega) { for _, ftc := range defaultFTCs { - gvr := ftc.GetSourceTypeGVR() + gvk := ftc.GetSourceTypeGVK() - lister, informerSynced, exists := manager.GetResourceLister(gvr, cluster.Name) + lister, informerSynced, exists := manager.GetResourceLister(gvk, cluster.Name) g.Expect(exists).To(gomega.BeTrue()) g.Expect(lister).ToNot(gomega.BeNil()) g.Expect(informerSynced()).To(gomega.BeTrue()) diff --git a/pkg/util/informermanager/informermanager.go b/pkg/util/informermanager/informermanager.go index 64323e48..b7ca7bb5 100644 --- a/pkg/util/informermanager/informermanager.go +++ b/pkg/util/informermanager/informermanager.go @@ -45,13 +45,15 @@ type informerManager struct { started bool shutdown bool - client dynamic.Interface - ftcInformer fedcorev1a1informers.FederatedTypeConfigInformer + client dynamic.Interface + informerTweakListOptions dynamicinformer.TweakListOptionsFunc + ftcInformer fedcorev1a1informers.FederatedTypeConfigInformer eventHandlerGenerators []*EventHandlerGenerator - gvrMapping *bijection.Bijection[string, schema.GroupVersionResource] + gvkMapping *bijection.Bijection[string, schema.GroupVersionKind] + lastObservedFTCs map[string]*fedcorev1a1.FederatedTypeConfig informers map[string]informers.GenericInformer informerCancelFuncs map[string]context.CancelFunc eventHandlerRegistrations map[string]map[*EventHandlerGenerator]cache.ResourceEventHandlerRegistration @@ -60,14 +62,20 @@ type informerManager struct { queue workqueue.RateLimitingInterface } -func NewInformerManager(client dynamic.Interface, ftcInformer fedcorev1a1informers.FederatedTypeConfigInformer) InformerManager { +func NewInformerManager( + client dynamic.Interface, + ftcInformer fedcorev1a1informers.FederatedTypeConfigInformer, + informerTweakListOptions dynamicinformer.TweakListOptionsFunc, +) InformerManager { manager := &informerManager{ lock: sync.RWMutex{}, started: false, client: client, + informerTweakListOptions: informerTweakListOptions, ftcInformer: ftcInformer, eventHandlerGenerators: []*EventHandlerGenerator{}, - gvrMapping: bijection.NewBijection[string, schema.GroupVersionResource](), + gvkMapping: bijection.NewBijection[string, schema.GroupVersionKind](), + lastObservedFTCs: map[string]*fedcorev1a1.FederatedTypeConfig{}, informers: map[string]informers.GenericInformer{}, informerCancelFuncs: map[string]context.CancelFunc{}, eventHandlerRegistrations: map[string]map[*EventHandlerGenerator]cache.ResourceEventHandlerRegistration{}, @@ -144,21 +152,26 @@ func (m *informerManager) worker(ctx context.Context) { } } -func (m *informerManager) processFTC(ctx context.Context, ftc *fedcorev1a1.FederatedTypeConfig) (err error, needReenqueue bool) { +func (m *informerManager) processFTC( + ctx context.Context, + ftc *fedcorev1a1.FederatedTypeConfig, +) (err error, needReenqueue bool) { m.lock.Lock() defer m.lock.Unlock() + ftc = ftc.DeepCopy() ftcName := ftc.Name + gvk := ftc.GetSourceTypeGVK() gvr := ftc.GetSourceTypeGVR() - ctx, logger := logging.InjectLoggerValues(ctx, "gvr", gvr.String()) + ctx, logger := logging.InjectLoggerValues(ctx, "gvk", gvk.String()) var informer informers.GenericInformer - if oldGVR, exists := m.gvrMapping.LookupByT1(ftcName); exists { - ctx, _ := logging.InjectLoggerValues(ctx, "old-gvr", oldGVR.String()) + if oldGVK, exists := m.gvkMapping.LookupByT1(ftcName); exists { + ctx, _ := logging.InjectLoggerValues(ctx, "old-gvk", oldGVK.String()) - if oldGVR != gvr { + if oldGVK != gvk { // This might occur if a ftc was deleted and recreated with a different source type within a short period of // time and we missed processing the deletion. We simply process the ftc deletion and reenqueue. Note: // updating of ftc source types, however, is still not a supported use case. @@ -168,8 +181,8 @@ func (m *informerManager) processFTC(ctx context.Context, ftc *fedcorev1a1.Feder informer = m.informers[ftcName] } else { - if err := m.gvrMapping.Add(ftcName, gvr); err != nil { - // There must be another ftc with the same source type GVR. + if err := m.gvkMapping.Add(ftcName, gvk); err != nil { + // There must be another ftc with the same source type GVK. return fmt.Errorf("source type is already referenced by another FederatedTypeConfig: %w", err), false } @@ -181,11 +194,12 @@ func (m *informerManager) processFTC(ctx context.Context, ftc *fedcorev1a1.Feder metav1.NamespaceAll, 0, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, - nil, + m.informerTweakListOptions, ) ctx, cancel := context.WithCancel(ctx) go informer.Informer().Run(ctx.Done()) + m.lastObservedFTCs[ftcName] = ftc m.informers[ftcName] = informer m.informerCancelFuncs[ftcName] = cancel m.eventHandlerRegistrations[ftcName] = map[*EventHandlerGenerator]cache.ResourceEventHandlerRegistration{} @@ -195,8 +209,6 @@ func (m *informerManager) processFTC(ctx context.Context, ftc *fedcorev1a1.Feder registrations := m.eventHandlerRegistrations[ftcName] lastAppliedFTCs := m.lastAppliedFTCsCache[ftcName] - ftc = ftc.DeepCopy() - for _, generator := range m.eventHandlerGenerators { lastApplied := lastAppliedFTCs[generator] if !generator.Predicate(lastApplied, ftc) { @@ -230,8 +242,8 @@ func (m *informerManager) processFTCDeletion(ctx context.Context, ftcName string m.lock.Lock() defer m.lock.Unlock() - if gvr, exists := m.gvrMapping.LookupByT1(ftcName); exists { - ctx, _ = logging.InjectLoggerValues(ctx, "gvr", gvr.String()) + if gvk, exists := m.gvkMapping.LookupByT1(ftcName); exists { + ctx, _ = logging.InjectLoggerValues(ctx, "gvk", gvk.String()) } return m.processFTCDeletionUnlocked(ctx, ftcName) @@ -243,8 +255,9 @@ func (m *informerManager) processFTCDeletionUnlocked(ctx context.Context, ftcNam cancel() } - m.gvrMapping.DeleteT1(ftcName) + m.gvkMapping.DeleteT1(ftcName) + delete(m.lastObservedFTCs, ftcName) delete(m.informers, ftcName) delete(m.informerCancelFuncs, ftcName) delete(m.eventHandlerRegistrations, ftcName) @@ -269,12 +282,12 @@ func (m *informerManager) GetFederatedTypeConfigLister() fedcorev1a1listers.Fede } func (m *informerManager) GetResourceLister( - gvr schema.GroupVersionResource, + gvk schema.GroupVersionKind, ) (lister cache.GenericLister, informerSynced cache.InformerSynced, exists bool) { m.lock.RLock() defer m.lock.RUnlock() - ftc, ok := m.gvrMapping.LookupByT2(gvr) + ftc, ok := m.gvkMapping.LookupByT2(gvk) if !ok { return nil, nil, false } @@ -287,6 +300,23 @@ func (m *informerManager) GetResourceLister( return informer.Lister(), informer.Informer().HasSynced, true } +func (m *informerManager) GetResourceFTC(gvk schema.GroupVersionKind) (*fedcorev1a1.FederatedTypeConfig, bool) { + m.lock.RLock() + defer m.lock.RUnlock() + + ftcName, ok := m.gvkMapping.LookupByT2(gvk) + if !ok { + return nil, false + } + + ftc := m.lastObservedFTCs[ftcName] + if ftc == nil { + return nil, false + } + + return ftc, true +} + func (m *informerManager) HasSynced() bool { return m.ftcInformer.Informer().HasSynced() } diff --git a/pkg/util/informermanager/informermanager_test.go b/pkg/util/informermanager/informermanager_test.go index e4cf4ae0..8e1cb36d 100644 --- a/pkg/util/informermanager/informermanager_test.go +++ b/pkg/util/informermanager/informermanager_test.go @@ -46,6 +46,84 @@ func TestInformerManager(t *testing.T) { t.Parallel() ctx := klog.NewContext(context.Background(), ktesting.NewLogger(t, ktesting.NewConfig(ktesting.Verbosity(2)))) + t.Run("GVK mappings for existing FTCs should be available eventually", func(t *testing.T) { + t.Parallel() + g := gomega.NewWithT(t) + + // 1. Bootstrap environment + + defaultFTCs := []*fedcorev1a1.FederatedTypeConfig{deploymentFTC, configmapFTC, secretFTC} + defaultObjs := []*unstructured.Unstructured{} + generators := []*EventHandlerGenerator{} + + ctx, cancel := context.WithCancel(ctx) + manager, _, _ := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + defer func() { + cancel() + _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) + }() + + // 2. Verify that the GVK mapping for each FTC is eventually available + + for _, ftc := range defaultFTCs { + gvk := ftc.GetSourceTypeGVK() + + g.Eventually(func(g gomega.Gomega) { + resourceFTC, exists := manager.GetResourceFTC(gvk) + g.Expect(exists).To(gomega.BeTrue()) + g.Expect(resourceFTC).To(gomega.Equal(ftc)) + }).WithTimeout(time.Second * 2).Should(gomega.Succeed()) + } + + // 3. Verify that the GVK mapping for a non-existent FTC is not available + + ftc, exists := manager.GetResourceFTC(daemonsetGVK) + g.Expect(exists).To(gomega.BeFalse()) + g.Expect(ftc).To(gomega.BeNil()) + }) + + t.Run("GVK mapping for new FTC should be available eventually", func(t *testing.T) { + t.Parallel() + g := gomega.NewWithT(t) + + // 1. Bootstrap environment + + defaultFTCs := []*fedcorev1a1.FederatedTypeConfig{} + defaultObjs := []*unstructured.Unstructured{} + generators := []*EventHandlerGenerator{} + + ctx, cancel := context.WithCancel(ctx) + manager, _, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + defer func() { + cancel() + _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) + }() + + ftc := daemonsetFTC + gvk := ftc.GetSourceTypeGVK() + + // 2. Verify that the GVK mapping for daemonsets is not available at the start + + g.Consistently(func(g gomega.Gomega) { + resourceFTC, exists := manager.GetResourceFTC(gvk) + g.Expect(exists).To(gomega.BeFalse()) + g.Expect(resourceFTC).To(gomega.BeNil()) + }).WithTimeout(time.Second * 2).Should(gomega.Succeed()) + + // 3. Create the daemonset FTC. + + _, err := fedClient.CoreV1alpha1().FederatedTypeConfigs().Create(ctx, ftc, metav1.CreateOptions{}) + g.Expect(err).ToNot(gomega.HaveOccurred()) + + // 4. Verify that the GVK mapping for daemonsets is eventually available + + g.Eventually(func(g gomega.Gomega) { + resourceFTC, exists := manager.GetResourceFTC(gvk) + g.Expect(exists).To(gomega.BeTrue()) + g.Expect(resourceFTC).To(gomega.Equal(ftc)) + }).WithTimeout(time.Second * 2).Should(gomega.Succeed()) + }) + t.Run("listers for existing FTCs should be available eventually", func(t *testing.T) { t.Parallel() g := gomega.NewWithT(t) @@ -66,10 +144,10 @@ func TestInformerManager(t *testing.T) { // 2. Verify that the listers for each FTC is eventually available for _, ftc := range defaultFTCs { - gvr := ftc.GetSourceTypeGVR() + gvk := ftc.GetSourceTypeGVK() g.Eventually(func(g gomega.Gomega) { - lister, informerSynced, exists := manager.GetResourceLister(gvr) + lister, informerSynced, exists := manager.GetResourceLister(gvk) g.Expect(exists).To(gomega.BeTrue()) g.Expect(lister).ToNot(gomega.BeNil()) g.Expect(informerSynced()).To(gomega.BeTrue()) @@ -78,7 +156,7 @@ func TestInformerManager(t *testing.T) { // 3. Verify that the lister for a non-existent FTC is not available - lister, informerSynced, exists := manager.GetResourceLister(common.DaemonSetGVR) + lister, informerSynced, exists := manager.GetResourceLister(daemonsetGVK) g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) @@ -102,12 +180,12 @@ func TestInformerManager(t *testing.T) { }() ftc := daemonsetFTC - gvr := ftc.GetSourceTypeGVR() + gvk := ftc.GetSourceTypeGVK() // 2. Verify that the lister for daemonsets is not available at the start g.Consistently(func(g gomega.Gomega) { - lister, informerSynced, exists := manager.GetResourceLister(gvr) + lister, informerSynced, exists := manager.GetResourceLister(gvk) g.Expect(exists).To(gomega.BeFalse()) g.Expect(lister).To(gomega.BeNil()) g.Expect(informerSynced).To(gomega.BeNil()) @@ -121,7 +199,7 @@ func TestInformerManager(t *testing.T) { // 4. Verify that the lister for daemonsets is eventually available g.Eventually(func(g gomega.Gomega) { - lister, informerSynced, exists := manager.GetResourceLister(gvr) + lister, informerSynced, exists := manager.GetResourceLister(gvk) g.Expect(exists).To(gomega.BeTrue()) g.Expect(lister).ToNot(gomega.BeNil()) g.Expect(informerSynced()).To(gomega.BeTrue()) @@ -155,7 +233,13 @@ func TestInformerManager(t *testing.T) { } ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, _ := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, _ := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -246,7 +330,13 @@ func TestInformerManager(t *testing.T) { } ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -381,7 +471,13 @@ func TestInformerManager(t *testing.T) { generators := []*EventHandlerGenerator{generator} ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -434,7 +530,13 @@ func TestInformerManager(t *testing.T) { generators := []*EventHandlerGenerator{generator} ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -486,7 +588,13 @@ func TestInformerManager(t *testing.T) { generators := []*EventHandlerGenerator{generator} ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -539,7 +647,13 @@ func TestInformerManager(t *testing.T) { generators := []*EventHandlerGenerator{generator} ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -598,7 +712,13 @@ func TestInformerManager(t *testing.T) { generators := []*EventHandlerGenerator{generator1, generator2} ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients(g, ctx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, fedClient := bootstrapInformerManagerWithFakeClients( + g, + ctx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -622,7 +742,9 @@ func TestInformerManager(t *testing.T) { // 3. Delete the deployment FTC - err := fedClient.CoreV1alpha1().FederatedTypeConfigs().Delete(ctx, deploymentFTC.GetName(), metav1.DeleteOptions{}) + err := fedClient.CoreV1alpha1(). + FederatedTypeConfigs(). + Delete(ctx, deploymentFTC.GetName(), metav1.DeleteOptions{}) g.Expect(err).ToNot(gomega.HaveOccurred()) <-time.After(time.Second) @@ -691,7 +813,13 @@ func TestInformerManager(t *testing.T) { managerCtx, managerCancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx) - manager, dynamicClient, _ := bootstrapInformerManagerWithFakeClients(g, managerCtx, defaultFTCs, defaultObjs, generators) + manager, dynamicClient, _ := bootstrapInformerManagerWithFakeClients( + g, + managerCtx, + defaultFTCs, + defaultObjs, + generators, + ) defer func() { cancel() _ = wait.PollInfinite(time.Millisecond, func() (done bool, err error) { return manager.IsShutdown(), nil }) @@ -775,7 +903,7 @@ func bootstrapInformerManagerWithFakeClients( fedClient := fake.NewSimpleClientset(fedObjects...) factory := fedinformers.NewSharedInformerFactory(fedClient, 0) - informerManager := NewInformerManager(dynamicClient, factory.Core().V1alpha1().FederatedTypeConfigs()) + informerManager := NewInformerManager(dynamicClient, factory.Core().V1alpha1().FederatedTypeConfigs(), nil) for _, generator := range eventHandlerGenerators { err := informerManager.AddEventHandlerGenerator(generator) diff --git a/pkg/util/informermanager/interface.go b/pkg/util/informermanager/interface.go index d9d15910..88bc1c15 100644 --- a/pkg/util/informermanager/interface.go +++ b/pkg/util/informermanager/interface.go @@ -50,7 +50,9 @@ type InformerManager interface { AddEventHandlerGenerator(generator *EventHandlerGenerator) error // Returns a lister for the given GroupResourceVersion if it exists. The lister for each FTC's source type will // eventually exist. - GetResourceLister(gvr schema.GroupVersionResource) (lister cache.GenericLister, informerSynced cache.InformerSynced, exists bool) + GetResourceLister(gvk schema.GroupVersionKind) (lister cache.GenericLister, informerSynced cache.InformerSynced, exists bool) + // Returns the known FTC mapping for the given GVK if it exists. + GetResourceFTC(gvk schema.GroupVersionKind) (ftc *fedcorev1a1.FederatedTypeConfig, exists bool) // Returns the FederatedTypeConfig lister used by the InformerManager. GetFederatedTypeConfigLister() fedcorev1a1listers.FederatedTypeConfigLister @@ -93,7 +95,7 @@ type FederatedInformerManager interface { // Returns a lister for the given GroupResourceVersion and cluster if it exists. The lister for each FTC's source // type and cluster will eventually exist. GetResourceLister( - gvr schema.GroupVersionResource, + gvk schema.GroupVersionKind, cluster string, ) (lister cache.GenericLister, informerSynced cache.InformerSynced, exists bool) // Returns a client for the given cluster if it exists. The client for each cluster will eventually exist. diff --git a/pkg/util/informermanager/testutils_test.go b/pkg/util/informermanager/testutils_test.go index 044b6579..08a4df11 100644 --- a/pkg/util/informermanager/testutils_test.go +++ b/pkg/util/informermanager/testutils_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/tools/cache" fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" + "github.com/kubewharf/kubeadmiral/pkg/util/managedlabel" ) var ( @@ -112,6 +113,9 @@ func getTestDeployment(name, namespace string) *unstructured.Unstructured { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + managedlabel.ManagedByKubeAdmiralLabelKey: managedlabel.ManagedByKubeAdmiralLabelValue, + }, }, } @@ -132,6 +136,9 @@ func getTestConfigMap(name, namespace string) *unstructured.Unstructured { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + managedlabel.ManagedByKubeAdmiralLabelKey: managedlabel.ManagedByKubeAdmiralLabelValue, + }, }, } @@ -152,6 +159,9 @@ func getTestSecret(name, namespace string) *unstructured.Unstructured { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + managedlabel.ManagedByKubeAdmiralLabelKey: managedlabel.ManagedByKubeAdmiralLabelValue, + }, }, } @@ -172,6 +182,9 @@ func getTestDaemonSet(name, namespace string) *unstructured.Unstructured { ObjectMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, + Labels: map[string]string{ + managedlabel.ManagedByKubeAdmiralLabelKey: managedlabel.ManagedByKubeAdmiralLabelValue, + }, }, } diff --git a/pkg/controllers/util/managedlabel/managedlabel.go b/pkg/util/managedlabel/managedlabel.go similarity index 100% rename from pkg/controllers/util/managedlabel/managedlabel.go rename to pkg/util/managedlabel/managedlabel.go