Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/controller: generate hash of declarative flow by using "schema inspect" #253

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 39 additions & 60 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ package controller
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -140,23 +138,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
res.SetNotReady("ReadSchema", "Multiple targets are not supported")
return ctrl.Result{}, nil
}
hash, err := data.hash()
if err != nil {
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
// This is to ensure that other tools know we are still applying the changes.
if res.IsReady() && res.IsHashModified(hash) {
res.SetNotReady("Reconciling", "current schema does not match last applied")
return ctrl.Result{Requeue: true}, nil
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
// Write the schema file to the working directory.
opts = append(opts, func(ce *atlasexec.WorkingDir) error {
_, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
return err
})
}
// ====================================================
// Starting area to handle the heavy jobs.
// Below this line is the main logic of the controller.
// ====================================================
if !data.hasDevURL() && data.URL != nil {
// The user has not specified an URL for dev-db,
// spin up a dev-db and get the connection string.
Expand All @@ -166,17 +155,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return result(err)
}
}
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
if u := data.Desired; u != nil && u.Scheme == dbv1alpha1.SchemaTypeFile {
// Write the schema file to the working directory.
opts = append(opts, func(ce *atlasexec.WorkingDir) error {
_, err := ce.WriteFile(filepath.Join(u.Host, u.Path), data.schema)
return err
})
}
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config.
wd, err := atlasexec.NewWorkingDir(opts...)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
// This function will be used to edit and re-render the atlas.hcl file in the working directory.
editAtlasHCL := func(fn func(m *managedData)) error {
fn(data)
Expand All @@ -187,18 +174,35 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
_, err = wd.WriteFile("atlas.hcl", buf.Bytes())
return err
}
cli, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
res.SetNotReady("CreatingAtlasClient", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
defer wd.Close()
cli, err := r.atlasClient(wd.Path(), data.Cloud)
// Calculate the hash of the current schema.
hash, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: "env://schema.src",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may not correct, the user can define the src in the env block.

env {
   src = "file://schema.hcl"
}

Format: `{{ .Hash | base64url }}`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use the raw .Hash. base64url is used for schema's tag.

Vars: data.Vars,
})
if err != nil {
res.SetNotReady("CreatingAtlasClient", err.Error())
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
return result(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
// This is to ensure that other tools know we are still applying the changes.
if res.IsReady() && res.IsHashModified(hash) {
res.SetNotReady("Reconciling", "current schema does not match last applied")
return ctrl.Result{Requeue: true}, nil
}
// ====================================================
// Starting area to handle the heavy jobs.
// Below this line is the main logic of the controller.
// ====================================================
var whoami *atlasexec.WhoAmI
switch whoami, err = cli.WhoAmI(ctx); {
case errors.Is(err, atlasexec.ErrRequireLogin):
Expand Down Expand Up @@ -252,28 +256,10 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// to modify or approve the changes.
if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile {
log.Info("schema is a file, pushing the schema to Atlas Cloud")
// Using hash of desired state as the tag for the schema.
// This ensures push is idempotent.
tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
URL: desiredURL,
Format: `{{ .Hash | base64url }}`,
Vars: data.Vars,
})
if err != nil {
reason, msg := "SchemaPush", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
}
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Name: path.Join(repo.Host, repo.Path),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(hash)),
URL: []string{desiredURL},
Vars: data.Vars,
})
Expand Down Expand Up @@ -744,17 +730,6 @@ func (d *managedData) hasLintReview() bool {
return lint.Body().GetAttribute("review") != nil
}

// hash returns the sha256 hash of the desired.
func (d *managedData) hash() (string, error) {
h := sha256.New()
if len(d.schema) > 0 {
h.Write([]byte(d.schema))
} else {
h.Write([]byte(d.Desired.String()))
}
return hex.EncodeToString(h.Sum(nil)), nil
}

