Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize memory with pagers #245

Merged
merged 5 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr-merged.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
BUILD_PLATFORM: linux/amd64,linux/arm64
GO_VERSION: "1.23"
REQUIRED_TESTS: '[
"vuln_scan",
"vuln_v2_views",
"vuln_scan_trigger_scan_public_registry",
"vuln_scan_trigger_scan_public_registry_excluded",
"vuln_scan_trigger_scan_private_quay_registry",
Expand Down
12 changes: 5 additions & 7 deletions continuousscanning/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/ptr"

armoapi "github.com/armosec/armoapi-go/apis"
Expand All @@ -27,7 +27,7 @@ import (
sets "github.com/deckarep/golang-set/v2"
)

var orphanableWorkloadTypes sets.Set[string] = sets.NewSet[string]("Pod", "ReplicaSet", "Job")
var orphanableWorkloadTypes = sets.NewSet[string]("Pod", "ReplicaSet", "Job")

type EventHandler interface {
Handle(ctx context.Context, e watch.Event) error
Expand Down Expand Up @@ -97,7 +97,7 @@ func unstructuredToScanObject(uObject *unstructured.Unstructured) (*objectsenvel
}

func triggerScanFor(ctx context.Context, uObject *unstructured.Unstructured, isDelete bool, wp *ants.PoolWithFunc, clusterConfig config.IConfig) error {
logger.L().Ctx(ctx).Info(
logger.L().Info(
"triggering scan",
helpers.String("kind", uObject.GetKind()),
helpers.String("name", uObject.GetName()),
Expand All @@ -122,7 +122,6 @@ func (h *poolInvokerHandler) Handle(ctx context.Context, e watch.Event) error {
if e.Type != watch.Added && e.Type != watch.Modified {
return nil
}
isDelete := false

uObject, err := eventToUnstructured(e)
if err != nil {
Expand All @@ -134,7 +133,7 @@ func (h *poolInvokerHandler) Handle(ctx context.Context, e watch.Event) error {
return nil
}

return triggerScanFor(ctx, uObject, isDelete, h.wp, h.clusterConfig)
return triggerScanFor(ctx, uObject, false, h.wp, h.clusterConfig)
}

func NewTriggeringHandler(wp *ants.PoolWithFunc, clusterConfig config.IConfig) EventHandler {
Expand Down Expand Up @@ -220,7 +219,6 @@ func (h *deletedCleanerHandler) Handle(ctx context.Context, e watch.Event) error
if e.Type != watch.Deleted {
return nil
}
isDelete := true

uObject, err := eventToUnstructured(e)
if err != nil {
Expand All @@ -237,6 +235,6 @@ func (h *deletedCleanerHandler) Handle(ctx context.Context, e watch.Event) error
logger.L().Ctx(ctx).Error("failed to delete CRDs", helpers.Error(err))
}

err = triggerScanFor(ctx, uObject, isDelete, h.wp, h.clusterConfig)
err = triggerScanFor(ctx, uObject, true, h.wp, h.clusterConfig)
return err
}
10 changes: 5 additions & 5 deletions continuousscanning/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"

armoapi "github.com/armosec/armoapi-go/apis"
Expand All @@ -29,18 +29,18 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C
resourceEventsCh := make(chan watch.Event, 100)

gvrs := s.tl.LoadGVRs(ctx)
logger.L().Ctx(ctx).Info("fetched gvrs", helpers.Interface("gvrs", gvrs))
logger.L().Info("fetched gvrs", helpers.Interface("gvrs", gvrs))
wp, _ := NewWatchPool(ctx, s.k8sdynamic, gvrs, listOpts)
wp.Run(ctx, resourceEventsCh)
logger.L().Ctx(ctx).Info("ran watch pool")
logger.L().Info("ran watch pool")

go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *cooldownQueue) {
defer out.Stop(ctx)

for {
select {
case e := <-resourceEventsCh:
logger.L().Ctx(ctx).Debug(
logger.L().Debug(
"got event from channel",
helpers.Interface("event", e),
)
Expand All @@ -57,7 +57,7 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C

func (s *ContinuousScanningService) work(ctx context.Context) {
for e := range s.eventQueue.ResultChan {
logger.L().Ctx(ctx).Debug(
logger.L().Debug(
"got an event to process",
helpers.Interface("event", e),
)
Expand Down
18 changes: 9 additions & 9 deletions continuousscanning/watchbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (w *SelfHealingWatch) Run(ctx context.Context, readyWg *sync.WaitGroup, out
return ctx.Err()
default:
gvr := helpers.String("gvr", w.gvr.String())
logger.L().Ctx(ctx).Debug("creating watch for GVR", gvr)
watch, err := w.makeWatchFunc(ctx, w.client, w.gvr, w.opts)
logger.L().Debug("creating watch for GVR", gvr)
watchFunc, err := w.makeWatchFunc(ctx, w.client, w.gvr, w.opts)
if err != nil {
logger.L().Ctx(ctx).Warning(
"got error when creating a watch for gvr",
Expand All @@ -76,8 +76,8 @@ func (w *SelfHealingWatch) Run(ctx context.Context, readyWg *sync.WaitGroup, out
)
continue
}
logger.L().Ctx(ctx).Debug("watch created\n")
w.currWatch = watch
logger.L().Debug("watch created\n")
w.currWatch = watchFunc

// Watch is considered ready once it is successfully acquired
// Signal we are done only the first time because
Expand All @@ -97,7 +97,7 @@ type WatchPool struct {
}

func (wp *WatchPool) Run(ctx context.Context, out chan<- watch.Event) {
logger.L().Ctx(ctx).Info("Watch pool: starting")
logger.L().Info("Watch pool: starting")

wg := &sync.WaitGroup{}
for idx := range wp.pool {
Expand All @@ -106,17 +106,17 @@ func (wp *WatchPool) Run(ctx context.Context, out chan<- watch.Event) {
}
wg.Wait()

logger.L().Ctx(ctx).Info("Watch pool: started ok")
logger.L().Info("Watch pool: started ok")
}

func NewWatchPool(ctx context.Context, client dynamic.Interface, gvrs []schema.GroupVersionResource, opts metav1.ListOptions) (*WatchPool, error) {
func NewWatchPool(_ context.Context, client dynamic.Interface, gvrs []schema.GroupVersionResource, opts metav1.ListOptions) (*WatchPool, error) {
watches := make([]*SelfHealingWatch, len(gvrs))

for idx := range gvrs {
gvr := gvrs[idx]
watch := NewSelfHealingWatch(client, gvr, opts)
selfHealingWatch := NewSelfHealingWatch(client, gvr, opts)

watches[idx] = watch
watches[idx] = selfHealingWatch
}

pool := &WatchPool{pool: watches}
Expand Down
9 changes: 4 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"github.com/armosec/utils-k8s-go/probes"
beUtils "github.com/kubescape/backend/pkg/utils"
logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger"

"github.com/kubescape/operator/servicehandler"
)
Expand Down Expand Up @@ -107,12 +107,12 @@ func main() {
restclient.SetDefaultWarningHandler(restclient.NoWarnings{})

kubernetesCache := objectcache.NewKubernetesCache(k8sApi)

// Creating the ObjectCache using KubernetesCache
objectCache := objectcache.NewObjectCache(kubernetesCache)

if components.ServiceScanConfig.Enabled {
logger.L().Ctx(ctx).Info("service discovery enabeld and started with interval: ", helpers.String("interval", components.ServiceScanConfig.Interval.String()))
logger.L().Info("service discovery enabled and started with interval: ", helpers.String("interval", components.ServiceScanConfig.Interval.String()))
go servicehandler.DiscoveryServiceHandler(ctx, k8sApi, components.ServiceScanConfig.Interval)
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func main() {
if operatorConfig.ContinuousScanEnabled() {
go func(mh *mainhandler.MainHandler) {
err := mh.SetupContinuousScanning(ctx, cs.DefaultQueueSize, cfg.EventDeduplicationInterval)
logger.L().Ctx(ctx).Info("set up cont scanning service")
logger.L().Info("set up cont scanning service")
if err != nil {
logger.L().Ctx(ctx).Fatal(err.Error(), helpers.Error(err))
}
Expand All @@ -178,7 +178,6 @@ func main() {
ruleBindingNotify := make(chan rulebindingmanager.RuleBindingNotify, 100)
ruleBindingCache.AddNotifier(&ruleBindingNotify)


admissionController := webhook.New(addr, "/etc/certs/tls.crt", "/etc/certs/tls.key", runtime.NewScheme(), webhook.NewAdmissionValidator(k8sApi, objectCache, exporter, ruleBindingCache), ruleBindingCache)
// Start HTTP REST server for webhook
go func() {
Expand Down
Loading
Loading