Skip to content

Commit

Permalink
resolved merge conflicts between fix-branch and main
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaozhuang committed Dec 16, 2024
1 parent ad4a3c6 commit 1eadb63
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 114 deletions.
42 changes: 20 additions & 22 deletions api/v1beta2/redisreplication_types.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
package v1beta2

import (
common "github.com/OT-CONTAINER-KIT/redis-operator/api"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type RedisReplicationSpec struct {
Size *int32 `json:"clusterSize"`
KubernetesConfig KubernetesConfig `json:"kubernetesConfig"`
RedisExporter *RedisExporter `json:"redisExporter,omitempty"`
RedisConfig *RedisConfig `json:"redisConfig,omitempty"`
Storage *Storage `json:"storage,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
PodSecurityContext *corev1.PodSecurityContext `json:"podSecurityContext,omitempty"`
SecurityContext *corev1.SecurityContext `json:"securityContext,omitempty"`
PriorityClassName string `json:"priorityClassName,omitempty"`
Affinity *corev1.Affinity `json:"affinity,omitempty"`
Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"`
TLS *TLSConfig `json:"TLS,omitempty"`
PodDisruptionBudget *common.RedisPodDisruptionBudget `json:"pdb,omitempty"`
ACL *ACLConfig `json:"acl,omitempty"`
ReadinessProbe *corev1.Probe `json:"readinessProbe,omitempty" protobuf:"bytes,11,opt,name=readinessProbe"`
LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty" protobuf:"bytes,12,opt,name=livenessProbe"`
InitContainer *InitContainer `json:"initContainer,omitempty"`
Sidecars *[]Sidecar `json:"sidecars,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty" protobuf:"varint,4,opt,name=terminationGracePeriodSeconds"`
EnvVars *[]corev1.EnvVar `json:"env,omitempty"`
Size *int32 `json:"clusterSize"`
KubernetesConfig KubernetesConfig `json:"kubernetesConfig"`
RedisExporter *RedisExporter `json:"redisExporter,omitempty"`
RedisConfig *RedisConfig `json:"redisConfig,omitempty"`
Storage *Storage `json:"storage,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
PodSecurityContext *corev1.PodSecurityContext `json:"podSecurityContext,omitempty"`
SecurityContext *corev1.SecurityContext `json:"securityContext,omitempty"`
PriorityClassName string `json:"priorityClassName,omitempty"`
Affinity *corev1.Affinity `json:"affinity,omitempty"`
Tolerations *[]corev1.Toleration `json:"tolerations,omitempty"`
TLS *TLSConfig `json:"TLS,omitempty"`
ACL *ACLConfig `json:"acl,omitempty"`
ReadinessProbe *corev1.Probe `json:"readinessProbe,omitempty" protobuf:"bytes,11,opt,name=readinessProbe"`
LivenessProbe *corev1.Probe `json:"livenessProbe,omitempty" protobuf:"bytes,12,opt,name=livenessProbe"`
InitContainer *InitContainer `json:"initContainer,omitempty"`
Sidecars *[]Sidecar `json:"sidecars,omitempty"`
ServiceAccountName *string `json:"serviceAccountName,omitempty"`
TerminationGracePeriodSeconds *int64 `json:"terminationGracePeriodSeconds,omitempty" protobuf:"varint,4,opt,name=terminationGracePeriodSeconds"`
EnvVars *[]corev1.EnvVar `json:"env,omitempty"`
}

func (cr *RedisReplicationSpec) GetReplicationCounts(t string) int32 {
Expand Down
5 changes: 0 additions & 5 deletions api/v1beta2/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 0 additions & 13 deletions config/crd/bases/redis.redis.opstreelabs.in_redisreplications.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6240,19 +6240,6 @@ spec:
additionalProperties:
type: string
type: object
pdb:
description: RedisPodDisruptionBudget configure a PodDisruptionBudget
on the resource (leader/follower)
properties:
enabled:
type: boolean
maxUnavailable:
format: int32
type: integer
minAvailable:
format: int32
type: integer
type: object
podSecurityContext:
description: |-
PodSecurityContext holds pod-level security attributes and common container settings.
Expand Down
12 changes: 6 additions & 6 deletions pkg/controllers/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
}

// Check if the cluster is downscaled
if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, r.Log, instance, "leader"); leaderReplicas < leaderCount {
if leaderCount := k8sutils.CheckRedisNodeCount(ctx, r.K8sClient, instance, "leader"); leaderReplicas < leaderCount {
if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) {
return intctrlutil.Reconciled()
}

reqLogger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
logger.Info("Redis cluster is downscaling...", "Current.LeaderReplicas", leaderCount, "Desired.LeaderReplicas", leaderReplicas)
for shardIdx := leaderCount - 1; shardIdx >= leaderReplicas; shardIdx-- {
logger.Info("Remove the shard", "Shard.Index", shardIdx)
// Imp if the last index of leader sts is not leader make it then
Expand All @@ -87,10 +87,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// lastLeaderPod is slaving right now Make it the master Pod
// We have to bring a manual failover here to make it a leaderPod
// clusterFailover should also include the clusterReplicate since we have to map the followers to new leader
reqLogger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err := k8sutils.ClusterFailover(ctx, r.K8sClient, r.Log, instance); err != nil {
reqLogger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(err, reqLogger, "")
logger.Info("Cluster Failover is initiated", "Shard.Index", shardIdx)
if err := k8sutils.ClusterFailover(ctx, r.K8sClient, instance); err != nil {

Check failure on line 91 in pkg/controllers/rediscluster/rediscluster_controller.go

View workflow job for this annotation

GitHub Actions / lint

shadow: declaration of "err" shadows declaration at line 51 (govet)
logger.Error(err, "Failed to initiate cluster failover")
return intctrlutil.RequeueWithError(ctx, err, "")
}
}
// Step 1 Remove the Follower Node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
{typ: "finalizer", rec: r.reconcileFinalizer},
{typ: "statefulset", rec: r.reconcileStatefulSet},
{typ: "service", rec: r.reconcileService},
{typ: "poddisruptionbudget", rec: r.reconcilePDB},
{typ: "redis", rec: r.reconcileRedis},
{typ: "status", rec: r.reconcileStatus},
}
Expand Down Expand Up @@ -132,13 +131,6 @@ func (r *Reconciler) reconcileAnnotation(ctx context.Context, instance *redisv1b
return intctrlutil.Reconciled()
}

func (r *Reconciler) reconcilePDB(ctx context.Context, instance *redisv1beta2.RedisReplication) (ctrl.Result, error) {
if err := k8sutils.ReconcileReplicationPodDisruptionBudget(ctx, instance, instance.Spec.PodDisruptionBudget, r.K8sClient); err != nil {
return intctrlutil.RequeueAfter(ctx, time.Second*60, "")
}
return intctrlutil.Reconciled()
}

func (r *Reconciler) reconcileStatefulSet(ctx context.Context, instance *redisv1beta2.RedisReplication) (ctrl.Result, error) {
if err := k8sutils.CreateReplicationRedis(ctx, instance, r.K8sClient); err != nil {
return intctrlutil.RequeueAfter(ctx, time.Second*60, "")
Expand Down
12 changes: 6 additions & 6 deletions pkg/k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,8 @@ func verifyLeaderPodInfo(ctx context.Context, redisClient *redis.Client, podName
return false
}

func ClusterFailover(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, logger, cr, "leader"))-1)
func ClusterFailover(ctx context.Context, client kubernetes.Interface, cr *redisv1beta2.RedisCluster) error {
slavePodName := cr.Name + "-leader-" + strconv.Itoa(int(CheckRedisNodeCount(ctx, client, cr, "leader"))-1)
// cmd = redis-cli cluster failover -a <pass>
var cmd []string
pod := RedisDetails{
Expand All @@ -405,7 +405,7 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, logger lo
if *cr.Spec.ClusterVersion == "v7" {
cmd = append(cmd, getRedisHostname(pod, cr, "leader"))
} else {
cmd = append(cmd, getRedisServerIP(client, logger, pod))
cmd = append(cmd, getRedisServerIP(ctx, client, pod))
}
cmd = append(cmd, "-p")
cmd = append(cmd, strconv.Itoa(*cr.Spec.Port))
Expand All @@ -422,10 +422,10 @@ func ClusterFailover(ctx context.Context, client kubernetes.Interface, logger lo
cmd = append(cmd, getRedisTLSArgs(cr.Spec.TLS, slavePodName)...)
cmd = append(cmd, "cluster", "failover")

logger.V(1).Info("Redis cluster failover command is", "Command", cmd)
execOut, err := executeCommand1(client, logger, cr, cmd, slavePodName)
log.FromContext(ctx).V(1).Info("Redis cluster failover command is", "Command", cmd)
execOut, err := executeCommand1(ctx, client, cr, cmd, slavePodName)
if err != nil {
logger.Error(err, "Could not execute command", "Command", cmd, "Output", execOut)
log.FromContext(ctx).Error(err, "Could not execute command", "Command", cmd, "Output", execOut)
return err
}
return nil
Expand Down
59 changes: 5 additions & 54 deletions pkg/k8sutils/poddisruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func ReconcileRedisPodDisruptionBudget(ctx context.Context, cr *redisv1beta2.Red
return CreateOrUpdatePodDisruptionBudget(ctx, pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := getPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
_, err := GetPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
Expand All @@ -48,29 +48,7 @@ func ReconcileSentinelPodDisruptionBudget(ctx context.Context, cr *redisv1beta2.
return CreateOrUpdatePodDisruptionBudget(ctx, pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := getPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
log.FromContext(ctx).V(1).Info("Reconciliation Successful, no PodDisruptionBudget Found.")
// Its ok if its not found, as we're deleting anyway
return nil
}
return err
}
}

func ReconcileReplicationPodDisruptionBudget(ctx context.Context, cr *redisv1beta2.RedisReplication, pdbParams *commonapi.RedisPodDisruptionBudget, cl kubernetes.Interface) error {
pdbName := cr.ObjectMeta.Name + "-replication"
if pdbParams != nil && pdbParams.Enabled {
labels := getRedisLabels(cr.ObjectMeta.Name, replication, "replication", cr.GetObjectMeta().GetLabels())
annotations := generateStatefulSetsAnots(cr.ObjectMeta, cr.Spec.KubernetesConfig.IgnoreAnnotations)
pdbMeta := generateObjectMetaInformation(pdbName, cr.Namespace, labels, annotations)
pdbDef := generateReplicationPodDisruptionBudgetDef(ctx, cr, "replication", pdbMeta, pdbParams)
return CreateOrUpdatePodDisruptionBudget(ctx, pdbDef, cl)
} else {
// Check if one exists, and delete it.
_, err := getPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
_, err := GetPodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
if err == nil {
return deletePodDisruptionBudget(ctx, cr.Namespace, pdbName, cl)
} else if err != nil && errors.IsNotFound(err) {
Expand Down Expand Up @@ -109,33 +87,6 @@ func generatePodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta2.RedisC
return pdbTemplate
}

// generatePodDisruptionBudgetDef will create a PodDisruptionBudget definition
func generateReplicationPodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta2.RedisReplication, role string, pdbMeta metav1.ObjectMeta, pdbParams *commonapi.RedisPodDisruptionBudget) *policyv1.PodDisruptionBudget {
lblSelector := LabelSelectors(map[string]string{
"app": fmt.Sprintf("%s-%s", cr.ObjectMeta.Name, role),
"role": role,
})
pdbTemplate := &policyv1.PodDisruptionBudget{
TypeMeta: generateMetaInformation("PodDisruptionBudget", "policy/v1"),
ObjectMeta: pdbMeta,
Spec: policyv1.PodDisruptionBudgetSpec{
Selector: lblSelector,
},
}
if pdbParams.MinAvailable != nil {
pdbTemplate.Spec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *pdbParams.MinAvailable}
}
if pdbParams.MaxUnavailable != nil {
pdbTemplate.Spec.MaxUnavailable = &intstr.IntOrString{Type: intstr.Int, IntVal: *pdbParams.MaxUnavailable}
}
// If we don't have a value for either, assume quorum: (N/2)+1
if pdbTemplate.Spec.MaxUnavailable == nil && pdbTemplate.Spec.MinAvailable == nil {
pdbTemplate.Spec.MinAvailable = &intstr.IntOrString{Type: intstr.Int, IntVal: (*cr.Spec.Size / 2) + 1}
}
AddOwnerRefToObject(pdbTemplate, redisReplicationAsOwner(cr))
return pdbTemplate
}

// generatePodDisruptionBudgetDef will create a PodDisruptionBudget definition
func generateSentinelPodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta2.RedisSentinel, role string, pdbMeta metav1.ObjectMeta, pdbParams *commonapi.RedisPodDisruptionBudget) *policyv1.PodDisruptionBudget {
lblSelector := LabelSelectors(map[string]string{
Expand Down Expand Up @@ -165,7 +116,7 @@ func generateSentinelPodDisruptionBudgetDef(ctx context.Context, cr *redisv1beta

// CreateOrUpdateService method will create or update Redis service
func CreateOrUpdatePodDisruptionBudget(ctx context.Context, pdbDef *policyv1.PodDisruptionBudget, cl kubernetes.Interface) error {
storedPDB, err := getPodDisruptionBudget(ctx, pdbDef.Namespace, pdbDef.Name, cl)
storedPDB, err := GetPodDisruptionBudget(ctx, pdbDef.Namespace, pdbDef.Name, cl)
if err != nil {
if err := patch.DefaultAnnotator.SetLastAppliedAnnotation(pdbDef); err != nil { //nolint
log.FromContext(ctx).Error(err, "Unable to patch redis PodDisruptionBudget with comparison object")
Expand Down Expand Up @@ -253,8 +204,8 @@ func deletePodDisruptionBudget(ctx context.Context, namespace string, pdbName st
return nil
}

// getPodDisruptionBudget is a method to get PodDisruptionBudgets in Kubernetes
func getPodDisruptionBudget(ctx context.Context, namespace string, pdb string, cl kubernetes.Interface) (*policyv1.PodDisruptionBudget, error) {
// GetPodDisruptionBudget is a method to get PodDisruptionBudgets in Kubernetes
func GetPodDisruptionBudget(ctx context.Context, namespace string, pdb string, cl kubernetes.Interface) (*policyv1.PodDisruptionBudget, error) {
getOpts := metav1.GetOptions{
TypeMeta: generateMetaInformation("PodDisruptionBudget", "policy/v1"),
}
Expand Down

0 comments on commit 1eadb63

Please sign in to comment.