Skip to content

Commit

Permalink
internal/controller: generate hash of declarative flow by using "sche…
Browse files Browse the repository at this point in the history
…ma inspect"
  • Loading branch information
datdao committed Jan 8, 2025
1 parent 1a4dd95 commit cb035b4
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 64 deletions.
99 changes: 39 additions & 60 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package controller
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -140,23 +138,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
res.SetNotReady("ReadSchema", "Multiple targets are not supported")
return ctrl.Result{}, nil
}
hash, err := data.hash()
if err != nil {
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
// This is to ensure that other tools know we are still applying the changes.
if res.IsReady() && res.IsHashModified(hash) {
res.SetNotReady("Reconciling", "current schema does not match last applied")
return ctrl.Result{Requeue: true}, nil
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
// Write the schema file to the working directory.
opts = append(opts, func(ce *atlasexec.WorkingDir) error {
_, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
return err
})
}
// ====================================================
// Starting area to handle the heavy jobs.
// Below this line is the main logic of the controller.
// ====================================================
if !data.hasDevURL() && data.URL != nil {
// The user has not specified an URL for dev-db,
// spin up a dev-db and get the connection string.
Expand All @@ -166,17 +155,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return result(err)
}
}
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
// Write the schema file to the working directory.
opts = append(opts, func(ce *atlasexec.WorkingDir) error {
_, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
return err
})
}
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config.
wd, err := atlasexec.NewWorkingDir(opts...)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
// This function will be used to edit and re-render the atlas.hcl file in the working directory.
editAtlasHCL := func(fn func(m *managedData)) error {
fn(data)
Expand All @@ -187,18 +174,35 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
_, err = wd.WriteFile("atlas.hcl", buf.Bytes())
return err
}
cli, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
res.SetNotReady("CreatingAtlasClient", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
cli, err := r.atlasClient(wd.Path(), data.Cloud)
// Calculate the hash of the current schema.
hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: "env://schema.src",
Format: `{{ .Hash | base64url }}`,
Vars: data.Vars,
})
if err != nil {
res.SetNotReady("CreatingAtlasClient", err.Error())
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
// This is to ensure that other tools know we are still applying the changes.
if res.IsReady() && res.IsHashModified(hash) {
res.SetNotReady("Reconciling", "current schema does not match last applied")
return ctrl.Result{Requeue: true}, nil
}
// ====================================================
// Starting area to handle the heavy jobs.
// Below this line is the main logic of the controller.
// ====================================================
var whoami *atlasexec.WhoAmI
switch whoami, err = cli.WhoAmI(ctx); {
case errors.Is(err, atlasexec.ErrRequireLogin):
Expand Down Expand Up @@ -252,28 +256,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// to modify or approve the changes.
if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile {
log.Info("schema is a file, pushing the schema to Atlas Cloud")
// Using hash of desired state as the tag for the schema.
// This ensures push is idempotent.
tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: desiredURL,
Format: `{{ .Hash | base64url }}`,
Vars: data.Vars,
})
if err != nil {
reason, msg := "SchemaPush", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Name: path.Join(repo.Host, repo.Path),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(hash)),
URL: []string{desiredURL},
Vars: data.Vars,
})
Expand Down Expand Up @@ -744,17 +730,6 @@ func (d *managedData) hasLintReview() bool {
return lint.Body().GetAttribute("review") != nil
}

// hash returns the sha256 hash of the desired.
func (d *managedData) hash() (string, error) {
h := sha256.New()
if len(d.schema) > 0 {
h.Write([]byte(d.schema))
} else {
h.Write([]byte(d.Desired.String()))
}
return hex.EncodeToString(h.Sum(nil)), nil
}

func (d *managedData) repoURL() *url.URL {
switch {
// The user has provided the repository name.
Expand Down Expand Up @@ -857,6 +832,10 @@ func (d *managedData) asBlocks() []*hclwrite.Block {
env := hclwrite.NewBlock("env", []string{d.EnvName})
blocks = append(blocks, env)
envBody := env.Body()
if d.Desired != nil {
schema := envBody.AppendNewBlock("schema", nil).Body()
schema.SetAttributeValue("src", cty.StringVal(d.Desired.String()))
}
if d.URL != nil {
envBody.SetAttributeValue("url", cty.StringVal(d.URL.String()))
}
Expand Down
13 changes: 11 additions & 2 deletions internal/controller/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReconcile_ReadyButDiff(t *testing.T) {
Spec: dbv1alpha1.AtlasSchemaSpec{
Schema: dbv1alpha1.Schema{SQL: "create table foo (id int primary key);"},
TargetSpec: dbv1alpha1.TargetSpec{
URL: "mysql://root:password@localhost:3306/test",
URL: "sqlite://file?mode=memory",
},
},
Status: dbv1alpha1.AtlasSchemaStatus{
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestBadSQL(t *testing.T) {
cont := tt.cond()
require.EqualValues(t, schemaReadyCond, cont.Type)
require.EqualValues(t, metav1.ConditionFalse, cont.Status)
require.EqualValues(t, "LintPolicyError", cont.Reason)
require.EqualValues(t, "CalculatingHash", cont.Reason)
require.Contains(t, cont.Message, "executing statement:")
}

Expand Down Expand Up @@ -500,6 +500,9 @@ func TestConfigTemplate(t *testing.T) {
err := data.render(&buf)
require.NoError(t, err)
expected := `env "kubernetes" {
schema {
src = "file://schema.sql"
}
url = "mysql://root:password@localhost:3306/test"
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
Expand Down Expand Up @@ -573,6 +576,9 @@ env "kubernetes" {
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
url = "mysql://root:password@localhost:3306/test"
schema {
src = "file://schema.sql"
}
}
`
require.EqualValues(t, expected, buf.String())
Expand All @@ -599,6 +605,9 @@ env {
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
url = "mysql://root:password@localhost:3306/test"
schema {
src = "file://schema.sql"
}
}`
require.EqualValues(t, expected, buf.String())
}
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/testscript/schema-policy-diff.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ kubectl-wait-ready -l app=mysql pods
kubectl apply -f schema.yaml
kubectl-wait-ready AtlasSchema/mysql
kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql
stdout ddfe666707ddf5c2cc7625c2a0de89da51e54fc7caa6403db307146430d20d85
stdout oAoRLC2AXyGha6pKDollSqBB5ovjB-qK78aAN9dkOow
# Inspect the schema to ensure it's correct
atlas schema inspect -u ${DB_URL}
cmp stdout schema.hcl
Expand All @@ -20,7 +20,7 @@ kubectl patch -f schema.yaml --type merge --patch-file patch-remove-bio.yaml
kubectl-wait-ready AtlasSchema/mysql
# Ensure the schema is updated
kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql
stdout fc62b9f189404d5e38053e5aae9e64bd71c7f854d7001d45c1583d65ac90223c
stdout UmrKZN7GNsjWxLOq6VJ3vejqnvBQU9BeoDZlL_2LTKU
# Inspect the schema again to ensure it still has the bio column
atlas schema inspect -u ${DB_URL}
cmp stdout schema.hcl
Expand Down

0 comments on commit cb035b4

Please sign in to comment.