From cb035b4b740581258328e9c83ba8a235630dc0f7 Mon Sep 17 00:00:00 2001 From: Dat Dao Date: Tue, 7 Jan 2025 22:41:53 +0700 Subject: [PATCH] internal/controller: generate hash of declarative flow by using "schema inspect" --- internal/controller/atlasschema_controller.go | 99 ++++++++----------- .../controller/atlasschema_controller_test.go | 13 ++- test/e2e/testscript/schema-policy-diff.txtar | 4 +- 3 files changed, 52 insertions(+), 64 deletions(-) diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index 420886a..d60f4b0 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -17,8 +17,6 @@ package controller import ( "bytes" "context" - "crypto/sha256" - "encoding/hex" "errors" "fmt" "io" @@ -140,23 +138,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) res.SetNotReady("ReadSchema", "Multiple targets are not supported") return ctrl.Result{}, nil } - hash, err := data.hash() - if err != nil { - res.SetNotReady("CalculatingHash", err.Error()) - r.recordErrEvent(res, err) - return result(err) - } - // We need to update the ready condition immediately before doing - // any heavy jobs if the hash is different from the last applied. - // This is to ensure that other tools know we are still applying the changes. - if res.IsReady() && res.IsHashModified(hash) { - res.SetNotReady("Reconciling", "current schema does not match last applied") - return ctrl.Result{Requeue: true}, nil + opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} + if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile { + // Write the schema file to the working directory. + opts = append(opts, func(ce *atlasexec.WorkingDir) error { + _, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema) + return err + }) } - // ==================================================== - // Starting area to handle the heavy jobs. - // Below this line is the main logic of the controller. - // ==================================================== if !data.hasDevURL() && data.URL != nil { // The user has not specified an URL for dev-db, // spin up a dev-db and get the connection string. @@ -166,17 +155,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) return result(err) } } - opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} - if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile { - // Write the schema file to the working directory. - opts = append(opts, func(ce *atlasexec.WorkingDir) error { - _, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema) - return err - }) - } // Create a working directory for the Atlas CLI // The working directory contains the atlas.hcl config. wd, err := atlasexec.NewWorkingDir(opts...) + if err != nil { + res.SetNotReady("CreatingWorkingDir", err.Error()) + r.recordErrEvent(res, err) + return result(err) + } + defer wd.Close() // This function will be used to edit and re-render the atlas.hcl file in the working directory. editAtlasHCL := func(fn func(m *managedData)) error { fn(data) @@ -187,18 +174,35 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) _, err = wd.WriteFile("atlas.hcl", buf.Bytes()) return err } + cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { - res.SetNotReady("CreatingWorkingDir", err.Error()) + res.SetNotReady("CreatingAtlasClient", err.Error()) r.recordErrEvent(res, err) return result(err) } - defer wd.Close() - cli, err := r.atlasClient(wd.Path(), data.Cloud) + // Calculate the hash of the current schema. + hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{ + Env: data.EnvName, + URL: "env://schema.src", + Format: `{{ .Hash | base64url }}`, + Vars: data.Vars, + }) if err != nil { - res.SetNotReady("CreatingAtlasClient", err.Error()) + res.SetNotReady("CalculatingHash", err.Error()) r.recordErrEvent(res, err) return result(err) } + // We need to update the ready condition immediately before doing + // any heavy jobs if the hash is different from the last applied. + // This is to ensure that other tools know we are still applying the changes. + if res.IsReady() && res.IsHashModified(hash) { + res.SetNotReady("Reconciling", "current schema does not match last applied") + return ctrl.Result{Requeue: true}, nil + } + // ==================================================== + // Starting area to handle the heavy jobs. + // Below this line is the main logic of the controller. + // ==================================================== var whoami *atlasexec.WhoAmI switch whoami, err = cli.WhoAmI(ctx); { case errors.Is(err, atlasexec.ErrRequireLogin): @@ -252,28 +256,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) // to modify or approve the changes. if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile { log.Info("schema is a file, pushing the schema to Atlas Cloud") - // Using hash of desired state as the tag for the schema. - // This ensures push is idempotent. - tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{ - Env: data.EnvName, - URL: desiredURL, - Format: `{{ .Hash | base64url }}`, - Vars: data.Vars, - }) - if err != nil { - reason, msg := "SchemaPush", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) - } state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ Env: data.EnvName, Name: path.Join(repo.Host, repo.Path), - Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)), + Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(hash)), URL: []string{desiredURL}, Vars: data.Vars, }) @@ -744,17 +730,6 @@ func (d *managedData) hasLintReview() bool { return lint.Body().GetAttribute("review") != nil } -// hash returns the sha256 hash of the desired. -func (d *managedData) hash() (string, error) { - h := sha256.New() - if len(d.schema) > 0 { - h.Write([]byte(d.schema)) - } else { - h.Write([]byte(d.Desired.String())) - } - return hex.EncodeToString(h.Sum(nil)), nil -} - func (d *managedData) repoURL() *url.URL { switch { // The user has provided the repository name. @@ -857,6 +832,10 @@ func (d *managedData) asBlocks() []*hclwrite.Block { env := hclwrite.NewBlock("env", []string{d.EnvName}) blocks = append(blocks, env) envBody := env.Body() + if d.Desired != nil { + schema := envBody.AppendNewBlock("schema", nil).Body() + schema.SetAttributeValue("src", cty.StringVal(d.Desired.String())) + } if d.URL != nil { envBody.SetAttributeValue("url", cty.StringVal(d.URL.String())) } diff --git a/internal/controller/atlasschema_controller_test.go b/internal/controller/atlasschema_controller_test.go index 54ca3a7..d06e6c2 100644 --- a/internal/controller/atlasschema_controller_test.go +++ b/internal/controller/atlasschema_controller_test.go @@ -77,7 +77,7 @@ func TestReconcile_ReadyButDiff(t *testing.T) { Spec: dbv1alpha1.AtlasSchemaSpec{ Schema: dbv1alpha1.Schema{SQL: "create table foo (id int primary key);"}, TargetSpec: dbv1alpha1.TargetSpec{ - URL: "mysql://root:password@localhost:3306/test", + URL: "sqlite://file?mode=memory", }, }, Status: dbv1alpha1.AtlasSchemaStatus{ @@ -442,7 +442,7 @@ func TestBadSQL(t *testing.T) { cont := tt.cond() require.EqualValues(t, schemaReadyCond, cont.Type) require.EqualValues(t, metav1.ConditionFalse, cont.Status) - require.EqualValues(t, "LintPolicyError", cont.Reason) + require.EqualValues(t, "CalculatingHash", cont.Reason) require.Contains(t, cont.Message, "executing statement:") } @@ -500,6 +500,9 @@ func TestConfigTemplate(t *testing.T) { err := data.render(&buf) require.NoError(t, err) expected := `env "kubernetes" { + schema { + src = "file://schema.sql" + } url = "mysql://root:password@localhost:3306/test" dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo", "bar"] @@ -573,6 +576,9 @@ env "kubernetes" { dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo", "bar"] url = "mysql://root:password@localhost:3306/test" + schema { + src = "file://schema.sql" + } } ` require.EqualValues(t, expected, buf.String()) @@ -599,6 +605,9 @@ env { dev = "mysql://root:password@localhost:3306/dev" schemas = ["foo", "bar"] url = "mysql://root:password@localhost:3306/test" + schema { + src = "file://schema.sql" + } }` require.EqualValues(t, expected, buf.String()) } diff --git a/test/e2e/testscript/schema-policy-diff.txtar b/test/e2e/testscript/schema-policy-diff.txtar index 52b8217..c76269f 100644 --- a/test/e2e/testscript/schema-policy-diff.txtar +++ b/test/e2e/testscript/schema-policy-diff.txtar @@ -11,7 +11,7 @@ kubectl-wait-ready -l app=mysql pods kubectl apply -f schema.yaml kubectl-wait-ready AtlasSchema/mysql kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql -stdout ddfe666707ddf5c2cc7625c2a0de89da51e54fc7caa6403db307146430d20d85 +stdout oAoRLC2AXyGha6pKDollSqBB5ovjB-qK78aAN9dkOow # Inspect the schema to ensure it's correct atlas schema inspect -u ${DB_URL} cmp stdout schema.hcl @@ -20,7 +20,7 @@ kubectl patch -f schema.yaml --type merge --patch-file patch-remove-bio.yaml kubectl-wait-ready AtlasSchema/mysql # Ensure the schema is updated kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql -stdout fc62b9f189404d5e38053e5aae9e64bd71c7f854d7001d45c1583d65ac90223c +stdout UmrKZN7GNsjWxLOq6VJ3vejqnvBQU9BeoDZlL_2LTKU # Inspect the schema again to ensure it still has the bio column atlas schema inspect -u ${DB_URL} cmp stdout schema.hcl