diff --git a/internal/storage/bucket/bucket.go b/internal/storage/bucket/bucket.go index 866d5a2c2..c3f2c21cc 100644 --- a/internal/storage/bucket/bucket.go +++ b/internal/storage/bucket/bucket.go @@ -4,7 +4,6 @@ import ( "bytes" "context" _ "embed" - "errors" "fmt" "github.com/formancehq/go-libs/v2/migrations" ledger "github.com/formancehq/ledger/internal" @@ -13,6 +12,9 @@ import ( "text/template" ) +// stateless version (+1 regarding directory name, as migrations start from 1 in the lib) +const MinimalSchemaVersion = 12 + type Bucket struct { name string db bun.IDB @@ -23,11 +25,13 @@ func (b *Bucket) Migrate(ctx context.Context, tracer trace.Tracer) error { } func (b *Bucket) IsUpToDate(ctx context.Context) (bool, error) { - ret, err := GetMigrator(b.name).IsUpToDate(ctx, b.db) - if err != nil && errors.Is(err, migrations.ErrMissingVersionTable) { - return false, nil + migrator := GetMigrator(b.name) + lastVersion, err := migrator.GetLastVersion(ctx, b.db) + if err != nil { + return false, err } - return ret, err + + return lastVersion >= MinimalSchemaVersion, nil } func (b *Bucket) GetMigrationsInfo(ctx context.Context) ([]migrations.Info, error) { diff --git a/internal/storage/bucket/migrations/0-init-schema/tests_after.sql b/internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/0-init-schema/tests_after.sql rename to internal/storage/bucket/migrations/0-init-schema/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/12-moves-fill-transaction-id/tests_after.sql b/internal/storage/bucket/migrations/12-moves-fill-transaction-id/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/12-moves-fill-transaction-id/tests_after.sql rename to internal/storage/bucket/migrations/12-moves-fill-transaction-id/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/12-moves-fill-transaction-id/tests_before.sql b/internal/storage/bucket/migrations/12-moves-fill-transaction-id/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/12-moves-fill-transaction-id/tests_before.sql rename to internal/storage/bucket/migrations/12-moves-fill-transaction-id/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql b/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql index 79349e4e8..9fea51b23 100644 --- a/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql +++ b/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up.sql @@ -7,7 +7,7 @@ do $$ _date timestamp without time zone = ( select tstamp from _system.goose_db_version - where version_id = 11 + where version_id = 12 ); _count integer = ( select count(*) diff --git a/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/tests_after.sql b/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/13-transactions-fill-inserted-at/tests_after.sql rename to internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/tests_before.sql b/internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/13-transactions-fill-inserted-at/tests_before.sql rename to internal/storage/bucket/migrations/13-transactions-fill-inserted-at/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/14-transactions-fill-pcv/tests_after.sql b/internal/storage/bucket/migrations/14-transactions-fill-pcv/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/14-transactions-fill-pcv/tests_after.sql rename to internal/storage/bucket/migrations/14-transactions-fill-pcv/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/14-transactions-fill-pcv/tests_before.sql b/internal/storage/bucket/migrations/14-transactions-fill-pcv/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/14-transactions-fill-pcv/tests_before.sql rename to internal/storage/bucket/migrations/14-transactions-fill-pcv/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/tests_after.sql b/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/15-accounts-volumes-fill-history/tests_after.sql rename to internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/tests_before.sql b/internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/15-accounts-volumes-fill-history/tests_before.sql rename to internal/storage/bucket/migrations/15-accounts-volumes-fill-history/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/tests_after.sql b/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/tests_after.sql rename to internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/tests_before.sql b/internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/tests_before.sql rename to internal/storage/bucket/migrations/16-transactions-metadata-fill-transaction-id/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/tests_after.sql b/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/17-accounts-metadata-fill-address/tests_after.sql rename to internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/tests_before.sql b/internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/17-accounts-metadata-fill-address/tests_before.sql rename to internal/storage/bucket/migrations/17-accounts-metadata-fill-address/up_tests_before.sql diff --git a/internal/storage/bucket/migrations/18-logs-fill-memento/tests_after.sql b/internal/storage/bucket/migrations/18-logs-fill-memento/up_tests_after.sql similarity index 100% rename from internal/storage/bucket/migrations/18-logs-fill-memento/tests_after.sql rename to internal/storage/bucket/migrations/18-logs-fill-memento/up_tests_after.sql diff --git a/internal/storage/bucket/migrations/18-logs-fill-memento/tests_before.sql b/internal/storage/bucket/migrations/18-logs-fill-memento/up_tests_before.sql similarity index 100% rename from internal/storage/bucket/migrations/18-logs-fill-memento/tests_before.sql rename to internal/storage/bucket/migrations/18-logs-fill-memento/up_tests_before.sql diff --git a/internal/storage/bucket/migrations_test.go b/internal/storage/bucket/migrations_test.go index 02df0030b..c9071749e 100644 --- a/internal/storage/bucket/migrations_test.go +++ b/internal/storage/bucket/migrations_test.go @@ -35,7 +35,7 @@ func TestMigrations(t *testing.T) { migrator := bucket.GetMigrator(bucketName) _, err = bucket.WalkMigrations(func(entry fs.DirEntry) (*struct{}, error) { - before, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "tests_before.sql") + before, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "up_tests_before.sql") if !errors.Is(err, fs.ErrNotExist) { require.NoError(t, err) } @@ -50,7 +50,7 @@ func TestMigrations(t *testing.T) { } } - after, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "tests_after.sql") + after, err := bucket.TemplateSQLFile(bucketName, entry.Name(), "up_tests_after.sql") if !errors.Is(err, fs.ErrNotExist) { require.NoError(t, err) } @@ -62,21 +62,4 @@ func TestMigrations(t *testing.T) { return pointer.For(struct{}{}), nil }) require.NoError(t, err) - - //moves := make([]map[string]any, 0) - //err = db.NewSelect(). - // ModelTableExpr(`"`+bucketName+`".moves`). - // Scan(ctx, &moves) - //require.NoError(t, err) - // - //rows, err := db.NewSelect(). - // ModelTableExpr(`"`+bucketName+`".transactions`). - // Column("seq", "id", "post_commit_volumes", "ledger"). - // Order("id desc"). - // Where("ledger = 'ledger0'"). - // Rows(ctx) - //require.NoError(t, err) - // - //data, _ := xsql.Pretty(rows) - //fmt.Println(data) } diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index f2430cfb5..42c28f719 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -5,7 +5,6 @@ import ( "database/sql" "errors" "fmt" - "github.com/formancehq/go-libs/v2/collectionutils" "github.com/formancehq/go-libs/v2/metadata" "github.com/formancehq/go-libs/v2/platform/postgres" systemcontroller "github.com/formancehq/ledger/internal/controller/system" @@ -225,22 +224,17 @@ func (d *Driver) UpgradeBucket(ctx context.Context, name string) error { func (d *Driver) UpgradeAllBuckets(ctx context.Context) error { - buckets := collectionutils.Set[string]{} - err := bunpaginate.Iterate(ctx, ledgercontroller.NewListLedgersQuery(10), - func(ctx context.Context, q ledgercontroller.ListLedgersQuery) (*bunpaginate.Cursor[ledger.Ledger], error) { - return d.ListLedgers(ctx, q) - }, - func(cursor *bunpaginate.Cursor[ledger.Ledger]) error { - for _, l := range cursor.Data { - buckets.Put(l.Bucket) - } - return nil - }) + var buckets []string + err := d.db.NewSelect(). + DistinctOn("bucket"). + Model(&ledger.Ledger{}). + Column("bucket"). + Scan(ctx, &buckets) if err != nil { - return err + return fmt.Errorf("getting buckets: %w", err) } - for _, bucketName := range collectionutils.Keys(buckets) { + for _, bucketName := range buckets { b := bucket.New(d.db, bucketName) logging.FromContext(ctx).Infof("Upgrading bucket '%s'", bucketName) diff --git a/pkg/testserver/server.go b/pkg/testserver/server.go index 2162ccfc1..45b1046d0 100644 --- a/pkg/testserver/server.go +++ b/pkg/testserver/server.go @@ -45,6 +45,7 @@ type Configuration struct { Debug bool OTLPConfig *OTLPConfig ExperimentalFeatures bool + DisableAutoUpgrade bool BulkMaxSize int ExperimentalNumscriptRewrite bool } @@ -68,11 +69,13 @@ func (s *Server) Start() error { args := []string{ "serve", "--" + cmd.BindFlag, ":0", - "--" + cmd.AutoUpgradeFlag, "--" + bunconnect.PostgresURIFlag, s.configuration.PostgresConfiguration.DatabaseSourceName, "--" + bunconnect.PostgresMaxOpenConnsFlag, fmt.Sprint(s.configuration.PostgresConfiguration.MaxOpenConns), "--" + bunconnect.PostgresConnMaxIdleTimeFlag, fmt.Sprint(s.configuration.PostgresConfiguration.ConnMaxIdleTime), } + if !s.configuration.DisableAutoUpgrade { + args = append(args, "--"+cmd.AutoUpgradeFlag) + } if s.configuration.ExperimentalFeatures { args = append( args, diff --git a/test/e2e/lifecycle_test.go b/test/e2e/lifecycle_test.go index 33a38ebcb..10af57c8d 100644 --- a/test/e2e/lifecycle_test.go +++ b/test/e2e/lifecycle_test.go @@ -5,12 +5,15 @@ package test_suite import ( "context" "database/sql" + "github.com/formancehq/go-libs/v2/bun/bunconnect" "github.com/formancehq/go-libs/v2/logging" "github.com/formancehq/go-libs/v2/pointer" "github.com/formancehq/go-libs/v2/time" ledger "github.com/formancehq/ledger/internal" "github.com/formancehq/ledger/pkg/client/models/components" "github.com/formancehq/ledger/pkg/client/models/operations" + "github.com/formancehq/ledger/internal/storage/bucket" + "github.com/formancehq/ledger/internal/storage/driver" ledgerevents "github.com/formancehq/ledger/pkg/events" . "github.com/formancehq/ledger/pkg/testserver" "github.com/google/uuid" @@ -27,139 +30,198 @@ var _ = Context("Ledger application lifecycle tests", func() { ctx = logging.TestingContext() ) - testServer := NewTestServer(func() Configuration { - return Configuration{ - PostgresConfiguration: db.GetValue().ConnectionOptions(), - Output: GinkgoWriter, - Debug: debug, - NatsURL: natsServer.GetValue().ClientURL(), - } - }) - var events chan *nats.Msg - BeforeEach(func() { - events = Subscribe(GinkgoT(), testServer.GetValue()) - }) - - When("starting the service", func() { - It("should be ok", func() { - info, err := testServer.GetValue().Client().Ledger.GetInfo(ctx) - Expect(err).NotTo(HaveOccurred()) - Expect(info.V2ConfigInfoResponse.Version).To(Equal("develop")) - }) - }) - When("restarting the service", func() { - BeforeEach(func(ctx context.Context) { - Expect(testServer.GetValue().Restart(ctx)).To(BeNil()) + Context("Pending transaction should be fully processed before stopping or restarting the server", func() { + testServer := NewTestServer(func() Configuration { + return Configuration{ + PostgresConfiguration: db.GetValue().ConnectionOptions(), + Output: GinkgoWriter, + Debug: debug, + NatsURL: natsServer.GetValue().ClientURL(), + } }) - It("should be ok", func() {}) - }) - When("having some in flight transactions on a ledger", func() { - var ( - sqlTx bun.Tx - countTransactions = 80 - serverRestartTimeout = 10 * time.Second - ) + var events chan *nats.Msg BeforeEach(func() { - err := CreateLedger(ctx, testServer.GetValue(), operations.V2CreateLedgerRequest{ - Ledger: "foo", + events = Subscribe(GinkgoT(), testServer.GetValue()) + }) + + When("starting the service", func() { + It("should be ok", func() { + info, err := testServer.GetValue().Client().Ledger.GetInfo(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(info.V2ConfigInfoResponse.Version).To(Equal("develop")) }) - Expect(err).ToNot(HaveOccurred()) + }) + When("restarting the service", func() { + BeforeEach(func(ctx context.Context) { + Expect(testServer.GetValue().Restart(ctx)).To(BeNil()) + }) + It("should be ok", func() {}) + }) + When("having some in flight transactions on a ledger", func() { + var ( + sqlTx bun.Tx + countTransactions = 80 + serverRestartTimeout = 10 * time.Second + ) + BeforeEach(func() { + err := CreateLedger(ctx, testServer.GetValue(), operations.V2CreateLedgerRequest{ + Ledger: "foo", + }) + Expect(err).ToNot(HaveOccurred()) - // lock logs table to block transactions creation requests - // the first tx will block on the log insertion - // the next transaction will block earlier on advisory lock acquirement for accounts - db := ConnectToDatabase(GinkgoT(), testServer.GetValue()) - sqlTx, err = db.BeginTx(ctx, &sql.TxOptions{}) - Expect(err).To(BeNil()) - DeferCleanup(func() { - _ = sqlTx.Rollback() + // lock logs table to block transactions creation requests + // the first tx will block on the log insertion + // the next transaction will block earlier on advisory lock acquirement for accounts + db := ConnectToDatabase(GinkgoT(), testServer.GetValue()) + sqlTx, err = db.BeginTx(ctx, &sql.TxOptions{}) + Expect(err).To(BeNil()) + DeferCleanup(func() { + _ = sqlTx.Rollback() + }) + + _, err = sqlTx.NewRaw("lock table _default.logs").Exec(ctx) + Expect(err).To(BeNil()) + + // Create transactions in go routines + for i := 0; i < countTransactions; i++ { + go func() { + defer GinkgoRecover() + + _, err := CreateTransaction(ctx, testServer.GetValue(), operations.V2CreateTransactionRequest{ + Ledger: "foo", + V2PostTransaction: components.V2PostTransaction{ + Postings: []components.V2Posting{{ + Amount: big.NewInt(100), + Asset: "USD", + Destination: "bank", + Source: "world", + }}, + }, + }) + Expect(err).To(BeNil()) + }() + } + + // check postgres locks + Eventually(func(g Gomega) int { + count, err := db.NewSelect(). + Table("pg_stat_activity"). + Where("state <> 'idle' and pid <> pg_backend_pid()"). + Where(`query like 'INSERT INTO "_default".accounts%'`). + Count(ctx) + g.Expect(err).To(BeNil()) + return count + }). + WithTimeout(10 * time.Second). + // Once all the transactions are in pending state, we should have one lock + // for the first tx, trying to write a new log. + // And, we should also have countTransactions-1 pending lock for the 'bank' account + Should(BeNumerically("==", countTransactions-1)) // -1 for the first one }) + When("restarting the service", func() { + BeforeEach(func() { + // We will restart the server in a separate gorouting + // the server should not restart until all pending transactions creation requests are fully completed + restarted := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer func() { + close(restarted) + }() + By("restart server", func() { + ctx, cancel := context.WithTimeout(ctx, serverRestartTimeout) + DeferCleanup(cancel) - _, err = sqlTx.NewRaw("lock table _default.logs").Exec(ctx) - Expect(err).To(BeNil()) + Expect(testServer.GetValue().Restart(ctx)).To(BeNil()) + }) + }() - // Create transactions in go routines - for i := 0; i < countTransactions; i++ { - go func() { - defer GinkgoRecover() + // Once the server is restarting, it should not accept any new connection + Eventually(func() error { + _, err := GetInfo(ctx, testServer.GetValue()) + return err + }).ShouldNot(BeNil()) - _, err := CreateTransaction(ctx, testServer.GetValue(), operations.V2CreateTransactionRequest{ - Ledger: "foo", - V2PostTransaction: components.V2PostTransaction{ - Postings: []components.V2Posting{{ - Amount: big.NewInt(100), - Asset: "USD", - Destination: "bank", - Source: "world", - }}, - }, + // by rollback sql transactions, we allow the blocked routines (which create transactions) to resume. + By("rollback tx", func() { + _ = sqlTx.Rollback() + }) + + Eventually(restarted). + WithTimeout(serverRestartTimeout). + Should(BeClosed()) + }) + It("in flight transactions should be correctly terminated before", func() { + transactions, err := ListTransactions(ctx, testServer.GetValue(), operations.V2ListTransactionsRequest{ + Ledger: "foo", + PageSize: pointer.For(int64(countTransactions)), }) Expect(err).To(BeNil()) - }() - } + Expect(transactions.Data).To(HaveLen(countTransactions)) - // check postgres locks - Eventually(func(g Gomega) int { - count, err := db.NewSelect(). - Table("pg_stat_activity"). - Where("state <> 'idle' and pid <> pg_backend_pid()"). - Where(`query like 'INSERT INTO "_default".accounts%'`). - Count(ctx) - g.Expect(err).To(BeNil()) - return count - }). - WithTimeout(10 * time.Second). - // Once all the transactions are in pending state, we should have one lock - // for the first tx, trying to write a new log. - // And, we should also have countTransactions-1 pending lock for the 'bank' account - Should(BeNumerically("==", countTransactions-1)) // -1 for the first one + By("all events should have been properly sent", func() { + for range countTransactions { + Eventually(events).Should(Receive(Event(ledgerevents.EventTypeCommittedTransactions))) + } + }) + }) + }) }) - When("restarting the service", func() { - BeforeEach(func() { - // We will restart the server in a separate gorouting - // the server should not restart until all pending transactions creation requests are fully completed - restarted := make(chan struct{}) - go func() { - defer GinkgoRecover() - defer func() { - close(restarted) - }() - By("restart server", func() { - ctx, cancel := context.WithTimeout(ctx, serverRestartTimeout) - DeferCleanup(cancel) + }) - Expect(testServer.GetValue().Restart(ctx)).To(BeNil()) - }) - }() + Context("Ledger should respond correctly as well as the minimal schema version is respected", func() { + var ( + ledgerName = "default" + ) + BeforeEach(func() { + bunDB, err := bunconnect.OpenSQLDB(ctx, db.GetValue().ConnectionOptions()) + Expect(err).To(BeNil()) - // Once the server is restarting, it should not accept any new connection - Eventually(func() error { - _, err := GetInfo(ctx, testServer.GetValue()) - return err - }).ShouldNot(BeNil()) + Expect(driver.Migrate(ctx, bunDB)).To(BeNil()) - // by rollback sql transactions, we allow the blocked routines (which create transactions) to resume. - By("rollback tx", func() { - _ = sqlTx.Rollback() - }) + _, err = bunDB.NewInsert(). + Model(pointer.For(ledger.MustNewWithDefault(ledgerName))). + Exec(ctx) + Expect(err).To(BeNil()) - Eventually(restarted). - WithTimeout(serverRestartTimeout). - Should(BeClosed()) + migrator := bucket.GetMigrator(ledger.DefaultBucket) + for i := 0; i < bucket.MinimalSchemaVersion; i++ { + Expect(migrator.UpByOne(ctx, bunDB)).To(BeNil()) + } + }) + testServer := NewTestServer(func() Configuration { + return Configuration{ + PostgresConfiguration: db.GetValue().ConnectionOptions(), + Output: GinkgoWriter, + Debug: debug, + NatsURL: natsServer.GetValue().ClientURL(), + DisableAutoUpgrade: true, + } + }) + It("should be ok", func() { + By("we should be able to create a new transaction", func() { + _, err := CreateTransaction(ctx, testServer.GetValue(), operations.V2CreateTransactionRequest{ + Ledger: ledgerName, + V2PostTransaction: components.V2PostTransaction{ + Metadata: map[string]string{}, + Postings: []components.V2Posting{ + { + Amount: big.NewInt(100), + Asset: "USD", + Source: "world", + Destination: "alice", + }, + }, + }, + }) + Expect(err).To(BeNil()) }) - It("in flight transactions should be correctly terminated before", func() { + By("we should be able to list transactions", func() { transactions, err := ListTransactions(ctx, testServer.GetValue(), operations.V2ListTransactionsRequest{ - Ledger: "foo", - PageSize: pointer.For(int64(countTransactions)), + Ledger: ledgerName, }) Expect(err).To(BeNil()) - Expect(transactions.Data).To(HaveLen(countTransactions)) - - By("all events should have been properly sent", func() { - for range countTransactions { - Eventually(events).Should(Receive(Event(ledgerevents.EventTypeCommittedTransactions))) - } - }) + Expect(transactions.Data).To(HaveLen(1)) }) }) })