From b964814097adbe7772897a4e8052b8bd2e066d22 Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Wed, 24 Jul 2024 09:36:21 +0200 Subject: [PATCH] add jitter (#243) * add jitter * add cache filter e2e test --- .github/workflows/e2e.yaml | 1 + cmd/agent/agent.go | 6 +- cmd/agent/args/args.go | 16 ++++- internal/controller/argorollout_controller.go | 6 +- .../controller/pipelinegate_controller.go | 10 +-- internal/controller/requeue.go | 17 +++++ internal/controller/restore_controller.go | 12 ++-- internal/controller/stackrunjob_controller.go | 2 +- pkg/applier/filters/cache_filter_test.go | 66 +++++++++++++++++++ pkg/applier/filters/suite_test.go | 65 ++++++++++++++++++ pkg/controller/controller_manager.go | 9 ++- 11 files changed, 189 insertions(+), 21 deletions(-) create mode 100644 internal/controller/requeue.go create mode 100644 pkg/applier/filters/cache_filter_test.go create mode 100644 pkg/applier/filters/suite_test.go diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index b67215e4..79d2b18f 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -19,3 +19,4 @@ jobs: uses: helm/kind-action@v1.10.0 - run: kind get clusters - run: go test -v -race ./pkg/cache/... -tags="e2e" + - run: go test -v -race ./pkg/applier/filters/... -tags="e2e" diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 75e7456c..cbf45ee2 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -20,7 +20,10 @@ import ( "github.com/pluralsh/deployment-operator/pkg/controller/service" ) -const pollInterval = time.Second * 30 +const ( + pollInterval = time.Second * 30 + jitter = time.Second * 15 +) func runAgent(config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) { mgr, err := controller.NewControllerManager( @@ -28,6 +31,7 @@ func runAgent(config *rest.Config, ctx context.Context, k8sClient ctrclient.Clie args.MaxConcurrentReconciles(), args.ProcessingTimeout(), args.RefreshInterval(), + args.RefreshJitter(), lo.ToPtr(true), args.ConsoleUrl(), args.DeployToken(), diff --git a/cmd/agent/args/args.go b/cmd/agent/args/args.go index bf54b017..f98fcd88 100644 --- a/cmd/agent/args/args.go +++ b/cmd/agent/args/args.go @@ -29,6 +29,9 @@ const ( defaultRefreshInterval = "2m" defaultRefreshIntervalDuration = 2 * time.Minute + defaultRefreshJitter = "15s" + defaultRefreshJitterDuration = 15 * time.Second + defaultResourceCacheTTL = "1h" defaultResourceCacheTTLDuration = time.Hour @@ -59,6 +62,7 @@ var ( argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.") argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.") argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Refresh interval duration.") + argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.") argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.") argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.") argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.") @@ -158,13 +162,23 @@ func ProcessingTimeout() time.Duration { func RefreshInterval() time.Duration { duration, err := time.ParseDuration(*argRefreshInterval) if err != nil { - klog.ErrorS(err, "Could not parse refresh-interval", "value", *argProcessingTimeout, "default", defaultRefreshIntervalDuration) + klog.ErrorS(err, "Could not parse refresh-interval", "value", *argRefreshInterval, "default", defaultRefreshIntervalDuration) return defaultRefreshIntervalDuration } return duration } +func RefreshJitter() time.Duration { + jitter, err := time.ParseDuration(*argRefreshJitter) + if err != nil { + klog.ErrorS(err, "Could not parse refresh-jitter", "value", *argRefreshJitter, "default", defaultRefreshJitterDuration) + return defaultRefreshJitterDuration + } + + return jitter +} + func ResourceCacheTTL() time.Duration { duration, err := time.ParseDuration(*argResourceCacheTTL) if err != nil { diff --git a/internal/controller/argorollout_controller.go b/internal/controller/argorollout_controller.go index 7f2964b1..526cdbe5 100644 --- a/internal/controller/argorollout_controller.go +++ b/internal/controller/argorollout_controller.go @@ -27,7 +27,7 @@ import ( "github.com/pluralsh/deployment-operator/pkg/controller/service" ) -var requeueRollout = ctrl.Result{RequeueAfter: time.Second * 5} +const requeueArgoRolloutAfter = time.Second * 5 // ArgoRolloutReconciler reconciles a Argo Rollout custom resource. type ArgoRolloutReconciler struct { @@ -75,7 +75,7 @@ func (r *ArgoRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) if rollout.Status.Phase == rolloutv1alpha1.RolloutPhasePaused { // wait until the agent will change component status if !hasPausedRolloutComponent(service) { - return requeueRollout, nil + return requeue(requeueArgoRolloutAfter, jitter), nil } rolloutIf := r.ArgoClientSet.ArgoprojV1alpha1().Rollouts(rollout.Namespace) @@ -96,7 +96,7 @@ func (r *ArgoRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request) if rollbackResponse == http.StatusOK { return ctrl.Result{}, r.rollback(rolloutIf, rollout) } - return requeueRollout, nil + return requeue(requeueArgoRolloutAfter, jitter), nil } return ctrl.Result{}, nil } diff --git a/internal/controller/pipelinegate_controller.go b/internal/controller/pipelinegate_controller.go index 4c9c3eb4..46636984 100644 --- a/internal/controller/pipelinegate_controller.go +++ b/internal/controller/pipelinegate_controller.go @@ -115,7 +115,7 @@ func (r *PipelineGateReconciler) Reconcile(ctx context.Context, req ctrl.Request return ctrl.Result{}, nil } - return requeue, nil + return requeue(requeueAfter, jitter), nil } func (r *PipelineGateReconciler) cleanUpGate(ctx context.Context, crGate *v1alpha1.PipelineGate) error { @@ -183,7 +183,7 @@ func (r *PipelineGateReconciler) reconcilePendingRunningGate(ctx context.Context gate.Status.SetState(console.GateStateClosed) gate.Status.JobRef = nil log.V(1).Info("Job aborted.", "JobName", job.Name, "JobNamespace", job.Namespace) - return requeue, nil + return requeue(requeueAfter, jitter), nil } // check job status @@ -191,20 +191,20 @@ func (r *PipelineGateReconciler) reconcilePendingRunningGate(ctx context.Context // if the job is failed, then we need to update the gate state to closed, unless it's a rerun log.V(2).Info("Job failed.", "JobName", job.Name, "JobNamespace", job.Namespace) gate.Status.SetState(console.GateStateClosed) - return requeue, nil + return requeue(requeueAfter, jitter), nil } if hasSucceeded(reconciledJob) { // if the job is complete, then we need to update the gate state to open, unless it's a rerun log.V(1).Info("Job succeeded.", "JobName", job.Name, "JobNamespace", job.Namespace) gate.Status.SetState(console.GateStateOpen) - return requeue, nil + return requeue(requeueAfter, jitter), nil } if err := r.updateJob(ctx, reconciledJob, job); err != nil { return ctrl.Result{}, err } - return requeue, nil + return requeue(requeueAfter, jitter), nil } func (r *PipelineGateReconciler) updateJob(ctx context.Context, reconciledJob *batchv1.Job, newJob *batchv1.Job) error { diff --git a/internal/controller/requeue.go b/internal/controller/requeue.go new file mode 100644 index 00000000..5c3a16d3 --- /dev/null +++ b/internal/controller/requeue.go @@ -0,0 +1,17 @@ +package controller + +import ( + "math/rand" + "time" + + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + jitter = time.Second * 15 + requeueAfter = time.Second * 30 +) + +func requeue(after, jitter time.Duration) ctrl.Result { + return ctrl.Result{RequeueAfter: after + time.Duration(rand.Int63n(int64(jitter)))} +} diff --git a/internal/controller/restore_controller.go b/internal/controller/restore_controller.go index 85c520c4..6b442640 100644 --- a/internal/controller/restore_controller.go +++ b/internal/controller/restore_controller.go @@ -29,10 +29,6 @@ const ( restoreNameKey = "name" ) -var ( - requeue = ctrl.Result{RequeueAfter: time.Second * 30} -) - // Reconcile Velero Restore custom resources to ensure that Console stays in sync with Kubernetes cluster. func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) @@ -56,7 +52,7 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct logger.Error(err, "Unable to create config map") return ctrl.Result{}, err } - return requeue, nil + return requeue(requeueAfter, jitter), nil } return ctrl.Result{}, err } @@ -73,13 +69,13 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } } - return requeue, nil + return requeue(requeueAfter, jitter), nil } configMap := &corev1.ConfigMap{} if err := r.Get(ctx, types.NamespacedName{Name: service.RestoreConfigMapName, Namespace: restore.Namespace}, configMap); err != nil { if apierrors.IsNotFound(err) { - return requeue, nil + return requeue(requeueAfter, jitter), nil } return ctrl.Result{}, err } @@ -91,7 +87,7 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct } } - return requeue, nil + return requeue(requeueAfter, jitter), nil } func CreateConfigMap(ctx context.Context, client k8sClient.Client, restore *velerov1.Restore) error { diff --git a/internal/controller/stackrunjob_controller.go b/internal/controller/stackrunjob_controller.go index 637e3f42..a75b01fb 100644 --- a/internal/controller/stackrunjob_controller.go +++ b/internal/controller/stackrunjob_controller.go @@ -81,7 +81,7 @@ func (r *StackRunJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) }) return ctrl.Result{}, err } - return requeue, nil + return requeue(requeueAfter, jitter), nil } if hasSucceeded(job) { diff --git a/pkg/applier/filters/cache_filter_test.go b/pkg/applier/filters/cache_filter_test.go new file mode 100644 index 00000000..1f5e10c2 --- /dev/null +++ b/pkg/applier/filters/cache_filter_test.go @@ -0,0 +1,66 @@ +//go:build e2e + +package filters + +import ( + "context" + "time" + + "github.com/pluralsh/deployment-operator/pkg/cache" + "github.com/pluralsh/deployment-operator/pkg/common" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Test filters", func() { + Context("Resource cache filter", func() { + const ( + resourceName = "test-filter" + namespace = "default" + ) + pod := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: resourceName, + Namespace: namespace, + Labels: map[string]string{ + common.ManagedByLabel: common.AgentLabelValue, + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "test", + Image: "test", + }, + }, + }, + } + + It("check cache filter", func() { + cache.Init(context.Background(), cfg, 100*time.Second) + cacheFilter := CacheFilter{} + res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod) + Expect(err).ToNot(HaveOccurred()) + unstructuredPod := unstructured.Unstructured{Object: res} + // first iteration + Expect(cacheFilter.Filter(&unstructuredPod)).ToNot(HaveOccurred()) + + // update cache + key := cache.ResourceKeyFromUnstructured(&unstructuredPod) + sha, ok := cache.GetResourceCache().GetCacheEntry(key.ObjectIdentifier()) + Expect(ok).To(BeTrue()) + Expect(sha.SetSHA(unstructuredPod, cache.ApplySHA)).ToNot(HaveOccurred()) + Expect(sha.SetSHA(unstructuredPod, cache.ServerSHA)).ToNot(HaveOccurred()) + cache.GetResourceCache().SetCacheEntry(key.ObjectIdentifier(), sha) + + // should filter out + Expect(cacheFilter.Filter(&unstructuredPod)).To(HaveOccurred()) + }) + + }) +}) diff --git a/pkg/applier/filters/suite_test.go b/pkg/applier/filters/suite_test.go new file mode 100644 index 00000000..b889eab1 --- /dev/null +++ b/pkg/applier/filters/suite_test.go @@ -0,0 +1,65 @@ +//go:build e2e + +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +// These tests use Ginkgo (BDD-style Go testing framework). Refer to +// http://onsi.github.io/ginkgo/ to learn more about Ginkgo. +var kClient client.Client +var cfg *rest.Config +var err error + +func TestCacheFilter(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Resource Cache Filter Suite") +} + +var _ = BeforeSuite(func() { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + By("bootstrapping test environment") + cfg = ctrl.GetConfigOrDie() + + Expect(cfg).NotTo(BeNil()) + Expect(deploymentsv1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred()) + + kClient, err = client.New(cfg, client.Options{ + Scheme: scheme.Scheme, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(kClient).NotTo(BeNil()) +}) + +var _ = AfterSuite(func() { + By("tearing down the test environment") + +}) diff --git a/pkg/controller/controller_manager.go b/pkg/controller/controller_manager.go index 17687364..b7e01b11 100644 --- a/pkg/controller/controller_manager.go +++ b/pkg/controller/controller_manager.go @@ -3,6 +3,7 @@ package controller import ( "context" "errors" + "math/rand" "time" "k8s.io/apimachinery/pkg/util/wait" @@ -27,6 +28,8 @@ type ControllerManager struct { Refresh time.Duration + Jitter time.Duration + // started is true if the ControllerManager has been Started started bool @@ -38,7 +41,7 @@ type ControllerManager struct { } func NewControllerManager(ctx context.Context, maxConcurrentReconciles int, cacheSyncTimeout time.Duration, - refresh time.Duration, recoverPanic *bool, consoleUrl, deployToken, clusterId string) (*ControllerManager, error) { + refresh, jitter time.Duration, recoverPanic *bool, consoleUrl, deployToken, clusterId string) (*ControllerManager, error) { socket, err := websocket.New(clusterId, consoleUrl, deployToken) if err != nil { @@ -54,6 +57,7 @@ func NewControllerManager(ctx context.Context, maxConcurrentReconciles int, cach CacheSyncTimeout: cacheSyncTimeout, RecoverPanic: recoverPanic, Refresh: refresh, + Jitter: jitter, started: false, ctx: ctx, client: client.New(consoleUrl, deployToken), @@ -78,7 +82,7 @@ func (cm *ControllerManager) Start() error { for _, ctrl := range cm.Controllers { controller := ctrl - + jitterValue := time.Duration(rand.Int63n(int64(cm.Jitter))) cm.Socket.AddPublisher(controller.Do.GetPublisher()) go func() { @@ -89,6 +93,7 @@ func (cm *ControllerManager) Start() error { if controllerPollInterval := controller.Do.GetPollInterval(); controllerPollInterval > 0 { pollInterval = controllerPollInterval } + pollInterval += jitterValue _ = wait.PollUntilContextCancel(context.Background(), pollInterval, true, func(_ context.Context) (done bool, err error) { return controller.Do.Poll(cm.ctx) })