Skip to content

Commit

Permalink
skip old OperatorCommands
Browse files Browse the repository at this point in the history
Signed-off-by: Amir Malka <[email protected]>
  • Loading branch information
amirmalka committed Jan 1, 2025
1 parent 69134a6 commit 5810771
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 2 deletions.
2 changes: 2 additions & 0 deletions mainhandler/handlerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) {
commandWatchHandler := watcher.NewCommandWatchHandler(mainHandler.k8sAPI, mainHandler.config)
registryCommandsHandler := watcher.NewRegistryCommandsHandler(ctx, mainHandler.k8sAPI, commandWatchHandler, mainHandler.config)
go registryCommandsHandler.Start()
operatorCommandsHandler := watcher.NewOperatorCommandsHandler(ctx, mainHandler.eventWorkerPool, mainHandler.k8sAPI, commandWatchHandler, mainHandler.config)
go operatorCommandsHandler.Start()

// wait for the kubevuln component to be ready
logger.L().Info("Waiting for vuln scan to be ready")
Expand Down
83 changes: 83 additions & 0 deletions watcher/commandshandler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package watcher

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/armosec/armoapi-go/apis"
"github.com/kubescape/backend/pkg/command/types/v1alpha1"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/k8s-interface/k8sinterface"
"github.com/kubescape/operator/config"
"github.com/kubescape/operator/utils"
"github.com/panjf2000/ants/v2"
)

type OperatorCommandsHandler struct {
ctx context.Context
pool *ants.PoolWithFunc

k8sAPI *k8sinterface.KubernetesApi
commands chan v1alpha1.OperatorCommand
commandsWatcher *CommandWatchHandler
config config.IConfig
}

func NewOperatorCommandsHandler(ctx context.Context, pool *ants.PoolWithFunc, k8sAPI *k8sinterface.KubernetesApi, commandsWatcher *CommandWatchHandler, config config.IConfig) *OperatorCommandsHandler {
return &OperatorCommandsHandler{
pool: pool,
ctx: ctx,
k8sAPI: k8sAPI,
commands: make(chan v1alpha1.OperatorCommand, 100),
commandsWatcher: commandsWatcher,
config: config,
}
}

func (ch *OperatorCommandsHandler) Start() {
logger.L().Info("starting OperatorCommandsHandler")
ch.commandsWatcher.RegisterForCommands(ch.commands)

for {
select {
case cmd := <-ch.commands:
if cmd.Spec.CommandType != "OperatorAPI" {
logger.L().Info("not generic command" + cmd.Spec.CommandType)
continue
}
ch.invokeCommand(ch.ctx, cmd)
case <-ch.ctx.Done():
logger.L().Ctx(ch.ctx).Info("RegistryCommandsHandler: context done")
return
}
}
}

func (ch *OperatorCommandsHandler) invokeCommand(ctx context.Context, opcmd v1alpha1.OperatorCommand) {
startedAt := time.Now()
var cmd apis.Command

sessionObj := utils.NewSessionObj(ctx, ch.config, &cmd, "", opcmd.Spec.GUID)
sessionObj.SetOperatorCommandDetails(&utils.OperatorCommandDetails{
Command: &opcmd,
StartedAt: startedAt,
Client: ch.k8sAPI,
})

err := json.Unmarshal(opcmd.Spec.Body, &cmd)
if err != nil {
sessionObj.SetOperatorCommandStatus(ctx, utils.WithError(err))
return
}
l := utils.Job{}
l.SetContext(ctx)
l.SetObj(*sessionObj)

// invoke the job - status will be updated in the job
if err := ch.pool.Invoke(l); err != nil {
logger.L().Ctx(ctx).Error("failed to invoke job", helpers.String("ID", cmd.GetID()), helpers.String("command", fmt.Sprintf("%v", cmd)), helpers.Error(err))
}
}
11 changes: 9 additions & 2 deletions watcher/commandswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/cenkalti/backoff"
mapset "github.com/deckarep/golang-set/v2"
"github.com/kubescape/backend/pkg/command"
"github.com/kubescape/backend/pkg/command/types/v1alpha1"
"github.com/kubescape/operator/config"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"time"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/pager"
Expand All @@ -22,6 +23,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
)

const minOperatorCommandAge = 30 * time.Minute

type CommandWatchHandler struct {
k8sAPI *k8sinterface.KubernetesApi
eventQueue *CooldownQueue
Expand Down Expand Up @@ -123,7 +126,11 @@ func (cwh *CommandWatchHandler) AddHandler(obj runtime.Object) {
return
}

logger.L().Info("Received command", helpers.String("command", cmd.Name), helpers.String("GUID", cmd.Spec.GUID))
// Skip the command if it is older than the creation threshold
if cmd.CreationTimestamp.Time.Before(time.Now().Add(-minOperatorCommandAge)) {
logger.L().Info("Skipping old OperatorCommand", helpers.String("command", cmd.Name), helpers.String("GUID", cmd.Spec.GUID), helpers.String("CreationTimestamp", cmd.CreationTimestamp.String()))
return
}

// Skip the command if it has already been processed.
if cmd.Status.Completed {
Expand Down

0 comments on commit 5810771

Please sign in to comment.