From a55756b46787d89036724ec7db2c966d8439c1af Mon Sep 17 00:00:00 2001 From: Maxence Maireaux Date: Wed, 19 Oct 2022 17:33:27 +0200 Subject: [PATCH] feat: denormalize addresses on transactions to improve read (#351) * wip: test to denormalize addresses on transactions to improve read performance * fix: sql insert * fix: extension init * fix: sqlite support * clean: some debug * fix: remove insertTransaction segfault protection Co-authored-by: Geoffrey Ragot Co-authored-by: Antoine Gelloz --- pkg/ledger/process_test.go | 2 + pkg/storage/sqlstorage/aggregations.go | 2 +- .../16-denormalize-addresses/any_test.go | 63 ++++++++++++++ .../16-denormalize-addresses/postgres.sql | 21 +++++ .../16-denormalize-addresses/sqlite.sql | 15 ++++ pkg/storage/sqlstorage/schema.go | 4 + pkg/storage/sqlstorage/transactions.go | 83 +++++++++++++++---- 7 files changed, 171 insertions(+), 19 deletions(-) create mode 100644 pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go create mode 100644 pkg/storage/sqlstorage/migrates/16-denormalize-addresses/postgres.sql create mode 100644 pkg/storage/sqlstorage/migrates/16-denormalize-addresses/sqlite.sql diff --git a/pkg/ledger/process_test.go b/pkg/ledger/process_test.go index 7177f6fa4..a7fc94518 100644 --- a/pkg/ledger/process_test.go +++ b/pkg/ledger/process_test.go @@ -309,6 +309,7 @@ func TestLedger_processTx(t *testing.T) { Transaction: core.Transaction{ TransactionData: core.TransactionData{ Timestamp: now, + Postings: []core.Posting{{}}, }, ID: 0, }, @@ -337,6 +338,7 @@ func TestLedger_processTx(t *testing.T) { Transaction: core.Transaction{ TransactionData: core.TransactionData{ Timestamp: now, + Postings: []core.Posting{{}}, }, ID: 0, }, diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index 3fa56f23a..fa830e10b 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -18,7 +18,7 @@ func (s *Store) CountTransactions(ctx context.Context, q ledger.TransactionsQuer return 0, err } - sb, _ := s.buildTransactionsQuery(q) + sb, _ := s.buildTransactionsQuery(Flavor(s.schema.Flavor()), q) sqlq, args := sb.BuildWithFlavor(s.schema.Flavor()) sqlq = fmt.Sprintf(`SELECT count(*) FROM (%s) AS t`, sqlq) err = executor.QueryRowContext(ctx, sqlq, args...).Scan(&count) diff --git a/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go b/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go new file mode 100644 index 000000000..ee4dbf147 --- /dev/null +++ b/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go @@ -0,0 +1,63 @@ +package _16_denormalize_addresses + +import ( + "context" + "testing" + "time" + + "github.com/huandu/go-sqlbuilder" + "github.com/numary/ledger/pkg/ledgertesting" + "github.com/numary/ledger/pkg/storage/sqlstorage" + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" +) + +func TestMigrate16(t *testing.T) { + driver, closeFunc, err := ledgertesting.StorageDriver() + require.NoError(t, err) + defer closeFunc() + + require.NoError(t, driver.Initialize(context.Background())) + store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true) + require.NoError(t, err) + + schema := store.Schema() + + migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS) + require.NoError(t, err) + + modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:16]...) + require.NoError(t, err) + require.True(t, modified) + + now := time.Now().UTC().Truncate(time.Second) + + ib := sqlbuilder.NewInsertBuilder() + sqlq, args := ib. + InsertInto(schema.Table("transactions")). + Cols("id", "timestamp", "postings", "metadata"). + Values(0, now.Format(time.RFC3339), `[ + {"source": "world", "destination": "bank", "asset": "USD", "amount": 100}, + {"source": "bank", "destination": "user", "asset": "USD", "amount": 100} + ]`, "{}"). + BuildWithFlavor(schema.Flavor()) + _, err = schema.ExecContext(context.Background(), sqlq, args...) + require.NoError(t, err) + + modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[16]) + require.NoError(t, err) + require.True(t, modified) + + sqlq, args = sqlbuilder. + Select("sources", "destinations"). + From(schema.Table("transactions")). + Where("id = 0"). + BuildWithFlavor(schema.Flavor()) + + row := store.Schema().QueryRowContext(context.Background(), sqlq, args...) + require.NoError(t, row.Err()) + var sources, destinations string + require.NoError(t, err, row.Scan(&sources, &destinations)) + require.Equal(t, "world;bank", sources) + require.Equal(t, "bank;user", destinations) +} diff --git a/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/postgres.sql b/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/postgres.sql new file mode 100644 index 000000000..4eaa92eaa --- /dev/null +++ b/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/postgres.sql @@ -0,0 +1,21 @@ +--statement +alter table "VAR_LEDGER_NAME".transactions add column sources text; +--statement +alter table "VAR_LEDGER_NAME".transactions add column destinations text; +--statement +create index transactions_sources ON "VAR_LEDGER_NAME".transactions USING GIN (sources gin_trgm_ops); +--statement +create index transactions_destinations ON "VAR_LEDGER_NAME".transactions USING GIN (destinations gin_trgm_ops); +--statement +update "VAR_LEDGER_NAME".transactions +set sources = ( + select string_agg(ele->>'source', ';') + from "VAR_LEDGER_NAME".transactions sub + cross join lateral jsonb_array_elements(postings) source(ele) + where transactions.id = sub.id +), destinations = ( + select string_agg(ele->>'destination', ';') + from "VAR_LEDGER_NAME".transactions sub + cross join lateral jsonb_array_elements(postings) source(ele) + where transactions.id = sub.id +); diff --git a/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/sqlite.sql b/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/sqlite.sql new file mode 100644 index 000000000..90db4a7e8 --- /dev/null +++ b/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/sqlite.sql @@ -0,0 +1,15 @@ +--statement +alter table transactions add column sources text; +--statement +alter table transactions add column destinations text; +--statement +UPDATE transactions +SET sources = ( + select group_concat(json_extract(json_each.value, '$.source'), ';') + from transactions tx2, json_each(tx2.postings) + where transactions.id = tx2.id +), destinations = ( + select group_concat(json_extract(json_each.value, '$.destination'), ';') + from transactions tx2, json_each(tx2.postings) + where transactions.id = tx2.id +); diff --git a/pkg/storage/sqlstorage/schema.go b/pkg/storage/sqlstorage/schema.go index f655198d4..d1bf8df7a 100644 --- a/pkg/storage/sqlstorage/schema.go +++ b/pkg/storage/sqlstorage/schema.go @@ -146,6 +146,10 @@ func (p *postgresDB) Initialize(ctx context.Context) error { if err != nil { return err } + _, err = p.db.ExecContext(ctx, "CREATE EXTENSION IF NOT EXISTS pg_trgm") + if err != nil { + return err + } return nil } diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index 774f033cd..734ad601f 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -17,35 +17,45 @@ import ( "github.com/pkg/errors" ) -func (s *Store) buildTransactionsQuery(p ledger.TransactionsQuery) (*sqlbuilder.SelectBuilder, TxsPaginationToken) { +func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery) (*sqlbuilder.SelectBuilder, TxsPaginationToken) { sb := sqlbuilder.NewSelectBuilder() t := TxsPaginationToken{} var ( - destination = p.Filters.Destination - source = p.Filters.Source - account = p.Filters.Account - reference = p.Filters.Reference - startTime = p.Filters.StartTime - endTime = p.Filters.EndTime - metadata = p.Filters.Metadata + destination = p.Filters.Destination + source = p.Filters.Source + account = p.Filters.Account + reference = p.Filters.Reference + startTime = p.Filters.StartTime + endTime = p.Filters.EndTime + metadata = p.Filters.Metadata + regexOperator = "~" ) + if flavor == SQLite { + regexOperator = "REGEXP" + } sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") sb.From(s.schema.Table("transactions")) if account != "" { - arg := sb.Args.Add(account) - sb.Where(s.schema.Table("use_account") + "(postings, " + arg + ")") + r := fmt.Sprintf("(^|;)%s($|;)", account) + arg := sb.Args.Add(r) + sb.Where(sb.Or( + fmt.Sprintf("sources %s %s", regexOperator, arg), + fmt.Sprintf("destinations %s %s", regexOperator, arg), + )) t.AccountFilter = account } if source != "" { - arg := sb.Args.Add(source) - sb.Where(s.schema.Table("use_account_as_source") + "(postings, " + arg + ")") + r := fmt.Sprintf("(^|;)%s($|;)", source) + arg := sb.Args.Add(r) + sb.Where(fmt.Sprintf("sources %s %s", regexOperator, arg)) t.SourceFilter = source } if destination != "" { - arg := sb.Args.Add(destination) - sb.Where(s.schema.Table("use_account_as_destination") + "(postings, " + arg + ")") + r := fmt.Sprintf("(^|;)%s($|;)", destination) + arg := sb.Args.Add(r) + sb.Where(fmt.Sprintf("destinations %s %s", regexOperator, arg)) t.DestinationFilter = destination } if reference != "" { @@ -80,7 +90,7 @@ func (s *Store) GetTransactions(ctx context.Context, q ledger.TransactionsQuery) return sharedapi.Cursor[core.ExpandedTransaction]{Data: txs}, nil } - sb, t := s.buildTransactionsQuery(q) + sb, t := s.buildTransactionsQuery(Flavor(s.schema.Flavor()), q) sb.OrderBy("id").Desc() if q.AfterTxID > 0 { sb.Where(sb.LE("id", q.AfterTxID)) @@ -276,7 +286,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran case sqlbuilder.SQLite: ib := sqlbuilder.NewInsertBuilder() ib.InsertInto(s.schema.Table("transactions")) - ib.Cols("id", "timestamp", "reference", "postings", "metadata", "pre_commit_volumes", "post_commit_volumes") + ib.Cols("id", "timestamp", "reference", "postings", "metadata", + "pre_commit_volumes", "post_commit_volumes", "sources", "destinations") for _, tx := range txs { postingsData, err := json.Marshal(tx.Postings) if err != nil { @@ -300,6 +311,14 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran if err != nil { panic(err) } + sources := "" + destinations := "" + for _, p := range tx.Postings { + sources = fmt.Sprintf("%s;%s", sources, p.Source) + destinations = fmt.Sprintf("%s;%s", destinations, p.Destination) + } + sources = sources[1:] + destinations = destinations[1:] var reference *string if tx.Reference != "" { @@ -308,7 +327,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran } ib.Values(tx.ID, tx.Timestamp, reference, postingsData, - metadataData, preCommitVolumesData, postCommitVolumesData) + metadataData, preCommitVolumesData, postCommitVolumesData, + sources, destinations) } query, args = ib.BuildWithFlavor(s.schema.Flavor()) case sqlbuilder.PostgreSQL: @@ -319,6 +339,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran metadataDataSet := make([]string, len(txs)) preCommitVolumesDataSet := make([]string, len(txs)) postCommitVolumesDataSet := make([]string, len(txs)) + sources := make([]string, len(txs)) + destinations := make([]string, len(txs)) for i, tx := range txs { postingsData, err := json.Marshal(tx.Postings) @@ -344,6 +366,18 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran panic(err) } + computedSources := "" + for _, p := range tx.Postings { + computedSources = fmt.Sprintf("%s;%s", computedSources, p.Source) + } + computedSources = computedSources[1:] // Strip leading ; + + computedDestinations := "" + for _, p := range tx.Postings { + computedDestinations = fmt.Sprintf("%s;%s", computedDestinations, p.Destination) + } + computedDestinations = computedDestinations[1:] + ids[i] = tx.ID timestamps[i] = tx.Timestamp postingDataSet[i] = string(postingsData) @@ -351,6 +385,8 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran preCommitVolumesDataSet[i] = string(preCommitVolumesData) postCommitVolumesDataSet[i] = string(postCommitVolumesData) references[i] = nil + sources[i] = computedSources + destinations[i] = computedDestinations if tx.Reference != "" { cp := tx.Reference references[i] = &cp @@ -358,11 +394,22 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran } query = fmt.Sprintf( - `INSERT INTO "%s".transactions (id, timestamp, reference, postings, metadata, pre_commit_volumes, post_commit_volumes) (SELECT * FROM unnest($1::int[], $2::timestamp[], $3::varchar[], $4::jsonb[], $5::jsonb[], $6::jsonb[], $7::jsonb[]))`, + `INSERT INTO "%s".transactions (id, timestamp, reference, postings, metadata, pre_commit_volumes, + post_commit_volumes, sources, destinations) (SELECT * FROM unnest( + $1::int[], + $2::timestamp[], + $3::varchar[], + $4::jsonb[], + $5::jsonb[], + $6::jsonb[], + $7::jsonb[], + $8::varchar[], + $9::varchar[]))`, s.schema.Name()) args = []interface{}{ ids, timestamps, references, postingDataSet, metadataDataSet, preCommitVolumesDataSet, postCommitVolumesDataSet, + sources, destinations, } }