Skip to content

Commit

Permalink
internal/controller: handle the case of Atlas returning multiple results
Browse files Browse the repository at this point in the history
  • Loading branch information
datdao committed Dec 17, 2024
1 parent 7110c6d commit 151e1a6
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 27 deletions.
11 changes: 7 additions & 4 deletions internal/controller/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
log.Info("applying pending migrations", "count", len(status.Pending))
// There are pending migrations
// Execute Atlas CLI migrate command
report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{
reports, err := c.MigrateApplySlice(ctx, &atlasexec.MigrateApplyParams{
Env: data.EnvName,
Context: &atlasexec.DeployRunContext{
TriggerType: atlasexec.TriggerTypeKubernetes,
Expand All @@ -372,12 +372,15 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
}
return err
}
if len(reports) != 1 {
return fmt.Errorf("unexpected number of reports: %d", len(reports))
}
res.SetReady(dbv1alpha1.AtlasMigrationStatus{
ObservedHash: data.ObservedHash,
LastApplied: report.End.Unix(),
LastAppliedVersion: report.Target,
LastApplied: reports[0].End.Unix(),
LastAppliedVersion: reports[0].Target,
})
r.recordApplied(res, report.Target)
r.recordApplied(res, reports[0].Target)
}
if data.Dir != nil {
// Compress the migration directory then store it in the secret
Expand Down
27 changes: 15 additions & 12 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
var report *atlasexec.SchemaApply
var reports []*atlasexec.SchemaApply
switch desiredURL := data.Desired.String(); {
// The resource is connected to Atlas Cloud.
case whoami != nil:
Expand All @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
repo := data.repoURL()
if repo == nil {
// No repository is set, apply the changes directly.
report, err = cli.SchemaApply(ctx, params)
reports, err = cli.SchemaApplySlice(ctx, params)
break
}
createPlan := func() (ctrl.Result, error) {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Try to apply the schema changes with lint policies,
// if the changes are rejected by the review policy, create a plan
// for the pending changes.
report, err = cli.SchemaApply(ctx, params)
reports, err = cli.SchemaApplySlice(ctx, params)
// TODO: Better error handling for rejected changes.
if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") {
log.Info("schema changes are rejected by the review policy, creating a new schema plan")
Expand Down Expand Up @@ -391,7 +391,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}); err != nil {
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
Expand All @@ -409,15 +409,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
AutoApprove: true,
})
// No linting policy is set.
default:
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
Expand All @@ -433,20 +433,23 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
log.Info("schema changes are applied", "applied", len(report.Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize)
report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize)
s := dbv1alpha1.AtlasSchemaStatus{
LastApplied: time.Now().Unix(),
ObservedHash: hash,
}
if len(reports) != 1 {
return result(errors.New("unexpected number of reports"))
}
log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
reports[0].Changes.Applied = truncateSQL(reports[0].Changes.Applied, sqlLimitSize)
reports[0].Changes.Pending = truncateSQL(reports[0].Changes.Pending, sqlLimitSize)
// Set the plan URL if it exists.
if p := report.Plan; p != nil {
if p := reports[0].Plan; p != nil {
s.PlanLink = p.File.Link
s.PlanURL = p.File.URL
}
res.SetReady(s, report)
res.SetReady(s, reports[0])
r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema")
return ctrl.Result{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func (t *test) initDB(statement string) {
require.NoError(t, err)
cli, err := atlasexec.NewClient(wd.Path(), "atlas")
require.NoError(t, err)
_, err = cli.SchemaApply(context.Background(), &atlasexec.SchemaApplyParams{
_, err = cli.SchemaApplySlice(context.Background(), &atlasexec.SchemaApplyParams{
URL: t.dburl,
DevURL: "sqlite://file2/?mode=memory",
To: "file://./schema.sql",
Expand Down
9 changes: 4 additions & 5 deletions internal/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ type (
}
// AtlasExec is the interface for the atlas exec client.
AtlasExec interface {
// MigrateApply runs the `migrate apply` command and returns the successful runs.
MigrateApply(context.Context, *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error)
// MigrateApplySlice runs the `migrate apply` command and returns the successful runs.
MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error)
// MigrateDown runs the `migrate down` command.
MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error)
// MigrateLint runs the `migrate lint` command.
MigrateLint(context.Context, *atlasexec.MigrateLintParams) (*atlasexec.SummaryReport, error)
// MigrateStatus runs the `migrate status` command.
MigrateStatus(context.Context, *atlasexec.MigrateStatusParams) (*atlasexec.MigrateStatus, error)

// SchemaApply runs the `schema apply` command.
SchemaApply(context.Context, *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error)
// SchemaApplySlice runs the `schema apply` command and returns the successful runs.
SchemaApplySlice(context.Context, *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error)
// SchemaInspect runs the `schema inspect` command.
SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error)
// SchemaPush runs the `schema push` command.
Expand Down
7 changes: 5 additions & 2 deletions internal/controller/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD
if err != nil {
return err
}
plan, err := cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
plans, err := cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Vars: vars,
To: data.Desired.String(),
Expand All @@ -63,9 +63,12 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD
"unable to remove temporary directory", "dir", dir)
}
}()
if len(plans) != 1 {
return fmt.Errorf("unexpected number of schema plans: %d", len(plans))
}
dir, err := memDir(map[string]string{
"1.sql": current,
"2.sql": strings.Join(plan.Changes.Pending, ";\n"),
"2.sql": strings.Join(plans[0].Changes.Pending, ";\n"),
})
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions internal/controller/testhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *mockAtlasExec) WhoAmI(context.Context) (*atlasexec.WhoAmI, error) {
return m.whoami.res, m.whoami.err
}

// SchemaApply implements AtlasExec.
func (m *mockAtlasExec) SchemaApply(ctx context.Context, params *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error) {
return m.schemaApply.res, m.schemaApply.err
// SchemaAppleSlice implements AtlasExec.
func (m *mockAtlasExec) SchemaApplySlice(ctx context.Context, params *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) {
return []*atlasexec.SchemaApply{m.schemaApply.res}, m.schemaApply.err
}

// SchemaInspect implements AtlasExec.
Expand All @@ -97,6 +97,11 @@ func (m *mockAtlasExec) MigrateApply(context.Context, *atlasexec.MigrateApplyPar
return m.apply.res, m.apply.err
}

// MigrateApplySlice implements AtlasExec.
func (m *mockAtlasExec) MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) {
return []*atlasexec.MigrateApply{m.apply.res}, m.apply.err
}

// MigrateDown implements AtlasExec.
func (m *mockAtlasExec) MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) {
return m.down.res, m.down.err
Expand Down

0 comments on commit 151e1a6

Please sign in to comment.