Skip to content

Commit

Permalink
fix(informer-manager): adjust queue backoffs
Browse files Browse the repository at this point in the history
  • Loading branch information
limhawjia committed Aug 3, 2023
1 parent df3e9cd commit 415674a
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 28 deletions.
29 changes: 16 additions & 13 deletions pkg/util/informermanager/federatedinformermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ func NewFederatedInformerManager(
clusterCancelFuncs: map[string]context.CancelFunc{},
informerManagers: map[string]InformerManager{},
informerFactories: map[string]informers.SharedInformerFactory{},
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()),
queue: workqueue.NewRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second),
),
podListerSemaphore: semaphore.NewWeighted(3), // TODO: make this configurable
initialClusters: sets.New[string](),
podEventHandlers: []*ResourceEventHandlerWithClusterFuncs{},
Expand Down Expand Up @@ -175,7 +177,7 @@ func (m *federatedInformerManager) worker(ctx context.Context) {
return
}

needReenqueue, delay, err := m.processCluster(ctx, cluster)
needReenqueue, err := m.processCluster(ctx, cluster)
if err != nil {
if needReenqueue {
logger.Error(err, "Failed to process FederatedCluster, will retry")
Expand All @@ -187,24 +189,25 @@ func (m *federatedInformerManager) worker(ctx context.Context) {
return
}

m.queue.Forget(key)
if needReenqueue {
m.queue.AddAfter(key, delay)
m.queue.AddRateLimited(key)
} else {
m.queue.Forget(key)
}
}

func (m *federatedInformerManager) processCluster(
ctx context.Context,
cluster *fedcorev1a1.FederatedCluster,
) (needReenqueue bool, reenqueueDelay time.Duration, err error) {
) (needReenqueue bool, err error) {
m.lock.Lock()
defer m.lock.Unlock()

clusterName := cluster.Name

connectionHash, err := m.clientHelper.ConnectionHash(cluster)
if err != nil {
return true, 0, fmt.Errorf("failed to get connection hash for cluster %s: %w", clusterName, err)
return true, fmt.Errorf("failed to get connection hash for cluster %s: %w", clusterName, err)
}
if oldConnectionHash, exists := m.connectionMap[clusterName]; exists {
if !bytes.Equal(oldConnectionHash, connectionHash) {
Expand All @@ -213,22 +216,22 @@ func (m *federatedInformerManager) processCluster(
// reenqueue.
// Note: updating of cluster connection details, however, is still not a supported use case.
err := m.processClusterDeletionUnlocked(ctx, clusterName)
return true, 0, err
return true, err
}
} else {
clusterRestConfig, err := m.clientHelper.RestConfigGetter(cluster)
if err != nil {
return true, 0, fmt.Errorf("failed to get rest config for cluster %s: %w", clusterName, err)
return true, fmt.Errorf("failed to get rest config for cluster %s: %w", clusterName, err)
}

clusterDynamicClient, err := m.dynamicClientGetter(cluster, clusterRestConfig)
if err != nil {
return true, 0, fmt.Errorf("failed to get dynamic client for cluster %s: %w", clusterName, err)
return true, fmt.Errorf("failed to get dynamic client for cluster %s: %w", clusterName, err)
}

clusterKubeClient, err := m.kubeClientGetter(cluster, clusterRestConfig)
if err != nil {
return true, 0, fmt.Errorf("failed to get kubernetes client for cluster %s: %w", clusterName, err)
return true, fmt.Errorf("failed to get kubernetes client for cluster %s: %w", clusterName, err)
}

manager := NewInformerManager(
Expand All @@ -249,7 +252,7 @@ func (m *federatedInformerManager) processCluster(
for _, generator := range m.eventHandlerGenerators {
if err := manager.AddEventHandlerGenerator(generator); err != nil {
cancel()
return true, 0, fmt.Errorf("failed to initialized InformerManager for cluster %s: %w", clusterName, err)
return true, fmt.Errorf("failed to initialized InformerManager for cluster %s: %w", clusterName, err)
}
}

Expand Down Expand Up @@ -278,7 +281,7 @@ func (m *federatedInformerManager) processCluster(
m.initialClusters.Delete(cluster.Name)
} else {
klog.FromContext(ctx).V(3).Info("Waiting for InformerManager sync")
return true, 100 * time.Millisecond, nil
return true, nil
}
}

Expand All @@ -293,7 +296,7 @@ func (m *federatedInformerManager) processCluster(
}
}

return false, 0, nil
return false, nil
}

func (m *federatedInformerManager) processClusterDeletion(ctx context.Context, clusterName string) error {
Expand Down
35 changes: 20 additions & 15 deletions pkg/util/informermanager/informermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func NewInformerManager(
informerCancelFuncs: map[string]context.CancelFunc{},
eventHandlerRegistrations: map[string]map[*EventHandlerGenerator]cache.ResourceEventHandlerRegistration{},
lastAppliedFTCsCache: map[string]map[*EventHandlerGenerator]*fedcorev1a1.FederatedTypeConfig{},
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter()),
initialFTCs: sets.New[string](),
queue: workqueue.NewRateLimitingQueue(
workqueue.NewItemExponentialFailureRateLimiter(100*time.Millisecond, 10*time.Second),
),
initialFTCs: sets.New[string](),
}

ftcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -141,7 +143,7 @@ func (m *informerManager) worker(ctx context.Context) {
return
}

needReenqueue, delay, err := m.processFTC(ctx, ftc)
needReenqueue, err := m.processFTC(ctx, ftc)
if err != nil {
if needReenqueue {
logger.Error(err, "Failed to process FederatedTypeConfig, will retry")
Expand All @@ -153,16 +155,17 @@ func (m *informerManager) worker(ctx context.Context) {
return
}

m.queue.Forget(key)
if needReenqueue {
m.queue.AddAfter(key, delay)
m.queue.AddRateLimited(key)
} else {
m.queue.Forget(key)
}
}

func (m *informerManager) processFTC(
ctx context.Context,
ftc *fedcorev1a1.FederatedTypeConfig,
) (needReenqueue bool, reenqueueDelay time.Duration, err error) {
) (needReenqueue bool, err error) {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -183,17 +186,17 @@ func (m *informerManager) processFTC(
// time and we missed processing the deletion. We simply process the ftc deletion and reenqueue. Note:
// updating of ftc source types, however, is still not a supported use case.
err := m.processFTCDeletionUnlocked(ctx, ftcName)
return true, 0, err
return true, err
}

informer = m.informers[ftcName]
} else {
if err := m.gvkMapping.Add(ftcName, gvk); err != nil {
// There must be another ftc with the same source type GVK.
return false, 0, fmt.Errorf("source type is already referenced by another FederatedTypeConfig: %w", err)
return false, fmt.Errorf("source type is already referenced by another FederatedTypeConfig: %w", err)
}

logger.V(2).Info("Starting new informer for FederatedTypeConfig")
logger.V(2).Info("Starting new informer for FederatedTypeConfig's source type")

informer = dynamicinformer.NewFilteredDynamicInformer(
m.client,
Expand All @@ -219,14 +222,16 @@ func (m *informerManager) processFTC(
m.initialFTCs.Delete(ftcName)

if !informer.Informer().HasSynced() {
logger.V(3).Info("Informer for FederatedTypeConfig not synced, will not register event handlers yet")
return true, 100 * time.Millisecond, nil
logger.V(3).Info(
"Informer for FederatedTypeConfig's source type not synced, will not register event handlers yet",
)
return true, nil
}

registrations := m.eventHandlerRegistrations[ftcName]
lastAppliedFTCs := m.lastAppliedFTCsCache[ftcName]

logger.V(2).Info("Registering event handlers for FederatedTypeConfig")
logger.V(2).Info("Registering event handlers for FederatedTypeConfig's source type")

for _, generator := range m.eventHandlerGenerators {
lastApplied := lastAppliedFTCs[generator]
Expand All @@ -237,7 +242,7 @@ func (m *informerManager) processFTC(

if oldRegistration := registrations[generator]; oldRegistration != nil {
if err := informer.Informer().RemoveEventHandler(oldRegistration); err != nil {
return true, 0, fmt.Errorf("failed to unregister event handler: %w", err)
return true, fmt.Errorf("failed to unregister event handler: %w", err)
}
delete(registrations, generator)
}
Expand All @@ -246,15 +251,15 @@ func (m *informerManager) processFTC(
if handler := generator.Generator(ftc); handler != nil {
newRegistration, err := informer.Informer().AddEventHandler(handler)
if err != nil {
return true, 0, fmt.Errorf("failed to register event handler: %w", err)
return true, fmt.Errorf("failed to register event handler: %w", err)
}
registrations[generator] = newRegistration
}

lastAppliedFTCs[generator] = ftc
}

return false, 0, nil
return false, nil
}

func (m *informerManager) processFTCDeletion(ctx context.Context, ftcName string) error {
Expand Down

0 comments on commit 415674a

Please sign in to comment.