@@ -56,6 +56,9 @@ const (
56
56
BackupConfigKey = "config"
57
57
// SharedBufferParameterKey defines the key under which the shared buffer size is stored in the parameters map. Defined by the postgres-operator/patroni
58
58
SharedBufferParameterKey = "shared_buffers"
59
+ // StandbyKey defines the key under which the standby configuration is stored in the CR. Defined by the postgres-operator/patroni
60
+ StandbyKey = "standby"
61
+ StandbyMethod = "streaming_host"
59
62
60
63
teamIDPrefix = "pg"
61
64
)
@@ -152,6 +155,9 @@ type PostgresSpec struct {
152
155
// BackupSecretRef reference to the secret where the backup credentials are stored
153
156
BackupSecretRef string `json:"backupSecretRef,omitempty"`
154
157
158
+ // PostgresConnection Connection info of a streaming host, independant of the current role (leader or standby)
159
+ PostgresConnection * PostgresConnection `json:"connection,omitempty"`
160
+
155
161
// AuditLogs enable or disable default audit logs
156
162
AuditLogs * bool `json:"auditLogs,omitempty"`
157
163
@@ -207,6 +213,22 @@ type PostgresList struct {
207
213
Items []Postgres `json:"items"`
208
214
}
209
215
216
+ // PostgresConnection A remote postgres instance this one is linked to, e.g. for standby purpouses.
217
+ type PostgresConnection struct {
218
+ // ConnectedPostgresID internal ID of the connected Postgres instance
219
+ ConnectedPostgresID string `json:"postgresID,omitempty"`
220
+ // ConnectionSecretName name of the internal secret used to connect to the remote postgres
221
+ ConnectionSecretName string `json:"secretName,omitempty"`
222
+ // ConnectionIP IP of the remote postgres
223
+ ConnectionIP string `json:"ip,omitempty"`
224
+ // ConnectionPort port of the remote postgres
225
+ ConnectionPort uint16 `json:"port,omitempty"`
226
+ // SynchronousReplication determines if async or sync replication is used for the standby postgres
227
+ SynchronousReplication bool `json:"synchronous,omitempty"`
228
+ // ReplicationPrimary determines if THIS side of the connection is the primary or the standby side
229
+ ReplicationPrimary bool `json:"localSideIsPrimary,omitempty"`
230
+ }
231
+
210
232
var SvcLoadBalancerLabel = map [string ]string {
211
233
ManagedByLabelName : ManagedByLabelValue ,
212
234
}
@@ -519,6 +541,22 @@ func (p *Postgres) ToUnstructuredZalandoPostgresql(z *zalando.Postgresql, c *cor
519
541
z .Spec .AllowedSourceRanges = p .Spec .AccessList .SourceRanges
520
542
}
521
543
544
+ // Enable replication (using unstructured json)
545
+ if p .IsReplicationPrimary () {
546
+ // delete field
547
+ z .Spec .StandbyCluster = nil
548
+ } else {
549
+ // overwrite connection info
550
+ z .Spec .StandbyCluster = & zalando.StandbyDescription {
551
+ StandbyMethod : StandbyMethod ,
552
+ StandbyHost : p .Spec .PostgresConnection .ConnectionIP ,
553
+ StandbyPort : strconv .FormatInt (int64 (p .Spec .PostgresConnection .ConnectionPort ), 10 ),
554
+ StandbySecretName : "standby." + p .ToPeripheralResourceName () + ".credentials" ,
555
+ S3WalPath : "" ,
556
+ StandbyApplicationName : p .ObjectMeta .Name ,
557
+ }
558
+ }
559
+
522
560
jsonZ , err := runtime .DefaultUnstructuredConverter .ToUnstructured (z )
523
561
if err != nil {
524
562
return nil , fmt .Errorf ("failed to convert to unstructured zalando postgresql: %w" , err )
@@ -656,6 +694,14 @@ func setSharedBufferSize(parameters map[string]string, shmSize string) {
656
694
}
657
695
}
658
696
697
+ func (p * Postgres ) IsReplicationPrimary () bool {
698
+ if p .Spec .PostgresConnection == nil || p .Spec .PostgresConnection .ReplicationPrimary {
699
+ // nothing is configured, or we are the leader. nothing to do.
700
+ return true
701
+ }
702
+ return false
703
+ }
704
+
659
705
// enableAuditLogs configures this postgres instances audit logging
660
706
func enableAuditLogs (parameters map [string ]string ) {
661
707
// default values: bg_mon,pg_stat_statements,pgextwlist,pg_auth_mon,set_user,timescaledb,pg_cron,pg_stat_kcache
@@ -676,3 +722,73 @@ func setPostgresParams(parameters map[string]string, providedParams map[string]s
676
722
parameters [k ] = v
677
723
}
678
724
}
725
+
726
+ func (p * Postgres ) ToStandbyClusterIngresCWNPName () string {
727
+ return p .ToPeripheralResourceName () + "-standby-ingress"
728
+ }
729
+ func (p * Postgres ) ToStandbyClusterEgresCWNPName () string {
730
+ return p .ToPeripheralResourceName () + "-standby-egress"
731
+ }
732
+
733
+ func (p * Postgres ) ToStandbyClusterIngressCWNP (sourceCIDRs []string ) (* firewall.ClusterwideNetworkPolicy , error ) {
734
+
735
+ standbyIngressCWNPName := p .ToStandbyClusterIngresCWNPName ()
736
+ standbyIngressCWNP := & firewall.ClusterwideNetworkPolicy {ObjectMeta : metav1.ObjectMeta {Name : standbyIngressCWNPName , Namespace : firewall .ClusterwideNetworkPolicyNamespace }}
737
+
738
+ //
739
+ // Create ingress rule from pre-configured CIDR to this DBs port
740
+ //
741
+ standbyClusterIngressIPBlocks := []networkingv1.IPBlock {}
742
+ for _ , cidr := range sourceCIDRs {
743
+ remoteServiceClusterCIDR , err := netaddr .ParseIPPrefix (cidr )
744
+ if err != nil {
745
+ return nil , fmt .Errorf ("unable to parse standby host ip %s: %w" , p .Spec .PostgresConnection .ConnectionIP , err )
746
+ }
747
+ standbyClusterIPs := networkingv1.IPBlock {
748
+ CIDR : remoteServiceClusterCIDR .String (),
749
+ }
750
+ standbyClusterIngressIPBlocks = append (standbyClusterIngressIPBlocks , standbyClusterIPs )
751
+ }
752
+
753
+ // Add Port to CWNP (if known)
754
+ tcp := corev1 .ProtocolTCP
755
+ ingressTargetPorts := []networkingv1.NetworkPolicyPort {}
756
+ if p .Status .Socket .Port != 0 {
757
+ portObj := intstr .FromInt (int (p .Status .Socket .Port ))
758
+ ingressTargetPorts = append (ingressTargetPorts , networkingv1.NetworkPolicyPort {Port : & portObj , Protocol : & tcp })
759
+ }
760
+ standbyIngressCWNP .Spec .Ingress = []firewall.IngressRule {
761
+ {Ports : ingressTargetPorts , From : standbyClusterIngressIPBlocks },
762
+ }
763
+
764
+ return standbyIngressCWNP , nil
765
+ }
766
+
767
+ func (p * Postgres ) ToStandbyClusterEgressCWNP () (* firewall.ClusterwideNetworkPolicy , error ) {
768
+ standbyClusterEgressIPBlocks := []networkingv1.IPBlock {}
769
+ if p .Spec .PostgresConnection .ConnectionIP != "" {
770
+ remoteServiceClusterCIDR , err := netaddr .ParseIPPrefix (p .Spec .PostgresConnection .ConnectionIP + "/32" )
771
+ if err != nil {
772
+ return nil , fmt .Errorf ("unable to parse standby host ip %s: %w" , p .Spec .PostgresConnection .ConnectionIP , err )
773
+ }
774
+ standbyClusterIPs := networkingv1.IPBlock {
775
+ CIDR : remoteServiceClusterCIDR .String (),
776
+ }
777
+ standbyClusterEgressIPBlocks = append (standbyClusterEgressIPBlocks , standbyClusterIPs )
778
+ }
779
+
780
+ tcp := corev1 .ProtocolTCP
781
+ standbyEgressCWNPName := p .ToStandbyClusterEgresCWNPName ()
782
+ standbyEgressCWNP := & firewall.ClusterwideNetworkPolicy {ObjectMeta : metav1.ObjectMeta {Name : standbyEgressCWNPName , Namespace : firewall .ClusterwideNetworkPolicyNamespace }}
783
+ // Add Port to CWNP
784
+ egressTargetPorts := []networkingv1.NetworkPolicyPort {}
785
+ if p .Spec .PostgresConnection .ConnectionPort != 0 {
786
+ portObj := intstr .FromInt (int (p .Spec .PostgresConnection .ConnectionPort ))
787
+ egressTargetPorts = append (egressTargetPorts , networkingv1.NetworkPolicyPort {Port : & portObj , Protocol : & tcp })
788
+ }
789
+ standbyEgressCWNP .Spec .Egress = []firewall.EgressRule {
790
+ {Ports : egressTargetPorts , To : standbyClusterEgressIPBlocks },
791
+ }
792
+
793
+ return standbyEgressCWNP , nil
794
+ }
0 commit comments