Skip to content

Commit

Permalink
fix(cluster): reduce the impact of Redis cluster intermediate states (#…
Browse files Browse the repository at this point in the history
…1178)

Signed-off-by: xiaozhuang <[email protected]>
Co-authored-by: xiaozhuang <[email protected]>
  • Loading branch information
xiaozhuang-a and xiaozhuang authored Dec 21, 2024
1 parent ea98aa7 commit 1db22ae
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 35 deletions.
53 changes: 28 additions & 25 deletions pkg/controllers/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,37 +73,40 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

// Check if the cluster is downscaled
if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); leaderReplicas < leaderCount {
if leaderCount := r.GetStatefulSetReplicas(ctx, instance.Namespace, instance.Name+"-leader"); leaderReplicas < leaderCount {
if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) {
return intctrlutil.Reconciled()
}

logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- {
logger.Info("Remove the shard", "Shard.Index", shardIdx)
// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
// if not true then make it leader pod
if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, instance)) {
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance); err != nil {
logger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(ctx, err, "")
if masterCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); masterCount == leaderCount {
logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- {
logger.Info("Remove the shard", "Shard.Index", shardIdx)
// Imp if the last index of leader sts is not leader make it then
// check whether the redis is leader or not ?
// if not true then make it leader pod
if !(k8sutils.VerifyLeaderPod(ctx, r.K8sClient, instance, shardIdx)) {
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance, shardIdx); err != nil {
logger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(ctx, err, "")
}
}
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance, shardIdx)
// Step 2 Reshard the Cluster
k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, shardIdx, true)
}
// Step 1 Remove the Follower Node
k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance)
// Step 2 Reshard the Cluster
k8sutils.ReshardRedisCluster(ctx, r.K8sClient, instance, true)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster")
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
} else {
logger.Info("masterCount is not equal to leader statefulset replicas,skip downscale", "masterCount", masterCount, "leaderReplicas", leaderReplicas)
}
logger.Info("Redis cluster is downscaled... Rebalancing the cluster")
// Step 3 Rebalance the cluster
k8sutils.RebalanceRedisCluster(ctx, r.K8sClient, instance)
logger.Info("Redis cluster is downscaled... Rebalancing the cluster is done")
return intctrlutil.RequeueAfter(ctx, time.Second*10, "")
}

// Mark the cluster status as initializing if there are no leader or follower nodes
Expand Down
18 changes: 8 additions & 10 deletions pkg/k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ import (
// ReshardRedisCluster transfer the slots from the last node to the first node.
//
// NOTE: when all slot been transferred, the node become slave of the first master node.
func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, remove bool) {
func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32, remove bool) {
redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()

var cmd []string
currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader")

// Transfer Pod details
transferPOD := RedisDetails{
Expand All @@ -29,7 +28,7 @@ func ReshardRedisCluster(ctx context.Context, client kubernetes.Interface, cr *r
}
// Remove POD details
removePOD := RedisDetails{
PodName: cr.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
PodName: cr.Name + "-leader-" + strconv.Itoa(int(shardIdx)),
Namespace: cr.Namespace,
}
cmd = []string{"redis-cli", "--cluster", "reshard"}
Expand Down Expand Up @@ -274,18 +273,17 @@ func getAttachedFollowerNodeIDs(ctx context.Context, redisClient *redis.Client,
}

// Remove redis follower node would remove all follower nodes of last leader node using redis-cli
func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) {
func RemoveRedisFollowerNodesFromCluster(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32) {
var cmd []string
redisClient := configureRedisClient(ctx, client, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
currentRedisCount := CheckRedisNodeCount(ctx, client, cr, "leader")

existingPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-0",
Namespace: cr.Namespace,
}
lastLeaderPod := RedisDetails{
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(currentRedisCount)-1),
PodName: cr.ObjectMeta.Name + "-leader-" + strconv.Itoa(int(shardIdx)),
Namespace: cr.Namespace,
}

Expand Down Expand Up @@ -365,8 +363,8 @@ func RemoveRedisNodeFromCluster(ctx context.Context, client kubernetes.Interface
}

// verifyLeaderPod return true if the pod is leader/master
func VerifyLeaderPod(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) bool {
podName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1)
func VerifyLeaderPod(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, leadIndex int32) bool {
podName := cr.Name + "-leader-" + strconv.Itoa(int(leadIndex))

redisClient := configureRedisClient(ctx, client, cr, podName)
defer redisClient.Close()
Expand All @@ -391,8 +389,8 @@ func verifyLeaderPodInfo(ctx context.Context, redisClient *redis.Client, podName
return false
}

func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1)
func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster, shardIdx int32) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(shardIdx))
// cmd = redis-cli cluster failover -a <pass>
var cmd []string
pod := RedisDetails{
Expand Down
12 changes: 12 additions & 0 deletions pkg/k8sutils/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

type StatefulSet interface {
IsStatefulSetReady(ctx context.Context, namespace, name string) bool
GetStatefulSetReplicas(ctx context.Context, namespace, name string) int32
}

type StatefulSetService struct {
Expand Down Expand Up @@ -76,6 +77,17 @@ func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, namespace,
return true
}

func (s *StatefulSetService) GetStatefulSetReplicas(ctx context.Context, namespace, name string) int32 {
sts, err := s.kubeClient.AppsV1().StatefulSets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return 0
}
if sts.Spec.Replicas == nil {
return 0
}
return *sts.Spec.Replicas
}

const (
redisExporterContainer = "redis-exporter"
)
Expand Down

0 comments on commit 1db22ae

Please sign in to comment.