Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into sebastian/prod-2439-r…
Browse files Browse the repository at this point in the history
…etry-w-no-terraform-state-if-a-413-error-code-is-returned
  • Loading branch information
floreks committed Jul 24, 2024
2 parents 9bb3756 + b964814 commit 5f849a7
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 21 deletions.
1 change: 1 addition & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ jobs:
uses: helm/[email protected]
- run: kind get clusters
- run: go test -v -race ./pkg/cache/... -tags="e2e"
- run: go test -v -race ./pkg/applier/filters/... -tags="e2e"
6 changes: 5 additions & 1 deletion cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,18 @@ import (
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

const pollInterval = time.Second * 30
const (
pollInterval = time.Second * 30
jitter = time.Second * 15
)

func runAgent(config *rest.Config, ctx context.Context, k8sClient ctrclient.Client) (*controller.ControllerManager, *service.ServiceReconciler, *pipelinegates.GateReconciler) {
mgr, err := controller.NewControllerManager(
ctx,
args.MaxConcurrentReconciles(),
args.ProcessingTimeout(),
args.RefreshInterval(),
args.RefreshJitter(),
lo.ToPtr(true),
args.ConsoleUrl(),
args.DeployToken(),
Expand Down
16 changes: 15 additions & 1 deletion cmd/agent/args/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
defaultRefreshInterval = "2m"
defaultRefreshIntervalDuration = 2 * time.Minute

defaultRefreshJitter = "15s"
defaultRefreshJitterDuration = 15 * time.Second

defaultResourceCacheTTL = "1h"
defaultResourceCacheTTLDuration = time.Hour

Expand Down Expand Up @@ -59,6 +62,7 @@ var (
argMetricsAddr = flag.String("metrics-bind-address", defaultMetricsAddress, "The address the metric endpoint binds to.")
argProcessingTimeout = flag.String("processing-timeout", defaultProcessingTimeout, "Maximum amount of time to spend trying to process queue item.")
argRefreshInterval = flag.String("refresh-interval", defaultRefreshInterval, "Refresh interval duration.")
argRefreshJitter = flag.String("refresh-jitter", defaultRefreshJitter, "Refresh jitter.")
argResourceCacheTTL = flag.String("resource-cache-ttl", defaultResourceCacheTTL, "The time to live of each resource cache entry.")
argManifestCacheTTL = flag.String("manifest-cache-ttl", defaultManifestCacheTTL, "The time to live of service manifests in cache entry.")
argRestoreNamespace = flag.String("restore-namespace", defaultRestoreNamespace, "The namespace where Velero restores are located.")
Expand Down Expand Up @@ -158,13 +162,23 @@ func ProcessingTimeout() time.Duration {
func RefreshInterval() time.Duration {
duration, err := time.ParseDuration(*argRefreshInterval)
if err != nil {
klog.ErrorS(err, "Could not parse refresh-interval", "value", *argProcessingTimeout, "default", defaultRefreshIntervalDuration)
klog.ErrorS(err, "Could not parse refresh-interval", "value", *argRefreshInterval, "default", defaultRefreshIntervalDuration)
return defaultRefreshIntervalDuration
}

return duration
}

func RefreshJitter() time.Duration {
jitter, err := time.ParseDuration(*argRefreshJitter)
if err != nil {
klog.ErrorS(err, "Could not parse refresh-jitter", "value", *argRefreshJitter, "default", defaultRefreshJitterDuration)
return defaultRefreshJitterDuration
}

return jitter
}

func ResourceCacheTTL() time.Duration {
duration, err := time.ParseDuration(*argResourceCacheTTL)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/argorollout_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/pluralsh/deployment-operator/pkg/controller/service"
)

var requeueRollout = ctrl.Result{RequeueAfter: time.Second * 5}
const requeueArgoRolloutAfter = time.Second * 5

// ArgoRolloutReconciler reconciles a Argo Rollout custom resource.
type ArgoRolloutReconciler struct {
Expand Down Expand Up @@ -75,7 +75,7 @@ func (r *ArgoRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if rollout.Status.Phase == rolloutv1alpha1.RolloutPhasePaused {
// wait until the agent will change component status
if !hasPausedRolloutComponent(service) {
return requeueRollout, nil
return requeue(requeueArgoRolloutAfter, jitter), nil
}

rolloutIf := r.ArgoClientSet.ArgoprojV1alpha1().Rollouts(rollout.Namespace)
Expand All @@ -96,7 +96,7 @@ func (r *ArgoRolloutReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if rollbackResponse == http.StatusOK {
return ctrl.Result{}, r.rollback(rolloutIf, rollout)
}
return requeueRollout, nil
return requeue(requeueArgoRolloutAfter, jitter), nil
}
return ctrl.Result{}, nil
}
Expand Down
10 changes: 5 additions & 5 deletions internal/controller/pipelinegate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (r *PipelineGateReconciler) Reconcile(ctx context.Context, req ctrl.Request
return ctrl.Result{}, nil
}

return requeue, nil
return requeue(requeueAfter, jitter), nil
}

func (r *PipelineGateReconciler) cleanUpGate(ctx context.Context, crGate *v1alpha1.PipelineGate) error {
Expand Down Expand Up @@ -183,28 +183,28 @@ func (r *PipelineGateReconciler) reconcilePendingRunningGate(ctx context.Context
gate.Status.SetState(console.GateStateClosed)
gate.Status.JobRef = nil
log.V(1).Info("Job aborted.", "JobName", job.Name, "JobNamespace", job.Namespace)
return requeue, nil
return requeue(requeueAfter, jitter), nil
}

// check job status
if hasFailed(reconciledJob) {
// if the job is failed, then we need to update the gate state to closed, unless it's a rerun
log.V(2).Info("Job failed.", "JobName", job.Name, "JobNamespace", job.Namespace)
gate.Status.SetState(console.GateStateClosed)
return requeue, nil
return requeue(requeueAfter, jitter), nil
}
if hasSucceeded(reconciledJob) {
// if the job is complete, then we need to update the gate state to open, unless it's a rerun
log.V(1).Info("Job succeeded.", "JobName", job.Name, "JobNamespace", job.Namespace)
gate.Status.SetState(console.GateStateOpen)
return requeue, nil
return requeue(requeueAfter, jitter), nil
}

if err := r.updateJob(ctx, reconciledJob, job); err != nil {
return ctrl.Result{}, err
}

return requeue, nil
return requeue(requeueAfter, jitter), nil
}

func (r *PipelineGateReconciler) updateJob(ctx context.Context, reconciledJob *batchv1.Job, newJob *batchv1.Job) error {
Expand Down
17 changes: 17 additions & 0 deletions internal/controller/requeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package controller

import (
"math/rand"
"time"

ctrl "sigs.k8s.io/controller-runtime"
)

const (
jitter = time.Second * 15
requeueAfter = time.Second * 30
)

func requeue(after, jitter time.Duration) ctrl.Result {
return ctrl.Result{RequeueAfter: after + time.Duration(rand.Int63n(int64(jitter)))}
}
12 changes: 4 additions & 8 deletions internal/controller/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ const (
restoreNameKey = "name"
)

var (
requeue = ctrl.Result{RequeueAfter: time.Second * 30}
)

// Reconcile Velero Restore custom resources to ensure that Console stays in sync with Kubernetes cluster.
func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
Expand All @@ -56,7 +52,7 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Error(err, "Unable to create config map")
return ctrl.Result{}, err
}
return requeue, nil
return requeue(requeueAfter, jitter), nil
}
return ctrl.Result{}, err
}
Expand All @@ -73,13 +69,13 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}
}
return requeue, nil
return requeue(requeueAfter, jitter), nil
}

configMap := &corev1.ConfigMap{}
if err := r.Get(ctx, types.NamespacedName{Name: service.RestoreConfigMapName, Namespace: restore.Namespace}, configMap); err != nil {
if apierrors.IsNotFound(err) {
return requeue, nil
return requeue(requeueAfter, jitter), nil
}
return ctrl.Result{}, err
}
Expand All @@ -91,7 +87,7 @@ func (r *RestoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

return requeue, nil
return requeue(requeueAfter, jitter), nil
}

func CreateConfigMap(ctx context.Context, client k8sClient.Client, restore *velerov1.Restore) error {
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/stackrunjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (r *StackRunJobReconciler) Reconcile(ctx context.Context, req ctrl.Request)
})
return ctrl.Result{}, err
}
return requeue, nil
return requeue(requeueAfter, jitter), nil
}

