Skip to content

Commit

Permalink
Merge pull request #7 from kubescape/lru
Browse files Browse the repository at this point in the history
add lru cache to deduplicate events from watches
  • Loading branch information
matthyx authored Nov 16, 2023
2 parents 37af383 + 81aca45 commit 49bc851
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 8 deletions.
19 changes: 13 additions & 6 deletions adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,20 @@ func (c *Client) Start(ctx context.Context) error {
if err != nil {
logger.L().Fatal("unable to watch for resources", helpers.String("resource", c.res.Resource), helpers.Error(err))
}
for {
ctx := utils.ContextFromGeneric(ctx, domain.Generic{})
event, chanActive := <-watcher.ResultChan()
if !chanActive {
watcher.Stop()
break
eventQueue := utils.NewCooldownQueue(utils.DefaultQueueSize, utils.DefaultTTL)
go func() {
for {
event, chanActive := <-watcher.ResultChan()
if !chanActive {
watcher.Stop()
eventQueue.Stop()
break
}
eventQueue.Enqueue(event)
}
}()
for event := range eventQueue.ResultChan {
ctx := utils.ContextFromGeneric(ctx, domain.Generic{})
if event.Type == watch.Error {
logger.L().Error("watch event failed", helpers.String("resource", c.res.Resource), helpers.Interface("event", event))
watcher.Stop()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/evanphx/json-patch v4.12.0+incompatible
github.com/gobwas/ws v1.3.0
github.com/google/uuid v1.3.1
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/kubescape/backend v0.0.12-0.20231108114302-f08509d35566
github.com/kubescape/go-logger v0.0.21
github.com/kubescape/messaging v0.0.17
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
Expand Down Expand Up @@ -263,8 +265,6 @@ github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kubescape/backend v0.0.12-0.20231108114302-f08509d35566 h1:7mMyKAWh08UrxnAp4wTmenhYVUpj80YkkOrdif9FrQc=
github.com/kubescape/backend v0.0.12-0.20231108114302-f08509d35566/go.mod h1:ug9NFmmxT4DcQx3sgdLRzlLPWMKGHE/fpbcYUm5G5Qo=
github.com/kubescape/backend v0.0.13 h1:N+fH8giGGqvy3ff2li2AwG5guVduhdiPWyvZaZxrDCU=
github.com/kubescape/backend v0.0.13/go.mod h1:ug9NFmmxT4DcQx3sgdLRzlLPWMKGHE/fpbcYUm5G5Qo=
github.com/kubescape/go-logger v0.0.21 h1:4ZRIEw3UGUH6BG/cH3yiqFipzQSfGAoCrxlsZuk37ys=
github.com/kubescape/go-logger v0.0.21/go.mod h1:x3HBpZo3cMT/WIdy18BxvVVd5D0e/PWFVk/HiwBNu3g=
github.com/kubescape/messaging v0.0.17 h1:8X+TX9ACDcT5zsbWqsXY9UTIugaIo8yjj95OC+hrUe4=
Expand Down
69 changes: 69 additions & 0 deletions utils/cooldownqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package utils

import (
"time"

lru "github.com/hashicorp/golang-lru/v2/expirable"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)

const (
// Default size for the cooldown queue
DefaultQueueSize = 512
// Default TTL for events put in the queue
DefaultTTL = 5 * time.Second
)

// CooldownQueue is a queue that lets clients put events into it with a cooldown
//
// When a client puts an event into a queue, it forwards the event to its
// output channel and starts a cooldown for this event. If a client attempts to
// put the same event into the queue while the cooldown is running, the queue
// will silently drop the event. When the cooldown resets and a client puts the
// same event into the queue, it will be forwarded to the output channel
type CooldownQueue struct {
seenEvents *lru.LRU[string, bool]
// inner channel for producing events
innerChan chan watch.Event
// public channel for reading events
ResultChan <-chan watch.Event
}

// NewCooldownQueue returns a new Cooldown Queue
func NewCooldownQueue(size int, cooldown time.Duration) *CooldownQueue {
cache := lru.NewLRU[string, bool](size, nil, cooldown)
events := make(chan watch.Event)
return &CooldownQueue{
seenEvents: cache,
innerChan: events,
ResultChan: events,
}
}

// makeEventKey creates a unique key for an event from a watcher
func makeEventKey(e watch.Event) string {
object, ok := e.Object.(*unstructured.Unstructured)
if !ok {
return ""
}
eventKey := string(e.Type) + "-" + string(object.GetUID())
return eventKey
}

// Enqueue enqueues an event in the Cooldown Queue
func (q *CooldownQueue) Enqueue(e watch.Event) {
eventKey := makeEventKey(e)
_, exists := q.seenEvents.Get(eventKey)
if exists {
return
}
go func() {
q.innerChan <- e
}()
q.seenEvents.Add(eventKey, true)
}

func (q *CooldownQueue) Stop() {
close(q.innerChan)
}

0 comments on commit 49bc851

Please sign in to comment.