Skip to content

Commit

Permalink
Internal/controller: refactor error handling in declarative flow (#261)
Browse files Browse the repository at this point in the history
  • Loading branch information
datdao authored Jan 22, 2025
1 parent 02bb557 commit b4d554b
Showing 1 changed file with 38 additions and 92 deletions.
130 changes: 38 additions & 92 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
defer func() {
// At the end of reconcile, update the status of the resource base on the error
if err != nil {
r.recordErrEvent(res, err)
}
if err := r.Status().Update(ctx, res); err != nil {
log.Error(err, "failed to update resource status")
}
Expand All @@ -130,13 +126,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
data, err := r.extractData(ctx, res)
if err != nil {
res.SetNotReady("ReadSchema", err.Error())
r.recordErrEvent(res, err)
return result(err)
return r.resultErr(res, err, "ReadSchema")
}
if data.hasTargets() {
res.SetNotReady("ReadSchema", "Multiple targets are not supported")
return ctrl.Result{}, nil
return r.resultErr(res, errors.New("multiple targets are not supported"), "ReadSchema")
}
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
Expand All @@ -151,17 +144,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// spin up a dev-db and get the connection string.
data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL)
if err != nil {
res.SetNotReady("GettingDevDB", err.Error())
return result(err)
return r.resultErr(res, err, "GettingDevDB")
}
}
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config.
wd, err := atlasexec.NewWorkingDir(opts...)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
return result(err)
return r.resultErr(res, err, "CreatingWorkingDir")
}
defer wd.Close()
// This function will be used to edit and re-render the atlas.hcl file in the working directory.
Expand All @@ -176,9 +166,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
cli, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
res.SetNotReady("CreatingAtlasClient", err.Error())
r.recordErrEvent(res, err)
return result(err)
return r.resultErr(res, err, "CreatingAtlasClient")
}
// Calculate the hash of the current schema.
hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Expand All @@ -188,12 +176,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Vars: data.Vars,
})
if err != nil {
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
if isConnectionErr(err) {
err = transient(err)
}
return result(err)
return r.resultErr(res, err, "CalculatingHash")
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
Expand All @@ -211,24 +194,17 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
case errors.Is(err, atlasexec.ErrRequireLogin):
log.Info("the resource is not connected to Atlas Cloud")
if data.Config != nil {
err = errors.New("login is required to use custom atlas.hcl config")
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return result(err)
return r.resultErr(res, err, "WhoAmI")
}
case err != nil:
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return result(err)
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
var reports []*atlasexec.SchemaApply
shouldLint, err := data.shouldLint()
if err != nil {
res.SetNotReady("LintPolicyError", err.Error())
r.recordErrEvent(res, err)
return result(err)
return r.resultErr(res, err, "LintPolicyError")
}
switch desiredURL := data.targetURL(); {
// The resource is connected to Atlas Cloud.
Expand Down Expand Up @@ -265,14 +241,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Vars: data.Vars,
})
if err != nil {
reason, msg := "SchemaPush", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, "SchemaPush")
}
log.Info("schema is a file, pushing the schema to Atlas Cloud")
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Expand All @@ -283,14 +252,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Vars: data.Vars,
})
if err != nil {
reason, msg := "SchemaPush", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, "SchemaPush")
}
desiredURL = state.URL
}
Expand All @@ -314,19 +276,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
case err != nil:
reason, msg := "SchemaPlan", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, reason)
default:
log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL)
res.Status.PlanURL = plan.File.URL
res.Status.PlanLink = plan.File.Link
reason, msg := "ApprovalPending", "Schema plan is waiting for approval"
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeNormal, reason, msg)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
Expand All @@ -340,27 +296,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
Vars: data.Vars,
}); {
case err != nil:
reason, msg := "ListingPlans", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, "ListingPlans")
// There are multiple pending plans. This is an unexpected state.
case len(plans) > 1:
planURLs := make([]string, 0, len(plans))
for _, p := range plans {
planURLs = append(planURLs, p.URL)
}
log.Info("multiple schema plans found", "plans", planURLs)
reason, msg := "ListingPlans", fmt.Sprintf("multiple schema plans found: %s", strings.Join(planURLs, ", "))
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
err = errors.New(msg)
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, fmt.Errorf("multiple schema plans found: %s", strings.Join(planURLs, ", ")), "ListingPlans")
// There are no pending plans, but Atlas has been asked to review the changes ALWAYS.
case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways:
// Create a plan for the pending changes.
Expand Down Expand Up @@ -406,14 +350,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Don't requeue destructive errors.
return ctrl.Result{}, nil
case err != nil:
reason, msg := "VerifyingFirstRun", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, "VerifyingFirstRun")
}
// Revert the destructive linting policy back to the original value.
if err = editAtlasHCL(func(m *managedData) {
Expand All @@ -431,14 +368,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Run the linting policy.
case shouldLint:
if err = r.lint(ctx, wd, data, nil); err != nil {
reason, msg := "LintPolicyError", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, "LintPolicyError")
}
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Expand All @@ -458,13 +388,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
})
}
if err != nil {
res.SetNotReady("ApplyingSchema", err.Error())
r.recorder.Event(res, corev1.EventTypeWarning, "ApplyingSchema", err.Error())
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return r.resultCLIErr(res, err, "ApplyingSchema")
}
s := dbv1alpha1.AtlasSchemaStatus{
LastApplied: time.Now().Unix(),
Expand Down Expand Up @@ -618,6 +542,28 @@ func (r *AtlasSchemaReconciler) recordErrEvent(res *dbv1alpha1.AtlasSchema, err
r.recorder.Event(res, corev1.EventTypeWarning, reason, strings.TrimSpace(err.Error()))
}

func (r *AtlasSchemaReconciler) resultErr(
res *dbv1alpha1.AtlasSchema, err error, reason string,
) (ctrl.Result, error) {
if isConnectionErr(err) {
err = transient(err)
}
res.SetNotReady(reason, err.Error())
r.recordErrEvent(res, err)
return result(err)
}

func (r *AtlasSchemaReconciler) resultCLIErr(
res *dbv1alpha1.AtlasSchema, err error, reason string,
) (ctrl.Result, error) {
if !isSQLErr(err) {
err = transient(err)
}
res.SetNotReady(reason, err.Error())
r.recordErrEvent(res, err)
return result(err)
}

// ShouldLint returns true if the linting policy is set to error.
func (d *managedData) shouldLint() (bool, error) {
p := d.Policy
Expand Down

0 comments on commit b4d554b

Please sign in to comment.