From c7862444070c7f0c3fcec611d450451cffa21d3e Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Tue, 22 Oct 2024 11:00:17 +0200 Subject: [PATCH] fix: run migration outside sql transactions --- internal/storage/driver/driver.go | 65 ++++++++++++++----------------- 1 file changed, 29 insertions(+), 36 deletions(-) diff --git a/internal/storage/driver/driver.go b/internal/storage/driver/driver.go index 42c28f719..f43df0b63 100644 --- a/internal/storage/driver/driver.go +++ b/internal/storage/driver/driver.go @@ -2,7 +2,7 @@ package driver import ( "context" - "database/sql" + "errors" "errors" "fmt" "github.com/formancehq/go-libs/v2/metadata" @@ -36,41 +36,26 @@ type Driver struct { func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Ledger) (*ledgerstore.Store, error) { - tx, err := db.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - return nil, fmt.Errorf("begin transaction: %w", err) - } - - b := bucket.New(tx, l.Bucket) + b := bucket.New(d.db, l.Bucket) if err := b.Migrate(ctx, d.tracer); err != nil { return nil, fmt.Errorf("migrating bucket: %w", err) } - ret, err := db.NewInsert(). + _, err := db.NewInsert(). Model(l). - Ignore(). Returning("id, added_at"). Exec(ctx) if err != nil { + if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) { + return nil, systemcontroller.ErrLedgerAlreadyExists + } return nil, postgres.ResolveError(err) } - affected, err := ret.RowsAffected() - if err != nil { - return nil, fmt.Errorf("creating ledger: %w", err) - } - if affected == 0 { - return nil, systemcontroller.ErrLedgerAlreadyExists - } - - if err := b.AddLedger(ctx, *l, tx); err != nil { + if err := b.AddLedger(ctx, *l, d.db); err != nil { return nil, fmt.Errorf("adding ledger to bucket: %w", err) } - if err := tx.Commit(); err != nil { - return nil, fmt.Errorf("committing sql transaction to create ledger and schemas: %w", err) - } - return ledgerstore.New( d.db, b, @@ -82,29 +67,37 @@ func (d *Driver) createLedgerStore(ctx context.Context, db bun.IDB, l *ledger.Le func (d *Driver) CreateLedger(ctx context.Context, l *ledger.Ledger) (*ledgerstore.Store, error) { - // start a transaction because we will need to create the schema and apply ledger migrations - tx, err := d.db.BeginTx(ctx, &sql.TxOptions{}) - if err != nil { - return nil, fmt.Errorf("begin transaction: %w", err) - } - defer func() { - _ = tx.Rollback() - }() - if l.Metadata == nil { l.Metadata = metadata.Metadata{} } - store, err := d.createLedgerStore(ctx, tx, l) + b := bucket.New(d.db, l.Bucket) + if err := b.Migrate(ctx, d.tracer); err != nil { + return nil, fmt.Errorf("migrating bucket: %w", err) + } + + _, err := d.db.NewInsert(). + Model(l). + Returning("id, added_at"). + Exec(ctx) if err != nil { - return nil, err + if errors.Is(postgres.ResolveError(err), postgres.ErrConstraintsFailed{}) { + return nil, systemcontroller.ErrLedgerAlreadyExists + } + return nil, postgres.ResolveError(err) } - if err := tx.Commit(); err != nil { - return nil, fmt.Errorf("committing sql transaction to create ledger schema: %w", err) + if err := b.AddLedger(ctx, *l, d.db); err != nil { + return nil, fmt.Errorf("adding ledger to bucket: %w", err) } - return store, nil + return ledgerstore.New( + d.db, + b, + *l, + ledgerstore.WithMeter(d.meter), + ledgerstore.WithTracer(d.tracer), + ), nil } func (d *Driver) OpenLedger(ctx context.Context, name string) (*ledgerstore.Store, *ledger.Ledger, error) {