From 81aca45d8308f37d4d2da7fcc07644e54a82fb0c Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Wed, 15 Nov 2023 16:28:05 +0100 Subject: [PATCH] add lru cache to deduplicate events from watches Signed-off-by: Matthias Bertschy --- adapters/incluster/v1/client.go | 19 ++++++--- go.mod | 1 + go.sum | 4 +- utils/cooldownqueue.go | 69 +++++++++++++++++++++++++++++++++ 4 files changed, 85 insertions(+), 8 deletions(-) create mode 100644 utils/cooldownqueue.go diff --git a/adapters/incluster/v1/client.go b/adapters/incluster/v1/client.go index 38ac91f..2b0aacc 100644 --- a/adapters/incluster/v1/client.go +++ b/adapters/incluster/v1/client.go @@ -89,13 +89,20 @@ func (c *Client) Start(ctx context.Context) error { if err != nil { logger.L().Fatal("unable to watch for resources", helpers.String("resource", c.res.Resource), helpers.Error(err)) } - for { - ctx := utils.ContextFromGeneric(ctx, domain.Generic{}) - event, chanActive := <-watcher.ResultChan() - if !chanActive { - watcher.Stop() - break + eventQueue := utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL) + go func() { + for { + event, chanActive := <-watcher.ResultChan() + if !chanActive { + watcher.Stop() + eventQueue.Stop() + break + } + eventQueue.Enqueue(event) } + }() + for event := range eventQueue.ResultChan { + ctx := utils.ContextFromGeneric(ctx, domain.Generic{}) if event.Type == watch.Error { logger.L().Error("watch event failed", helpers.String("resource", c.res.Resource), helpers.Interface("event", event)) watcher.Stop() diff --git a/go.mod b/go.mod index a3a9b5b..666c2af 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/evanphx/json-patch v4.12.0+incompatible github.com/gobwas/ws v1.3.0 github.com/google/uuid v1.3.1 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/kubescape/backend v0.0.12-0.20231108114302-f08509d35566 github.com/kubescape/go-logger v0.0.21 github.com/kubescape/messaging v0.0.17 diff --git a/go.sum b/go.sum index be196c4..ab3663c 100644 --- a/go.sum +++ b/go.sum @@ -233,6 +233,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8 github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -263,8 +265,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kubescape/backend v0.0.12-0.20231108114302-f08509d35566 h1:7mMyKAWh08UrxnAp4wTmenhYVUpj80YkkOrdif9FrQc= github.com/kubescape/backend v0.0.12-0.20231108114302-f08509d35566/go.mod h1:ug9NFmmxT4DcQx3sgdLRzlLPWMKGHE/fpbcYUm5G5Qo= -github.com/kubescape/backend v0.0.13 h1:N+fH8giGGqvy3ff2li2AwG5guVduhdiPWyvZaZxrDCU= -github.com/kubescape/backend v0.0.13/go.mod h1:ug9NFmmxT4DcQx3sgdLRzlLPWMKGHE/fpbcYUm5G5Qo= github.com/kubescape/go-logger v0.0.21 h1:4ZRIEw3UGUH6BG/cH3yiqFipzQSfGAoCrxlsZuk37ys= github.com/kubescape/go-logger v0.0.21/go.mod h1:x3HBpZo3cMT/WIdy18BxvVVd5D0e/PWFVk/HiwBNu3g= github.com/kubescape/messaging v0.0.17 h1:8X+TX9ACDcT5zsbWqsXY9UTIugaIo8yjj95OC+hrUe4= diff --git a/utils/cooldownqueue.go b/utils/cooldownqueue.go new file mode 100644 index 0000000..f423f7e --- /dev/null +++ b/utils/cooldownqueue.go @@ -0,0 +1,69 @@ +package utils + +import ( + "time" + + lru "github.com/hashicorp/golang-lru/v2/expirable" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" +) + +const ( + // Default size for the cooldown queue + DefaultQueueSize = 512 + // Default TTL for events put in the queue + DefaultTTL = 5 * time.Second +) + +// CooldownQueue is a queue that lets clients put events into it with a cooldown +// +// When a client puts an event into a queue, it forwards the event to its +// output channel and starts a cooldown for this event. If a client attempts to +// put the same event into the queue while the cooldown is running, the queue +// will silently drop the event. When the cooldown resets and a client puts the +// same event into the queue, it will be forwarded to the output channel +type CooldownQueue struct { + seenEvents *lru.LRU[string, bool] + // inner channel for producing events + innerChan chan watch.Event + // public channel for reading events + ResultChan <-chan watch.Event +} + +// NewCooldownQueue returns a new Cooldown Queue +func NewCooldownQueue(size int, cooldown time.Duration) *CooldownQueue { + cache := lru.NewLRU[string, bool](size, nil, cooldown) + events := make(chan watch.Event) + return &CooldownQueue{ + seenEvents: cache, + innerChan: events, + ResultChan: events, + } +} + +// makeEventKey creates a unique key for an event from a watcher +func makeEventKey(e watch.Event) string { + object, ok := e.Object.(*unstructured.Unstructured) + if !ok { + return "" + } + eventKey := string(e.Type) + "-" + string(object.GetUID()) + return eventKey +} + +// Enqueue enqueues an event in the Cooldown Queue +func (q *CooldownQueue) Enqueue(e watch.Event) { + eventKey := makeEventKey(e) + _, exists := q.seenEvents.Get(eventKey) + if exists { + return + } + go func() { + q.innerChan <- e + }() + q.seenEvents.Add(eventKey, true) +} + +func (q *CooldownQueue) Stop() { + close(q.innerChan) +}