Skip to content

Commit

Permalink
fix(federated-informer-manager): fix cluster deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
limhawjia committed Jul 27, 2023
1 parent e10bbe8 commit 513a40d
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 28 deletions.
24 changes: 13 additions & 11 deletions pkg/util/informermanager/federatedinformermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (m *federatedInformerManager) worker(ctx context.Context) {
return
}

err, needReenqueue, delay := m.processCluster(ctx, cluster)
needReenqueue, delay, err := m.processCluster(ctx, cluster)
if err != nil {
if needReenqueue {
logger.Error(err, "Failed to process FederatedCluster, will retry")
Expand All @@ -191,15 +191,15 @@ func (m *federatedInformerManager) worker(ctx context.Context) {
func (m *federatedInformerManager) processCluster(
ctx context.Context,
cluster *fedcorev1a1.FederatedCluster,
) (err error, needReenqueue bool, delay time.Duration) {
) (needReenqueue bool, reenqueueDelay time.Duration, err error) {
m.lock.Lock()
defer m.lock.Unlock()

clusterName := cluster.Name

connectionHash, err := m.clientHelper.ConnectionHash(cluster)
if err != nil {
return fmt.Errorf("failed to get connection hash for cluster %s: %w", clusterName, err), true, 0
return true, 0, fmt.Errorf("failed to get connection hash for cluster %s: %w", clusterName, err)
}
if oldConnectionHash, exists := m.connectionMap[clusterName]; exists {
if !bytes.Equal(oldConnectionHash, connectionHash) {
Expand All @@ -208,22 +208,22 @@ func (m *federatedInformerManager) processCluster(
// reenqueue.
// Note: updating of cluster connection details, however, is still not a supported use case.
err := m.processClusterDeletionUnlocked(ctx, clusterName)
return err, true, 0
return true, 0, err
}
} else {
clusterRestConfig, err := m.clientHelper.RestConfigGetter(cluster)
if err != nil {
return fmt.Errorf("failed to get rest config for cluster %s: %w", clusterName, err), true, 0
return true, 0, fmt.Errorf("failed to get rest config for cluster %s: %w", clusterName, err)
}

clusterDynamicClient, err := m.dynamicClientGetter(cluster, clusterRestConfig)
if err != nil {
return fmt.Errorf("failed to get dynamic client for cluster %s: %w", clusterName, err), true, 0
return true, 0, fmt.Errorf("failed to get dynamic client for cluster %s: %w", clusterName, err)
}

clusterKubeClient, err := m.kubeClientGetter(cluster, clusterRestConfig)
if err != nil {
return fmt.Errorf("failed to get kubernetes client for cluster %s: %w", clusterName, err), true, 0
return true, 0, fmt.Errorf("failed to get kubernetes client for cluster %s: %w", clusterName, err)
}

manager := NewInformerManager(
Expand All @@ -244,7 +244,7 @@ func (m *federatedInformerManager) processCluster(
for _, generator := range m.eventHandlerGenerators {
if err := manager.AddEventHandlerGenerator(generator); err != nil {
cancel()
return fmt.Errorf("failed to initialized InformerManager for cluster %s: %w", clusterName, err), true, 0
return true, 0, fmt.Errorf("failed to initialized InformerManager for cluster %s: %w", clusterName, err)
}
}

Expand Down Expand Up @@ -272,11 +272,11 @@ func (m *federatedInformerManager) processCluster(
m.initialClusters.Delete(cluster.Name)
} else {
klog.FromContext(ctx).V(3).Info("Waiting for InformerManager sync")
return nil, true, 100 * time.Millisecond
return true, 100 * time.Millisecond, nil
}
}

return nil, false, 0
return false, 0, nil
}

func (m *federatedInformerManager) processClusterDeletion(ctx context.Context, clusterName string) error {
Expand All @@ -287,13 +287,15 @@ func (m *federatedInformerManager) processClusterDeletion(ctx context.Context, c

func (m *federatedInformerManager) processClusterDeletionUnlocked(ctx context.Context, clusterName string) error {
delete(m.connectionMap, clusterName)
delete(m.kubeClients, clusterName)
delete(m.dynamicClients, clusterName)

if cancel, ok := m.clusterCancelFuncs[clusterName]; ok {
klog.FromContext(ctx).V(2).Info("Stopping InformerManager for FederatedCluster")
klog.FromContext(ctx).V(2).Info("Stopping InformerManager and SharedInformerFactory for FederatedCluster")
cancel()
}
delete(m.informerManagers, clusterName)
delete(m.informerFactories, clusterName)
delete(m.clusterCancelFuncs, clusterName)

m.initialClusters.Delete(clusterName)
Expand Down
16 changes: 8 additions & 8 deletions pkg/util/informermanager/informermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (m *informerManager) worker(ctx context.Context) {
return
}

err, needReenqueue, delay := m.processFTC(ctx, ftc)
needReenqueue, delay, err := m.processFTC(ctx, ftc)
if err != nil {
if needReenqueue {
logger.Error(err, "Failed to process FederatedTypeConfig, will retry")
Expand All @@ -162,7 +162,7 @@ func (m *informerManager) worker(ctx context.Context) {
func (m *informerManager) processFTC(
ctx context.Context,
ftc *fedcorev1a1.FederatedTypeConfig,
) (err error, needReenqueue bool, reenqueueDelay time.Duration) {
) (needReenqueue bool, reenqueueDelay time.Duration, err error) {
m.lock.Lock()
defer m.lock.Unlock()

Expand All @@ -183,14 +183,14 @@ func (m *informerManager) processFTC(
// time and we missed processing the deletion. We simply process the ftc deletion and reenqueue. Note:
// updating of ftc source types, however, is still not a supported use case.
err := m.processFTCDeletionUnlocked(ctx, ftcName)
return err, true, 0
return true, 0, err
}

informer = m.informers[ftcName]
} else {
if err := m.gvkMapping.Add(ftcName, gvk); err != nil {
// There must be another ftc with the same source type GVK.
return fmt.Errorf("source type is already referenced by another FederatedTypeConfig: %w", err), false, 0
return false, 0, fmt.Errorf("source type is already referenced by another FederatedTypeConfig: %w", err)
}

logger.V(2).Info("Starting new informer for FederatedTypeConfig")
Expand Down Expand Up @@ -220,7 +220,7 @@ func (m *informerManager) processFTC(

if !informer.Informer().HasSynced() {
logger.V(3).Info("Informer for FederatedTypeConfig not synced, will not register event handlers yet")
return nil, true, 100 * time.Millisecond
return true, 100 * time.Millisecond, nil
}

registrations := m.eventHandlerRegistrations[ftcName]
Expand All @@ -237,7 +237,7 @@ func (m *informerManager) processFTC(

if oldRegistration := registrations[generator]; oldRegistration != nil {
if err := informer.Informer().RemoveEventHandler(oldRegistration); err != nil {
return fmt.Errorf("failed to unregister event handler: %w", err), true, 0
return true, 0, fmt.Errorf("failed to unregister event handler: %w", err)
}
delete(registrations, generator)
}
Expand All @@ -246,15 +246,15 @@ func (m *informerManager) processFTC(
if handler := generator.Generator(ftc); handler != nil {
newRegistration, err := informer.Informer().AddEventHandler(handler)
if err != nil {
return fmt.Errorf("failed to register event handler: %w", err), true, 0
return true, 0, fmt.Errorf("failed to register event handler: %w", err)
}
registrations[generator] = newRegistration
}

lastAppliedFTCs[generator] = ftc
}

return nil, false, 0
return false, 0, nil
}

func (m *informerManager) processFTCDeletion(ctx context.Context, ftcName string) error {
Expand Down
20 changes: 11 additions & 9 deletions pkg/util/informermanager/podinformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,17 @@ func addPodInformer(ctx context.Context,
podListerSemaphore *semaphore.Weighted,
enablePodPruning bool,
) {
informer.InformerFor(&corev1.Pod{}, func(k kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
podListerWatcher(ctx, client, podListerSemaphore, enablePodPruning),
&corev1.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
})

informer.InformerFor(
&corev1.Pod{},
func(k kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
podListerWatcher(ctx, client, podListerSemaphore, enablePodPruning),
&corev1.Pod{},
resyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
},
)
}

func podListerWatcher(
Expand Down

0 comments on commit 513a40d

Please sign in to comment.