Skip to content

Commit

Permalink
feat: lock schema under a specific migration number
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Oct 24, 2024
1 parent 62e0354 commit cbfaa80
Show file tree
Hide file tree
Showing 21 changed files with 199 additions and 153 deletions.
14 changes: 9 additions & 5 deletions internal/storage/bucket/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bytes"
"context"
_ "embed"
"errors"
"fmt"
"github.com/formancehq/go-libs/v2/migrations"
ledger "github.com/formancehq/ledger/internal"
Expand All @@ -13,6 +12,9 @@ import (
"text/template"
)

// stateless version (+1 regarding directory name, as migrations start from 1 in the lib)
const MinimalSchemaVersion = 12

type Bucket struct {
name string
db bun.IDB
Expand All @@ -23,11 +25,13 @@ func (b *Bucket) Migrate(ctx context.Context, tracer trace.Tracer) error {
}

func (b *Bucket) IsUpToDate(ctx context.Context) (bool, error) {
ret, err := GetMigrator(b.name).IsUpToDate(ctx, b.db)
if err != nil && errors.Is(err, migrations.ErrMissingVersionTable) {
return false, nil
migrator := GetMigrator(b.name)
lastVersion, err := migrator.GetLastVersion(ctx, b.db)
if err != nil {
return false, err
}
return ret, err

return lastVersion >= MinimalSchemaVersion, nil
}

func (b *Bucket) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ do $$
_date timestamp without time zone = (
select tstamp
from _system.goose_db_version
where version_id = 11
where version_id = 12
);
_count integer = (
select count(*)
Expand Down
21 changes: 2 additions & 19 deletions internal/storage/bucket/migrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestMigrations(t *testing.T) {
migrator := bucket.GetMigrator(bucketName)

_, err = bucket.WalkMigrations(func(entry fs.DirEntry) (*struct{}, error) {
before, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "tests_before.sql")
before, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "up_tests_before.sql")
if !errors.Is(err, fs.ErrNotExist) {
require.NoError(t, err)
}
Expand All @@ -50,7 +50,7 @@ func TestMigrations(t *testing.T) {
}
}

after, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "tests_after.sql")
after, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "up_tests_after.sql")
if !errors.Is(err, fs.ErrNotExist) {
require.NoError(t, err)
}
Expand All @@ -62,21 +62,4 @@ func TestMigrations(t *testing.T) {
return pointer.For(struct{}{}), nil
})
require.NoError(t, err)

//moves := make([]map[string]any, 0)
//err = db.NewSelect().
// ModelTableExpr(`"`+bucketName+`".moves`).
// Scan(ctx, &moves)
//require.NoError(t, err)
//
//rows, err := db.NewSelect().
// ModelTableExpr(`"`+bucketName+`".transactions`).
// Column("seq", "id", "post_commit_volumes", "ledger").
// Order("id desc").
// Where("ledger = 'ledger0'").
// Rows(ctx)
//require.NoError(t, err)
//
//data, _ := xsql.Pretty(rows)
//fmt.Println(data)
}
22 changes: 8 additions & 14 deletions internal/storage/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"github.com/formancehq/go-libs/v2/collectionutils"
"github.com/formancehq/go-libs/v2/metadata"
"github.com/formancehq/go-libs/v2/platform/postgres"
systemcontroller "github.com/formancehq/ledger/internal/controller/system"
Expand Down Expand Up @@ -225,22 +224,17 @@ func (d *Driver) UpgradeBucket(ctx context.Context, name string) error {

func (d *Driver) UpgradeAllBuckets(ctx context.Context) error {

buckets := collectionutils.Set[string]{}
err := bunpaginate.Iterate(ctx, ledgercontroller.NewListLedgersQuery(10),
func(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) {
return d.ListLedgers(ctx, q)
},
func(cursor *bunpaginate.Cursor[ledger.Ledger]) error {
for _, l := range cursor.Data {
buckets.Put(l.Bucket)
}
return nil
})
var buckets []string
err := d.db.NewSelect().
DistinctOn("bucket").
Model(&ledger.Ledger{}).
Column("bucket").
Scan(ctx, &buckets)
if err != nil {
return err
return fmt.Errorf("getting buckets: %w", err)
}

for _, bucketName := range collectionutils.Keys(buckets) {
for _, bucketName := range buckets {
b := bucket.New(d.db, bucketName)

logging.FromContext(ctx).Infof("Upgrading bucket '%s'", bucketName)
Expand Down
5 changes: 4 additions & 1 deletion pkg/testserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type Configuration struct {
Debug bool
OTLPConfig *OTLPConfig
ExperimentalFeatures bool
DisableAutoUpgrade bool
BulkMaxSize int
ExperimentalNumscriptRewrite bool
}
Expand All @@ -68,11 +69,13 @@ func (s *Server) Start() error {
args := []string{
"serve",
"--" + cmd.BindFlag, ":0",
"--" + cmd.AutoUpgradeFlag,
"--" + bunconnect.PostgresURIFlag, s.configuration.PostgresConfiguration.DatabaseSourceName,
"--" + bunconnect.PostgresMaxOpenConnsFlag, fmt.Sprint(s.configuration.PostgresConfiguration.MaxOpenConns),
"--" + bunconnect.PostgresConnMaxIdleTimeFlag, fmt.Sprint(s.configuration.PostgresConfiguration.ConnMaxIdleTime),
}
if !s.configuration.DisableAutoUpgrade {
args = append(args, "--"+cmd.AutoUpgradeFlag)
}
if s.configuration.ExperimentalFeatures {
args = append(
args,
Expand Down
Loading

0 comments on commit cbfaa80

Please sign in to comment.