if hasSucceeded(job) {
Expand Down
66 changes: 66 additions & 0 deletions pkg/applier/filters/cache_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//go:build e2e

package filters

import (
"context"
"time"

"github.com/pluralsh/deployment-operator/pkg/cache"
"github.com/pluralsh/deployment-operator/pkg/common"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Test filters", func() {
Context("Resource cache filter", func() {
const (
resourceName = "test-filter"
namespace = "default"
)
pod := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: resourceName,
Namespace: namespace,
Labels: map[string]string{
common.ManagedByLabel: common.AgentLabelValue,
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test",
Image: "test",
},
},
},
}

It("check cache filter", func() {
cache.Init(context.Background(), cfg, 100*time.Second)
cacheFilter := CacheFilter{}
res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod)
Expect(err).ToNot(HaveOccurred())
unstructuredPod := unstructured.Unstructured{Object: res}
// first iteration
Expect(cacheFilter.Filter(&unstructuredPod)).ToNot(HaveOccurred())

// update cache
key := cache.ResourceKeyFromUnstructured(&unstructuredPod)
sha, ok := cache.GetResourceCache().GetCacheEntry(key.ObjectIdentifier())
Expect(ok).To(BeTrue())
Expect(sha.SetSHA(unstructuredPod, cache.ApplySHA)).ToNot(HaveOccurred())
Expect(sha.SetSHA(unstructuredPod, cache.ServerSHA)).ToNot(HaveOccurred())
cache.GetResourceCache().SetCacheEntry(key.ObjectIdentifier(), sha)

// should filter out
Expect(cacheFilter.Filter(&unstructuredPod)).To(HaveOccurred())
})

})
})
65 changes: 65 additions & 0 deletions pkg/applier/filters/suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//go:build e2e

