diff --git a/api/v1beta2/redisreplication_types.go b/api/v1beta2/redisreplication_types.go index 0f19ce3a1..8b2510cc8 100644 --- a/api/v1beta2/redisreplication_types.go +++ b/api/v1beta2/redisreplication_types.go @@ -1,33 +1,31 @@ package v1beta2 import ( - common "github.com/OT-CONTAINER-KIT/redis-operator/api" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type RedisReplicationSpec struct { - Size *int32 `json:"clusterSize"` - KubernetesConfig KubernetesConfig `json:"kubernetesConfig"` - RedisExporter *RedisExporter `json:"redisExporter,omitempty"` - RedisConfig *RedisConfig `json:"redisConfig,omitempty"` - Storage *Storage `json:"storage,omitempty"` - NodeSelector map[string]string `json:"nodeSelector,omitempty"` - PodSecurityContext *corev1.PodSecurityContext `json:"podSecurityContext,omitempty"` - SecurityContext *corev1.SecurityContext `json:"securityContext,omitempty"` - PriorityClassName string `json:"priorityClassName,omitempty"` - Affinity *corev1.Affinity `json:"affinity,omitempty"` - Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"` - TLS *TLSConfig `json:"TLS,omitempty"` - PodDisruptionBudget *common.RedisPodDisruptionBudget `json:"pdb,omitempty"` - ACL *ACLConfig `json:"acl,omitempty"` - ReadinessProbe *corev1.Probe `json:"readinessProbe,omitempty" protobuf:"bytes,11,opt,name=readinessProbe"` - LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty" protobuf:"bytes,12,opt,name=livenessProbe"` - InitContainer *InitContainer `json:"initContainer,omitempty"` - Sidecars *[]Sidecar `json:"sidecars,omitempty"` - ServiceAccountName *string `json:"serviceAccountName,omitempty"` - TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty" protobuf:"varint,4,opt,name=terminationGracePeriodSeconds"` - EnvVars *[]corev1.EnvVar `json:"env,omitempty"` + Size *int32 `json:"clusterSize"` + KubernetesConfig KubernetesConfig `json:"kubernetesConfig"` + RedisExporter *RedisExporter `json:"redisExporter,omitempty"` + RedisConfig *RedisConfig `json:"redisConfig,omitempty"` + Storage *Storage `json:"storage,omitempty"` + NodeSelector map[string]string `json:"nodeSelector,omitempty"` + PodSecurityContext *corev1.PodSecurityContext `json:"podSecurityContext,omitempty"` + SecurityContext *corev1.SecurityContext `json:"securityContext,omitempty"` + PriorityClassName string `json:"priorityClassName,omitempty"` + Affinity *corev1.Affinity `json:"affinity,omitempty"` + Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"` + TLS *TLSConfig `json:"TLS,omitempty"` + ACL *ACLConfig `json:"acl,omitempty"` + ReadinessProbe *corev1.Probe `json:"readinessProbe,omitempty" protobuf:"bytes,11,opt,name=readinessProbe"` + LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty" protobuf:"bytes,12,opt,name=livenessProbe"` + InitContainer *InitContainer `json:"initContainer,omitempty"` + Sidecars *[]Sidecar `json:"sidecars,omitempty"` + ServiceAccountName *string `json:"serviceAccountName,omitempty"` + TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty" protobuf:"varint,4,opt,name=terminationGracePeriodSeconds"` + EnvVars *[]corev1.EnvVar `json:"env,omitempty"` } func (cr *RedisReplicationSpec) GetReplicationCounts(t string) int32 { diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index 331905267..55493e1d9 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -593,11 +593,6 @@ func (in *RedisReplicationSpec) DeepCopyInto(out *RedisReplicationSpec) { *out = new(TLSConfig) (*in).DeepCopyInto(*out) } - if in.PodDisruptionBudget != nil { - in, out := &in.PodDisruptionBudget, &out.PodDisruptionBudget - *out = new(api.RedisPodDisruptionBudget) - (*in).DeepCopyInto(*out) - } if in.ACL != nil { in, out := &in.ACL, &out.ACL *out = new(ACLConfig) diff --git a/config/crd/bases/redis.redis.opstreelabs.in_redisreplications.yaml b/config/crd/bases/redis.redis.opstreelabs.in_redisreplications.yaml index 3dd86791c..38a26b931 100644 --- a/config/crd/bases/redis.redis.opstreelabs.in_redisreplications.yaml +++ b/config/crd/bases/redis.redis.opstreelabs.in_redisreplications.yaml @@ -6240,19 +6240,6 @@ spec: additionalProperties: type: string type: object - pdb: - description: RedisPodDisruptionBudget configure a PodDisruptionBudget - on the resource (leader/follower) - properties: - enabled: - type: boolean - maxUnavailable: - format: int32 - type: integer - minAvailable: - format: int32 - type: integer - type: object podSecurityContext: description: |- PodSecurityContext holds pod-level security attributes and common container settings. diff --git a/pkg/controllers/rediscluster/rediscluster_controller.go b/pkg/controllers/rediscluster/rediscluster_controller.go index 0c79a2a56..76d51f2be 100644 --- a/pkg/controllers/rediscluster/rediscluster_controller.go +++ b/pkg/controllers/rediscluster/rediscluster_controller.go @@ -73,6 +73,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // Check if the cluster is downscaled if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); leaderReplicas < leaderCount { + if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) { + return intctrlutil.Reconciled() + } + logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas) for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- { logger.Info("Remove the shard", "Shard.Index", shardIdx) @@ -83,7 +87,11 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu // lastLeaderPod is slaving right now Make it the master Pod // We have to bring a manual failover here to make it a leaderPod // clusterFailover should also include the clusterReplicate since we have to map the followers to new leader - k8sutils.ClusterFailover(ctx, r.K8sClient, instance) + logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx) + if err = k8sutils.ClusterFailover(ctx, r.K8sClient, instance); err != nil { + logger.Error(err, "Failed to initiate cluster failover") + return intctrlutil.RequeueWithError(ctx, err, "") + } } // Step 1 Remove the Follower Node k8sutils.RemoveRedisFollowerNodesFromCluster(ctx, r.K8sClient, instance) diff --git a/pkg/controllers/redisreplication/redisreplication_controller.go b/pkg/controllers/redisreplication/redisreplication_controller.go index 0e3e0f433..cca13842c 100644 --- a/pkg/controllers/redisreplication/redisreplication_controller.go +++ b/pkg/controllers/redisreplication/redisreplication_controller.go @@ -46,7 +46,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu {typ: "finalizer", rec: r.reconcileFinalizer}, {typ: "statefulset", rec: r.reconcileStatefulSet}, {typ: "service", rec: r.reconcileService}, - {typ: "poddisruptionbudget", rec: r.reconcilePDB}, {typ: "redis", rec: r.reconcileRedis}, {typ: "status", rec: r.reconcileStatus}, } @@ -132,13 +131,6 @@ func (r *Reconciler) reconcileAnnotation(ctx context.Context, instance *redisv1b return intctrlutil.Reconciled() } -func (r *Reconciler) reconcilePDB(ctx context.Context, instance *redisv1beta2.RedisReplication) (ctrl.Result, error) { - if err := k8sutils.ReconcileReplicationPodDisruptionBudget(ctx, instance, instance.Spec.PodDisruptionBudget, r.K8sClient); err != nil { - return intctrlutil.RequeueAfter(ctx, time.Second*60, "") - } - return intctrlutil.Reconciled() -} - func (r *Reconciler) reconcileStatefulSet(ctx context.Context, instance *redisv1beta2.RedisReplication) (ctrl.Result, error) { if err := k8sutils.CreateReplicationRedis(ctx, instance, r.K8sClient); err != nil { return intctrlutil.RequeueAfter(ctx, time.Second*60, "") diff --git a/pkg/k8sutils/cluster-scaling.go b/pkg/k8sutils/cluster-scaling.go index b2bd5a0da..3e7cd7097 100644 --- a/pkg/k8sutils/cluster-scaling.go +++ b/pkg/k8sutils/cluster-scaling.go @@ -391,7 +391,7 @@ func verifyLeaderPodInfo(ctx context.Context, redisClient *redis.Client, podName return false } -func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) { +func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) error { slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1) // cmd = redis-cli cluster failover -a var cmd []string @@ -400,13 +400,15 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redis Namespace: cr.Namespace, } - cmd = []string{"redis-cli", "cluster", "failover"} + cmd = []string{"redis-cli", "-h"} if *cr.Spec.ClusterVersion == "v7" { - cmd = append(cmd, getRedisHostname(pod, cr, "leader")+fmt.Sprintf(":%d", *cr.Spec.Port)) + cmd = append(cmd, getRedisHostname(pod, cr, "leader")) } else { - cmd = append(cmd, getRedisServerAddress(ctx, client, pod, *cr.Spec.Port)) + cmd = append(cmd, getRedisServerIP(ctx, client, pod)) } + cmd = append(cmd, "-p") + cmd = append(cmd, strconv.Itoa(*cr.Spec.Port)) if cr.Spec.KubernetesConfig.ExistingPasswordSecret != nil { pass, err := getRedisPassword(ctx, client, cr.Namespace, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Name, *cr.Spec.KubernetesConfig.ExistingPasswordSecret.Key) @@ -418,7 +420,13 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redis } cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, slavePodName)...) + cmd = append(cmd, "cluster", "failover") log.FromContext(ctx).V(1).Info("Redis cluster failover command is", "Command", cmd) - executeCommand(ctx, client, cr, cmd, slavePodName) + execOut, err := executeCommand1(ctx, client, cr, cmd, slavePodName) + if err != nil { + log.FromContext(ctx).Error(err, "Could not execute command", "Command", cmd, "Output", execOut) + return err + } + return nil } diff --git a/pkg/k8sutils/poddisruption.go b/pkg/k8sutils/poddisruption.go index 69bad44cd..6dc97a970 100644 --- a/pkg/k8sutils/poddisruption.go +++ b/pkg/k8sutils/poddisruption.go @@ -26,7 +26,7 @@ func ReconcileRedisPodDisruptionBudget(ctx context.Context, cr *redisv1beta2.Red return CreateOrUpdatePodDisruptionBudget(ctx, pdbDef, cl) } else { // Check if one exists, and delete it. - _, err := getPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) + _, err := GetPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) if err == nil { return deletePodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) } else if err != nil && errors.IsNotFound(err) { @@ -48,29 +48,7 @@ func ReconcileSentinelPodDisruptionBudget(ctx context.Context, cr *redisv1beta2. return CreateOrUpdatePodDisruptionBudget(ctx, pdbDef, cl) } else { // Check if one exists, and delete it. - _, err := getPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) - if err == nil { - return deletePodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) - } else if err != nil && errors.IsNotFound(err) { - log.FromContext(ctx).V(1).Info("Reconciliation Successful, no PodDisruptionBudget Found.") - // Its ok if its not found, as we're deleting anyway - return nil - } - return err - } -} - -func ReconcileReplicationPodDisruptionBudget(ctx context.Context, cr *redisv1beta2.RedisReplication, pdbParams *commonapi.RedisPodDisruptionBudget, cl kubernetes.Interface) error { - pdbName := cr.ObjectMeta.Name + "-replication" - if pdbParams != nil && pdbParams.Enabled { - labels := getRedisLabels(cr.ObjectMeta.Name, replication, "replication", cr.GetObjectMeta().GetLabels()) - annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations) - pdbMeta := generateObjectMetaInformation(pdbName, cr.Namespace, labels, annotations) - pdbDef := generateReplicationPodDisruptionBudgetDef(ctx, cr, "replication", pdbMeta, pdbParams) - return CreateOrUpdatePodDisruptionBudget(ctx, pdbDef, cl) - } else { - // Check if one exists, and delete it. - _, err := getPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) + _, err := GetPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) if err == nil { return deletePodDisruptionBudget(ctx, cr.Namespace, pdbName, cl) } else if err != nil && errors.IsNotFound(err) { @@ -109,33 +87,6 @@ func generatePodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta2.RedisC return pdbTemplate } -// generatePodDisruptionBudgetDef will create a PodDisruptionBudget definition -func generateReplicationPodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta2.RedisReplication, role string, pdbMeta metav1.ObjectMeta, pdbParams *commonapi.RedisPodDisruptionBudget) *policyv1.PodDisruptionBudget { - lblSelector := LabelSelectors(map[string]string{ - "app": fmt.Sprintf("%s-%s", cr.ObjectMeta.Name, role), - "role": role, - }) - pdbTemplate := &policyv1.PodDisruptionBudget{ - TypeMeta: generateMetaInformation("PodDisruptionBudget", "policy/v1"), - ObjectMeta: pdbMeta, - Spec: policyv1.PodDisruptionBudgetSpec{ - Selector: lblSelector, - }, - } - if pdbParams.MinAvailable != nil { - pdbTemplate.Spec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *pdbParams.MinAvailable} - } - if pdbParams.MaxUnavailable != nil { - pdbTemplate.Spec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *pdbParams.MaxUnavailable} - } - // If we don't have a value for either, assume quorum: (N/2)+1 - if pdbTemplate.Spec.MaxUnavailable == nil && pdbTemplate.Spec.MinAvailable == nil { - pdbTemplate.Spec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: (*cr.Spec.Size / 2) + 1} - } - AddOwnerRefToObject(pdbTemplate, redisReplicationAsOwner(cr)) - return pdbTemplate -} - // generatePodDisruptionBudgetDef will create a PodDisruptionBudget definition func generateSentinelPodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta2.RedisSentinel, role string, pdbMeta metav1.ObjectMeta, pdbParams *commonapi.RedisPodDisruptionBudget) *policyv1.PodDisruptionBudget { lblSelector := LabelSelectors(map[string]string{ @@ -165,7 +116,7 @@ func generateSentinelPodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta // CreateOrUpdateService method will create or update Redis service func CreateOrUpdatePodDisruptionBudget(ctx context.Context, pdbDef *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error { - storedPDB, err := getPodDisruptionBudget(ctx, pdbDef.Namespace, pdbDef.Name, cl) + storedPDB, err := GetPodDisruptionBudget(ctx, pdbDef.Namespace, pdbDef.Name, cl) if err != nil { if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(pdbDef); err != nil { //nolint log.FromContext(ctx).Error(err, "Unable to patch redis PodDisruptionBudget with comparison object") @@ -253,8 +204,8 @@ func deletePodDisruptionBudget(ctx context.Context, namespace string, pdbName st return nil } -// getPodDisruptionBudget is a method to get PodDisruptionBudgets in Kubernetes -func getPodDisruptionBudget(ctx context.Context, namespace string, pdb string, cl kubernetes.Interface) (*policyv1.PodDisruptionBudget, error) { +// GetPodDisruptionBudget is a method to get PodDisruptionBudgets in Kubernetes +func GetPodDisruptionBudget(ctx context.Context, namespace string, pdb string, cl kubernetes.Interface) (*policyv1.PodDisruptionBudget, error) { getOpts := metav1.GetOptions{ TypeMeta: generateMetaInformation("PodDisruptionBudget", "policy/v1"), }