diff --git a/pkg/databaseclaim/databaseclaim.go b/pkg/databaseclaim/databaseclaim.go index 524c3848..7308ca6c 100644 --- a/pkg/databaseclaim/databaseclaim.go +++ b/pkg/databaseclaim/databaseclaim.go @@ -77,6 +77,7 @@ type input struct { MasterConnInfo v1.DatabaseClaimConnectionInfo TempSecret string DbHostIdentifier string + OldDbHostIdentifier string HostParams hostparams.HostParams EnableReplicationRole bool EnableSuperUser bool @@ -323,6 +324,7 @@ func (r *DatabaseClaimReconciler) setReqInfo(ctx context.Context, dbClaim *v1.Da } r.Input.DbHostIdentifier = r.getDynamicHostName(dbClaim) + r.Input.OldDbHostIdentifier = r.getOldDynamicHostName(dbClaim) } if r.Config.Viper.GetBool("supportSuperUserElevation") { r.Input.EnableSuperUser = *dbClaim.Spec.EnableSuperUser @@ -531,99 +533,121 @@ func (r *DatabaseClaimReconciler) postMigrationInProgress(ctx context.Context, d return ctrl.Result{RequeueAfter: time.Minute}, nil } -// Create, migrate or upgrade database +// executeDbClaimRequest manages the reconciliation logic for different database claim modes. func (r *DatabaseClaimReconciler) executeDbClaimRequest(ctx context.Context, dbClaim *v1.DatabaseClaim) (ctrl.Result, error) { - logr := log.FromContext(ctx).WithValues("databaseclaim", dbClaim.Namespace+"/"+dbClaim.Name) - if dbClaim.Status.ActiveDB.ConnectionInfo == nil { - dbClaim.Status.ActiveDB.ConnectionInfo = new(v1.DatabaseClaimConnectionInfo) - } - if dbClaim.Status.NewDB.ConnectionInfo == nil { - dbClaim.Status.NewDB.ConnectionInfo = new(v1.DatabaseClaimConnectionInfo) - } + // Ensure connection info is initialized. + ensureConnectionInfoInitialized(&dbClaim.Status.ActiveDB) + ensureConnectionInfoInitialized(&dbClaim.Status.NewDB) // FIXME: why is a per request value being set in a global variable? + // Set the mode based on the current context and dbClaim r.mode = r.getMode(ctx, dbClaim) - if r.mode == M_PostMigrationInProgress { + + switch r.mode { + case M_PostMigrationInProgress: return r.postMigrationInProgress(ctx, dbClaim) + case M_UseExistingDB: + return r.handleUseExistingDB(ctx, dbClaim, logr) + case M_MigrateExistingToNewDB: + return r.handleMigrateExistingToNewDB(ctx, dbClaim, logr) + case M_InitiateDBUpgrade: + return r.reconcileMigrateToNewDB(ctx, dbClaim) + case M_MigrationInProgress, M_UpgradeDBInProgress: + return r.reconcileMigrationInProgress(ctx, dbClaim) + case M_UseNewDB: + return r.handleUseNewDB(ctx, dbClaim, logr) + default: + logr.Info("unhandled mode") + return r.manageError(ctx, dbClaim, fmt.Errorf("unhandled mode")) } - if r.mode == M_UseExistingDB { - logr.Info("existing db reconcile started") - err := r.reconcileUseExistingDB(ctx, dbClaim) - if err != nil { - return r.manageError(ctx, dbClaim, err) - } - dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy() - dbClaim.Status.NewDB = v1.Status{ConnectionInfo: &v1.DatabaseClaimConnectionInfo{}} +} - logr.Info("existing db reconcile complete") - return r.manageSuccess(ctx, dbClaim) +// ensureConnectionInfoInitialized ensures that the ConnectionInfo is initialized. +func ensureConnectionInfoInitialized(status *v1.Status) { + if status.ConnectionInfo == nil { + status.ConnectionInfo = new(v1.DatabaseClaimConnectionInfo) } - if r.mode == M_MigrateExistingToNewDB { - logr.Info("migrate to new db reconcile started") - //check if existingDB has been already reconciled, else reconcileUseExisitngDB - existing_db_conn, err := v1.ParseUri(dbClaim.Spec.SourceDataFrom.Database.DSN) - if err != nil { - return r.manageError(ctx, dbClaim, err) - } - if (dbClaim.Status.ActiveDB.ConnectionInfo.DatabaseName != existing_db_conn.DatabaseName) || - (dbClaim.Status.ActiveDB.ConnectionInfo.Host != existing_db_conn.Host) { +} - logr.Info("existing db was not reconciled, calling reconcileUseExisitngDB before reconcileUseExisitngDB") +// handleUseExistingDB handles the M_UseExistingDB mode. +func (r *DatabaseClaimReconciler) handleUseExistingDB(ctx context.Context, dbClaim *v1.DatabaseClaim, logr logr.Logger) (ctrl.Result, error) { + logr.Info("existing db reconcile started") - err := r.reconcileUseExistingDB(ctx, dbClaim) - if err != nil { - return r.manageError(ctx, dbClaim, err) - } - dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy() - dbClaim.Status.NewDB = v1.Status{ConnectionInfo: &v1.DatabaseClaimConnectionInfo{}} - } - - return r.reconcileMigrateToNewDB(ctx, dbClaim) + if err := r.reconcileUseExistingDB(ctx, dbClaim); err != nil { + return r.manageError(ctx, dbClaim, err) } - if r.mode == M_InitiateDBUpgrade { - logr.Info("upgrade db initiated") - return r.reconcileMigrateToNewDB(ctx, dbClaim) + dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy() + dbClaim.Status.NewDB = v1.Status{ConnectionInfo: &v1.DatabaseClaimConnectionInfo{}} + logr.Info("existing db reconcile complete") + return r.manageSuccess(ctx, dbClaim) +} + +// handleMigrateExistingToNewDB handles the M_MigrateExistingToNewDB mode. +func (r *DatabaseClaimReconciler) handleMigrateExistingToNewDB(ctx context.Context, dbClaim *v1.DatabaseClaim, logr logr.Logger) (ctrl.Result, error) { + logr.Info("migrate to new db reconcile started") + + existingDBConn, err := v1.ParseUri(dbClaim.Spec.SourceDataFrom.Database.DSN) + if err != nil { + return r.manageError(ctx, dbClaim, err) } - if r.mode == M_MigrationInProgress || r.mode == M_UpgradeDBInProgress { - return r.reconcileMigrationInProgress(ctx, dbClaim) - } - if r.mode == M_UseNewDB { - logr.Info("Use new DB") - result, err := r.reconcileNewDB(ctx, dbClaim) - if err != nil { + if !isExistingDBReconciled(dbClaim, existingDBConn) { + logr.Info("existing db was not reconciled, calling reconcileUseExistingDB before reconcileUseExistingDB") + if err := r.reconcileUseExistingDB(ctx, dbClaim); err != nil { return r.manageError(ctx, dbClaim, err) } - if result.RequeueAfter > 0 { - logr.Info("requeuing request") - return result, nil - } - if r.Input.TempSecret != "" { - newDBConnInfo := dbClaim.Status.NewDB.ConnectionInfo.DeepCopy() - newDBConnInfo.Password = r.Input.TempSecret - if err := r.createOrUpdateSecret(ctx, dbClaim, newDBConnInfo); err != nil { - return r.manageError(ctx, dbClaim, err) - } - } dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy() - if r.Input.SharedDBHost { - dbClaim.Status.ActiveDB.DbState = v1.UsingSharedHost - } else { - dbClaim.Status.ActiveDB.DbState = v1.Ready - } dbClaim.Status.NewDB = v1.Status{ConnectionInfo: &v1.DatabaseClaimConnectionInfo{}} + } + + return r.reconcileMigrateToNewDB(ctx, dbClaim) +} + +// isExistingDBReconciled checks if the existing DB has been reconciled. +func isExistingDBReconciled(dbClaim *v1.DatabaseClaim, existingDBConn *v1.DatabaseClaimConnectionInfo) bool { + return dbClaim.Status.ActiveDB.ConnectionInfo.DatabaseName == existingDBConn.DatabaseName && + dbClaim.Status.ActiveDB.ConnectionInfo.Host == existingDBConn.Host +} + +// handleUseNewDB handles the M_UseNewDB mode. +func (r *DatabaseClaimReconciler) handleUseNewDB(ctx context.Context, dbClaim *v1.DatabaseClaim, logr logr.Logger) (ctrl.Result, error) { + logr.Info("use new db") + result, err := r.reconcileNewDB(ctx, dbClaim) + if err != nil { + return r.manageError(ctx, dbClaim, err) + } - return r.manageSuccess(ctx, dbClaim) + if result.RequeueAfter > 0 { + logr.Info("requeuing request") + return result, nil + } + + if r.Input.TempSecret != "" { + newDBConnInfo := dbClaim.Status.NewDB.ConnectionInfo.DeepCopy() + newDBConnInfo.Password = r.Input.TempSecret + if err := r.createOrUpdateSecret(ctx, dbClaim, newDBConnInfo); err != nil { + return r.manageError(ctx, dbClaim, err) + } } - logr.Info("unhandled mode") - return r.manageError(ctx, dbClaim, fmt.Errorf("unhandled mode")) + dbClaim.Status.ActiveDB = *dbClaim.Status.NewDB.DeepCopy() + dbClaim.Status.ActiveDB.DbState = getDBState(r.Input.SharedDBHost) + dbClaim.Status.NewDB = v1.Status{ConnectionInfo: &v1.DatabaseClaimConnectionInfo{}} + + return r.manageSuccess(ctx, dbClaim) +} +// getDBState returns the appropriate DB state based on whether the DB is shared. +func getDBState(sharedDBHost bool) v1.DbState { + if sharedDBHost { + return v1.UsingSharedHost + } + return v1.Ready } func (r *DatabaseClaimReconciler) reconcileUseExistingDB(ctx context.Context, dbClaim *v1.DatabaseClaim) error { @@ -775,10 +799,10 @@ func (r *DatabaseClaimReconciler) reconcileNewDB(ctx context.Context, dbClaim *v return ctrl.Result{}, nil } -func (r *DatabaseClaimReconciler) reconcileMigrateToNewDB(ctx context.Context, - dbClaim *v1.DatabaseClaim) (ctrl.Result, error) { +func (r *DatabaseClaimReconciler) reconcileMigrateToNewDB(ctx context.Context, dbClaim *v1.DatabaseClaim) (ctrl.Result, error) { logr := log.FromContext(ctx) + logr.Info("upgrade db initiated") if dbClaim.Status.MigrationState == "" { dbClaim.Status.MigrationState = pgctl.S_Initial.String() @@ -1578,6 +1602,22 @@ func (r *DatabaseClaimReconciler) getDynamicHostName(dbClaim *v1.DatabaseClaim) return prefix + r.Input.FragmentKey + suffix } +// getDynamicHostName is used to name the crossplane +// dbinstance CRs +func (r *DatabaseClaimReconciler) getOldDynamicHostName(dbClaim *v1.DatabaseClaim) string { + var prefix string + suffix := "-" + r.Input.HostParams.OldHash() + + if r.Config.DbIdentifierPrefix != "" { + prefix = r.Config.DbIdentifierPrefix + "-" + } + if r.Input.FragmentKey == "" { + return prefix + dbClaim.Name + suffix + } + + return prefix + r.Input.FragmentKey + suffix +} + func (r *DatabaseClaimReconciler) getParameterGroupName(dbClaim *v1.DatabaseClaim) string { hostName := r.getDynamicHostName(dbClaim) params := &r.Input.HostParams @@ -1970,7 +2010,6 @@ func (r *DatabaseClaimReconciler) managePostgresDBInstance(ctx context.Context, Name: r.getProviderConfig(), } restoreFromSource := defaultRestoreFromSource - dbInstance := &crossplanerds.DBInstance{} params := &r.Input.HostParams ms64 := int64(params.MinStorageGB) @@ -1987,16 +2026,31 @@ func (r *DatabaseClaimReconciler) managePostgresDBInstance(ctx context.Context, maxStorageVal = ¶ms.MaxStorageGB } - err = r.Client.Get(ctx, client.ObjectKey{ - Name: dbHostName, - }, dbInstance) + var dbInstance crossplanerds.DBInstance + oldKey := client.ObjectKey{ + Name: r.Input.OldDbHostIdentifier, + } + err = r.Client.Get(ctx, oldKey, &dbInstance) if err != nil { - if errors.IsNotFound(err) { + if !errors.IsNotFound(err) { + return false, fmt.Errorf("error getting db instance key: %s: %w", oldKey.Name, err) + } + + key := client.ObjectKey{ + Name: dbHostName, + } + err = r.Client.Get(ctx, key, &dbInstance) + if err != nil { + if !errors.IsNotFound(err) { + return false, fmt.Errorf("error getting db instance key: %s: %w", key.Name, err) + } + validationError := params.CheckEngineVersion() if validationError != nil { return false, validationError } - dbInstance = &crossplanerds.DBInstance{ + + dbInstance = crossplanerds.DBInstance{ ObjectMeta: metav1.ObjectMeta{ Name: dbHostName, // TODO - Figure out the proper labels for resource @@ -2048,6 +2102,7 @@ func (r *DatabaseClaimReconciler) managePostgresDBInstance(ctx context.Context, }, }, } + if r.mode == M_UseNewDB && dbClaim.Spec.RestoreFrom != "" { snapshotID := dbClaim.Spec.RestoreFrom dbInstance.Spec.ForProvider.CustomDBInstanceParameters.RestoreFrom = &crossplanerds.RestoreDBInstanceBackupConfiguration{ @@ -2065,14 +2120,12 @@ func (r *DatabaseClaimReconciler) managePostgresDBInstance(ctx context.Context, } //create DBInstance logr.Info("creating crossplane DBInstance resource", "DBInstance", dbInstance.Name) - if err := r.Client.Create(ctx, dbInstance); err != nil { + if err := r.Client.Create(ctx, &dbInstance); err != nil { return false, err } - } else { - //not errors.IsNotFound(err) { - return false, err } } + // Deletion is long running task check that is not being deleted. if !dbInstance.ObjectMeta.DeletionTimestamp.IsZero() { err = fmt.Errorf("can not create Cloud DB instance %s it is being deleted", dbHostName) @@ -2080,7 +2133,7 @@ func (r *DatabaseClaimReconciler) managePostgresDBInstance(ctx context.Context, return false, err } - _, err = r.updateDBInstance(ctx, dbClaim, dbInstance) + _, err = r.updateDBInstance(ctx, dbClaim, &dbInstance) if err != nil { return false, err } diff --git a/pkg/hostparams/hostparams.go b/pkg/hostparams/hostparams.go index dc400620..a5adfc41 100644 --- a/pkg/hostparams/hostparams.go +++ b/pkg/hostparams/hostparams.go @@ -60,7 +60,8 @@ type HostParams struct { } func (p *HostParams) String() string { - return fmt.Sprintf("%s-%s-%s", p.Engine, p.InstanceClass, p.EngineVersion) + index := 0 // TODO: this should comes from somewhere else. + return fmt.Sprintf("%s-%d", p.InstanceClass, index) } func (p *HostParams) Hash() string { @@ -68,6 +69,15 @@ func (p *HostParams) Hash() string { return fmt.Sprintf("%08x", crc32.Checksum([]byte(p.String()), crc32q)) } +func (p *HostParams) OldString() string { + return fmt.Sprintf("%s-%s-%s", p.Engine, p.InstanceClass, p.EngineVersion) +} + +func (p *HostParams) OldHash() string { + crc32q := crc32.MakeTable(0xD5828281) + return fmt.Sprintf("%08x", crc32.Checksum([]byte(p.OldString()), crc32q)) +} + func (p *HostParams) HasShapeChanged(activeShape string) bool { if p.isDefaultShape { // request is for a "" shape