func (d *managedData) repoURL() *url.URL {
switch {
// The user has provided the repository name.
Expand Down Expand Up @@ -857,6 +832,10 @@ func (d *managedData) asBlocks() []*hclwrite.Block {
env := hclwrite.NewBlock("env", []string{d.EnvName})
blocks = append(blocks, env)
envBody := env.Body()
if d.Desired != nil {
schema := envBody.AppendNewBlock("schema", nil).Body()
schema.SetAttributeValue("src", cty.StringVal(d.Desired.String()))
Comment on lines +836 to +837
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also set this schema.src to file://schema.hcl or file://schema.sql

}
if d.URL != nil {
envBody.SetAttributeValue("url", cty.StringVal(d.URL.String()))
}
Expand Down
13 changes: 11 additions & 2 deletions internal/controller/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReconcile_ReadyButDiff(t *testing.T) {
Spec: dbv1alpha1.AtlasSchemaSpec{
Schema: dbv1alpha1.Schema{SQL: "create table foo (id int primary key);"},
TargetSpec: dbv1alpha1.TargetSpec{
URL: "mysql://root:password@localhost:3306/test",
URL: "sqlite://file?mode=memory",
},
},
Status: dbv1alpha1.AtlasSchemaStatus{
Expand Down Expand Up @@ -442,7 +442,7 @@ func TestBadSQL(t *testing.T) {
cont := tt.cond()
require.EqualValues(t, schemaReadyCond, cont.Type)
require.EqualValues(t, metav1.ConditionFalse, cont.Status)
require.EqualValues(t, "LintPolicyError", cont.Reason)
require.EqualValues(t, "CalculatingHash", cont.Reason)
require.Contains(t, cont.Message, "executing statement:")
}

Expand Down Expand Up @@ -500,6 +500,9 @@ func TestConfigTemplate(t *testing.T) {
err := data.render(&buf)
require.NoError(t, err)
expected := `env "kubernetes" {
schema {
src = "file://schema.sql"
}
url = "mysql://root:password@localhost:3306/test"
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
Expand Down Expand Up @@ -573,6 +576,9 @@ env "kubernetes" {
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
url = "mysql://root:password@localhost:3306/test"
schema {
src = "file://schema.sql"
}
}
`
require.EqualValues(t, expected, buf.String())
Expand All @@ -599,6 +605,9 @@ env {
dev = "mysql://root:password@localhost:3306/dev"
schemas = ["foo", "bar"]
url = "mysql://root:password@localhost:3306/test"
schema {
src = "file://schema.sql"
}
}`
require.EqualValues(t, expected, buf.String())
}
Expand Down
4 changes: 2 additions & 2 deletions test/e2e/testscript/schema-policy-diff.txtar
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ kubectl-wait-ready -l app=mysql pods
kubectl apply -f schema.yaml
kubectl-wait-ready AtlasSchema/mysql
kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql
stdout ddfe666707ddf5c2cc7625c2a0de89da51e54fc7caa6403db307146430d20d85
stdout oAoRLC2AXyGha6pKDollSqBB5ovjB-qK78aAN9dkOow
# Inspect the schema to ensure it's correct
atlas schema inspect -u ${DB_URL}
cmp stdout schema.hcl
Expand All @@ -20,7 +20,7 @@ kubectl patch -f schema.yaml --type merge --patch-file patch-remove-bio.yaml
kubectl-wait-ready AtlasSchema/mysql
# Ensure the schema is updated
kubectl get -o jsonpath --template='{.status.observed_hash}' AtlasSchema/mysql
stdout fc62b9f189404d5e38053e5aae9e64bd71c7f854d7001d45c1583d65ac90223c
stdout UmrKZN7GNsjWxLOq6VJ3vejqnvBQU9BeoDZlL_2LTKU
# Inspect the schema again to ensure it still has the bio column
atlas schema inspect -u ${DB_URL}
cmp stdout schema.hcl
Expand Down
Loading