From 77796c4b31600fdf0164bdcbf7d948966b3b8ae4 Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Thu, 11 Apr 2024 11:20:34 +0700 Subject: [PATCH] controllers: support migrate down command --- controllers/atlasmigration_controller.go | 85 +++++++++++++++++++----- 1 file changed, 67 insertions(+), 18 deletions(-) diff --git a/controllers/atlasmigration_controller.go b/controllers/atlasmigration_controller.go index c2d6ee19..f085b519 100644 --- a/controllers/atlasmigration_controller.go +++ b/controllers/atlasmigration_controller.go @@ -39,6 +39,7 @@ import ( "io/fs" "net/url" "strings" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -218,6 +219,13 @@ func (r *AtlasMigrationReconciler) watchRefs(res *dbv1alpha1.AtlasMigration) { } } +const ( + StatePending = "PENDING_USER" + StateApproved = "APPROVED" + StateAborted = "ABORTED" + StateApplied = "APPLIED" +) + // Reconcile the given AtlasMigration resource. func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName string) (_ *dbv1alpha1.AtlasMigrationStatus, _ error) { c, err := atlas.NewClient(dir, r.execPath) @@ -232,7 +240,46 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s } return nil, transient(err) } - if len(status.Pending) == 0 { + // FIXME: Move this flag to the resource spec + allowedDowngraded := true + switch { + case len(status.Pending) == 0 && len(status.Applied) > 0 && len(status.Available) < len(status.Applied): + // Migration is downgraded + if allowedDowngraded { + // The downgrade is allowed, apply the last migration version + last := status.Available[len(status.Available)-1] + run, err := c.MigrateDown(ctx, &atlas.MigrateDownParams{ + Env: envName, + ToVersion: last.Version, + Context: &atlas.DeployRunContext{ + TriggerType: atlas.TriggerTypeKubernetes, + TriggerVersion: dbv1alpha1.VersionFromContext(ctx), + }, + }) + if err != nil { + return nil, transient(err) + } + switch run.Status { + case StatePending: + // Migration is pending approval, requeue + return nil, &transientError{ + err: fmt.Errorf("plan approval pending, review here: %s", run.URL), + after: 5 * time.Second, + } + case StateAborted: + // Migration is aborted, no need to reapply + return nil, fmt.Errorf("plan rejected, review here: %s", run.URL) + case StateApplied, StateApproved: + return &dbv1alpha1.AtlasMigrationStatus{ + LastApplied: run.Start.Unix(), + LastAppliedVersion: run.Current, + }, nil + } + } + // The downgrade is not allowed, fall through to default behavior: No action + fallthrough + case len(status.Pending) == 0: + // No pending migrations var lastApplied int64 if len(status.Applied) > 0 { lastApplied = status.Applied[len(status.Applied)-1].ExecutedAt.Unix() @@ -241,25 +288,27 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s LastApplied: lastApplied, LastAppliedVersion: status.Current, }, nil - } - // Execute Atlas CLI migrate command - report, err := c.MigrateApply(ctx, &atlas.MigrateApplyParams{ - Env: envName, - Context: &atlas.DeployRunContext{ - TriggerType: atlas.TriggerTypeKubernetes, - TriggerVersion: dbv1alpha1.VersionFromContext(ctx), - }, - }) - if err != nil { - if !isSQLErr(err) { - err = transient(err) + default: + // There are pending migrations + // Execute Atlas CLI migrate command + report, err := c.MigrateApply(ctx, &atlas.MigrateApplyParams{ + Env: envName, + Context: &atlas.DeployRunContext{ + TriggerType: atlas.TriggerTypeKubernetes, + TriggerVersion: dbv1alpha1.VersionFromContext(ctx), + }, + }) + if err != nil { + if !isSQLErr(err) { + err = transient(err) + } + return nil, err } - return nil, err + return &dbv1alpha1.AtlasMigrationStatus{ + LastApplied: report.End.Unix(), + LastAppliedVersion: report.Target, + }, nil } - return &dbv1alpha1.AtlasMigrationStatus{ - LastApplied: report.End.Unix(), - LastAppliedVersion: report.Target, - }, nil } // Extract migration data from the given resource