Skip to content

Commit

Permalink
restart watchers when they expire
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <[email protected]>
  • Loading branch information
matthyx committed Dec 22, 2023
1 parent 4ed3fd8 commit 3c09e40
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 13 deletions.
43 changes: 30 additions & 13 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package incluster

import (
"context"
"errors"
"fmt"
"slices"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/kubescape/go-logger"
Expand All @@ -19,6 +21,11 @@ import (
"k8s.io/client-go/dynamic"
)

// resourceVersionGetter is an interface used to get resource version from events.
type resourceVersionGetter interface {
GetResourceVersion() string
}

type Client struct {
client dynamic.Interface
account string
Expand Down Expand Up @@ -86,28 +93,38 @@ func (c *Client) Start(ctx context.Context) error {
watchOpts.ResourceVersion = list.GetResourceVersion()
}
// begin watch
watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
logger.L().Ctx(ctx).Fatal("unable to watch for resources", helpers.String("resource", c.res.Resource), helpers.Error(err))
}
eventQueue := utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL)
go func() {
for {
event, chanActive := <-watcher.ResultChan()
if !chanActive {
watcher.Stop()
eventQueue.Stop()
break
watcher, err := c.client.Resource(c.res).Namespace("").Watch(context.Background(), watchOpts)
if err != nil {
logger.L().Ctx(ctx).Fatal("unable to watch for resources", helpers.String("resource", c.res.Resource), helpers.Error(err))
}
for {
event, chanActive := <-watcher.ResultChan()
if eventQueue.Closed() {
watcher.Stop()
return
}
if !chanActive {
logger.L().Debug("watcher channel closed, restarting in 10s", helpers.String("resource", c.res.Resource))
time.Sleep(10 * time.Second)
break
}
// set resource version to resume watch from
metaObject, ok := event.Object.(resourceVersionGetter)
if ok {
watchOpts.ResourceVersion = metaObject.GetResourceVersion()
}
eventQueue.Enqueue(event)
}
eventQueue.Enqueue(event)
}
}()
for event := range eventQueue.ResultChan {
ctx := utils.ContextFromGeneric(ctx, domain.Generic{})
if event.Type == watch.Error {
logger.L().Ctx(ctx).Error("watch event failed", helpers.String("resource", c.res.Resource), helpers.Interface("event", event))
watcher.Stop()
break
eventQueue.Stop()
return errors.New("watch event failed")
}
d, ok := event.Object.(*unstructured.Unstructured)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions utils/cooldownqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func makeEventKey(e watch.Event) string {
return eventKey
}

func (q *CooldownQueue) Closed() bool {
return q.closed
}

// Enqueue enqueues an event in the Cooldown Queue
func (q *CooldownQueue) Enqueue(e watch.Event) {
if q.closed {
Expand Down

0 comments on commit 3c09e40

Please sign in to comment.