Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

K8SPXC-1256: Ignore stopped channel if we're going to delete it #1526

Merged
merged 3 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ uninstall: manifests ## Uninstall CRDs, rbac

.PHONY: deploy
deploy: ## Deploy operator
yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .image) = "$(IMAGE)"' $(DEPLOYDIR)/operator.yaml | kubectl apply -f -
yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .image) = "$(IMAGE)"' $(DEPLOYDIR)/operator.yaml \
| yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .env[] | select(.name=="DISABLE_TELEMETRY") | .value) = "true"' - \
| yq eval '(.spec.template.spec.containers[] | select(.name=="$(NAME)") | .env[] | select(.name=="LOG_LEVEL") | .value) = "DEBUG"' - \
| kubectl apply -f -

undeploy: ## Undeploy operator
kubectl delete -f $(DEPLOYDIR)/operator.yaml
Expand Down
21 changes: 16 additions & 5 deletions pkg/controller/pxc/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileReplication(ctx context.Context
return nil
}

err = removeOutdatedChannels(primaryDB, cr.Spec.PXC.ReplicationChannels)
err = removeOutdatedChannels(ctx, primaryDB, cr.Spec.PXC.ReplicationChannels)
if err != nil {
return errors.Wrap(err, "remove outdated replication channels")
}

err = checkReadonlyStatus(cr.Spec.PXC.ReplicationChannels, podList, cr, r.client)
err = checkReadonlyStatus(ctx, cr.Spec.PXC.ReplicationChannels, podList, cr, r.client)
if err != nil {
return errors.Wrap(err, "failed to ensure cluster readonly status")
}
Expand All @@ -195,11 +195,13 @@ func (r *ReconcilePerconaXtraDBCluster) reconcileReplication(ctx context.Context
if err != nil {
return errors.Wrapf(err, "failed to connect to pod %s", pod.Name)
}
log.V(1).Info("Stop replication on pod", "pod", pod.Name)
err = db.StopAllReplication()
db.Close()
if err != nil {
return errors.Wrapf(err, "stop replication on pod %s", pod.Name)
}
log.V(1).Info("Remove replication label from pod", "pod", pod.Name)
delete(pod.Labels, replicationPodLabel)
err = r.client.Update(context.TODO(), &pod)
if err != nil {
Expand Down Expand Up @@ -268,7 +270,9 @@ func handleReplicaPasswordChange(db queries.Database, newPass string) error {
return nil
}

func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, cr *api.PerconaXtraDBCluster, client client.Client) error {
func checkReadonlyStatus(ctx context.Context, channels []api.ReplicationChannel, pods []corev1.Pod, cr *api.PerconaXtraDBCluster, client client.Client) error {
log := logf.FromContext(ctx)

isReplica := false
if len(channels) > 0 {
isReplica = !channels[0].IsSource
Expand All @@ -286,14 +290,17 @@ func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, c
}

if isReplica && readonly || (!isReplica && !readonly) {
log.V(1).Info("Read only status is correct", "pod", pod.Name, "isReplica", isReplica, "readonly", readonly)
continue
}

if isReplica && !readonly {
log.Info("Replica is not readonly. Enabling readonly mode", "pod", pod.Name)
err = db.EnableReadonly()
}

if !isReplica && readonly {
log.Info("Primary is readonly. Disabling readonly mode", "pod", pod.Name)
err = db.DisableReadonly()
}
if err != nil {
Expand All @@ -304,7 +311,9 @@ func checkReadonlyStatus(channels []api.ReplicationChannel, pods []corev1.Pod, c
return nil
}

func removeOutdatedChannels(db queries.Database, currentChannels []api.ReplicationChannel) error {
func removeOutdatedChannels(ctx context.Context, db queries.Database, currentChannels []api.ReplicationChannel) error {
log := logf.FromContext(ctx)

dbChannels, err := db.CurrentReplicationChannels()
if err != nil {
return errors.Wrap(err, "get current replication channels")
Expand All @@ -330,8 +339,9 @@ func removeOutdatedChannels(db queries.Database, currentChannels []api.Replicati
}

for channelToRemove := range toRemove {
log.Info("Remove outdated replication channel", "channel", channelToRemove)
err = db.StopReplication(channelToRemove)
if err != nil {
if err != nil && !strings.Contains(err.Error(), "Error 3074") { // Error 3074: ER_REPLICA_CHANNEL_DOES_NOT_EXIST
return errors.Wrapf(err, "stop replication for channel %s", channelToRemove)
}

Expand All @@ -340,6 +350,7 @@ func removeOutdatedChannels(db queries.Database, currentChannels []api.Replicati
return errors.Wrapf(err, "get src list for outdated channel %s", channelToRemove)
}
for _, v := range srcList {
log.V(1).Info("Remove outdated replication source", "channel", channelToRemove, "host", v.Host)
err = db.DeleteReplicationSource(channelToRemove, v.Host, v.Port)
if err != nil {
return errors.Wrapf(err, "delete replication source for outdated channel %s", channelToRemove)
Expand Down
Loading