Skip to content

Commit

Permalink
feat(federate): add backoff for reenqueue on unsynced cache
Browse files Browse the repository at this point in the history
  • Loading branch information
limhawjia committed Aug 3, 2023
1 parent 415674a commit bbb2c0b
Showing 1 changed file with 7 additions and 3 deletions.
10 changes: 7 additions & 3 deletions pkg/controllers/federate/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

Expand Down Expand Up @@ -74,8 +75,9 @@ type FederateController struct {
fedClient fedclient.Interface
dynamicClient dynamic.Interface

worker worker.ReconcileWorker[workerKey]
eventRecorder record.EventRecorder
worker worker.ReconcileWorker[workerKey]
cacheSyncRateLimiter workqueue.RateLimiter
eventRecorder record.EventRecorder

metrics stats.Metrics
logger klog.Logger
Expand Down Expand Up @@ -103,6 +105,7 @@ func NewFederateController(
clusterFedObjectInformer: clusterFedObjectInformer,
fedClient: fedClient,
dynamicClient: dynamicClient,
cacheSyncRateLimiter: workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second),
metrics: metrics,
logger: logger.WithValues("controller", FederateControllerName),
}
Expand Down Expand Up @@ -267,9 +270,10 @@ func (c *FederateController) reconcile(ctx context.Context, key workerKey) (stat
logger.V(3).Info("Lister for source type not yet synced, will reenqueue")
return worker.Result{
Success: true,
RequeueAfter: pointer.Duration(100 * time.Millisecond),
RequeueAfter: pointer.Duration(c.cacheSyncRateLimiter.When(key)),
}
}
c.cacheSyncRateLimiter.Forget(key)

sourceUns, err := lister.Get(key.QualifiedName().String())
if err != nil && apierrors.IsNotFound(err) {
Expand Down

0 comments on commit bbb2c0b

Please sign in to comment.