Skip to content

Commit

Permalink
fix: potential double dispense on some rare case
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Sep 26, 2024
1 parent 26f924d commit a8555fe
Show file tree
Hide file tree
Showing 4 changed files with 162 additions and 113 deletions.
45 changes: 28 additions & 17 deletions internal/storage/ledger/balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,6 @@ import (
"github.com/uptrace/bun"
)

type Balances struct {
bun.BaseModel `bun:"accounts_volumes"`

Ledger string `bun:"ledger,type:varchar"`
Account string `bun:"accounts_address,type:varchar"`
Asset string `bun:"asset,type:varchar"`
Balance *big.Int `bun:"balance,type:numeric"`
}

func (s *Store) selectAccountWithVolumes(date *time.Time, useInsertionDate bool, builder query.Builder) *bun.SelectQuery {

ret := s.db.NewSelect()
Expand Down Expand Up @@ -196,12 +187,32 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
}
}

balances := make([]Balances, 0)
accountsVolumes := make([]AccountsVolumes, 0)
for account, assets := range query {
for _, asset := range assets {
accountsVolumes = append(accountsVolumes, AccountsVolumes{
Ledger: s.ledger.Name,
Account: account,
Asset: asset,
Input: new(big.Int),
Output: new(big.Int),
})
}
}

err := s.db.NewSelect().
Model(&balances).
With(
"ins",
// try to insert volumes with 0 values
// this way, if the account has a 0 balance at this point, it will be locked also
s.db.NewInsert().
Model(&accountsVolumes).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
On("conflict do nothing"),
).
Model(&accountsVolumes).
ModelTableExpr(s.GetPrefixedRelationName("accounts_volumes")).
ColumnExpr("accounts_address, asset").
ColumnExpr("input - output as balance").
Column("accounts_address", "asset", "input", "output").
Where("("+strings.Join(conditions, ") OR (")+")", args...).
For("update").
// notes(gfyrag): keep order, it ensures consistent locking order and limit deadlocks
Expand All @@ -212,11 +223,11 @@ func (s *Store) GetBalances(ctx context.Context, query ledgercontroller.BalanceQ
}

ret := ledgercontroller.Balances{}
for _, balance := range balances {
if _, ok := ret[balance.Account]; !ok {
ret[balance.Account] = map[string]*big.Int{}
for _, volumes := range accountsVolumes {
if _, ok := ret[volumes.Account]; !ok {
ret[volumes.Account] = map[string]*big.Int{}
}
ret[balance.Account][balance.Asset] = balance.Balance
ret[volumes.Account][volumes.Asset] = new(big.Int).Sub(volumes.Input, volumes.Output)
}

// fill empty balances with 0 value
Expand Down
71 changes: 6 additions & 65 deletions internal/storage/ledger/balances_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@ func TestBalancesGet(t *testing.T) {
require.NoError(t, err)

_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD",
Input: new(big.Int),
Output: big.NewInt(100),
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD",
Input: new(big.Int),
Output: big.NewInt(100),
})
require.NoError(t, err)

Expand Down Expand Up @@ -95,6 +95,7 @@ func TestBalancesGet(t *testing.T) {
})

t.Run("balance query with empty balance", func(t *testing.T) {

balances, err := store.GetBalances(ctx, ledgercontroller.BalanceQuery{
"world": {"USD"},
"not-existing": {"USD"},
Expand Down Expand Up @@ -253,63 +254,3 @@ func TestBalancesAggregates(t *testing.T) {
}, ret)
})
}

func TestUpdateBalances(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

world := &Account{
Ledger: store.ledger.Name,
Address: "world",
AddressArray: []string{"world"},
InsertionDate: time.Now(),
UpdatedAt: time.Now(),
FirstUsage: time.Now(),
}
_, err := store.upsertAccount(ctx, world)
require.NoError(t, err)

volumes, err := store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Input: big.NewInt(0),
Output: big.NewInt(100),
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(0, 100),
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Input: big.NewInt(50),
Output: big.NewInt(0),
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(50, 100),
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Input: big.NewInt(50),
Output: big.NewInt(50),
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(100, 150),
},
}, volumes)
}
31 changes: 0 additions & 31 deletions internal/storage/ledger/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,37 +300,6 @@ func TestTransactionsCommit(t *testing.T) {

store := newLedgerStore(t)

account1 := &Account{
Address: "account:1",
}
_, err := store.upsertAccount(ctx, account1)
require.NoError(t, err)

account2 := &Account{
Address: "account:2",
}
_, err = store.upsertAccount(ctx, account2)
require.NoError(t, err)

// todo: we should not need to update volumes to have a lock
_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "account:1",
Asset: "USD",
Input: big.NewInt(100),
Output: big.NewInt(0),
})
require.NoError(t, err)

_, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "account:2",
Asset: "USD",
Input: big.NewInt(100),
Output: big.NewInt(0),
})
require.NoError(t, err)

// Create a new sql transaction to commit a transaction from account:1 to account:2
// it will block until storeWithBlockingTx is commited or rollbacked.
txWithAccount1AsSource, err := store.db.BeginTx(ctx, &sql.TxOptions{})
Expand Down
128 changes: 128 additions & 0 deletions internal/storage/ledger/volumes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
package ledger

import (
"database/sql"
"github.com/formancehq/go-libs/platform/postgres"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/pkg/errors"
"math/big"
"testing"
libtime "time"

"github.com/formancehq/go-libs/time"

Expand Down Expand Up @@ -679,5 +683,129 @@ func TestAggGetVolumesWithBalances(t *testing.T) {
},
})
})
}

func TestUpdateVolumes(t *testing.T) {
t.Parallel()

t.Run("update volumes of same account sequentially", func(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

volumes, err := store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Input: big.NewInt(0),
Output: big.NewInt(100),
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(0, 100),
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Input: big.NewInt(50),
Output: big.NewInt(0),
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(50, 100),
},
}, volumes)

volumes, err = store.updateVolumes(ctx, AccountsVolumes{
Ledger: store.ledger.Name,
Account: "world",
Asset: "USD/2",
Input: big.NewInt(50),
Output: big.NewInt(50),
})
require.NoError(t, err)
require.Equal(t, ledger.PostCommitVolumes{
"world": {
"USD/2": ledger.NewVolumesInt64(100, 150),
},
}, volumes)
})

t.Run("get balance of not existing account should take a lock", func(t *testing.T) {
t.Parallel()

store := newLedgerStore(t)
ctx := logging.TestingContext()

sqlTx1, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{})
require.NoError(t, err)
t.Cleanup(func() {
_ = sqlTx1.Rollback()
})
storeTx1 := store.WithDB(sqlTx1)

sqlTx2, err := store.GetDB().BeginTx(ctx, &sql.TxOptions{})
require.NoError(t, err)
t.Cleanup(func() {
_ = sqlTx2.Rollback()
})
storeTx2 := store.WithDB(sqlTx2)

// at this stage, the accounts_volumes is empty
// taking balance of the 'world' account should force a lock
volumes, err := storeTx1.GetBalances(ctx, ledgercontroller.BalanceQuery{
"world": {"USD"},
})
require.NoError(t, err)
require.Equal(t, ledgercontroller.Balances{
"world": {
"USD": big.NewInt(0),
},
}, volumes)

// take an arbitrary lock on tx2
_, err = storeTx2.GetDB().NewRaw(`select pg_advisory_xact_lock(1)`).Exec(ctx)
require.NoError(t, err)

errChan := make(chan error, 2)
go func() {
// this call should lock since the lock iw owned by tx1
_, err := storeTx2.GetBalances(ctx, ledgercontroller.BalanceQuery{
"world": {"USD"},
})
errChan <- err
}()

go func() {
// take the same advisory lock for tx1 as tx2
// as tx1 hold a lock on the world balance, and tx2 is waiting for that balance
// it should trigger a deadlock
_, err = storeTx1.GetDB().NewRaw(`select pg_advisory_xact_lock(1)`).Exec(ctx)
errChan <- postgres.ResolveError(err)
}()

// either tx1 or tx2 should be cancelled by PG with a deadlock error
select {
case err := <-errChan:
if err == nil {
select {
case err = <-errChan:
if err == nil {
require.Fail(t, "should have a deadlock")
}
case <-libtime.After(2 * time.Second):
require.Fail(t, "transaction should have finished")
}
}
require.True(t, errors.Is(err, postgres.ErrDeadlockDetected))
case <-libtime.After(2 * time.Second):
require.Fail(t, "transaction should have finished")
}
})
}

0 comments on commit a8555fe

Please sign in to comment.