Skip to content

Commit

Permalink
internal/controller: use hcl pkg to generate atlas.hcl for declarativ…
Browse files Browse the repository at this point in the history
…e flow (#239)
  • Loading branch information
datdao authored Dec 11, 2024
1 parent ca780b0 commit e4482e2
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 153 deletions.
66 changes: 64 additions & 2 deletions api/v1alpha1/atlasschema_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"path/filepath"
"strings"

"github.com/hashicorp/hcl/v2/hclwrite"
"github.com/zclconf/go-cty/cty"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -107,8 +109,8 @@ type (
// Lint defines the linting policies to apply before applying the schema.
Lint struct {
Destructive *CheckConfig `json:"destructive,omitempty"`
// Review defines the review policy to apply after linting the schema changes.
// +kubebuilder:default=ERROR
// Review defines the review policy to apply after linting the schema changes (default: "ERROR").
// Atlas Cloud login is required.
Review LintReview `json:"review,omitempty"`
}
// CheckConfig defines the configuration of a linting check.
Expand Down Expand Up @@ -266,3 +268,63 @@ func (s Schema) DesiredState(ctx context.Context, r client.Reader, ns string) (*
}
return nil, nil, fmt.Errorf("no desired state specified")
}

// AsBlock returns the HCL block representation of the diff.
func (d Diff) AsBlock() *hclwrite.Block {
blk := hclwrite.NewBlock("diff", nil)
body := blk.Body()
if v := d.ConcurrentIndex; v != nil {
b := body.AppendNewBlock("concurrent_index", nil).Body()
b.SetAttributeValue("create", cty.BoolVal(v.Create))
b.SetAttributeValue("drop", cty.BoolVal(v.Drop))
}
if v := d.Skip; v != nil {
b := body.AppendNewBlock("skip", nil).Body()
if v.AddSchema {
b.SetAttributeValue("add_schema", cty.BoolVal(v.AddSchema))
}
if v.DropSchema {
b.SetAttributeValue("drop_schema", cty.BoolVal(v.DropSchema))
}
if v.ModifySchema {
b.SetAttributeValue("modify_schema", cty.BoolVal(v.ModifySchema))
}
if v.AddTable {
b.SetAttributeValue("add_table", cty.BoolVal(v.AddTable))
}
if v.DropTable {
b.SetAttributeValue("drop_table", cty.BoolVal(v.DropTable))
}
if v.ModifyTable {
b.SetAttributeValue("modify_table", cty.BoolVal(v.ModifyTable))
}
if v.AddColumn {
b.SetAttributeValue("add_column", cty.BoolVal(v.AddColumn))
}
if v.DropColumn {
b.SetAttributeValue("drop_column", cty.BoolVal(v.DropColumn))
}
if v.ModifyColumn {
b.SetAttributeValue("modify_column", cty.BoolVal(v.ModifyColumn))
}
if v.AddIndex {
b.SetAttributeValue("add_index", cty.BoolVal(v.AddIndex))
}
if v.DropIndex {
b.SetAttributeValue("drop_index", cty.BoolVal(v.DropIndex))
}
if v.ModifyIndex {
b.SetAttributeValue("modify_index", cty.BoolVal(v.ModifyIndex))
}
if v.AddForeignKey {
b.SetAttributeValue("add_foreign_key", cty.BoolVal(v.AddForeignKey))
}
if v.DropForeignKey {
b.SetAttributeValue("drop_foreign_key", cty.BoolVal(v.DropForeignKey))
}
if v.ModifyForeignKey {
b.SetAttributeValue("modify_foreign_key", cty.BoolVal(v.ModifyForeignKey))
}
}
return blk
}
6 changes: 3 additions & 3 deletions charts/atlas-operator/templates/crds/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -719,9 +719,9 @@ spec:
type: boolean
type: object
review:
default: ERROR
description: Review defines the review policy to apply after
linting the schema changes.
description: |-
Review defines the review policy to apply after linting the schema changes (default: "ERROR").
Atlas Cloud login is required.
enum:
- ALWAYS
- WARNING
Expand Down
6 changes: 3 additions & 3 deletions config/crd/bases/db.atlasgo.io_atlasschemas.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,9 @@ spec:
type: boolean
type: object
review:
default: ERROR
description: Review defines the review policy to apply after
linting the schema changes.
description: |-
Review defines the review policy to apply after linting the schema changes (default: "ERROR").
Atlas Cloud login is required.
enum:
- ALWAYS
- WARNING
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ toolchain go1.23.2
require (
ariga.io/atlas v0.28.1
ariga.io/atlas-go-sdk v0.6.4
github.com/hashicorp/hcl/v2 v2.18.1
github.com/rogpeppe/go-internal v1.13.1
github.com/stretchr/testify v1.9.0
github.com/zclconf/go-cty v1.14.4
golang.org/x/mod v0.21.0
k8s.io/api v0.31.0
k8s.io/apimachinery v0.31.1
Expand Down Expand Up @@ -42,7 +44,6 @@ require (
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/hcl/v2 v2.18.1 // indirect
github.com/imdario/mergo v0.3.6 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand All @@ -59,7 +60,6 @@ require (
github.com/prometheus/procfs v0.15.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/zclconf/go-cty v1.14.4 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
Expand Down
142 changes: 112 additions & 30 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
Expand All @@ -24,7 +25,6 @@ import (
"net/url"
"path"
"path/filepath"
"strconv"
"strings"
"time"

Expand All @@ -42,6 +42,8 @@ import (
"github.com/ariga/atlas-operator/api/v1alpha1"
dbv1alpha1 "github.com/ariga/atlas-operator/api/v1alpha1"
"github.com/ariga/atlas-operator/internal/controller/watch"
"github.com/hashicorp/hcl/v2/hclwrite"
"github.com/zclconf/go-cty/cty"
)

//+kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;update;delete;get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -167,6 +169,16 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Create a working directory for the Atlas CLI
// The working directory contains the atlas.hcl config.
wd, err := atlasexec.NewWorkingDir(opts...)
// This function will be used to edit and re-render the atlas.hcl file in the working directory.
editAtlasHCL := func(fn func(m *managedData)) error {
fn(data)
var buf bytes.Buffer
if err := data.render(&buf); err != nil {
return err
}
_, err = wd.WriteFile("atlas.hcl", buf.Bytes())
return err
}
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
Expand Down Expand Up @@ -194,21 +206,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
switch desiredURL := data.Desired.String(); {
// The resource is connected to Atlas Cloud.
case whoami != nil:
vars := atlasexec.Vars2{
"lint_destructive": "true",
"lint_review": dbv1alpha1.LintReviewError,
}
if p := data.Policy; p != nil && p.Lint != nil {
if d := p.Lint.Destructive; d != nil {
vars["lint_destructive"] = strconv.FormatBool(d.Error)
}
if r := p.Lint.Review; r != "" {
vars["lint_review"] = r
}
err = editAtlasHCL(func(m *managedData) {
m.enableDestructive(false)
m.setLintReview(dbv1alpha1.LintReviewError, false)
})
if err != nil {
return result(err)
}
params := &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Vars: vars,
To: desiredURL,
TxMode: string(data.TxMode),
}
Expand All @@ -229,7 +235,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// This ensures push is idempotent.
tag, err := cli.SchemaInspect(ctx, &atlasexec.SchemaInspectParams{
Env: data.EnvName,
Vars: vars,
URL: desiredURL,
Format: `{{ .Hash | base64url }}`,
})
Expand All @@ -245,7 +250,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Vars: vars,
Name: path.Join(repo.Host, repo.Path),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)),
URL: []string{desiredURL},
Expand All @@ -266,7 +270,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Create a new plan for the pending changes.
plan, err := cli.SchemaPlan(ctx, &atlasexec.SchemaPlanParams{
Env: data.EnvName,
Vars: vars,
Repo: repo.String(),
From: []string{"env://url"},
To: []string{desiredURL},
Expand Down Expand Up @@ -302,7 +305,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// List the schema plans to check if there are any plans.
switch plans, err := cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{
Env: data.EnvName,
Vars: vars,
Repo: repo.String(),
From: []string{"env://url"},
To: []string{desiredURL},
Expand Down Expand Up @@ -330,7 +332,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
// There are no pending plans, but Atlas has been asked to review the changes ALWAYS.
case len(plans) == 0 && vars["lint_review"] == dbv1alpha1.LintReviewAlways:
case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways:
// Create a plan for the pending changes.
return createPlan()
// The plan is pending approval, show the plan to the user.
Expand Down Expand Up @@ -358,9 +360,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
// Verify the first run doesn't contain destructive changes.
case res.Status.LastApplied == 0:
err = r.lint(ctx, wd, data, atlasexec.Vars2{
"lint_destructive": "true",
})
// For the first run, force the destructive linting policy to true.
// Then revert it back to the original value after the linting is done.
if err = editAtlasHCL(func(m *managedData) {
m.enableDestructive(true)
}); err != nil {
return result(err)
}
err = r.lint(ctx, wd, data, nil)
switch d := (*destructiveErr)(nil); {
case errors.As(err, &d):
reason, msg := d.FirstRun()
Expand All @@ -378,6 +385,12 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
// Revert the destructive linting policy back to the original value.
if err = editAtlasHCL(func(m *managedData) {
m.Policy.Lint.Destructive.Error = false
}); err != nil {
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
Expand All @@ -386,13 +399,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
})
// Run the linting policy.
case data.shouldLint():
vars := atlasexec.Vars2{}
if p := data.Policy; p != nil && p.Lint != nil {
if d := p.Lint.Destructive; d != nil {
vars["lint_destructive"] = strconv.FormatBool(d.Error)
}
}
if err = r.lint(ctx, wd, data, vars); err != nil {
if err = r.lint(ctx, wd, data, nil); err != nil {
reason, msg := "LintPolicyError", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
Expand All @@ -404,7 +411,6 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Vars: vars,
To: desiredURL,
TxMode: string(data.TxMode),
AutoApprove: true,
Expand Down Expand Up @@ -598,7 +604,83 @@ func (d *managedData) render(w io.Writer) error {
if d.Desired == nil {
return errors.New("the desired state is not set")
}
return tmpl.ExecuteTemplate(w, "atlas_schema.tmpl", d)
f := hclwrite.NewFile()
fBody := f.Body()
for _, b := range d.asBlocks() {
fBody.AppendBlock(b)
}
if _, err := f.WriteTo(w); err != nil {
return err
}
return nil
}

// enableDestructive enables the linting policy for destructive changes.
// If the force is set to true, it will override the existing value.
func (d *managedData) enableDestructive(force bool) {
check := &dbv1alpha1.CheckConfig{Error: true}
destructive := &dbv1alpha1.Lint{Destructive: check}
switch {
case d.Policy == nil:
d.Policy = &dbv1alpha1.Policy{Lint: destructive}
case d.Policy.Lint == nil:
d.Policy.Lint = destructive
case d.Policy.Lint.Destructive == nil, force:
d.Policy.Lint.Destructive = check
}
}

// setLintReview sets the lint review policy.
// If the force is set to true, it will override the existing value.
func (d *managedData) setLintReview(v dbv1alpha1.LintReview, force bool) {
lint := &dbv1alpha1.Lint{Review: v}
switch {
case d.Policy == nil:
d.Policy = &dbv1alpha1.Policy{Lint: lint}
case d.Policy.Lint == nil:
d.Policy.Lint = lint
case d.Policy.Lint.Review == "", force:
d.Policy.Lint.Review = v
}
}

// asBlocks returns the HCL block for the environment configuration.
func (d *managedData) asBlocks() []*hclwrite.Block {
var blocks []*hclwrite.Block
env := hclwrite.NewBlock("env", []string{d.EnvName})
blocks = append(blocks, env)
envBody := env.Body()
if d.URL != nil {
envBody.SetAttributeValue("url", cty.StringVal(d.URL.String()))
}
if d.DevURL != "" {
envBody.SetAttributeValue("dev", cty.StringVal(d.DevURL))
}
if l := d.Schemas; len(l) > 0 {
envBody.SetAttributeValue("schemas", listStringVal(l))
}
if l := d.Exclude; len(l) > 0 {
envBody.SetAttributeValue("exclude", listStringVal(l))
}
if p := d.Policy; p != nil {
if d := p.Diff; d != nil {
envBody.AppendBlock(d.AsBlock())
}
if l := p.Lint; l != nil {
lint := envBody.AppendNewBlock("lint", nil).Body()
if v := l.Destructive; v != nil {
b := lint.AppendNewBlock("destructive", nil).Body()
b.SetAttributeValue("error", cty.BoolVal(v.Error))
}
if v := l.Review; v != "" {
lint.SetAttributeValue("review", cty.StringVal(string(v)))
}
}
}
if v := d.TxMode; v != "" {
envBody.SetAttributeValue("tx_mode", cty.StringVal(string(v)))
}
return blocks
}

func truncateSQL(s []string, size int) []string {
Expand Down
Loading

0 comments on commit e4482e2

Please sign in to comment.