Skip to content

Commit

Permalink
feat: denormalize addresses on transactions to improve read (#351)
Browse files Browse the repository at this point in the history
* wip: test to denormalize addresses on transactions to improve read performance

* fix: sql insert

* fix: extension init

* fix: sqlite support

* clean: some debug

* fix: remove insertTransaction segfault protection

Co-authored-by: Geoffrey Ragot <[email protected]>
Co-authored-by: Antoine Gelloz <[email protected]>
  • Loading branch information
3 people committed Oct 20, 2022
1 parent f103f3b commit a55756b
Show file tree
Hide file tree
Showing 7 changed files with 171 additions and 19 deletions.
2 changes: 2 additions & 0 deletions pkg/ledger/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func TestLedger_processTx(t *testing.T) {
Transaction: core.Transaction{
TransactionData: core.TransactionData{
Timestamp: now,
Postings: []core.Posting{{}},
},
ID: 0,
},
Expand Down Expand Up @@ -337,6 +338,7 @@ func TestLedger_processTx(t *testing.T) {
Transaction: core.Transaction{
TransactionData: core.TransactionData{
Timestamp: now,
Postings: []core.Posting{{}},
},
ID: 0,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/sqlstorage/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (s *Store) CountTransactions(ctx context.Context, q ledger.TransactionsQuer
return 0, err
}

sb, _ := s.buildTransactionsQuery(q)
sb, _ := s.buildTransactionsQuery(Flavor(s.schema.Flavor()), q)
sqlq, args := sb.BuildWithFlavor(s.schema.Flavor())
sqlq = fmt.Sprintf(`SELECT count(*) FROM (%s) AS t`, sqlq)
err = executor.QueryRowContext(ctx, sqlq, args...).Scan(&count)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package _16_denormalize_addresses

import (
"context"
"testing"
"time"

"github.com/huandu/go-sqlbuilder"
"github.com/numary/ledger/pkg/ledgertesting"
"github.com/numary/ledger/pkg/storage/sqlstorage"
"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
)

func TestMigrate16(t *testing.T) {
driver, closeFunc, err := ledgertesting.StorageDriver()
require.NoError(t, err)
defer closeFunc()

require.NoError(t, driver.Initialize(context.Background()))
store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true)
require.NoError(t, err)

schema := store.Schema()

migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS)
require.NoError(t, err)

modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:16]...)
require.NoError(t, err)
require.True(t, modified)

now := time.Now().UTC().Truncate(time.Second)

ib := sqlbuilder.NewInsertBuilder()
sqlq, args := ib.
InsertInto(schema.Table("transactions")).
Cols("id", "timestamp", "postings", "metadata").
Values(0, now.Format(time.RFC3339), `[
{"source": "world", "destination": "bank", "asset": "USD", "amount": 100},
{"source": "bank", "destination": "user", "asset": "USD", "amount": 100}
]`, "{}").
BuildWithFlavor(schema.Flavor())
_, err = schema.ExecContext(context.Background(), sqlq, args...)
require.NoError(t, err)

modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[16])
require.NoError(t, err)
require.True(t, modified)

sqlq, args = sqlbuilder.
Select("sources", "destinations").
From(schema.Table("transactions")).
Where("id = 0").
BuildWithFlavor(schema.Flavor())

