From 5f689e73fc7c26875436ef89adfc940aff6f5ea7 Mon Sep 17 00:00:00 2001 From: jixiang <2623210647@qq.com> Date: Sun, 3 Dec 2023 20:12:03 +0800 Subject: [PATCH] resolve some conversations and rearrange the code --- pkg/synchromanager/clustersynchro_manager.go | 55 +++++++++++++------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/pkg/synchromanager/clustersynchro_manager.go b/pkg/synchromanager/clustersynchro_manager.go index 717cd8c7e..f38315a88 100644 --- a/pkg/synchromanager/clustersynchro_manager.go +++ b/pkg/synchromanager/clustersynchro_manager.go @@ -38,7 +38,6 @@ import ( ) const ClusterSynchroControllerFinalizer = "clusterpedia.io/cluster-synchro-controller" -const ClusterShardingLabel = "clusterpedia.io/sharding-name" const defaultRetryNum = 5 @@ -188,10 +187,15 @@ func (manager *Manager) enqueue(obj interface{}) { return } - cluster := obj.(*clusterv1alpha2.PediaCluster) - if manager.shardingName != cluster.Spec.ShardingName && - (cluster.Status.ShardingName == nil || manager.shardingName != *cluster.Status.ShardingName) { - return + if cluster, ok := obj.(*clusterv1alpha2.PediaCluster); ok { + if manager.shardingName != cluster.Spec.ShardingName && + (cluster.Status.ShardingName == nil || manager.shardingName != *cluster.Status.ShardingName) { + return + } + if cluster.Spec.ShardingName == manager.shardingName && cluster.Status.ShardingName != nil && + *cluster.Status.ShardingName != manager.shardingName { + return + } } manager.queue.Add(key) @@ -267,6 +271,17 @@ func (manager *Manager) processNextCluster() (continued bool) { // if err returned is not nil, cluster will be requeued func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) controller.Result { + if cluster.Status.ShardingName == nil && cluster.Spec.ShardingName != manager.shardingName { + return controller.NoRequeueResult + } + + if cluster.Status.ShardingName != nil && cluster.Status.ShardingName != &manager.shardingName { + return controller.NoRequeueResult + } + // After the above filteringļ¼Œ The cluster will be in the following stateļ¼š + // 1. spec.sharding == manager.shardingName and status.sharding == nil + // 2. spec.sharding == manager.shardingName and status != nil and status.sharding == manager.shardingName + // 3. spec.sharding != manager.shardingName and status != nil and status.sharding == manager.shardingName if !cluster.DeletionTimestamp.IsZero() { klog.InfoS("remove cluster", "cluster", cluster.Name) if err := manager.removeCluster(cluster.Name); err != nil { @@ -297,23 +312,14 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) } } - if cluster.Spec.ShardingName == manager.shardingName { - if cluster.Status.ShardingName != nil && *cluster.Status.ShardingName != manager.shardingName { - return controller.NoRequeueResult - } + if cluster.Spec.ShardingName != manager.shardingName { + return manager.reconcileShardingUnmanaged(cluster) } manager.synchrolock.RLock() synchro := manager.synchros[cluster.Name] manager.synchrolock.RUnlock() - if synchro != nil { - if cluster.Spec.ShardingName != manager.shardingName && - cluster.Status.ShardingName != nil && cluster.Spec.ShardingName != *cluster.Status.ShardingName { - return manager.reconcileShardingUnmanaged(cluster) - } - } - config, err := buildClusterConfig(cluster) if err != nil { klog.ErrorS(err, "Failed to build cluster config", "cluster", cluster.Name) @@ -430,8 +436,8 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) func (manager *Manager) reconcileShardingUnmanaged(cluster *clusterv1alpha2.PediaCluster) controller.Result { if *cluster.Status.ShardingName == manager.shardingName { - if err := manager.removeCluster(cluster.Name); err != nil { - klog.ErrorS(err, "Failed to remove cluster", cluster.Name) + if err := manager.stopClusterSynchro(cluster.Name); err != nil { + klog.ErrorS(err, "Failed to stop cluster synchro", "cluster", cluster.Name) return controller.RequeueResult(defaultRetryNum) } } @@ -445,6 +451,19 @@ func (manager *Manager) reconcileShardingUnmanaged(cluster *clusterv1alpha2.Pedi return controller.NoRequeueResult } +func (manager *Manager) stopClusterSynchro(name string) error { + manager.synchrolock.Lock() + synchro := manager.synchros[name] + delete(manager.synchros, name) + manager.synchrolock.Unlock() + + if synchro != nil { + synchro.Shutdown(false) + } + + return nil +} + func (manager *Manager) removeCluster(name string) error { manager.synchrolock.Lock() synchro := manager.synchros[name]