From 16dbd6c49baddd163421b063ab9d1a6d2bedea0b Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Tue, 17 Sep 2024 13:23:55 +0200 Subject: [PATCH 1/3] use same cooldownqueue as node-agent and synchronizer Signed-off-by: Matthias Bertschy --- continuousscanning/cooldownqueue.go | 81 ----------------- continuousscanning/service.go | 14 +-- continuousscanning/service_test.go | 4 +- main.go | 3 +- mainhandler/handlerequests.go | 6 +- watcher/cooldownqueue.go | 54 +++++------ watcher/cooldownqueue_test.go | 136 +++++++++++++++++----------- watcher/podwatcher_test.go | 6 +- 8 files changed, 122 insertions(+), 182 deletions(-) delete mode 100644 continuousscanning/cooldownqueue.go diff --git a/continuousscanning/cooldownqueue.go b/continuousscanning/cooldownqueue.go deleted file mode 100644 index 72e3b10..0000000 --- a/continuousscanning/cooldownqueue.go +++ /dev/null @@ -1,81 +0,0 @@ -package continuousscanning - -import ( - "context" - "time" - - lru "github.com/hashicorp/golang-lru/v2/expirable" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/watch" - - "github.com/kubescape/go-logger" - "github.com/kubescape/go-logger/helpers" -) - -const ( - // Default size for the cooldown queue - DefaultQueueSize = 512 - // Default TTL for events put in the queue - DefaultTTL = 5 * time.Second -) - -// cooldownQueue is a queue that lets clients put events into it with a cooldown -// -// When a client puts an event into a queue, it forwards the event to its -// output channel and starts a cooldown for this event. If a client attempts to -// put the same event into the queue while the cooldown is running, the queue -// will silently drop the event. When the cooldown resets and a client puts the -// same event into the queue, it will be forwarded to the output channel -type cooldownQueue struct { - seenEvents *lru.LRU[string, bool] - // inner channel for producing events - innerChan chan watch.Event - // public channel for reading events - ResultChan <-chan watch.Event -} - -// NewCooldownQueue returns a new Cooldown Queue -func NewCooldownQueue(size int, cooldown time.Duration) *cooldownQueue { - lru := lru.NewLRU[string, bool](size, nil, cooldown) - c := make(chan watch.Event) - - return &cooldownQueue{ - seenEvents: lru, - innerChan: c, - ResultChan: c, - } - -} - -func makeEventKey(e watch.Event) string { - object, ok := e.Object.(*unstructured.Unstructured) - if !ok { - return "" - } - - eventKey := string(e.Type) + "-" + string(object.GetUID()) - return eventKey -} - -// Enqueue enqueues an event in the Cooldown Queue -func (q *cooldownQueue) Enqueue(ctx context.Context, e watch.Event) { - eventKey := makeEventKey(e) - logger.L().Debug("Adding event to queue", helpers.String("eventKey", eventKey)) - - _, exists := q.seenEvents.Get(eventKey) - if exists { - logger.L().Debug("key exists, dropping event", helpers.Interface("eventKey", eventKey)) - return - } - - go func() { - logger.L().Debug("pushing event", helpers.Interface("eventKey", eventKey)) - q.innerChan <- e - logger.L().Debug("pushed event", helpers.Interface("event", eventKey)) - }() - q.seenEvents.Add(eventKey, true) -} - -func (q *cooldownQueue) Stop(ctx context.Context) { - close(q.innerChan) -} diff --git a/continuousscanning/service.go b/continuousscanning/service.go index bdad22d..d4ac942 100644 --- a/continuousscanning/service.go +++ b/continuousscanning/service.go @@ -2,8 +2,8 @@ package continuousscanning import ( "context" - "time" + "github.com/kubescape/operator/watcher" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" @@ -19,7 +19,7 @@ type ContinuousScanningService struct { workDone chan struct{} k8sdynamic dynamic.Interface eventHandlers []EventHandler - eventQueue *cooldownQueue + eventQueue *watcher.CooldownQueue } func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.Command { @@ -34,8 +34,8 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C wp.Run(ctx, resourceEventsCh) logger.L().Info("ran watch pool") - go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *cooldownQueue) { - defer out.Stop(ctx) + go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *watcher.CooldownQueue) { + defer out.Stop() for { select { @@ -44,7 +44,7 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C "got event from channel", helpers.Interface("event", e), ) - out.Enqueue(ctx, e) + out.Enqueue(e) case <-shutdownCh: return } @@ -100,9 +100,9 @@ func (s *ContinuousScanningService) Stop() { <-s.workDone } -func NewContinuousScanningService(client dynamic.Interface, tl TargetLoader, queueSize int, sameEventCooldown time.Duration, h ...EventHandler) *ContinuousScanningService { +func NewContinuousScanningService(client dynamic.Interface, tl TargetLoader, h ...EventHandler) *ContinuousScanningService { doneCh := make(chan struct{}) - eventQueue := NewCooldownQueue(queueSize, sameEventCooldown) + eventQueue := watcher.NewCooldownQueue() workDone := make(chan struct{}) return &ContinuousScanningService{ diff --git a/continuousscanning/service_test.go b/continuousscanning/service_test.go index ea26b70..13b2979 100644 --- a/continuousscanning/service_test.go +++ b/continuousscanning/service_test.go @@ -151,7 +151,7 @@ func TestAddEventHandler(t *testing.T) { tl := NewTargetLoader(f) // We use the spy handler later to verify if it's been called spyH := &spyHandler{called: false, wg: resourcesCreatedWg, mx: &sync.RWMutex{}} - css := NewContinuousScanningService(dynClient, tl, DefaultQueueSize, DefaultTTL, spyH) + css := NewContinuousScanningService(dynClient, tl, spyH) css.Launch(ctx) // Create Pods to be listened @@ -265,7 +265,7 @@ func TestContinuousScanningService(t *testing.T) { triggeringHandler := NewTriggeringHandler(wp, operatorConfig) stubFetcher := &stubFetcher{podMatchRules} loader := NewTargetLoader(stubFetcher) - css := NewContinuousScanningService(dynClient, loader, DefaultQueueSize, DefaultTTL, triggeringHandler) + css := NewContinuousScanningService(dynClient, loader, triggeringHandler) css.Launch(ctx) // Create Pods to be listened diff --git a/main.go b/main.go index e3ffc72..deb8682 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,6 @@ import ( rulebindingcachev1 "github.com/kubescape/operator/admission/rulebinding/cache" "github.com/kubescape/operator/admission/webhook" "github.com/kubescape/operator/config" - cs "github.com/kubescape/operator/continuousscanning" "github.com/kubescape/operator/mainhandler" "github.com/kubescape/operator/notificationhandler" "github.com/kubescape/operator/objectcache" @@ -150,7 +149,7 @@ func main() { if operatorConfig.ContinuousScanEnabled() { go func(mh *mainhandler.MainHandler) { - err := mh.SetupContinuousScanning(ctx, cs.DefaultQueueSize, cfg.EventDeduplicationInterval) + err := mh.SetupContinuousScanning(ctx) logger.L().Info("set up cont scanning service") if err != nil { logger.L().Ctx(ctx).Fatal(err.Error(), helpers.Error(err)) diff --git a/mainhandler/handlerequests.go b/mainhandler/handlerequests.go index 79a68c9..a2fb451 100644 --- a/mainhandler/handlerequests.go +++ b/mainhandler/handlerequests.go @@ -114,7 +114,7 @@ func NewActionHandler(config config.IConfig, k8sAPI *k8sinterface.KubernetesApi, } // SetupContinuousScanning sets up the continuous cluster scanning function -func (mainHandler *MainHandler) SetupContinuousScanning(ctx context.Context, queueSize int, eventCooldown time.Duration) error { +func (mainHandler *MainHandler) SetupContinuousScanning(ctx context.Context) error { ksStorageClient, err := kssc.NewForConfig(k8sinterface.GetK8sConfig()) if err != nil { logger.L().Ctx(ctx).Fatal(fmt.Sprintf("Unable to initialize the storage client: %v", err)) @@ -133,7 +133,7 @@ func (mainHandler *MainHandler) SetupContinuousScanning(ctx context.Context, que loader := cs.NewTargetLoader(fetcher) dynClient := mainHandler.k8sAPI.DynamicClient - svc := cs.NewContinuousScanningService(dynClient, loader, queueSize, eventCooldown, triggeringHandler, deletingHandler) + svc := cs.NewContinuousScanningService(dynClient, loader, triggeringHandler, deletingHandler) svc.Launch(ctx) return nil @@ -150,7 +150,7 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) { if err != nil { logger.L().Ctx(ctx).Fatal(fmt.Sprintf("Unable to initialize the storage client: %v", err)) } - eventQueue := watcher.NewCooldownQueue(watcher.DefaultQueueSize, watcher.DefaultTTL) + eventQueue := watcher.NewCooldownQueue() watchHandler := watcher.NewWatchHandler(ctx, mainHandler.config, mainHandler.k8sAPI, ksStorageClient, eventQueue) // wait for the kubevuln component to be ready diff --git a/watcher/cooldownqueue.go b/watcher/cooldownqueue.go index 6f79d62..d4e759b 100644 --- a/watcher/cooldownqueue.go +++ b/watcher/cooldownqueue.go @@ -1,40 +1,42 @@ package watcher import ( + "strings" "time" - lru "github.com/hashicorp/golang-lru/v2/expirable" - v1 "k8s.io/api/core/v1" + "istio.io/pkg/cache" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) const ( - // Default size for the cooldown queue - DefaultQueueSize = 512 - // Default TTL for events put in the queue - DefaultTTL = 1 * time.Second + defaultExpiration = 5 * time.Second + evictionInterval = 1 * time.Second ) // CooldownQueue is a queue that lets clients put events into it with a cooldown // -// When a client puts an event into a queue, it forwards the event to its -// output channel and starts a cooldown for this event. If a client attempts to -// put the same event into the queue while the cooldown is running, the queue -// will silently drop the event. When the cooldown resets and a client puts the -// same event into the queue, it will be forwarded to the output channel +// When a client puts an event into a queue, it waits for a cooldown period before +// the event is forwarded to the consumer. If and event for the same key is put into the queue +// again before the cooldown period is over, the event is overridden and the cooldown period is reset. type CooldownQueue struct { - seenEvents *lru.LRU[string, bool] - innerChan chan watch.Event - ResultChan <-chan watch.Event closed bool + seenEvents cache.ExpiringCache + // inner channel for producing events + innerChan chan watch.Event + // public channel for reading events + ResultChan <-chan watch.Event } // NewCooldownQueue returns a new Cooldown Queue -func NewCooldownQueue(size int, cooldown time.Duration) *CooldownQueue { - cache := lru.NewLRU[string, bool](size, nil, cooldown) +func NewCooldownQueue() *CooldownQueue { events := make(chan watch.Event) + callback := func(key, value any) { + events <- value.(watch.Event) + } + c := cache.NewTTLWithCallback(defaultExpiration, evictionInterval, callback) return &CooldownQueue{ - seenEvents: cache, + seenEvents: c, innerChan: events, ResultChan: events, } @@ -42,12 +44,9 @@ func NewCooldownQueue(size int, cooldown time.Duration) *CooldownQueue { // makeEventKey creates a unique key for an event from a watcher func makeEventKey(e watch.Event) string { - object, ok := e.Object.(*v1.Pod) - if !ok { - return "" - } - eventKey := string(e.Type) + "-" + string(object.GetUID()) - return eventKey + gvk := e.Object.GetObjectKind().GroupVersionKind() + meta := e.Object.(metav1.Object) + return strings.Join([]string{gvk.Group, gvk.Version, gvk.Kind, meta.GetNamespace(), meta.GetName()}, "/") } func (q *CooldownQueue) Closed() bool { @@ -60,14 +59,7 @@ func (q *CooldownQueue) Enqueue(e watch.Event) { return } eventKey := makeEventKey(e) - _, exists := q.seenEvents.Get(eventKey) - if exists { - return - } - go func() { - q.innerChan <- e - }() - q.seenEvents.Add(eventKey, true) + q.seenEvents.Set(eventKey, e) } func (q *CooldownQueue) Stop() { diff --git a/watcher/cooldownqueue_test.go b/watcher/cooldownqueue_test.go index 8c4b85c..bbe5ea6 100644 --- a/watcher/cooldownqueue_test.go +++ b/watcher/cooldownqueue_test.go @@ -1,68 +1,98 @@ package watcher import ( + "sort" "testing" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "github.com/stretchr/testify/assert" - core1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/watch" ) -func TestCooldownQueue_Stop(t *testing.T) { - queue := NewCooldownQueue(1, 1*time.Second) - - // Assert that the queue is not closed - assert.False(t, queue.Closed(), "expected queue to be open") - assert.False(t, queue.closed, "expected queue to be closed") - - // Stop the queue - queue.Stop() - - // Wait for a short period to allow the queue to stop - time.Sleep(100 * time.Millisecond) - - // Assert that the queue is closed - assert.True(t, queue.closed, "expected queue to be closed") - assert.True(t, queue.Closed(), "expected queue to be closed") -} +var ( + configmap = unstructured.Unstructured{Object: map[string]interface{}{"kind": "ConfigMap", "metadata": map[string]interface{}{"uid": "748ad4a8-e5ff-44da-ba94-309992c97820"}}} + deployment = unstructured.Unstructured{Object: map[string]interface{}{"kind": "Deployment", "metadata": map[string]interface{}{"uid": "6b1a0c50-277f-4aa1-a4f9-9fc278ce4fe2"}}} + pod = unstructured.Unstructured{Object: map[string]interface{}{"kind": "Pod", "metadata": map[string]interface{}{"uid": "aa5e3e8f-2da5-4c38-93c0-210d3280d10f"}}} + deploymentAdded = watch.Event{Type: watch.Added, Object: &deployment} + podAdded = watch.Event{Type: watch.Added, Object: &pod} + podModified = watch.Event{Type: watch.Modified, Object: &pod} +) func TestCooldownQueue_Enqueue(t *testing.T) { - queue := NewCooldownQueue(1, 1*time.Second) - - // Enqueue an event - event := watch.Event{Type: watch.Added, Object: &core1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "test", Name: "test", Namespace: "test"}}} - queue.Enqueue(event) - - // Wait for a short period to allow the event to be processed - time.Sleep(100 * time.Millisecond) - - // Assert that the event was processed - assert.Equal(t, event, <-queue.innerChan) - - // Enqueue the same event again - queue.Enqueue(event) - - // Wait for a short period to allow the event to be processed - time.Sleep(100 * time.Millisecond) - - // Assert that the event was not processed again - select { - case <-queue.innerChan: - assert.Fail(t, "event should not have been processed again") - default: - // Event was not processed again, as expected + tests := []struct { + name string + inEvents []watch.Event + outEvents []watch.Event + }{ + { + name: "add pod", + inEvents: []watch.Event{deploymentAdded, podAdded, podModified, podModified, podModified}, + outEvents: []watch.Event{deploymentAdded, podModified}, + }, } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := NewCooldownQueue() + go func() { + time.Sleep(10 * time.Second) + q.Stop() + }() + for _, e := range tt.inEvents { + time.Sleep(50 * time.Millisecond) // need to sleep to preserve order since the insertion is async + q.Enqueue(e) + } + var outEvents []watch.Event + for e := range q.ResultChan { + outEvents = append(outEvents, e) + } + // sort outEvents to make the comparison easier + sort.Slice(outEvents, func(i, j int) bool { + uidI := outEvents[i].Object.(*unstructured.Unstructured).GetUID() + uidJ := outEvents[j].Object.(*unstructured.Unstructured).GetUID() + return uidI < uidJ + }) + assert.Equal(t, tt.outEvents, outEvents) + }) + } +} - // Enqueue a different event - anotherEvent := watch.Event{Type: watch.Modified, Object: &core1.Pod{ObjectMeta: metav1.ObjectMeta{UID: "test-2", Name: "test", Namespace: "test"}}} - queue.Enqueue(anotherEvent) - - // Wait for a short period to allow the event to be processed - time.Sleep(100 * time.Millisecond) - - // Assert that the event was processed - assert.Equal(t, anotherEvent, <-queue.innerChan) +// key is only based on the UID of the object +func Test_makeEventKey(t *testing.T) { + tests := []struct { + name string + e watch.Event + want string + }{ + { + name: "add pod", + e: watch.Event{ + Type: watch.Added, + Object: &pod, + }, + want: "//Pod//", + }, + { + name: "delete deployment", + e: watch.Event{ + Type: watch.Deleted, + Object: &deployment, + }, + want: "//Deployment//", + }, + { + name: "modify configmap", + e: watch.Event{ + Type: watch.Modified, + Object: &configmap, + }, + want: "//ConfigMap//", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := makeEventKey(tt.e) + assert.Equal(t, tt.want, got) + }) + } } diff --git a/watcher/podwatcher_test.go b/watcher/podwatcher_test.go index 92a6d87..1f55917 100644 --- a/watcher/podwatcher_test.go +++ b/watcher/podwatcher_test.go @@ -112,7 +112,7 @@ func TestPodWatch(t *testing.T) { storageClient := kssfake.NewSimpleClientset() k8sAPI.DynamicClient = dynClient - eventQueue := NewCooldownQueue(DefaultQueueSize, DefaultTTL) + eventQueue := NewCooldownQueue() wh := NewWatchHandler(ctx, operatorConfig, k8sAPI, storageClient, eventQueue) resourcesCreatedWg := &sync.WaitGroup{} @@ -370,7 +370,7 @@ func Test_handlePodWatcher(t *testing.T) { storageClient := kssfake.NewSimpleClientset() k8sAPI.DynamicClient = dynClient - eventQueue := NewCooldownQueue(DefaultQueueSize, DefaultTTL) + eventQueue := NewCooldownQueue() wh := NewWatchHandler(ctx, operatorConfig, k8sAPI, storageClient, eventQueue) resourcesCreatedWg := &sync.WaitGroup{} @@ -451,7 +451,7 @@ func Test_listPods(t *testing.T) { k8sClient := k8sfake.NewSimpleClientset() k8sAPI := utils.NewK8sInterfaceFake(k8sClient) storageClient := kssfake.NewSimpleClientset() - eventQueue := NewCooldownQueue(DefaultQueueSize, DefaultTTL) + eventQueue := NewCooldownQueue() wh := NewWatchHandler(ctx, operatorConfig, k8sAPI, storageClient, eventQueue) resourcesCreatedWg := &sync.WaitGroup{} From 5b9046a1ec486f31c9bd4ace719a2fc6bb56fbd0 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Tue, 17 Sep 2024 16:13:10 +0200 Subject: [PATCH 2/3] fix data race with mutexes Signed-off-by: Matthias Bertschy --- watcher/cooldownqueue.go | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/watcher/cooldownqueue.go b/watcher/cooldownqueue.go index d4e759b..c64caa4 100644 --- a/watcher/cooldownqueue.go +++ b/watcher/cooldownqueue.go @@ -2,6 +2,7 @@ package watcher import ( "strings" + "sync" "time" "istio.io/pkg/cache" @@ -17,25 +18,29 @@ const ( // CooldownQueue is a queue that lets clients put events into it with a cooldown // // When a client puts an event into a queue, it waits for a cooldown period before -// the event is forwarded to the consumer. If and event for the same key is put into the queue +// the event is forwarded to the consumer. If an event for the same key is put into the queue // again before the cooldown period is over, the event is overridden and the cooldown period is reset. type CooldownQueue struct { closed bool + mu sync.Mutex // mutex for closed + chanMu *sync.Mutex // mutex for innerChan seenEvents cache.ExpiringCache - // inner channel for producing events - innerChan chan watch.Event - // public channel for reading events + innerChan chan watch.Event ResultChan <-chan watch.Event } // NewCooldownQueue returns a new Cooldown Queue func NewCooldownQueue() *CooldownQueue { events := make(chan watch.Event) + chanMu := sync.Mutex{} callback := func(key, value any) { + chanMu.Lock() + defer chanMu.Unlock() events <- value.(watch.Event) } c := cache.NewTTLWithCallback(defaultExpiration, evictionInterval, callback) return &CooldownQueue{ + chanMu: &chanMu, seenEvents: c, innerChan: events, ResultChan: events, @@ -50,11 +55,15 @@ func makeEventKey(e watch.Event) string { } func (q *CooldownQueue) Closed() bool { + q.mu.Lock() + defer q.mu.Unlock() return q.closed } // Enqueue enqueues an event in the Cooldown Queue func (q *CooldownQueue) Enqueue(e watch.Event) { + q.mu.Lock() + defer q.mu.Unlock() if q.closed { return } @@ -63,6 +72,10 @@ func (q *CooldownQueue) Enqueue(e watch.Event) { } func (q *CooldownQueue) Stop() { + q.chanMu.Lock() + defer q.chanMu.Unlock() + q.mu.Lock() + defer q.mu.Unlock() q.closed = true close(q.innerChan) } From 73bd4e85abbc340141a542aebebb5a8abdb15744 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Sun, 22 Sep 2024 22:30:49 +0200 Subject: [PATCH 3/3] fix scan image trigger command Signed-off-by: Matthias Bertschy --- mainhandler/handlerequests.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mainhandler/handlerequests.go b/mainhandler/handlerequests.go index a2fb451..00c0564 100644 --- a/mainhandler/handlerequests.go +++ b/mainhandler/handlerequests.go @@ -433,16 +433,16 @@ func (mainHandler *MainHandler) HandleImageScanningScopedRequest(ctx context.Con CommandName: apis.TypeScanImages, Args: map[string]interface{}{ utils.ArgsContainerData: containerData, - utils.ArgsPod: &pod, + utils.ArgsPod: pod, }, } // send specific command to the channel newSessionObj := utils.NewSessionObj(ctx, mainHandler.config, cmd, "Websocket", sessionObj.Reporter.GetJobID(), "", 1) - logger.L().Info("triggering", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID)) + logger.L().Info("triggering scan image", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID)) if err := mainHandler.HandleSingleRequest(ctx, newSessionObj); err != nil { - logger.L().Info("failed to complete action", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID)) + logger.L().Info("failed to complete action", helpers.Error(err), helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID)) newSessionObj.Reporter.SendError(err, mainHandler.sendReport, true) continue }