Skip to content

Commit

Permalink
Standby create replica methods (#533)
Browse files Browse the repository at this point in the history
* Patch create_replica_methods

* Add datadir option

* poc implementation of STANDBY_* envs

* * Add SYANDBY_WITH_WALG env
* Reconfigure bootstrapping, not replica imaging

* Renaming

* * More pointers
* enable s3 for creating replicas as well

* Set additional envs

* Refactor

* Remove unused code

* Test without modifying the bootstrap/create_replica_methods

* Cleanup

* Cleanup

* Refactoring

* Tweak WALG for standby bootstrapping as well

* Add feature flag
  • Loading branch information
eberlep authored Nov 28, 2023
1 parent 20491dd commit 5c04c8a
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 0 deletions.
8 changes: 8 additions & 0 deletions api/v1/postgres_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,14 @@ func (p *Postgres) IsReplicationPrimary() bool {
return false
}

func (p *Postgres) IsReplicationTarget() bool {
if p.Spec.PostgresConnection != nil && p.Spec.PostgresConnection.ReplicationPrimary == false {
// sth is configured and we are not the leader
return true
}
return false
}

// enableAuditLogs configures this postgres instances audit logging
func enableAuditLogs(parameters map[string]string) {
// default values: bg_mon,pg_stat_statements,pgextwlist,pg_auth_mon,set_user,timescaledb,pg_cron,pg_stat_kcache
Expand Down
94 changes: 94 additions & 0 deletions controllers/postgres_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type PostgresReconciler struct {
EnableRandomStorageEncryptionSecret bool
EnableWalGEncryption bool
PostgresletFullname string
EnableBootstrapStandbyFromS3 bool
}

// Reconcile is the entry point for postgres reconciliation.
Expand Down Expand Up @@ -493,6 +494,12 @@ func (r *PostgresReconciler) updatePodEnvironmentConfigMap(ctx context.Context,
"CLONE_WALG_DOWNLOAD_CONCURRENCY": downloadConcurrency,
}

if r.EnableBootstrapStandbyFromS3 && p.IsReplicationTarget() {
data["STANDBY_WALG_UPLOAD_DISK_CONCURRENCY"] = uploadDiskConcurrency
data["STANDBY_WALG_UPLOAD_CONCURRENCY"] = uploadConcurrency
data["STANDBY_WALG_DOWNLOAD_CONCURRENCY"] = downloadConcurrency
}

cm := &corev1.ConfigMap{}
ns := types.NamespacedName{
Name: operatormanager.PodEnvCMName,
Expand Down Expand Up @@ -559,6 +566,19 @@ func (r *PostgresReconciler) updatePodEnvironmentSecret(ctx context.Context, p *
} else {
delete(data, "CLONE_WALG_LIBSODIUM_KEY")
}

// we also need that (hopefully identical) key to bootstrap from files in S3
if r.EnableBootstrapStandbyFromS3 && p.IsReplicationTarget() {
data["STANDBY_WALG_LIBSODIUM_KEY"] = k
}
}

// add STANDBY_* variables for bootstrapping from S3
if r.EnableBootstrapStandbyFromS3 && p.IsReplicationTarget() {
standbyEnvs := r.getStandbyEnvs(ctx, p)
for name, value := range standbyEnvs {
data[name] = value
}
}

var s *corev1.Secret
Expand All @@ -579,6 +599,80 @@ func (r *PostgresReconciler) updatePodEnvironmentSecret(ctx context.Context, p *
return nil
}

// getStandbyEnvs Fetches all the required info from the remote primary postgres and fills all ENVS required for bootstrapping from S3
func (r *PostgresReconciler) getStandbyEnvs(ctx context.Context, p *pg.Postgres) map[string][]byte {
standbyEnvs := map[string][]byte{}

// fetch backup secret of primary
primary := &pg.Postgres{}
ns := types.NamespacedName{
Name: p.Spec.PostgresConnection.ConnectedPostgresID,
Namespace: p.Namespace,
}
if err := r.CtrlClient.Get(ctx, ns, primary); err != nil {
if apierrors.IsNotFound(err) {
// the instance was updated, but does not exist anymore -> do nothing, it was probably deleted
return standbyEnvs
}

r.recorder.Eventf(primary, "Warning", "Error", "failed to get referenced primary postgres: %v", err)
return standbyEnvs
}

if primary.Spec.BackupSecretRef == "" {
r.recorder.Eventf(primary, "Warning", "Error", "No backupSecretRef for primary postgres found, skipping configuration of wal_e bootstrapping")
return standbyEnvs
}

primaryBackupConfig, err := r.getBackupConfig(ctx, primary.Namespace, primary.Spec.BackupSecretRef)
if err != nil {
r.recorder.Eventf(primary, "Warning", "Error", "failed to get referenced primary backup config, skipping configuration of wal_e bootstrapping: %v", err)
return standbyEnvs
}
primaryS3url, err := url.Parse(primaryBackupConfig.S3Endpoint)
if err != nil {
r.recorder.Eventf(primary, "Warning", "Error", "error while parsing the s3 endpoint url in the backup secret: %w", err)
return standbyEnvs
}

// use the s3 endpoint as provided
primaryAwsEndpoint := primaryS3url.String()
// modify the scheme to 'https+path'
primaryS3url.Scheme = "https+path"
// use the modified s3 endpoint
primaryWalES3Endpoint := primaryS3url.String()
// region
primaryRegion := primaryBackupConfig.S3Region
// s3 prefix
primaryWalGS3Prefix := "s3://" + primaryBackupConfig.S3BucketName + "/" + primary.ToPeripheralResourceName()
// s3 server side encryption SSE is disabled, we use client side encryption
// see STANDBY_WALG_LIBSODIUM_KEY above
primaryWalgDisableSSE := "true"
// aws access key
primaryAwsAccessKeyID := primaryBackupConfig.S3AccessKey
// aws secret key
primaryAwsSecretAccessKey := primaryBackupConfig.S3SecretKey

// create updated content for pod environment configmap
// this is a bit confusing: those are used to bootstrap a remote standby, so they have to point to the primary!
standbyEnvs["STANDBY_AWS_ACCESS_KEY_ID"] = []byte(primaryAwsAccessKeyID)
standbyEnvs["STANDBY_AWS_SECRET_ACCESS_KEY"] = []byte(primaryAwsSecretAccessKey)
standbyEnvs["STANDBY_AWS_ENDPOINT"] = []byte(primaryAwsEndpoint)
standbyEnvs["STANDBY_AWS_S3_FORCE_PATH_STYLE"] = []byte("true")
standbyEnvs["STANDBY_AWS_REGION"] = []byte(primaryRegion)
standbyEnvs["STANDBY_AWS_WALG_S3_ENDPOINT"] = []byte(primaryWalES3Endpoint)
standbyEnvs["STANDBY_USE_WALG_BACKUP"] = []byte("true")
standbyEnvs["STANDBY_USE_WALG_RESTORE"] = []byte("true")
standbyEnvs["STANDBY_WALE_S3_ENDPOINT"] = []byte(primaryWalES3Endpoint)
standbyEnvs["STANDBY_WALG_DISABLE_S3_SSE"] = []byte(primaryWalgDisableSSE)
standbyEnvs["STANDBY_WALG_S3_ENDPOINT"] = []byte(primaryWalES3Endpoint)
standbyEnvs["STANDBY_WALG_S3_PREFIX"] = []byte(primaryWalGS3Prefix)
standbyEnvs["STANDBY_WALG_S3_SSE"] = []byte("")
standbyEnvs["STANDBY_WITH_WALG"] = []byte("true")

return standbyEnvs
}

func (r *PostgresReconciler) isManagedByUs(obj *pg.Postgres) bool {
if obj.Spec.PartitionID != r.PartitionID {
return false
Expand Down
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const (
enableRandomStorageEncrytionSecretFlg = "enable-random-storage-encryption-secret"
enableWalGEncryptionFlg = "enable-walg-encryption"
enableForceSharedIPFlg = "enable-force-shared-ip"
enableBootstrapStandbyFromS3Flg = "enable-bootsrtap-standby-from-s3"
)

var (
Expand Down Expand Up @@ -124,6 +125,7 @@ func main() {
enableRandomStorageEncrytionSecret bool
enableWalGEncryption bool
enableForceSharedIP bool
enableBootstrapStandbyFromS3 bool

portRangeStart int
portRangeSize int
Expand Down Expand Up @@ -260,6 +262,9 @@ func main() {
viper.SetDefault(enableForceSharedIPFlg, true) // TODO switch to false?
enableForceSharedIP = viper.GetBool(enableForceSharedIPFlg)

viper.SetDefault(enableBootstrapStandbyFromS3Flg, true)
enableBootstrapStandbyFromS3 = viper.GetBool(enableBootstrapStandbyFromS3Flg)

ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

ctrl.Log.Info("flag",
Expand Down Expand Up @@ -299,6 +304,7 @@ func main() {
postgresletFullnameFlg, postgresletFullname,
enableWalGEncryptionFlg, enableWalGEncryption,
enableForceSharedIPFlg, enableForceSharedIP,
enableBootstrapStandbyFromS3Flg, enableBootstrapStandbyFromS3,
)

svcClusterConf := ctrl.GetConfigOrDie()
Expand Down Expand Up @@ -406,6 +412,7 @@ func main() {
EnableRandomStorageEncryptionSecret: enableRandomStorageEncrytionSecret,
EnableWalGEncryption: enableWalGEncryption,
PostgresletFullname: postgresletFullname,
EnableBootstrapStandbyFromS3: enableBootstrapStandbyFromS3,
}).SetupWithManager(ctrlPlaneClusterMgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Postgres")
os.Exit(1)
Expand Down

0 comments on commit 5c04c8a

Please sign in to comment.