From 5810771ca8d4b63b17c46d405c2488e37eacba43 Mon Sep 17 00:00:00 2001 From: Amir Malka Date: Wed, 1 Jan 2025 18:45:48 +0200 Subject: [PATCH] skip old OperatorCommands Signed-off-by: Amir Malka --- mainhandler/handlerequests.go | 2 + watcher/commandshandler.go | 83 +++++++++++++++++++++++++++++++++++ watcher/commandswatcher.go | 11 ++++- 3 files changed, 94 insertions(+), 2 deletions(-) create mode 100644 watcher/commandshandler.go diff --git a/mainhandler/handlerequests.go b/mainhandler/handlerequests.go index 96cbb11..a7abe36 100644 --- a/mainhandler/handlerequests.go +++ b/mainhandler/handlerequests.go @@ -153,6 +153,8 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) { commandWatchHandler := watcher.NewCommandWatchHandler(mainHandler.k8sAPI, mainHandler.config) registryCommandsHandler := watcher.NewRegistryCommandsHandler(ctx, mainHandler.k8sAPI, commandWatchHandler, mainHandler.config) go registryCommandsHandler.Start() + operatorCommandsHandler := watcher.NewOperatorCommandsHandler(ctx, mainHandler.eventWorkerPool, mainHandler.k8sAPI, commandWatchHandler, mainHandler.config) + go operatorCommandsHandler.Start() // wait for the kubevuln component to be ready logger.L().Info("Waiting for vuln scan to be ready") diff --git a/watcher/commandshandler.go b/watcher/commandshandler.go new file mode 100644 index 0000000..054399d --- /dev/null +++ b/watcher/commandshandler.go @@ -0,0 +1,83 @@ +package watcher + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/armosec/armoapi-go/apis" + "github.com/kubescape/backend/pkg/command/types/v1alpha1" + "github.com/kubescape/go-logger" + "github.com/kubescape/go-logger/helpers" + "github.com/kubescape/k8s-interface/k8sinterface" + "github.com/kubescape/operator/config" + "github.com/kubescape/operator/utils" + "github.com/panjf2000/ants/v2" +) + +type OperatorCommandsHandler struct { + ctx context.Context + pool *ants.PoolWithFunc + + k8sAPI *k8sinterface.KubernetesApi + commands chan v1alpha1.OperatorCommand + commandsWatcher *CommandWatchHandler + config config.IConfig +} + +func NewOperatorCommandsHandler(ctx context.Context, pool *ants.PoolWithFunc, k8sAPI *k8sinterface.KubernetesApi, commandsWatcher *CommandWatchHandler, config config.IConfig) *OperatorCommandsHandler { + return &OperatorCommandsHandler{ + pool: pool, + ctx: ctx, + k8sAPI: k8sAPI, + commands: make(chan v1alpha1.OperatorCommand, 100), + commandsWatcher: commandsWatcher, + config: config, + } +} + +func (ch *OperatorCommandsHandler) Start() { + logger.L().Info("starting OperatorCommandsHandler") + ch.commandsWatcher.RegisterForCommands(ch.commands) + + for { + select { + case cmd := <-ch.commands: + if cmd.Spec.CommandType != "OperatorAPI" { + logger.L().Info("not generic command" + cmd.Spec.CommandType) + continue + } + ch.invokeCommand(ch.ctx, cmd) + case <-ch.ctx.Done(): + logger.L().Ctx(ch.ctx).Info("RegistryCommandsHandler: context done") + return + } + } +} + +func (ch *OperatorCommandsHandler) invokeCommand(ctx context.Context, opcmd v1alpha1.OperatorCommand) { + startedAt := time.Now() + var cmd apis.Command + + sessionObj := utils.NewSessionObj(ctx, ch.config, &cmd, "", opcmd.Spec.GUID) + sessionObj.SetOperatorCommandDetails(&utils.OperatorCommandDetails{ + Command: &opcmd, + StartedAt: startedAt, + Client: ch.k8sAPI, + }) + + err := json.Unmarshal(opcmd.Spec.Body, &cmd) + if err != nil { + sessionObj.SetOperatorCommandStatus(ctx, utils.WithError(err)) + return + } + l := utils.Job{} + l.SetContext(ctx) + l.SetObj(*sessionObj) + + // invoke the job - status will be updated in the job + if err := ch.pool.Invoke(l); err != nil { + logger.L().Ctx(ctx).Error("failed to invoke job", helpers.String("ID", cmd.GetID()), helpers.String("command", fmt.Sprintf("%v", cmd)), helpers.Error(err)) + } +} diff --git a/watcher/commandswatcher.go b/watcher/commandswatcher.go index e4f799d..14003f0 100644 --- a/watcher/commandswatcher.go +++ b/watcher/commandswatcher.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" + "time" + "github.com/cenkalti/backoff" mapset "github.com/deckarep/golang-set/v2" "github.com/kubescape/backend/pkg/command" "github.com/kubescape/backend/pkg/command/types/v1alpha1" "github.com/kubescape/operator/config" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "time" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/pager" @@ -22,6 +23,8 @@ import ( "k8s.io/apimachinery/pkg/watch" ) +const minOperatorCommandAge = 30 * time.Minute + type CommandWatchHandler struct { k8sAPI *k8sinterface.KubernetesApi eventQueue *CooldownQueue @@ -123,7 +126,11 @@ func (cwh *CommandWatchHandler) AddHandler(obj runtime.Object) { return } - logger.L().Info("Received command", helpers.String("command", cmd.Name), helpers.String("GUID", cmd.Spec.GUID)) + // Skip the command if it is older than the creation threshold + if cmd.CreationTimestamp.Time.Before(time.Now().Add(-minOperatorCommandAge)) { + logger.L().Info("Skipping old OperatorCommand", helpers.String("command", cmd.Name), helpers.String("GUID", cmd.Spec.GUID), helpers.String("CreationTimestamp", cmd.CreationTimestamp.String())) + return + } // Skip the command if it has already been processed. if cmd.Status.Completed {