Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch application_name back to default value #575

Merged
merged 16 commits into from
Aug 13, 2024
Merged
70 changes: 52 additions & 18 deletions controllers/postgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1062,14 +1062,28 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
return requeueAfterReconcile, nil
}
if instance.Spec.PostgresConnection.SynchronousReplication {
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != instance.Spec.PostgresConnection.ConnectedPostgresID {
// fetch the sync standby to determine the correct application_name of the instance
log.V(debugLogLevel).Info("fetching the referenced sync standby")
var synchronousStandbyApplicationName *string
s := &pg.Postgres{}
ns := types.NamespacedName{
Name: instance.Spec.PostgresConnection.ConnectedPostgresID,
Namespace: instance.Namespace,
}
if err := r.CtrlClient.Get(ctx, ns, s); err != nil {
r.recorder.Eventf(s, "Warning", "Error", "failed to get referenced sync standby: %v", err)
synchronousStandbyApplicationName = nil
} else {
synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName())
}
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != *synchronousStandbyApplicationName {
log.V(debugLogLevel).Info("synchronous_nodes_additional mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, synchronousStandbyApplicationName)
}
} else {
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
}

Expand All @@ -1078,25 +1092,25 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp)
return requeueAfterReconcile, nil
}
if resp.StandbyCluster.ApplicationName != instance.ObjectMeta.Name {
log.V(debugLogLevel).Info("application_name mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
}
if resp.StandbyCluster.CreateReplicaMethods == nil {
log.V(debugLogLevel).Info("create_replica_methods mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.StandbyCluster.Host != instance.Spec.PostgresConnection.ConnectionIP {
log.V(debugLogLevel).Info("host mismatch, updating and requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.StandbyCluster.Port != int(instance.Spec.PostgresConnection.ConnectionPort) {
log.V(debugLogLevel).Info("port mismatch, updating and requeing", "updating", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.StandbyCluster.ApplicationName != instance.ToPeripheralResourceName() {
log.V(debugLogLevel).Info("application_name mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
if resp.SynchronousNodesAdditional != nil {
log.V(debugLogLevel).Info("synchronous_nodes_additional mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}
}

Expand Down Expand Up @@ -1142,7 +1156,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo
for _, pod := range pods.Items {
pod := pod // pin!
podIP := pod.Status.PodIP
if err := r.httpPatchPatroni(log, ctx, instance, podIP); err != nil {
if err := r.httpPatchPatroni(log, ctx, instance, podIP, nil); err != nil {
lastErr = err
log.Info("failed to update pod")
}
Expand All @@ -1155,7 +1169,7 @@ func (r *PostgresReconciler) updatePatroniReplicationConfigOnAllPods(log logr.Lo
return nil
}

func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string) error {
func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Context, instance *pg.Postgres, podIP string, synchronousStandbyApplicationName *string) error {
if podIP == "" {
return errors.New("podIP must not be empty")
}
Expand All @@ -1170,8 +1184,23 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
StandbyCluster: nil,
}
if instance.Spec.PostgresConnection.SynchronousReplication {
if synchronousStandbyApplicationName == nil {
// fetch the sync standby to determine the correct application_name of the instance
log.V(debugLogLevel).Info("unexpectetly having to fetch the referenced sync standby")
s := &pg.Postgres{}
ns := types.NamespacedName{
Name: instance.Spec.PostgresConnection.ConnectedPostgresID,
Namespace: instance.Namespace,
}
if err := r.CtrlClient.Get(ctx, ns, s); err != nil {
r.recorder.Eventf(s, "Warning", "Error", "failed to get referenced sync standby: %v", err)
synchronousStandbyApplicationName = nil
} else {
synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName())
}
}
// enable sync replication
request.SynchronousNodesAdditional = pointer.String(instance.Spec.PostgresConnection.ConnectedPostgresID)
request.SynchronousNodesAdditional = synchronousStandbyApplicationName
} else {
// disable sync replication
request.SynchronousNodesAdditional = nil
Expand All @@ -1183,7 +1212,7 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
CreateReplicaMethods: []string{"basebackup_fast_xlog"},
Host: instance.Spec.PostgresConnection.ConnectionIP,
Port: int(instance.Spec.PostgresConnection.ConnectionPort),
ApplicationName: instance.ObjectMeta.Name,
ApplicationName: instance.ToPeripheralResourceName(),
},
SynchronousNodesAdditional: nil,
}
Expand Down Expand Up @@ -1212,6 +1241,11 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
}
defer resp.Body.Close()

// fake error when standbyApplicationName is required but not provided
if instance.IsReplicationPrimary() && instance.Spec.PostgresConnection.SynchronousReplication && synchronousStandbyApplicationName == nil {
return fmt.Errorf("missing application_name of synchronous standby, disable synchronous replication")
}

return nil
}

Expand Down
Loading