Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support starting a reconciliation task with cron schedule #87

Merged
merged 4 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 80 additions & 40 deletions adapters/backend/v1/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kubescape/synchronizer/domain"
"github.com/kubescape/synchronizer/messaging"
"github.com/kubescape/synchronizer/utils"
"github.com/robfig/cron/v3"
)

type Adapter struct {
Expand Down Expand Up @@ -191,59 +192,98 @@ func (b *Adapter) Batch(ctx context.Context, kind domain.Kind, batchType domain.
return client.Batch(ctx, kind, batchType, items)
}

type ReconciliationTask struct {
ctx context.Context
cfg *config.ReconciliationTaskConfig
adapter *Adapter
}

// startReconciliationPeriodicTask starts a periodic task that sends reconciliation request messages to connected clients
// every configurable minutes (interval). If interval is 0 (not set), the task is disabled.
// when cron schedule is set, the task will be executed according to the cron schedule.
// intervalFromConnection is the minimum interval time in minutes from the connection time that the reconciliation task will be sent.
func (a *Adapter) startReconciliationPeriodicTask(mainCtx context.Context, cfg *config.ReconciliationTaskConfig) {
if cfg == nil || cfg.TaskIntervalSeconds == 0 || cfg.IntervalFromConnectionSeconds == 0 {
if cfg == nil || (cfg.TaskIntervalSeconds == 0 && cfg.CronSchedule == "") || cfg.IntervalFromConnectionSeconds == 0 {
logger.L().Warning("reconciliation task is disabled (intervals are not set)")
return
}

go func() {
logger.L().Info("starting reconciliation periodic task",
helpers.Int("TaskIntervalSeconds", cfg.TaskIntervalSeconds),
helpers.Int("IntervalFromConnectionSeconds", cfg.IntervalFromConnectionSeconds))
ticker := time.NewTicker(time.Duration(cfg.TaskIntervalSeconds) * time.Second)
for {
select {
case <-mainCtx.Done():
ticker.Stop()
task := ReconciliationTask{
ctx: mainCtx,
cfg: cfg,
adapter: a,
}

if cfg.CronSchedule != "" {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would instantiate another ticker instead of rewriting a new branch of code.
Please take a look at https://github.com/krayzpipes/cronticker/blob/main/cronticker/ticker.go (maybe we can just copy/paste this ticker and use the other cron package for our needs)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amirmalka PTAL at my proposal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we release?

logger.L().Info("starting reconciliation periodic task with cron schedule",
helpers.String("CronSchedule", task.cfg.CronSchedule),
helpers.Int("IntervalFromConnectionSeconds", task.cfg.IntervalFromConnectionSeconds))
cronJob := cron.New()
cronID, err := cronJob.AddFunc(cfg.CronSchedule, task.Run)
if err != nil {
logger.L().Fatal("failed to add cron job", helpers.Error(err))
}
logger.L().Info("cron job added", helpers.Int("cronID", int(cronID)))
cronJob.Start()
go func() {
for range mainCtx.Done() {
logger.L().Info("stopping ReconciliationTask cron job", helpers.Int("cronID", int(cronID)))
closeCtx := cronJob.Stop()
<-closeCtx.Done()
logger.L().Info("ReconciliationTask cron job stopped", helpers.Int("cronID", int(cronID)))
return
case <-ticker.C:
a.connMapMutex.Lock()
logger.L().Info("running reconciliation task for connected clients", helpers.Int("clients", a.clientsMap.Len()))
for connId, clientId := range a.connectionMap {
if time.Since(clientId.ConnectionTime) < time.Duration(cfg.IntervalFromConnectionSeconds)*time.Second {
logger.L().Info("skipping reconciliation request for client because it was connected recently", helpers.Interface("clientId", clientId.String()))
continue
}
}
}()
} else {
go func() {
logger.L().Info("starting reconciliation periodic task with interval",
helpers.Int("TaskIntervalSeconds", task.cfg.TaskIntervalSeconds),
helpers.Int("IntervalFromConnectionSeconds", task.cfg.IntervalFromConnectionSeconds))
ticker := time.NewTicker(time.Duration(task.cfg.TaskIntervalSeconds) * time.Second)
for {
select {
case <-task.ctx.Done():
ticker.Stop()
return
case <-ticker.C:
task.Run()
}
}
}()
}
}

client, ok := a.clientsMap.Load(connId)
if !ok {
logger.L().Error("expected to find client for reconciliation in clients map", helpers.String("clientId", clientId.String()))
continue
}
func (task *ReconciliationTask) Run() {
task.adapter.connMapMutex.Lock()
logger.L().Info("running reconciliation task for connected clients", helpers.Int("clients", task.adapter.clientsMap.Len()))
for connId, clientId := range task.adapter.connectionMap {
if time.Since(clientId.ConnectionTime) < time.Duration(task.cfg.IntervalFromConnectionSeconds)*time.Second {
logger.L().Info("skipping reconciliation request for client because it was connected recently", helpers.Interface("clientId", clientId.String()))
continue
}

if !utils.IsBatchMessageSupported(clientId.Version) {
logger.L().Info("skipping reconciliation request for client because it does not support batch messages",
helpers.String("version", clientId.Version),
helpers.Interface("clientId", clientId.String()))
continue
}
client, ok := task.adapter.clientsMap.Load(connId)
if !ok {
logger.L().Error("expected to find client for reconciliation in clients map", helpers.String("clientId", clientId.String()))
continue
}

clientCtx := utils.ContextFromIdentifiers(mainCtx, clientId)
err := client.SendReconciliationRequestMessage(clientCtx)
if err != nil {
logger.L().Error("failed to send reconciliation request message", helpers.String("error", err.Error()))
} else {
logger.L().Info("sent reconciliation request message", helpers.Interface("clientId", clientId))
}
}
a.connMapMutex.Unlock()
}
if !utils.IsBatchMessageSupported(clientId.Version) {
logger.L().Info("skipping reconciliation request for client because it does not support batch messages",
helpers.String("version", clientId.Version),
helpers.Interface("clientId", clientId.String()))
continue
}
}()

clientCtx := utils.ContextFromIdentifiers(task.ctx, clientId)
err := client.SendReconciliationRequestMessage(clientCtx)
if err != nil {
logger.L().Error("failed to send reconciliation request message", helpers.String("error", err.Error()))
} else {
logger.L().Info("sent reconciliation request message", helpers.Interface("clientId", clientId))
}
}
task.adapter.connMapMutex.Unlock()
}

// startKeepalivePeriodicTask starts a periodic task that sends connected clients message every configurable minutes (interval).
Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ type PrometheusConfig struct {
}

type ReconciliationTaskConfig struct {
TaskIntervalSeconds int `mapstructure:"taskIntervalSeconds"`
IntervalFromConnectionSeconds int `mapstructure:"intervalFromConnectionSeconds"`
CronSchedule string `mapstructure:"cronSchedule"` // when this is set, taskIntervalSeconds is ignored
TaskIntervalSeconds int `mapstructure:"taskIntervalSeconds"`
IntervalFromConnectionSeconds int `mapstructure:"intervalFromConnectionSeconds"`
}

type KeepAliveTaskConfig struct {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/panjf2000/ants/v2 v2.9.1
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2
github.com/prometheus/client_golang v1.19.0
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/viper v1.18.2
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.30.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,8 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
Expand Down
1 change: 1 addition & 0 deletions tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ require (
github.com/prometheus/procfs v0.13.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/saferwall/pe v1.5.2 // indirect
github.com/sagikazarmark/locafero v0.4.0 // indirect
github.com/sagikazarmark/slog-shim v0.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,8 @@ github.com/rivo/uniseg v0.1.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
Expand Down
Loading