Skip to content

Commit

Permalink
K8SPXC-1256: Ignore stopped channel if we're going to delete it (#1526)
Browse files Browse the repository at this point in the history
* K8SPXC-1256: Ignore stopped channel if we're going to delete it

* Set LOG_LEVEL to DEBUG in make deploy

---------

Co-authored-by: Viacheslav Sarzhan <[email protected]>
  • Loading branch information
egegunes and hors authored Dec 7, 2023
1 parent 8d3514a commit c509998
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ uninstall: manifests ## Uninstall CRDs, rbac

.PHONY: deploy
deploy: ## Deploy operator
yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .image) = "$(IMAGE)"' $(DEPLOYDIR)/operator.yaml | kubectl apply -f -
yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .image) = "$(IMAGE)"' $(DEPLOYDIR)/operator.yaml \
| yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .env[] | select(.name=="DISABLE_TELEMETRY") | .value) = "true"' - \
| yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .env[] | select(.name=="LOG_LEVEL") | .value) = "DEBUG"' - \
| kubectl apply -f -

undeploy: ## Undeploy operator
kubectl delete -f $(DEPLOYDIR)/operator.yaml
Expand Down
21 changes: 16 additions & 5 deletions pkg/controller/pxc/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileReplication(ctx context.Context
return nil
}

err = removeOutdatedChannels(primaryDB, cr.Spec.PXC.ReplicationChannels)
err = removeOutdatedChannels(ctx, primaryDB, cr.Spec.PXC.ReplicationChannels)
if err != nil {
return errors.Wrap(err, "remove outdated replication channels")
}

err = checkReadonlyStatus(cr.Spec.PXC.ReplicationChannels, podList, cr, r.client)
err = checkReadonlyStatus(ctx, cr.Spec.PXC.ReplicationChannels, podList, cr, r.client)
if err != nil {
return errors.Wrap(err, "failed to ensure cluster readonly status")
}
Expand All @@ -195,11 +195,13 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileReplication(ctx context.Context
if err != nil {
return errors.Wrapf(err, "failed to connect to pod %s", pod.Name)
}
log.V(1).Info("Stop replication on pod", "pod", pod.Name)
err = db.StopAllReplication()
db.Close()
if err != nil {
return errors.Wrapf(err, "stop replication on pod %s", pod.Name)
}
log.V(1).Info("Remove replication label from pod", "pod", pod.Name)
delete(pod.Labels, replicationPodLabel)
err = r.client.Update(context.TODO(), &pod)
if err != nil {
Expand Down Expand Up @@ -268,7 +270,9 @@ func handleReplicaPasswordChange(db queries.Database, newPass string) error {
return nil
}

func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, cr *api.PerconaXtraDBCluster, client client.Client) error {
func checkReadonlyStatus(ctx context.Context, channels []api.ReplicationChannel, pods []corev1.Pod, cr *api.PerconaXtraDBCluster, client client.Client) error {
log := logf.FromContext(ctx)

isReplica := false
if len(channels) > 0 {
isReplica = !channels[0].IsSource
Expand All @@ -286,14 +290,17 @@ func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, c
}

if isReplica && readonly || (!isReplica && !readonly) {
log.V(1).Info("Read only status is correct", "pod", pod.Name, "isReplica", isReplica, "readonly", readonly)
continue
}

if isReplica && !readonly {
log.Info("Replica is not readonly. Enabling readonly mode", "pod", pod.Name)
err = db.EnableReadonly()
}

if !isReplica && readonly {
log.Info("Primary is readonly. Disabling readonly mode", "pod", pod.Name)
err = db.DisableReadonly()
}
if err != nil {
Expand All @@ -304,7 +311,9 @@ func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, c
return nil
}

func removeOutdatedChannels(db queries.Database, currentChannels []api.ReplicationChannel) error {
func removeOutdatedChannels(ctx context.Context, db queries.Database, currentChannels []api.ReplicationChannel) error {
log := logf.FromContext(ctx)

dbChannels, err := db.CurrentReplicationChannels()
if err != nil {
return errors.Wrap(err, "get current replication channels")
Expand All @@ -330,8 +339,9 @@ func removeOutdatedChannels(db queries.Database, currentChannels []api.Replicati
}

for channelToRemove := range toRemove {
log.Info("Remove outdated replication channel", "channel", channelToRemove)
err = db.StopReplication(channelToRemove)
if err != nil {
if err != nil && !strings.Contains(err.Error(), "Error 3074") { // Error 3074: ER_REPLICA_CHANNEL_DOES_NOT_EXIST
return errors.Wrapf(err, "stop replication for channel %s", channelToRemove)
}

Expand All @@ -340,6 +350,7 @@ func removeOutdatedChannels(db queries.Database, currentChannels []api.Replicati
return errors.Wrapf(err, "get src list for outdated channel %s", channelToRemove)
}
for _, v := range srcList {
log.V(1).Info("Remove outdated replication source", "channel", channelToRemove, "host", v.Host)
err = db.DeleteReplicationSource(channelToRemove, v.Host, v.Port)
if err != nil {
return errors.Wrapf(err, "delete replication source for outdated channel %s", channelToRemove)
Expand Down

0 comments on commit c509998

Please sign in to comment.