Skip to content

Commit

Permalink
controller: support devdb for atlas-migration
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Apr 11, 2024
1 parent 77796c4 commit cd82719
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 41 deletions.
41 changes: 35 additions & 6 deletions controllers/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ type (
configMapWatcher *watch.ResourceWatcher
secretWatcher *watch.ResourceWatcher
recorder record.EventRecorder
devDB *devDBReconciler
}
// migrationData is the data used to render the HCL template
// that will be used for Atlas CLI
migrationData struct {
EnvName string
URL *url.URL
DevURL string
Dir fs.FS
Cloud *cloud
RevisionsSchema string
Expand All @@ -93,13 +95,15 @@ type (
)

func NewAtlasMigrationReconciler(mgr Manager, execPath string) *AtlasMigrationReconciler {
r := mgr.GetEventRecorderFor("atlasmigration-controller")
return &AtlasMigrationReconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
execPath: execPath,
configMapWatcher: watch.New(),
secretWatcher: watch.New(),
recorder: mgr.GetEventRecorderFor("atlasmigration-controller"),
recorder: r,
devDB: newDevDB(mgr, r, true),
}
}

Expand All @@ -113,6 +117,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
if err = r.Get(ctx, req.NamespacedName, res); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}

defer func() {
// At the end of reconcile, update the status of the resource base on the error
if err != nil {
Expand All @@ -123,6 +128,10 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
// After updating the status, watch the dependent resources
r.watchRefs(res)
// Clean up any resources created by the controller after the reconciler is successful.
if res.IsReady() {
r.devDB.cleanUp(ctx, res)
}
}()
// When the resource is first created, create the "Ready" condition.
if len(res.Status.Conditions) == 0 {
Expand Down Expand Up @@ -156,6 +165,15 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
// TODO(giautm): Create DevDB and run linter for new migration
// files before applying it to the target database.

if data.DevURL == "" {
// The user has not specified an URL for dev-db,
// spin up a dev-db and get the connection string.
data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL)
if err != nil {
res.SetNotReady("GettingDevDB", err.Error())
return result(err)
}
}
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config
// and the migrations directory (if any)
Expand All @@ -170,7 +188,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
defer wd.Close()
// Reconcile given resource
status, err := r.reconcile(ctx, wd.Path(), data.EnvName)
status, err := r.reconcile(ctx, wd.Path(), data.Cloud.RemoteDir, data.EnvName)
if err != nil {
res.SetNotReady("Migrating", strings.TrimSpace(err.Error()))
r.recordErrEvent(res, err)
Expand Down Expand Up @@ -227,11 +245,13 @@ const (
)

// Reconcile the given AtlasMigration resource.
func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName string) (_ *dbv1alpha1.AtlasMigrationStatus, _ error) {
c, err := atlas.NewClient(dir, r.execPath)
func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, wd string, dir *dbv1alpha1.Remote, envName string) (_ *dbv1alpha1.AtlasMigrationStatus, _ error) {
log := ctrl.Log.WithName("atlas_migration.reconcile")
c, err := atlas.NewClient(wd, r.execPath)
if err != nil {
return nil, err
}
log.Info("reconciling migration", "env", envName)
// Check if there are any pending migration files
status, err := c.MigrateStatus(ctx, &atlas.MigrateStatusParams{Env: envName})
if err != nil {
Expand All @@ -244,18 +264,25 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s
allowedDowngraded := true
switch {
case len(status.Pending) == 0 && len(status.Applied) > 0 && len(status.Available) < len(status.Applied):
log.Info("downgrading migration detected", "allowed", allowedDowngraded)
// Migration is downgraded
if allowedDowngraded {
// The downgrade is allowed, apply the last migration version
last := status.Available[len(status.Available)-1]
run, err := c.MigrateDown(ctx, &atlas.MigrateDownParams{
log.Info("downgrading to the last available version", "version", last.Version)
params := &atlas.MigrateDownParams{
Env: envName,
ToVersion: last.Version,
Context: &atlas.DeployRunContext{
TriggerType: atlas.TriggerTypeKubernetes,
TriggerVersion: dbv1alpha1.VersionFromContext(ctx),
},
})
}
if dir != nil {
// Use the latest tag of the remote directory
params.DirURL = fmt.Sprintf("atlas://%s", dir.Name)
}
run, err := c.MigrateDown(ctx, params)
if err != nil {
return nil, transient(err)
}
Expand All @@ -279,6 +306,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s
// The downgrade is not allowed, fall through to default behavior: No action
fallthrough
case len(status.Pending) == 0:
log.Info("no pending migrations")
// No pending migrations
var lastApplied int64
if len(status.Applied) > 0 {
Expand All @@ -289,6 +317,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, dir, envName s
LastAppliedVersion: status.Current,
}, nil
default:
log.Info("applying pending migrations", "count", len(status.Pending))
// There are pending migrations
// Execute Atlas CLI migrate command
report, err := c.MigrateApply(ctx, &atlas.MigrateApplyParams{
Expand Down
30 changes: 21 additions & 9 deletions controllers/atlasmigration_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestReconcile_reconcile(t *testing.T) {
defer func() {
require.NoError(t, wd.Close())
}()
status, err := tt.r.reconcile(context.Background(), wd.Path(), "test")
status, err := tt.r.reconcile(context.Background(), wd.Path(), nil, "test")
require.NoError(t, err)
require.EqualValues(t, "20230412003626", status.LastAppliedVersion)
}
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestReconcile_reconcile_upToDate(t *testing.T) {
defer func() {
require.NoError(t, wd.Close())
}()
status, err := tt.r.reconcile(context.Background(), wd.Path(), "test")
status, err := tt.r.reconcile(context.Background(), wd.Path(), nil, "test")
require.NoError(t, err)
require.EqualValues(t, "20230412003626", status.LastAppliedVersion)
}
Expand Down Expand Up @@ -495,7 +495,7 @@ func TestReconcile_reconcile_baseline(t *testing.T) {
defer func() {
require.NoError(t, wd.Close())
}()
status, err := tt.r.reconcile(context.Background(), wd.Path(), "test")
status, err := tt.r.reconcile(context.Background(), wd.Path(), nil, "test")
require.NoError(t, err)
require.EqualValues(t, "20230412003628", status.LastAppliedVersion)
cli, err := atlasexec.NewClient(wd.Path(), tt.r.execPath)
Expand Down Expand Up @@ -684,7 +684,7 @@ func TestAtlasMigrationReconciler_Credentials(t *testing.T) {
require.NoError(tt, err)
require.EqualValues(tt, reconcile.Result{}, c)
ev := tt.events()
require.Len(t, ev, 1)
require.Len(t, ev, 2)
require.Equal(t, "Normal Applied Version 20230412003626 applied", ev[0])
}

Expand All @@ -710,13 +710,15 @@ func TestDefaultTemplate(t *testing.T) {
Dir: mapFS(map[string]string{
"1.sql": "CREATE TABLE foo (id INT PRIMARY KEY);",
}),
DevURL: "sqlite://dev/?mode=memory",
}
var fileContent bytes.Buffer
require.NoError(t, migrate.render(&fileContent))
require.EqualValues(t, `
env {
name = atlas.env
url = "sqlite://file2/?mode=memory"
url = "sqlite://file2/?mode=memory"
dev = "sqlite://dev/?mode=memory"
migration {
dir = "file://migrations"
}
Expand All @@ -726,6 +728,7 @@ env {
func TestBaselineTemplate(t *testing.T) {
migrate := &migrationData{
URL: must(url.Parse("sqlite://file2/?mode=memory")),
DevURL: "sqlite://dev/?mode=memory",
Dir: mapFS(map[string]string{}),
Baseline: "20230412003626",
}
Expand All @@ -734,7 +737,8 @@ func TestBaselineTemplate(t *testing.T) {
require.EqualValues(t, `
env {
name = atlas.env
url = "sqlite://file2/?mode=memory"
url = "sqlite://file2/?mode=memory"
dev = "sqlite://dev/?mode=memory"
migration {
dir = "file://migrations"
baseline = "20230412003626"
Expand All @@ -744,7 +748,8 @@ env {

func TestCloudTemplate(t *testing.T) {
migrate := &migrationData{
URL: must(url.Parse("sqlite://file2/?mode=memory")),
URL: must(url.Parse("sqlite://file2/?mode=memory")),
DevURL: "sqlite://dev/?mode=memory",
Cloud: &cloud{
URL: "https://atlasgo.io/",
Project: "my-project",
Expand All @@ -767,7 +772,8 @@ atlas {
}
env {
name = atlas.env
url = "sqlite://file2/?mode=memory"
url = "sqlite://file2/?mode=memory"
dev = "sqlite://dev/?mode=memory"
migration {
dir = "atlas://my-remote-dir?tag=my-remote-tag"
}
Expand Down Expand Up @@ -869,6 +875,7 @@ func newMigrationTest(t *testing.T) *migrationTest {
m := &mockClient{
state: map[client.ObjectKey]client.Object{},
}
r := record.NewFakeRecorder(100)
return &migrationTest{
T: t,
k8s: m,
Expand All @@ -877,7 +884,12 @@ func newMigrationTest(t *testing.T) *migrationTest {
scheme: scheme,
secretWatcher: watch.New(),
configMapWatcher: watch.New(),
recorder: record.NewFakeRecorder(100),
recorder: r,
devDB: &devDBReconciler{
client: m,

Check failure on line 889 in controllers/atlasmigration_controller_test.go

View workflow job for this annotation

GitHub Actions / test

unknown field client in struct literal of type devDBReconciler
scheme: scheme,
recorder: r,
},
},
}
}
Expand Down
11 changes: 6 additions & 5 deletions controllers/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type (
configMapWatcher *watch.ResourceWatcher
secretWatcher *watch.ResourceWatcher
recorder record.EventRecorder
prewarmDevDB bool
devDB *devDBReconciler
}
// managedData contains information about the managed database and its desired state.
managedData struct {
Expand All @@ -78,14 +78,15 @@ type (
const sqlLimitSize = 1024

func NewAtlasSchemaReconciler(mgr Manager, execPath string, prewarmDevDB bool) *AtlasSchemaReconciler {
r := mgr.GetEventRecorderFor("atlasschema-controller")
return &AtlasSchemaReconciler{
Client: mgr.GetClient(),
scheme: mgr.GetScheme(),
execPath: execPath,
configMapWatcher: watch.New(),
secretWatcher: watch.New(),
recorder: mgr.GetEventRecorderFor("atlasschema-controller"),
prewarmDevDB: prewarmDevDB,
recorder: r,
devDB: newDevDB(mgr, r, prewarmDevDB),
}
}

Expand All @@ -111,7 +112,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.watchRefs(res)
// Clean up any resources created by the controller after the reconciler is successful.
if res.IsReady() {
r.cleanUp(ctx, res)
r.devDB.cleanUp(ctx, res)
}
}()
// When the resource is first created, create the "Ready" condition.
Expand Down Expand Up @@ -145,7 +146,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if data.DevURL == "" {
// The user has not specified an URL for dev-db,
// spin up a dev-db and get the connection string.
data.DevURL, err = r.devURL(ctx, res, *data.URL)
data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL)
if err != nil {
res.SetNotReady("GettingDevDB", err.Error())
return result(err)
Expand Down
8 changes: 7 additions & 1 deletion controllers/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,7 @@ func newTest(t *testing.T) *test {
}
execPath, err := exec.LookPath("atlas")
require.NoError(t, err)
r := record.NewFakeRecorder(100)
return &test{
T: t,
k8s: m,
Expand All @@ -576,7 +577,12 @@ func newTest(t *testing.T) *test {
execPath: execPath,
configMapWatcher: watch.New(),
secretWatcher: watch.New(),
recorder: record.NewFakeRecorder(100),
recorder: r,
devDB: &devDBReconciler{
client: m,

Check failure on line 582 in controllers/atlasschema_controller_test.go

View workflow job for this annotation

GitHub Actions / test

unknown field client in struct literal of type devDBReconciler
scheme: scheme,
recorder: r,
},
},
}
}
Expand Down
Loading

0 comments on commit cd82719

Please sign in to comment.