Skip to content

Commit

Permalink
controllers: support migrate down command
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Apr 11, 2024
1 parent 2872681 commit 77796c4
Showing 1 changed file with 67 additions and 18 deletions.
85 changes: 67 additions & 18 deletions controllers/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"io/fs"
"net/url"
"strings"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -218,6 +219,13 @@ func (r *AtlasMigrationReconciler) watchRefs(res *dbv1alpha1.AtlasMigration) {
}
}

const (
StatePending = "PENDING_USER"
StateApproved = "APPROVED"
StateAborted = "ABORTED"
StateApplied = "APPLIED"
)

// Reconcile the given AtlasMigration resource.
func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName string) (_ *dbv1alpha1.AtlasMigrationStatus, _ error) {
c, err := atlas.NewClient(dir, r.execPath)
Expand All @@ -232,7 +240,46 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s
}
return nil, transient(err)
}
if len(status.Pending) == 0 {
// FIXME: Move this flag to the resource spec
allowedDowngraded := true
switch {
case len(status.Pending) == 0 && len(status.Applied) > 0 && len(status.Available) < len(status.Applied):
// Migration is downgraded
if allowedDowngraded {
// The downgrade is allowed, apply the last migration version
last := status.Available[len(status.Available)-1]
run, err := c.MigrateDown(ctx, &atlas.MigrateDownParams{
Env: envName,
ToVersion: last.Version,
Context: &atlas.DeployRunContext{
TriggerType: atlas.TriggerTypeKubernetes,
TriggerVersion: dbv1alpha1.VersionFromContext(ctx),
},
})
if err != nil {
return nil, transient(err)
}
switch run.Status {
case StatePending:
// Migration is pending approval, requeue
return nil, &transientError{
err: fmt.Errorf("plan approval pending, review here: %s", run.URL),
after: 5 * time.Second,
}
case StateAborted:
// Migration is aborted, no need to reapply
return nil, fmt.Errorf("plan rejected, review here: %s", run.URL)
case StateApplied, StateApproved:
return &dbv1alpha1.AtlasMigrationStatus{
LastApplied: run.Start.Unix(),
LastAppliedVersion: run.Current,
}, nil
}
}
// The downgrade is not allowed, fall through to default behavior: No action
fallthrough
case len(status.Pending) == 0:
// No pending migrations
var lastApplied int64
if len(status.Applied) > 0 {
lastApplied = status.Applied[len(status.Applied)-1].ExecutedAt.Unix()
Expand All @@ -241,25 +288,27 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s
LastApplied: lastApplied,
LastAppliedVersion: status.Current,
}, nil
}
// Execute Atlas CLI migrate command
report, err := c.MigrateApply(ctx, &atlas.MigrateApplyParams{
Env: envName,
Context: &atlas.DeployRunContext{
TriggerType: atlas.TriggerTypeKubernetes,
TriggerVersion: dbv1alpha1.VersionFromContext(ctx),
},
})
if err != nil {
if !isSQLErr(err) {
err = transient(err)
default:
// There are pending migrations
// Execute Atlas CLI migrate command
report, err := c.MigrateApply(ctx, &atlas.MigrateApplyParams{
Env: envName,
Context: &atlas.DeployRunContext{
TriggerType: atlas.TriggerTypeKubernetes,
TriggerVersion: dbv1alpha1.VersionFromContext(ctx),
},
})
if err != nil {
if !isSQLErr(err) {
err = transient(err)
}
return nil, err
}
return nil, err
return &dbv1alpha1.AtlasMigrationStatus{
LastApplied: report.End.Unix(),
LastAppliedVersion: report.Target,
}, nil
}
return &dbv1alpha1.AtlasMigrationStatus{
LastApplied: report.End.Unix(),
LastAppliedVersion: report.Target,
}, nil
}

// Extract migration data from the given resource
Expand Down

0 comments on commit 77796c4

Please sign in to comment.