From c509998a4c1fd0e1f22f2d0e749ee632c08e2e6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ege=20G=C3=BCne=C5=9F?= Date: Thu, 7 Dec 2023 20:05:35 +0300 Subject: [PATCH] K8SPXC-1256: Ignore stopped channel if we're going to delete it (#1526) * K8SPXC-1256: Ignore stopped channel if we're going to delete it * Set LOG_LEVEL to DEBUG in make deploy --------- Co-authored-by: Viacheslav Sarzhan --- Makefile | 5 ++++- pkg/controller/pxc/replication.go | 21 ++++++++++++++++----- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index ced14f8d52..258086195d 100644 --- a/Makefile +++ b/Makefile @@ -55,7 +55,10 @@ uninstall: manifests ## Uninstall CRDs, rbac .PHONY: deploy deploy: ## Deploy operator - yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .image) = "$(IMAGE)"' $(DEPLOYDIR)/operator.yaml | kubectl apply -f - + yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .image) = "$(IMAGE)"' $(DEPLOYDIR)/operator.yaml \ + | yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .env[] | select(.name=="DISABLE_TELEMETRY") | .value) = "true"' - \ + | yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .env[] | select(.name=="LOG_LEVEL") | .value) = "DEBUG"' - \ + | kubectl apply -f - undeploy: ## Undeploy operator kubectl delete -f $(DEPLOYDIR)/operator.yaml diff --git a/pkg/controller/pxc/replication.go b/pkg/controller/pxc/replication.go index f9dbc6c5a8..1f91786d8f 100644 --- a/pkg/controller/pxc/replication.go +++ b/pkg/controller/pxc/replication.go @@ -167,12 +167,12 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileReplication(ctx context.Context return nil } - err = removeOutdatedChannels(primaryDB, cr.Spec.PXC.ReplicationChannels) + err = removeOutdatedChannels(ctx, primaryDB, cr.Spec.PXC.ReplicationChannels) if err != nil { return errors.Wrap(err, "remove outdated replication channels") } - err = checkReadonlyStatus(cr.Spec.PXC.ReplicationChannels, podList, cr, r.client) + err = checkReadonlyStatus(ctx, cr.Spec.PXC.ReplicationChannels, podList, cr, r.client) if err != nil { return errors.Wrap(err, "failed to ensure cluster readonly status") } @@ -195,11 +195,13 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileReplication(ctx context.Context if err != nil { return errors.Wrapf(err, "failed to connect to pod %s", pod.Name) } + log.V(1).Info("Stop replication on pod", "pod", pod.Name) err = db.StopAllReplication() db.Close() if err != nil { return errors.Wrapf(err, "stop replication on pod %s", pod.Name) } + log.V(1).Info("Remove replication label from pod", "pod", pod.Name) delete(pod.Labels, replicationPodLabel) err = r.client.Update(context.TODO(), &pod) if err != nil { @@ -268,7 +270,9 @@ func handleReplicaPasswordChange(db queries.Database, newPass string) error { return nil } -func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, cr *api.PerconaXtraDBCluster, client client.Client) error { +func checkReadonlyStatus(ctx context.Context, channels []api.ReplicationChannel, pods []corev1.Pod, cr *api.PerconaXtraDBCluster, client client.Client) error { + log := logf.FromContext(ctx) + isReplica := false if len(channels) > 0 { isReplica = !channels[0].IsSource @@ -286,14 +290,17 @@ func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, c } if isReplica && readonly || (!isReplica && !readonly) { + log.V(1).Info("Read only status is correct", "pod", pod.Name, "isReplica", isReplica, "readonly", readonly) continue } if isReplica && !readonly { + log.Info("Replica is not readonly. Enabling readonly mode", "pod", pod.Name) err = db.EnableReadonly() } if !isReplica && readonly { + log.Info("Primary is readonly. Disabling readonly mode", "pod", pod.Name) err = db.DisableReadonly() } if err != nil { @@ -304,7 +311,9 @@ func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, c return nil } -func removeOutdatedChannels(db queries.Database, currentChannels []api.ReplicationChannel) error { +func removeOutdatedChannels(ctx context.Context, db queries.Database, currentChannels []api.ReplicationChannel) error { + log := logf.FromContext(ctx) + dbChannels, err := db.CurrentReplicationChannels() if err != nil { return errors.Wrap(err, "get current replication channels") @@ -330,8 +339,9 @@ func removeOutdatedChannels(db queries.Database, currentChannels []api.Replicati } for channelToRemove := range toRemove { + log.Info("Remove outdated replication channel", "channel", channelToRemove) err = db.StopReplication(channelToRemove) - if err != nil { + if err != nil && !strings.Contains(err.Error(), "Error 3074") { // Error 3074: ER_REPLICA_CHANNEL_DOES_NOT_EXIST return errors.Wrapf(err, "stop replication for channel %s", channelToRemove) } @@ -340,6 +350,7 @@ func removeOutdatedChannels(db queries.Database, currentChannels []api.Replicati return errors.Wrapf(err, "get src list for outdated channel %s", channelToRemove) } for _, v := range srcList { + log.V(1).Info("Remove outdated replication source", "channel", channelToRemove, "host", v.Host) err = db.DeleteReplicationSource(channelToRemove, v.Host, v.Port) if err != nil { return errors.Wrapf(err, "delete replication source for outdated channel %s", channelToRemove)