Skip to content

Commit

Permalink
resolve some conversations and rearrange the code
Browse files Browse the repository at this point in the history
  • Loading branch information
jxustc committed Dec 3, 2023
1 parent 41b00a2 commit 5f689e7
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions pkg/synchromanager/clustersynchro_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
)

const ClusterSynchroControllerFinalizer = "clusterpedia.io/cluster-synchro-controller"
const ClusterShardingLabel = "clusterpedia.io/sharding-name"

const defaultRetryNum = 5

Expand Down Expand Up @@ -188,10 +187,15 @@ func (manager *Manager) enqueue(obj interface{}) {
return
}

cluster := obj.(*clusterv1alpha2.PediaCluster)
if manager.shardingName != cluster.Spec.ShardingName &&
(cluster.Status.ShardingName == nil || manager.shardingName != *cluster.Status.ShardingName) {
return
if cluster, ok := obj.(*clusterv1alpha2.PediaCluster); ok {
if manager.shardingName != cluster.Spec.ShardingName &&
(cluster.Status.ShardingName == nil || manager.shardingName != *cluster.Status.ShardingName) {
return
}
if cluster.Spec.ShardingName == manager.shardingName && cluster.Status.ShardingName != nil &&
*cluster.Status.ShardingName != manager.shardingName {
return
}
}

manager.queue.Add(key)
Expand Down Expand Up @@ -267,6 +271,17 @@ func (manager *Manager) processNextCluster() (continued bool) {

// if err returned is not nil, cluster will be requeued
func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) controller.Result {
if cluster.Status.ShardingName == nil && cluster.Spec.ShardingName != manager.shardingName {
return controller.NoRequeueResult
}

if cluster.Status.ShardingName != nil && cluster.Status.ShardingName != &manager.shardingName {
return controller.NoRequeueResult
}
// After the above filtering, The cluster will be in the following state:
// 1. spec.sharding == manager.shardingName and status.sharding == nil
// 2. spec.sharding == manager.shardingName and status != nil and status.sharding == manager.shardingName
// 3. spec.sharding != manager.shardingName and status != nil and status.sharding == manager.shardingName
if !cluster.DeletionTimestamp.IsZero() {
klog.InfoS("remove cluster", "cluster", cluster.Name)
if err := manager.removeCluster(cluster.Name); err != nil {
Expand Down Expand Up @@ -297,23 +312,14 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)
}
}

if cluster.Spec.ShardingName == manager.shardingName {
if cluster.Status.ShardingName != nil && *cluster.Status.ShardingName != manager.shardingName {
return controller.NoRequeueResult
}
if cluster.Spec.ShardingName != manager.shardingName {
return manager.reconcileShardingUnmanaged(cluster)
}

manager.synchrolock.RLock()
synchro := manager.synchros[cluster.Name]
manager.synchrolock.RUnlock()

if synchro != nil {
if cluster.Spec.ShardingName != manager.shardingName &&
cluster.Status.ShardingName != nil && cluster.Spec.ShardingName != *cluster.Status.ShardingName {
return manager.reconcileShardingUnmanaged(cluster)
}
}

config, err := buildClusterConfig(cluster)
if err != nil {
klog.ErrorS(err, "Failed to build cluster config", "cluster", cluster.Name)
Expand Down Expand Up @@ -430,8 +436,8 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)

func (manager *Manager) reconcileShardingUnmanaged(cluster *clusterv1alpha2.PediaCluster) controller.Result {
if *cluster.Status.ShardingName == manager.shardingName {
if err := manager.removeCluster(cluster.Name); err != nil {
klog.ErrorS(err, "Failed to remove cluster", cluster.Name)
if err := manager.stopClusterSynchro(cluster.Name); err != nil {
klog.ErrorS(err, "Failed to stop cluster synchro", "cluster", cluster.Name)
return controller.RequeueResult(defaultRetryNum)
}
}
Expand All @@ -445,6 +451,19 @@ func (manager *Manager) reconcileShardingUnmanaged(cluster *clusterv1alpha2.Pedi
return controller.NoRequeueResult
}

func (manager *Manager) stopClusterSynchro(name string) error {
manager.synchrolock.Lock()
synchro := manager.synchros[name]
delete(manager.synchros, name)
manager.synchrolock.Unlock()

if synchro != nil {
synchro.Shutdown(false)
}

return nil
}

func (manager *Manager) removeCluster(name string) error {
manager.synchrolock.Lock()
synchro := manager.synchros[name]
Expand Down

0 comments on commit 5f689e7

Please sign in to comment.