Skip to content

Commit

Permalink
Proper Configuration of Single Instance DBs after Deletion of Sync St…
Browse files Browse the repository at this point in the history
…andbies (#577)

* Double check config

* Logging

* Especially double check when bootstrapping

* Check for new value

* Revert "Especially double check when bootstrapping"

This reverts commit f1ca8db.

* Especially double check when bootstrapping

* Revert "Check for new value"

This reverts commit 4ecb5f5.

* Fetch and use the actual postgres cluster name as application_name for. This resolves some issues when using sync replication.

* Always fetch application_name when needed

* make linter happy

* Simplify

* logging

* Fix typos

* Logging

* Also patch patroni in single instance mode
(to ensure proper cleanup after a sync standby has been deleted)

* Debug logging

* More precise nil

* On a related note, do not requeue

* Some more debug logging

* Additional check

* Logging

* Back to default nil

* Check return code

* Additional nil check

* Make (updated) linter happy

* Revert "Additional nil check"

This reverts commit 273e30c.

* Fix check and improve func name

* Reapply "Additional nil check"

This reverts commit 6ff4738.
  • Loading branch information
eberlep authored Aug 27, 2024
1 parent 66e2742 commit de4dd9d
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 20 deletions.
10 changes: 5 additions & 5 deletions api/v1/postgres_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ func (p *Postgres) ToSharedSvcLB(lbIP string, lbPort int32, enableStandbyLeaderS
"cluster-name": p.ToPeripheralResourceName(),
"team": p.generateTeamID(),
}
if p.IsReplicationPrimary() {
if p.IsReplicationPrimaryOrStandalone() {
lb.Spec.Selector[SpiloRoleLabelName] = SpiloRoleLabelValueMaster
} else {
if enableStandbyLeaderSelector {
Expand Down Expand Up @@ -478,7 +478,7 @@ func (p *Postgres) ToDedicatedSvcLB(lbIP string, lbPort int32, standbyClustersSo
"cluster-name": p.ToPeripheralResourceName(),
"team": p.generateTeamID(),
}
if p.IsReplicationPrimary() {
if p.IsReplicationPrimaryOrStandalone() {
lb.Spec.Selector[SpiloRoleLabelName] = SpiloRoleLabelValueMaster
} else {
// select the first pod in the statefulset
Expand Down Expand Up @@ -769,7 +769,7 @@ func (p *Postgres) ToUnstructuredZalandoPostgresql(z *zalando.Postgresql, c *cor
}

// Enable replication (using unstructured json)
if p.IsReplicationPrimary() {
if p.IsReplicationPrimaryOrStandalone() {
// delete field
z.Spec.StandbyCluster = nil
} else {
Expand Down Expand Up @@ -929,7 +929,7 @@ func setSharedBufferSize(parameters map[string]string, shmSize string) {
}
}

func (p *Postgres) IsReplicationPrimary() bool {
func (p *Postgres) IsReplicationPrimaryOrStandalone() bool {
if p.Spec.PostgresConnection == nil || p.Spec.PostgresConnection.ReplicationPrimary {
// nothing is configured, or we are the leader. nothing to do.
return true
Expand All @@ -938,7 +938,7 @@ func (p *Postgres) IsReplicationPrimary() bool {
}

func (p *Postgres) IsReplicationTarget() bool {
if p.Spec.PostgresConnection != nil && p.Spec.PostgresConnection.ReplicationPrimary == false {
if p.Spec.PostgresConnection != nil && !p.Spec.PostgresConnection.ReplicationPrimary {
// sth is configured and we are not the leader
return true
}
Expand Down
45 changes: 32 additions & 13 deletions controllers/postgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func (r *PostgresReconciler) ensurePostgresSecrets(log logr.Logger, ctx context.
}

func (r *PostgresReconciler) ensureStandbySecrets(log logr.Logger, ctx context.Context, instance *pg.Postgres) error {
if instance.IsReplicationPrimary() {
if instance.IsReplicationPrimaryOrStandalone() {
// nothing is configured, or we are the leader. nothing to do.
return nil
}
Expand Down Expand Up @@ -1024,11 +1024,6 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
const requeueAfterReconcile = true
const allDone = false

// If there is no connected postgres, no need to tinker with patroni directly
if instance.Spec.PostgresConnection == nil {
return allDone, nil
}

log.V(debugLogLevel).Info("Checking replication config from Patroni API")

// Get the leader pod
Expand All @@ -1045,6 +1040,12 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
}
leaderIP := leaderPods.Items[0].Status.PodIP

// If there is no connected postgres, we still need to possibly clean up a former synchronous primary
if instance.Spec.PostgresConnection == nil {
log.V(debugLogLevel).Info("single instance, updating with empty config and requeing")
return allDone, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
}

var resp *PatroniConfig
resp, err = r.httpGetPatroniConfig(log, ctx, leaderIP)
if err != nil {
Expand All @@ -1056,7 +1057,7 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
return requeueAfterReconcile, nil
}

if instance.IsReplicationPrimary() {
if instance.IsReplicationPrimaryOrStandalone() {
if resp.StandbyCluster != nil {
log.V(debugLogLevel).Info("standby_cluster mismatch, requeing", "response", resp)
return requeueAfterReconcile, nil
Expand All @@ -1076,7 +1077,11 @@ func (r *PostgresReconciler) checkAndUpdatePatroniReplicationConfig(log logr.Log
} else {
synchronousStandbyApplicationName = pointer.String(s.ToPeripheralResourceName())
}
if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != *synchronousStandbyApplicationName {
// compare the actual value with the expected value
if synchronousStandbyApplicationName == nil {
log.V(debugLogLevel).Info("could not fetch synchronous_nodes_additional, disabling sync replication and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, nil)
} else if resp.SynchronousNodesAdditional == nil || *resp.SynchronousNodesAdditional != *synchronousStandbyApplicationName {
log.V(debugLogLevel).Info("synchronous_nodes_additional mismatch, updating and requeing", "response", resp)
return requeueAfterReconcile, r.httpPatchPatroni(log, ctx, instance, leaderIP, synchronousStandbyApplicationName)
}
Expand Down Expand Up @@ -1179,7 +1184,9 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte

log.V(debugLogLevel).Info("Preparing request")
var request PatroniConfig
if instance.IsReplicationPrimary() {
if instance.Spec.PostgresConnection == nil {
// use empty config
} else if instance.IsReplicationPrimaryOrStandalone() {
request = PatroniConfig{
StandbyCluster: nil,
}
Expand All @@ -1206,7 +1213,6 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
request.SynchronousNodesAdditional = nil
}
} else {
// TODO check values first
request = PatroniConfig{
StandbyCluster: &PatroniStandbyCluster{
CreateReplicaMethods: []string{"basebackup_fast_xlog"},
Expand Down Expand Up @@ -1241,8 +1247,21 @@ func (r *PostgresReconciler) httpPatchPatroni(log logr.Logger, ctx context.Conte
}
defer resp.Body.Close()

if resp.StatusCode/100 != 2 {
err = fmt.Errorf("received unexpected return code %d", resp.StatusCode)
log.Error(err, "could not perform PATCH request")
return err
}

log.V(debugLogLevel).Info("Performed request")

// fake error when standbyApplicationName is required but not provided
if instance.Spec.PostgresConnection != nil && instance.IsReplicationPrimaryOrStandalone() && instance.Spec.PostgresConnection.SynchronousReplication && synchronousStandbyApplicationName == nil {
return fmt.Errorf("missing application_name of synchronous standby, disable synchronous replication")
}

// fake error when standbyApplicationName is required but not provided
if instance.IsReplicationPrimary() && instance.Spec.PostgresConnection.SynchronousReplication && synchronousStandbyApplicationName == nil {
if instance.Spec.PostgresConnection != nil && instance.Spec.PostgresConnection.SynchronousReplication && synchronousStandbyApplicationName == nil {
return fmt.Errorf("missing application_name of synchronous standby, disable synchronous replication")
}

Expand Down Expand Up @@ -1487,7 +1506,7 @@ func (r *PostgresReconciler) createOrUpdateExporterSidecarServices(log logr.Logg
pes.Spec.Ports = []corev1.ServicePort{
{
Name: postgresExporterServicePortName,
Port: int32(exporterServicePort),
Port: int32(exporterServicePort), //nolint
Protocol: corev1.ProtocolTCP,
TargetPort: intstr.FromInt(int(exporterServiceTargetPort)),
},
Expand Down Expand Up @@ -1752,7 +1771,7 @@ func (r *PostgresReconciler) ensureInitDBJob(log logr.Logger, ctx context.Contex
cm.Data = map[string]string{}

// only execute SQL when encountering a **new** database, not for standbies or clones
if instance.IsReplicationPrimary() && instance.Spec.PostgresRestore == nil {
if instance.IsReplicationPrimaryOrStandalone() && instance.Spec.PostgresRestore == nil {
// try to fetch the global initjob configmap
cns := types.NamespacedName{
Namespace: r.PostgresletNamespace,
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ func main() {

var lbMgrOpts lbmanager.Options = lbmanager.Options{
LBIP: lbIP,
PortRangeStart: int32(portRangeStart),
PortRangeSize: int32(portRangeSize),
PortRangeStart: int32(portRangeStart), // nolint
PortRangeSize: int32(portRangeSize), // nolint
EnableStandbyLeaderSelector: enableStandbyLeaderSelector,
EnableLegacyStandbySelector: enableLegacyStandbySelector,
StandbyClustersSourceRanges: standbyClusterSourceRanges,
Expand Down

0 comments on commit de4dd9d

Please sign in to comment.