diff --git a/pkg/controllers/federate/controller.go b/pkg/controllers/federate/controller.go index 4db4b65c..31f849d5 100644 --- a/pkg/controllers/federate/controller.go +++ b/pkg/controllers/federate/controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -74,8 +75,9 @@ type FederateController struct { fedClient fedclient.Interface dynamicClient dynamic.Interface - worker worker.ReconcileWorker[workerKey] - eventRecorder record.EventRecorder + worker worker.ReconcileWorker[workerKey] + cacheSyncRateLimiter workqueue.RateLimiter + eventRecorder record.EventRecorder metrics stats.Metrics logger klog.Logger @@ -103,6 +105,7 @@ func NewFederateController( clusterFedObjectInformer: clusterFedObjectInformer, fedClient: fedClient, dynamicClient: dynamicClient, + cacheSyncRateLimiter: workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second), metrics: metrics, logger: logger.WithValues("controller", FederateControllerName), } @@ -267,9 +270,10 @@ func (c *FederateController) reconcile(ctx context.Context, key workerKey) (stat logger.V(3).Info("Lister for source type not yet synced, will reenqueue") return worker.Result{ Success: true, - RequeueAfter: pointer.Duration(100 * time.Millisecond), + RequeueAfter: pointer.Duration(c.cacheSyncRateLimiter.When(key)), } } + c.cacheSyncRateLimiter.Forget(key) sourceUns, err := lister.Get(key.QualifiedName().String()) if err != nil && apierrors.IsNotFound(err) {