/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filters

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
deploymentsv1alpha1 "github.com/pluralsh/deployment-operator/api/v1alpha1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
)

// These tests use Ginkgo (BDD-style Go testing framework). Refer to
// http://onsi.github.io/ginkgo/ to learn more about Ginkgo.
var kClient client.Client
var cfg *rest.Config
var err error

func TestCacheFilter(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Resource Cache Filter Suite")
}

var _ = BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

By("bootstrapping test environment")
cfg = ctrl.GetConfigOrDie()

Expect(cfg).NotTo(BeNil())
Expect(deploymentsv1alpha1.AddToScheme(scheme.Scheme)).NotTo(HaveOccurred())

kClient, err = client.New(cfg, client.Options{
Scheme: scheme.Scheme,
})
Expect(err).NotTo(HaveOccurred())
Expect(kClient).NotTo(BeNil())
})

var _ = AfterSuite(func() {
By("tearing down the test environment")

})
9 changes: 7 additions & 2 deletions pkg/controller/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package controller
import (
"context"
"errors"
"math/rand"
"time"

"k8s.io/apimachinery/pkg/util/wait"
Expand All @@ -27,6 +28,8 @@ type ControllerManager struct {

Refresh time.Duration

Jitter time.Duration

// started is true if the ControllerManager has been Started
started bool

Expand All @@ -38,7 +41,7 @@ type ControllerManager struct {
}

func NewControllerManager(ctx context.Context, maxConcurrentReconciles int, cacheSyncTimeout time.Duration,
refresh time.Duration, recoverPanic *bool, consoleUrl, deployToken, clusterId string) (*ControllerManager, error) {
refresh, jitter time.Duration, recoverPanic *bool, consoleUrl, deployToken, clusterId string) (*ControllerManager, error) {

socket, err := websocket.New(clusterId, consoleUrl, deployToken)
if err != nil {
Expand All @@ -54,6 +57,7 @@ func NewControllerManager(ctx context.Context, maxConcurrentReconciles int, cach
CacheSyncTimeout: cacheSyncTimeout,
RecoverPanic: recoverPanic,
Refresh: refresh,
Jitter: jitter,
started: false,
ctx: ctx,
client: client.New(consoleUrl, deployToken),
Expand All @@ -78,7 +82,7 @@ func (cm *ControllerManager) Start() error {

for _, ctrl := range cm.Controllers {
controller := ctrl

jitterValue := time.Duration(rand.Int63n(int64(cm.Jitter)))
cm.Socket.AddPublisher(controller.Do.GetPublisher())

go func() {
Expand All @@ -89,6 +93,7 @@ func (cm *ControllerManager) Start() error {
if controllerPollInterval := controller.Do.GetPollInterval(); controllerPollInterval > 0 {
pollInterval = controllerPollInterval
}
pollInterval += jitterValue
_ = wait.PollUntilContextCancel(context.Background(), pollInterval, true, func(_ context.Context) (done bool, err error) {
return controller.Do.Poll(cm.ctx)
})
Expand Down

0 comments on commit 5f849a7

Please sign in to comment.