row := store.Schema().QueryRowContext(context.Background(), sqlq, args...)
require.NoError(t, row.Err())
var sources, destinations string
require.NoError(t, err, row.Scan(&sources, &destinations))
require.Equal(t, "world;bank", sources)
require.Equal(t, "bank;user", destinations)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
--statement
alter table "VAR_LEDGER_NAME".transactions add column sources text;
--statement
alter table "VAR_LEDGER_NAME".transactions add column destinations text;
--statement
create index transactions_sources ON "VAR_LEDGER_NAME".transactions USING GIN (sources gin_trgm_ops);
--statement
create index transactions_destinations ON "VAR_LEDGER_NAME".transactions USING GIN (destinations gin_trgm_ops);
--statement
update "VAR_LEDGER_NAME".transactions
set sources = (
select string_agg(ele->>'source', ';')
from "VAR_LEDGER_NAME".transactions sub
cross join lateral jsonb_array_elements(postings) source(ele)
where transactions.id = sub.id
), destinations = (
select string_agg(ele->>'destination', ';')
from "VAR_LEDGER_NAME".transactions sub
cross join lateral jsonb_array_elements(postings) source(ele)
where transactions.id = sub.id
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
--statement
alter table transactions add column sources text;
--statement
alter table transactions add column destinations text;
--statement
UPDATE transactions
SET sources = (
select group_concat(json_extract(json_each.value, '$.source'), ';')
from transactions tx2, json_each(tx2.postings)
where transactions.id = tx2.id
), destinations = (
select group_concat(json_extract(json_each.value, '$.destination'), ';')
from transactions tx2, json_each(tx2.postings)
where transactions.id = tx2.id
);
4 changes: 4 additions & 0 deletions pkg/storage/sqlstorage/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ func (p *postgresDB) Initialize(ctx context.Context) error {
if err != nil {
return err
}
_, err = p.db.ExecContext(ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm")
if err != nil {
return err
}
return nil
}

Expand Down
83 changes: 65 additions & 18 deletions pkg/storage/sqlstorage/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,45 @@ import (
"github.com/pkg/errors"
)

func (s *Store) buildTransactionsQuery(p ledger.TransactionsQuery) (*sqlbuilder.SelectBuilder, TxsPaginationToken) {
func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery) (*sqlbuilder.SelectBuilder, TxsPaginationToken) {
sb := sqlbuilder.NewSelectBuilder()
t := TxsPaginationToken{}

var (
destination = p.Filters.Destination
source = p.Filters.Source
account = p.Filters.Account
reference = p.Filters.Reference
startTime = p.Filters.StartTime
endTime = p.Filters.EndTime
metadata = p.Filters.Metadata
destination = p.Filters.Destination
source = p.Filters.Source
account = p.Filters.Account
reference = p.Filters.Reference
startTime = p.Filters.StartTime
endTime = p.Filters.EndTime
metadata = p.Filters.Metadata
regexOperator = "~"
)
if flavor == SQLite {
regexOperator = "REGEXP"
}

sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes")
sb.From(s.schema.Table("transactions"))
if account != "" {
arg := sb.Args.Add(account)
sb.Where(s.schema.Table("use_account") + "(postings, " + arg + ")")
r := fmt.Sprintf("(^|;)%s($|;)", account)
arg := sb.Args.Add(r)
sb.Where(sb.Or(
fmt.Sprintf("sources %s %s", regexOperator, arg),
fmt.Sprintf("destinations %s %s", regexOperator, arg),
))
t.AccountFilter = account
}
if source != "" {
arg := sb.Args.Add(source)
sb.Where(s.schema.Table("use_account_as_source") + "(postings, " + arg + ")")
r := fmt.Sprintf("(^|;)%s($|;)", source)
arg := sb.Args.Add(r)
sb.Where(fmt.Sprintf("sources %s %s", regexOperator, arg))
t.SourceFilter = source
}
if destination != "" {
arg := sb.Args.Add(destination)
sb.Where(s.schema.Table("use_account_as_destination") + "(postings, " + arg + ")")
r := fmt.Sprintf("(^|;)%s($|;)", destination)
arg := sb.Args.Add(r)
sb.Where(fmt.Sprintf("destinations %s %s", regexOperator, arg))
t.DestinationFilter = destination
}
if reference != "" {
Expand Down Expand Up @@ -80,7 +90,7 @@ func (s *Store) GetTransactions(ctx context.Context, q ledger.TransactionsQuery)
return sharedapi.Cursor[core.ExpandedTransaction]{Data: txs}, nil
}

sb, t := s.buildTransactionsQuery(q)
sb, t := s.buildTransactionsQuery(Flavor(s.schema.Flavor()), q)
sb.OrderBy("id").Desc()
if q.AfterTxID > 0 {
sb.Where(sb.LE("id", q.AfterTxID))
Expand Down Expand Up @@ -276,7 +286,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran
case sqlbuilder.SQLite:
ib := sqlbuilder.NewInsertBuilder()
ib.InsertInto(s.schema.Table("transactions"))
ib.Cols("id", "timestamp", "reference", "postings", "metadata", "pre_commit_volumes", "post_commit_volumes")
ib.Cols("id", "timestamp", "reference", "postings", "metadata",
"pre_commit_volumes", "post_commit_volumes", "sources", "destinations")
for _, tx := range txs {
postingsData, err := json.Marshal(tx.Postings)
if err != nil {
Expand All @@ -300,6 +311,14 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran
if err != nil {
panic(err)
}
sources := ""
destinations := ""
for _, p := range tx.Postings {
sources = fmt.Sprintf("%s;%s", sources, p.Source)
destinations = fmt.Sprintf("%s;%s", destinations, p.Destination)
}
sources = sources[1:]
destinations = destinations[1:]

var reference *string
if tx.Reference != "" {
Expand All @@ -308,7 +327,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran
}

ib.Values(tx.ID, tx.Timestamp, reference, postingsData,
metadataData, preCommitVolumesData, postCommitVolumesData)
metadataData, preCommitVolumesData, postCommitVolumesData,
sources, destinations)
}
query, args = ib.BuildWithFlavor(s.schema.Flavor())
case sqlbuilder.PostgreSQL:
Expand All @@ -319,6 +339,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran
metadataDataSet := make([]string, len(txs))
preCommitVolumesDataSet := make([]string, len(txs))
postCommitVolumesDataSet := make([]string, len(txs))
sources := make([]string, len(txs))
destinations := make([]string, len(txs))

for i, tx := range txs {
postingsData, err := json.Marshal(tx.Postings)
Expand All @@ -344,25 +366,50 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran
panic(err)
}

computedSources := ""
for _, p := range tx.Postings {
computedSources = fmt.Sprintf("%s;%s", computedSources, p.Source)
}
computedSources = computedSources[1:] // Strip leading ;

computedDestinations := ""
for _, p := range tx.Postings {
computedDestinations = fmt.Sprintf("%s;%s", computedDestinations, p.Destination)
}
computedDestinations = computedDestinations[1:]

ids[i] = tx.ID
timestamps[i] = tx.Timestamp
postingDataSet[i] = string(postingsData)
metadataDataSet[i] = string(metadataData)
preCommitVolumesDataSet[i] = string(preCommitVolumesData)
postCommitVolumesDataSet[i] = string(postCommitVolumesData)
references[i] = nil
sources[i] = computedSources
destinations[i] = computedDestinations
if tx.Reference != "" {
cp := tx.Reference
references[i] = &cp
}
}

query = fmt.Sprintf(
`INSERT INTO "%s".transactions (id, timestamp, reference, postings, metadata, pre_commit_volumes, post_commit_volumes) (SELECT * FROM unnest($1::int[], $2::timestamp[], $3::varchar[], $4::jsonb[], $5::jsonb[], $6::jsonb[], $7::jsonb[]))`,
`INSERT INTO "%s".transactions (id, timestamp, reference, postings, metadata, pre_commit_volumes,
post_commit_volumes, sources, destinations) (SELECT * FROM unnest(
$1::int[],
$2::timestamp[],
$3::varchar[],
$4::jsonb[],
$5::jsonb[],
$6::jsonb[],
$7::jsonb[],
$8::varchar[],
$9::varchar[]))`,
s.schema.Name())
args = []interface{}{
ids, timestamps, references, postingDataSet,
metadataDataSet, preCommitVolumesDataSet, postCommitVolumesDataSet,
sources, destinations,
}
}

Expand Down

0 comments on commit a55756b

Please sign in to comment.