From 3c09e402d47de4cd92edfd5e21dc322bc43f3009 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Thu, 21 Dec 2023 17:28:47 +0100 Subject: [PATCH] restart watchers when they expire Signed-off-by: Matthias Bertschy --- adapters/incluster/v1/client.go | 43 +++++++++++++++++++++++---------- utils/cooldownqueue.go | 4 +++ 2 files changed, 34 insertions(+), 13 deletions(-) diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index d7e3ec8..f12d35e 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -2,8 +2,10 @@ package incluster import ( "context" + "errors" "fmt" "slices" + "time" jsonpatch "github.com/evanphx/json-patch" "github.com/kubescape/go-logger" @@ -19,6 +21,11 @@ import ( "k8s.io/client-go/dynamic" ) +// resourceVersionGetter is an interface used to get resource version from events. +type resourceVersionGetter interface { + GetResourceVersion() string +} + type Client struct { client dynamic.Interface account string @@ -86,28 +93,38 @@ func (c *Client) Start(ctx context.Context) error { watchOpts.ResourceVersion = list.GetResourceVersion() } // begin watch - watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts) - if err != nil { - logger.L().Ctx(ctx).Fatal("unable to watch for resources", helpers.String("resource", c.res.Resource), helpers.Error(err)) - } eventQueue := utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL) go func() { for { - event, chanActive := <-watcher.ResultChan() - if !chanActive { - watcher.Stop() - eventQueue.Stop() - break + watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts) + if err != nil { + logger.L().Ctx(ctx).Fatal("unable to watch for resources", helpers.String("resource", c.res.Resource), helpers.Error(err)) + } + for { + event, chanActive := <-watcher.ResultChan() + if eventQueue.Closed() { + watcher.Stop() + return + } + if !chanActive { + logger.L().Debug("watcher channel closed, restarting in 10s", helpers.String("resource", c.res.Resource)) + time.Sleep(10 * time.Second) + break + } + // set resource version to resume watch from + metaObject, ok := event.Object.(resourceVersionGetter) + if ok { + watchOpts.ResourceVersion = metaObject.GetResourceVersion() + } + eventQueue.Enqueue(event) } - eventQueue.Enqueue(event) } }() for event := range eventQueue.ResultChan { ctx := utils.ContextFromGeneric(ctx, domain.Generic{}) if event.Type == watch.Error { - logger.L().Ctx(ctx).Error("watch event failed", helpers.String("resource", c.res.Resource), helpers.Interface("event", event)) - watcher.Stop() - break + eventQueue.Stop() + return errors.New("watch event failed") } d, ok := event.Object.(*unstructured.Unstructured) if !ok { diff --git a/utils/cooldownqueue.go b/utils/cooldownqueue.go index 8ca8c8c..51c2383 100644 --- a/utils/cooldownqueue.go +++ b/utils/cooldownqueue.go @@ -52,6 +52,10 @@ func makeEventKey(e watch.Event) string { return eventKey } +func (q *CooldownQueue) Closed() bool { + return q.closed +} + // Enqueue enqueues an event in the Cooldown Queue func (q *CooldownQueue) Enqueue(e watch.Event) { if q.closed {