From c20ea143a236a34fb331e6c04820b75aac444e7d Mon Sep 17 00:00:00 2001 From: Max Smythe Date: Tue, 22 Aug 2023 06:12:55 -0700 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Allow=20non-blocking=20retrieval=20?= =?UTF-8?q?of=20informers=20(#2371)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Allow non-blocking retrieval of informers Signed-off-by: Max Smythe Re-organize functional arguments Signed-off-by: Max Smythe Add unit tests Signed-off-by: Max Smythe Add deferred cancel call to test Signed-off-by: Max Smythe Run gofmt Signed-off-by: Max Smythe * Update pkg/cache/internal/informers.go Co-authored-by: Stefan Büringer <4662360+sbueringer@users.noreply.github.com> * Update pkg/cache/internal/informers.go Co-authored-by: Stefan Büringer <4662360+sbueringer@users.noreply.github.com> * Alias functional options * Use private option for newInformer override * Fix lint errors --------- Signed-off-by: Max Smythe Co-authored-by: Stefan Büringer <4662360+sbueringer@users.noreply.github.com> --- pkg/builder/controller_test.go | 6 +- pkg/cache/cache.go | 22 +++++- pkg/cache/cache_test.go | 91 +++++++++++++++++++++- pkg/cache/delegating_by_gvk_cache.go | 8 +- pkg/cache/informer_cache.go | 18 +++-- pkg/cache/informertest/fake_cache.go | 4 +- pkg/cache/internal/informers.go | 27 ++++++- pkg/cache/multi_namespace_cache.go | 12 +-- pkg/internal/controller/controller_test.go | 2 +- 9 files changed, 163 insertions(+), 27 deletions(-) diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index ff734abd83..a70ece37c4 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -576,15 +576,15 @@ type nonTypedOnlyCache struct { cache.Cache } -func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { +func (c *nonTypedOnlyCache) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) { switch obj.(type) { case (*metav1.PartialObjectMetadata): - return c.Cache.GetInformer(ctx, obj) + return c.Cache.GetInformer(ctx, obj, opts...) default: return nil, fmt.Errorf("did not want to provide an informer for normal type %T", obj) } } -func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) { +func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) { return nil, fmt.Errorf("don't try to sidestep the restriction on informer types by calling GetInformerForKind") } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index e317c99e77..1ea44d9b83 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -44,6 +44,20 @@ var ( defaultSyncPeriod = 10 * time.Hour ) +// InformerGetOptions defines the behavior of how informers are retrieved. +type InformerGetOptions internal.GetOptions + +// InformerGetOption defines an option that alters the behavior of how informers are retrieved. +type InformerGetOption func(*InformerGetOptions) + +// BlockUntilSynced determines whether a get request for an informer should block +// until the informer's cache has synced. +func BlockUntilSynced(shouldBlock bool) InformerGetOption { + return func(opts *InformerGetOptions) { + opts.BlockUntilSynced = &shouldBlock + } +} + // Cache knows how to load Kubernetes objects, fetch informers to request // to receive events for Kubernetes objects (at a low-level), // and add indices to fields on the objects stored in the cache. @@ -61,11 +75,11 @@ type Cache interface { type Informers interface { // GetInformer fetches or constructs an informer for the given object that corresponds to a single // API kind and resource. - GetInformer(ctx context.Context, obj client.Object) (Informer, error) + GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) // GetInformerForKind is similar to GetInformer, except that it takes a group-version-kind, instead // of the underlying object. - GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) + GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) // Start runs all the informers known to this cache until the context is closed. // It blocks. @@ -187,6 +201,9 @@ type Options struct { // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. // object, this will fall through to Default* settings. ByObject map[client.Object]ByObject + + // newInformer allows overriding of NewSharedIndexInformer for testing. + newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer } // ByObject offers more fine-grained control over the cache's ListWatch by object. @@ -337,6 +354,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { }, Transform: config.Transform, UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), + NewInformer: opts.newInformer, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index f66819ddbf..d98f5f92ee 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -24,6 +24,7 @@ import ( "sort" "strconv" "strings" + "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -43,6 +44,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllertest" ) const testNodeOne = "test-node-1" @@ -117,6 +119,7 @@ func deletePod(pod client.Object) { var _ = Describe("Informer Cache", func() { CacheTest(cache.New, cache.Options{}) + NonBlockingGetTest(cache.New, cache.Options{}) }) var _ = Describe("Informer Cache with ReaderFailOnMissingInformer", func() { @@ -131,12 +134,22 @@ var _ = Describe("Multi-Namespace Informer Cache", func() { "default": {}, }, }) + NonBlockingGetTest(cache.New, cache.Options{ + DefaultNamespaces: map[string]cache.Config{ + testNamespaceOne: {}, + testNamespaceTwo: {}, + "default": {}, + }, + }) }) var _ = Describe("Informer Cache without global DeepCopy", func() { CacheTest(cache.New, cache.Options{ DefaultUnsafeDisableDeepCopy: pointer.Bool(true), }) + NonBlockingGetTest(cache.New, cache.Options{ + DefaultUnsafeDisableDeepCopy: pointer.Bool(true), + }) }) var _ = Describe("Cache with transformers", func() { @@ -440,7 +453,6 @@ func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Conf BeforeEach(func() { informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) Expect(cfg).NotTo(BeNil()) - By("creating the informer cache") var err error informerCache, err = createCacheFunc(cfg, opts) @@ -507,6 +519,83 @@ func CacheTestReaderFailOnMissingInformer(createCacheFunc func(config *rest.Conf }) } +func NonBlockingGetTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { + Describe("non-blocking get test", func() { + var ( + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + ) + BeforeEach(func() { + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) + + By("creating expected namespaces") + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + err = ensureNode(testNodeOne, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceOne, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceTwo, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceThree, cl) + Expect(err).NotTo(HaveOccurred()) + + By("creating the informer cache") + v := reflect.ValueOf(&opts).Elem() + newInformerField := v.FieldByName("newInformer") + newFakeInformer := func(_ kcache.ListerWatcher, _ runtime.Object, _ time.Duration, _ kcache.Indexers) kcache.SharedIndexInformer { + return &controllertest.FakeInformer{Synced: false} + } + reflect.NewAt(newInformerField.Type(), newInformerField.Addr().UnsafePointer()). + Elem(). + Set(reflect.ValueOf(&newFakeInformer)) + informerCache, err = createCacheFunc(cfg, opts) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) + }) + + AfterEach(func() { + By("cleaning up created pods") + informerCacheCancel() + }) + + Describe("as an Informer", func() { + It("should be able to get informer for the object without blocking", func() { + By("getting a shared index informer for a pod") + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "informer-obj", + Namespace: "default", + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "nginx", + Image: "nginx", + }, + }, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + sii, err := informerCache.GetInformer(ctx, pod, cache.BlockUntilSynced(false)) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeFalse()) + }) + }) + }) +} + func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (cache.Cache, error), opts cache.Options) { Describe("Cache test", func() { var ( diff --git a/pkg/cache/delegating_by_gvk_cache.go b/pkg/cache/delegating_by_gvk_cache.go index 6d640216a0..f3fa4800d2 100644 --- a/pkg/cache/delegating_by_gvk_cache.go +++ b/pkg/cache/delegating_by_gvk_cache.go @@ -52,16 +52,16 @@ func (dbt *delegatingByGVKCache) List(ctx context.Context, list client.ObjectLis return cache.List(ctx, list, opts...) } -func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { +func (dbt *delegatingByGVKCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) { cache, err := dbt.cacheForObject(obj) if err != nil { return nil, err } - return cache.GetInformer(ctx, obj) + return cache.GetInformer(ctx, obj, opts...) } -func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { - return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk) +func (dbt *delegatingByGVKCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { + return dbt.cacheForGVK(gvk).GetInformerForKind(ctx, gvk, opts...) } func (dbt *delegatingByGVKCache) Start(ctx context.Context) error { diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 4daae02c78..0f1b4e93d2 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -141,15 +141,23 @@ func (ic *informerCache) objectTypeForListObject(list client.ObjectList) (*schem return &gvk, cacheTypeObj, nil } +func applyGetOptions(opts ...InformerGetOption) *internal.GetOptions { + cfg := &InformerGetOptions{} + for _, opt := range opts { + opt(cfg) + } + return (*internal.GetOptions)(cfg) +} + // GetInformerForKind returns the informer for the GroupVersionKind. If no informer exists, one will be started. -func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { +func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { // Map the gvk to an object obj, err := ic.scheme.New(gvk) if err != nil { return nil, err } - _, i, err := ic.Informers.Get(ctx, gvk, obj) + _, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...)) if err != nil { return nil, err } @@ -157,13 +165,13 @@ func (ic *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou } // GetInformer returns the informer for the obj. If no informer exists, one will be started. -func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { +func (ic *informerCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) { gvk, err := apiutil.GVKForObject(obj, ic.scheme) if err != nil { return nil, err } - _, i, err := ic.Informers.Get(ctx, gvk, obj) + _, i, err := ic.Informers.Get(ctx, gvk, obj, applyGetOptions(opts...)) if err != nil { return nil, err } @@ -179,7 +187,7 @@ func (ic *informerCache) getInformerForKind(ctx context.Context, gvk schema.Grou return started, cache, nil } - return ic.Informers.Get(ctx, gvk, obj) + return ic.Informers.Get(ctx, gvk, obj, &internal.GetOptions{}) } // NeedLeaderElection implements the LeaderElectionRunnable interface diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index bb1ccee34b..171117698a 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -40,7 +40,7 @@ type FakeInformers struct { } // GetInformerForKind implements Informers. -func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (cache.Informer, error) { +func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...cache.InformerGetOption) (cache.Informer, error) { if c.Scheme == nil { c.Scheme = scheme.Scheme } @@ -61,7 +61,7 @@ func (c *FakeInformers) FakeInformerForKind(ctx context.Context, gvk schema.Grou } // GetInformer implements Informers. -func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { +func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) { if c.Scheme == nil { c.Scheme = scheme.Scheme } diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 66e5e9ef72..1d2c9ce2b4 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -45,6 +45,7 @@ type InformersOpts struct { Mapper meta.RESTMapper ResyncPeriod time.Duration Namespace string + NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool @@ -52,6 +53,10 @@ type InformersOpts struct { // NewInformers creates a new InformersMap that can create informers under the hood. func NewInformers(config *rest.Config, options *InformersOpts) *Informers { + newInformer := cache.NewSharedIndexInformer + if options.NewInformer != nil { + newInformer = *options.NewInformer + } return &Informers{ config: config, httpClient: options.HTTPClient, @@ -70,6 +75,7 @@ func NewInformers(config *rest.Config, options *InformersOpts) *Informers { selector: options.Selector, transform: options.Transform, unsafeDisableDeepCopy: options.UnsafeDisableDeepCopy, + newInformer: newInformer, } } @@ -88,6 +94,13 @@ type tracker struct { Metadata map[schema.GroupVersionKind]*Cache } +// GetOptions provides configuration to customize the behavior when +// getting an informer. +type GetOptions struct { + // BlockUntilSynced controls if the informer retrieval will block until the informer is synced. Defaults to `true`. + BlockUntilSynced *bool +} + // Informers create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs. // It uses a standard parameter codec constructed based on the given generated Scheme. type Informers struct { @@ -143,6 +156,9 @@ type Informers struct { selector Selector transform cache.TransformFunc unsafeDisableDeepCopy bool + + // NewInformer allows overriding of the shared index informer constructor for testing. + newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -240,7 +256,7 @@ func (ip *Informers) Peek(gvk schema.GroupVersionKind, obj runtime.Object) (res // Get will create a new Informer and add it to the map of specificInformersMap if none exists. Returns // the Informer from the map. -func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object) (bool, *Cache, error) { +func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj runtime.Object, opts *GetOptions) (bool, *Cache, error) { // Return the informer if it is found i, started, ok := ip.Peek(gvk, obj) if !ok { @@ -250,7 +266,12 @@ func (ip *Informers) Get(ctx context.Context, gvk schema.GroupVersionKind, obj r } } - if started && !i.Informer.HasSynced() { + shouldBlock := true + if opts.BlockUntilSynced != nil { + shouldBlock = *opts.BlockUntilSynced + } + + if shouldBlock && started && !i.Informer.HasSynced() { // Wait for it to sync before returning the Informer so that folks don't read from a stale cache. if !cache.WaitForCacheSync(ctx.Done(), i.Informer.HasSynced) { return started, nil, apierrors.NewTimeoutError(fmt.Sprintf("failed waiting for %T Informer to sync", obj), 0) @@ -288,7 +309,7 @@ func (ip *Informers) addInformerToMap(gvk schema.GroupVersionKind, obj runtime.O if err != nil { return nil, false, err } - sharedIndexInformer := cache.NewSharedIndexInformer(&cache.ListWatch{ + sharedIndexInformer := ip.newInformer(&cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { ip.selector.ApplyToList(&opts) return listWatcher.ListFunc(opts) diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index f767ddd951..5b20195d77 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -76,7 +76,7 @@ var _ Cache = &multiNamespaceCache{} // Methods for multiNamespaceCache to conform to the Informers interface. -func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object) (Informer, error) { +func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object, opts ...InformerGetOption) (Informer, error) { // If the object is cluster scoped, get the informer from clusterCache, // if not use the namespaced caches. isNamespaced, err := apiutil.IsObjectNamespaced(obj, c.Scheme, c.RESTMapper) @@ -84,7 +84,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return nil, err } if !isNamespaced { - clusterCacheInformer, err := c.clusterCache.GetInformer(ctx, obj) + clusterCacheInformer, err := c.clusterCache.GetInformer(ctx, obj, opts...) if err != nil { return nil, err } @@ -98,7 +98,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object namespaceToInformer := map[string]Informer{} for ns, cache := range c.namespaceToCache { - informer, err := cache.GetInformer(ctx, obj) + informer, err := cache.GetInformer(ctx, obj, opts...) if err != nil { return nil, err } @@ -108,7 +108,7 @@ func (c *multiNamespaceCache) GetInformer(ctx context.Context, obj client.Object return &multiNamespaceInformer{namespaceToInformer: namespaceToInformer}, nil } -func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind) (Informer, error) { +func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema.GroupVersionKind, opts ...InformerGetOption) (Informer, error) { // If the object is cluster scoped, get the informer from clusterCache, // if not use the namespaced caches. isNamespaced, err := apiutil.IsGVKNamespaced(gvk, c.RESTMapper) @@ -116,7 +116,7 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema return nil, err } if !isNamespaced { - clusterCacheInformer, err := c.clusterCache.GetInformerForKind(ctx, gvk) + clusterCacheInformer, err := c.clusterCache.GetInformerForKind(ctx, gvk, opts...) if err != nil { return nil, err } @@ -130,7 +130,7 @@ func (c *multiNamespaceCache) GetInformerForKind(ctx context.Context, gvk schema namespaceToInformer := map[string]Informer{} for ns, cache := range c.namespaceToCache { - informer, err := cache.GetInformerForKind(ctx, gvk) + informer, err := cache.GetInformerForKind(ctx, gvk, opts...) if err != nil { return nil, err } diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 05f84dc7b3..9575af69b8 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -873,7 +873,7 @@ type cacheWithIndefinitelyBlockingGetInformer struct { cache.Cache } -func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object) (cache.Informer, error) { +func (c *cacheWithIndefinitelyBlockingGetInformer) GetInformer(ctx context.Context, obj client.Object, opts ...cache.InformerGetOption) (cache.Informer, error) { <-ctx.Done() return nil, errors.New("GetInformer timed out") }