diff --git a/pkg/storage/sqlstorage/migrates/14-update-timestamp-column-type/any_test.go b/pkg/storage/sqlstorage/migrates/14-update-timestamp-column-type/any_test.go deleted file mode 100644 index 94978b05b..000000000 --- a/pkg/storage/sqlstorage/migrates/14-update-timestamp-column-type/any_test.go +++ /dev/null @@ -1,52 +0,0 @@ -package _14_update_timestamp_column_type - -import ( - "context" - "testing" - "time" - - "github.com/huandu/go-sqlbuilder" - "github.com/numary/ledger/pkg/ledgertesting" - "github.com/numary/ledger/pkg/storage/sqlstorage" - "github.com/pborman/uuid" - "github.com/stretchr/testify/require" -) - -func TestMigrate14(t *testing.T) { - driver, closeFunc, err := ledgertesting.StorageDriver() - require.NoError(t, err) - defer closeFunc() - - require.NoError(t, driver.Initialize(context.Background())) - store, _, err := driver.GetLedgerStore(context.Background(), uuid.New(), true) - require.NoError(t, err) - - schema := store.Schema() - - migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS) - require.NoError(t, err) - - modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:14]...) - require.NoError(t, err) - require.True(t, modified) - - now := time.Now().UTC().Truncate(time.Second) - - ib := sqlbuilder.NewInsertBuilder() - sqlq, args := ib. - InsertInto(schema.Table("transactions")). - Cols("id", "timestamp", "postings", "metadata"). - Values(0, now.Format(time.RFC3339), `[{"source": "world", "destination": "bank", "asset": "USD", "amount": 100}]`, "{}"). - BuildWithFlavor(schema.Flavor()) - _, err = schema.ExecContext(context.Background(), sqlq, args...) - require.NoError(t, err) - - modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[14]) - require.NoError(t, err) - require.True(t, modified) - - tx, err := store.GetTransaction(context.Background(), 0) - require.NoError(t, err) - require.Equal(t, now, tx.Timestamp) - require.Len(t, tx.Postings, 1) -} diff --git a/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go b/pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go similarity index 67% rename from pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go rename to pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go index ee4dbf147..e124dcf89 100644 --- a/pkg/storage/sqlstorage/migrates/16-denormalize-addresses/any_test.go +++ b/pkg/storage/sqlstorage/migrates/17-optimized-segments/any_test.go @@ -1,4 +1,4 @@ -package _16_denormalize_addresses +package _17_optimized_segments import ( "context" @@ -12,7 +12,11 @@ import ( "github.com/stretchr/testify/require" ) -func TestMigrate16(t *testing.T) { +func TestMigrate17(t *testing.T) { + if ledgertesting.StorageDriverName() != "postgres" { + t.Skip() + } + driver, closeFunc, err := ledgertesting.StorageDriver() require.NoError(t, err) defer closeFunc() @@ -26,7 +30,7 @@ func TestMigrate16(t *testing.T) { migrations, err := sqlstorage.CollectMigrationFiles(sqlstorage.MigrationsFS) require.NoError(t, err) - modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:16]...) + modified, err := sqlstorage.Migrate(context.Background(), schema, migrations[0:17]...) require.NoError(t, err) require.True(t, modified) @@ -37,27 +41,31 @@ func TestMigrate16(t *testing.T) { InsertInto(schema.Table("transactions")). Cols("id", "timestamp", "postings", "metadata"). Values(0, now.Format(time.RFC3339), `[ - {"source": "world", "destination": "bank", "asset": "USD", "amount": 100}, - {"source": "bank", "destination": "user", "asset": "USD", "amount": 100} + {"source": "world", "destination": "users:001", "asset": "USD", "amount": 100} ]`, "{}"). BuildWithFlavor(schema.Flavor()) _, err = schema.ExecContext(context.Background(), sqlq, args...) require.NoError(t, err) - modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[16]) + modified, err = sqlstorage.Migrate(context.Background(), schema, migrations[17]) require.NoError(t, err) require.True(t, modified) sqlq, args = sqlbuilder. - Select("sources", "destinations"). - From(schema.Table("transactions")). - Where("id = 0"). + Select("txid", "posting_index", "source", "destination"). + From(schema.Table("postings")). + Where("txid = 0"). BuildWithFlavor(schema.Flavor()) row := store.Schema().QueryRowContext(context.Background(), sqlq, args...) require.NoError(t, row.Err()) - var sources, destinations string - require.NoError(t, err, row.Scan(&sources, &destinations)) - require.Equal(t, "world;bank", sources) - require.Equal(t, "bank;user", destinations) + + var txid uint64 + var postingIndex int + var source, destination string + require.NoError(t, err, row.Scan(&txid, &postingIndex, &source, &destination)) + require.Equal(t, uint64(0), txid) + require.Equal(t, 0, postingIndex) + require.Equal(t, `["world"]`, source) + require.Equal(t, `["users", "001"]`, destination) } diff --git a/pkg/storage/sqlstorage/migrates/17-optimized-segments/postgres.sql b/pkg/storage/sqlstorage/migrates/17-optimized-segments/postgres.sql new file mode 100644 index 000000000..300545b42 --- /dev/null +++ b/pkg/storage/sqlstorage/migrates/17-optimized-segments/postgres.sql @@ -0,0 +1,36 @@ +--statement +drop trigger if exists log_entry on "VAR_LEDGER_NAME".log; +--statement +drop trigger if exists volumes_changed on "VAR_LEDGER_NAME".log; + +--statement +alter table "VAR_LEDGER_NAME".transactions drop column if exists sources; +--statement +alter table "VAR_LEDGER_NAME".transactions drop column if exists destinations; +--statement +drop index if exists transactions_sources; +--statement +drop index if exists transactions_destinations; + +--statement +create table if not exists "VAR_LEDGER_NAME".postings ( + txid bigint, + posting_index integer, + source jsonb, + destination jsonb +); + +--statement +create index postings_src on "VAR_LEDGER_NAME".postings using GIN(source); +--statement +create index postings_dest on "VAR_LEDGER_NAME".postings using GIN(destination); +--statement +create index postings_txid on "VAR_LEDGER_NAME".postings (txid asc); + +--statement +insert into "VAR_LEDGER_NAME".postings(txid, posting_index, source, destination) +select txs.id as txid, i - 1 as posting_index, + array_to_json(string_to_array(t.posting->>'source', ':'))::jsonb as source, + array_to_json(string_to_array(t.posting->>'destination', ':'))::jsonb as destination +from "VAR_LEDGER_NAME".transactions txs left join lateral jsonb_array_elements(txs.postings) +with ordinality as t(posting, i) on true; diff --git a/pkg/storage/sqlstorage/transactions.go b/pkg/storage/sqlstorage/transactions.go index 734ad601f..f730ed5d0 100644 --- a/pkg/storage/sqlstorage/transactions.go +++ b/pkg/storage/sqlstorage/transactions.go @@ -6,6 +6,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "regexp" "strings" "time" @@ -17,46 +18,96 @@ import ( "github.com/pkg/errors" ) +// this regexp is used to distinguish between deprecated regex queries for +// source, destination and account params and the new wildcard query +// which allows segmented address pattern matching, e.g; "foo:bar:*" +var addressQueryRegexp = regexp.MustCompile(`^(\w+|\*|\.\*)(:(\w+|\*|\.\*))*$`) + func (s *Store) buildTransactionsQuery(flavor Flavor, p ledger.TransactionsQuery) (*sqlbuilder.SelectBuilder, TxsPaginationToken) { sb := sqlbuilder.NewSelectBuilder() t := TxsPaginationToken{} var ( - destination = p.Filters.Destination - source = p.Filters.Source - account = p.Filters.Account - reference = p.Filters.Reference - startTime = p.Filters.StartTime - endTime = p.Filters.EndTime - metadata = p.Filters.Metadata - regexOperator = "~" + destination = p.Filters.Destination + source = p.Filters.Source + account = p.Filters.Account + reference = p.Filters.Reference + startTime = p.Filters.StartTime + endTime = p.Filters.EndTime + metadata = p.Filters.Metadata ) - if flavor == SQLite { - regexOperator = "REGEXP" - } - sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes") + sb.Select("id", "timestamp", "reference", "metadata", "postings", "pre_commit_volumes", "post_commit_volumes"). + Distinct() sb.From(s.schema.Table("transactions")) - if account != "" { - r := fmt.Sprintf("(^|;)%s($|;)", account) - arg := sb.Args.Add(r) - sb.Where(sb.Or( - fmt.Sprintf("sources %s %s", regexOperator, arg), - fmt.Sprintf("destinations %s %s", regexOperator, arg), + if (source != "" || destination != "" || account != "") && flavor == PostgreSQL { + // new wildcard handling + sb.Join(fmt.Sprintf( + "%s postings on postings.txid = %s.id", + s.schema.Table("postings"), + s.schema.Table("transactions"), )) - t.AccountFilter = account } if source != "" { - r := fmt.Sprintf("(^|;)%s($|;)", source) - arg := sb.Args.Add(r) - sb.Where(fmt.Sprintf("sources %s %s", regexOperator, arg)) - t.SourceFilter = source + if !addressQueryRegexp.MatchString(source) || flavor == SQLite { + // deprecated regex handling + arg := sb.Args.Add(source) + sb.Where(s.schema.Table("use_account_as_source") + "(postings, " + arg + ")") + t.SourceFilter = source + } else { + // new wildcard handling + src := strings.Split(source, ":") + sb.Where(fmt.Sprintf("jsonb_array_length(postings.source) = %d", len(src))) + + for i, segment := range src { + if segment == ".*" || segment == "*" || segment == "" { + continue + } + + arg := sb.Args.Add(segment) + sb.Where(fmt.Sprintf("postings.source @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath", i, arg)) + } + } } if destination != "" { - r := fmt.Sprintf("(^|;)%s($|;)", destination) - arg := sb.Args.Add(r) - sb.Where(fmt.Sprintf("destinations %s %s", regexOperator, arg)) - t.DestinationFilter = destination + if !addressQueryRegexp.MatchString(destination) || flavor == SQLite { + // deprecated regex handling + arg := sb.Args.Add(destination) + sb.Where(s.schema.Table("use_account_as_destination") + "(postings, " + arg + ")") + t.DestinationFilter = destination + } else { + // new wildcard handling + dst := strings.Split(destination, ":") + sb.Where(fmt.Sprintf("jsonb_array_length(postings.destination) = %d", len(dst))) + for i, segment := range dst { + if segment == ".*" || segment == "*" || segment == "" { + continue + } + + arg := sb.Args.Add(segment) + sb.Where(fmt.Sprintf("postings.destination @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath", i, arg)) + } + } + } + if account != "" { + if !addressQueryRegexp.MatchString(account) || flavor == SQLite { + // deprecated regex handling + arg := sb.Args.Add(account) + sb.Where(s.schema.Table("use_account") + "(postings, " + arg + ")") + t.AccountFilter = account + } else { + // new wildcard handling + dst := strings.Split(account, ":") + sb.Where(fmt.Sprintf("(jsonb_array_length(postings.destination) = %d OR jsonb_array_length(postings.source) = %d)", len(dst), len(dst))) + for i, segment := range dst { + if segment == ".*" || segment == "*" || segment == "" { + continue + } + + arg := sb.Args.Add(segment) + sb.Where(fmt.Sprintf("(postings.source @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath OR postings.destination @@ ('$[%d] == \"' || %s::text || '\"')::jsonpath)", i, arg, i, arg)) + } + } } if reference != "" { sb.Where(sb.E("reference", reference)) @@ -218,7 +269,6 @@ func (s *Store) GetTransaction(ctx context.Context, txId uint64) (*core.Expanded } return nil, err } - tx.Timestamp = tx.Timestamp.UTC() tx.Reference = ref.String @@ -269,7 +319,6 @@ func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransacti } return nil, err } - tx.Timestamp = tx.Timestamp.UTC() tx.Reference = ref.String @@ -277,17 +326,21 @@ func (s *Store) GetLastTransaction(ctx context.Context) (*core.ExpandedTransacti } func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTransaction) error { - var ( - query string - args []interface{} - ) + var queryTxs string + var argsTxs []any + + executor, err := s.executorProvider(ctx) + if err != nil { + return err + } switch s.Schema().Flavor() { case sqlbuilder.SQLite: - ib := sqlbuilder.NewInsertBuilder() - ib.InsertInto(s.schema.Table("transactions")) - ib.Cols("id", "timestamp", "reference", "postings", "metadata", - "pre_commit_volumes", "post_commit_volumes", "sources", "destinations") + ibTxs := sqlbuilder.NewInsertBuilder() + ibTxs.InsertInto(s.schema.Table("transactions")) + ibTxs.Cols("id", "timestamp", "reference", "postings", "metadata", + "pre_commit_volumes", "post_commit_volumes") + for _, tx := range txs { postingsData, err := json.Marshal(tx.Postings) if err != nil { @@ -311,14 +364,6 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran if err != nil { panic(err) } - sources := "" - destinations := "" - for _, p := range tx.Postings { - sources = fmt.Sprintf("%s;%s", sources, p.Source) - destinations = fmt.Sprintf("%s;%s", destinations, p.Destination) - } - sources = sources[1:] - destinations = destinations[1:] var reference *string if tx.Reference != "" { @@ -326,21 +371,25 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran reference = &cp } - ib.Values(tx.ID, tx.Timestamp, reference, postingsData, - metadataData, preCommitVolumesData, postCommitVolumesData, - sources, destinations) + ibTxs.Values(tx.ID, tx.Timestamp, reference, postingsData, + metadataData, preCommitVolumesData, postCommitVolumesData) } - query, args = ib.BuildWithFlavor(s.schema.Flavor()) + + queryTxs, argsTxs = ibTxs.BuildWithFlavor(s.schema.Flavor()) + case sqlbuilder.PostgreSQL: - ids := make([]uint64, len(txs)) + txIds := make([]uint64, len(txs)) timestamps := make([]time.Time, len(txs)) references := make([]*string, len(txs)) postingDataSet := make([]string, len(txs)) metadataDataSet := make([]string, len(txs)) preCommitVolumesDataSet := make([]string, len(txs)) postCommitVolumesDataSet := make([]string, len(txs)) - sources := make([]string, len(txs)) - destinations := make([]string, len(txs)) + + postingTxIds := []uint64{} + postingIndices := []int{} + sources := []string{} + destinations := []string{} for i, tx := range txs { postingsData, err := json.Marshal(tx.Postings) @@ -366,64 +415,78 @@ func (s *Store) insertTransactions(ctx context.Context, txs ...core.ExpandedTran panic(err) } - computedSources := "" - for _, p := range tx.Postings { - computedSources = fmt.Sprintf("%s;%s", computedSources, p.Source) - } - computedSources = computedSources[1:] // Strip leading ; - - computedDestinations := "" - for _, p := range tx.Postings { - computedDestinations = fmt.Sprintf("%s;%s", computedDestinations, p.Destination) - } - computedDestinations = computedDestinations[1:] - - ids[i] = tx.ID + txIds[i] = tx.ID timestamps[i] = tx.Timestamp postingDataSet[i] = string(postingsData) metadataDataSet[i] = string(metadataData) preCommitVolumesDataSet[i] = string(preCommitVolumesData) postCommitVolumesDataSet[i] = string(postCommitVolumesData) references[i] = nil - sources[i] = computedSources - destinations[i] = computedDestinations if tx.Reference != "" { cp := tx.Reference references[i] = &cp } + + for i, p := range tx.Postings { + sourcesBy, err := json.Marshal(strings.Split(p.Source, ":")) + if err != nil { + panic(err) + } + destinationsBy, err := json.Marshal(strings.Split(p.Destination, ":")) + if err != nil { + panic(err) + } + postingTxIds = append(postingTxIds, tx.ID) + postingIndices = append(postingIndices, i) + sources = append(sources, string(sourcesBy)) + destinations = append(destinations, string(destinationsBy)) + } } - query = fmt.Sprintf( - `INSERT INTO "%s".transactions (id, timestamp, reference, postings, metadata, pre_commit_volumes, - post_commit_volumes, sources, destinations) (SELECT * FROM unnest( + queryTxs = fmt.Sprintf( + `INSERT INTO "%s".transactions (id, timestamp, reference, + postings, metadata, + pre_commit_volumes, + post_commit_volumes) (SELECT * FROM unnest( $1::int[], $2::timestamp[], $3::varchar[], $4::jsonb[], $5::jsonb[], $6::jsonb[], - $7::jsonb[], - $8::varchar[], - $9::varchar[]))`, + $7::jsonb[]))`, s.schema.Name()) - args = []interface{}{ - ids, timestamps, references, postingDataSet, - metadataDataSet, preCommitVolumesDataSet, postCommitVolumesDataSet, - sources, destinations, + argsTxs = []any{ + txIds, timestamps, references, + postingDataSet, metadataDataSet, + preCommitVolumesDataSet, postCommitVolumesDataSet, } - } - sharedlogging.GetLogger(ctx).Debugf("ExecContext: %s %s", query, args) + queryPostings := fmt.Sprintf( + `INSERT INTO "%s".postings (txid, posting_index, + source, destination) (SELECT * FROM unnest( + $1::int[], + $2::int[], + $3::jsonb[], + $4::jsonb[]))`, + s.schema.Name()) + argsPostings := []any{ + postingTxIds, postingIndices, sources, destinations, + } - executor, err := s.executorProvider(ctx) - if err != nil { - return err + sharedlogging.GetLogger(ctx).Debugf("ExecContext: %s %s", queryPostings, argsPostings) + _, err = executor.ExecContext(ctx, queryPostings, argsPostings...) + if err != nil { + return s.error(err) + } } - _, err = executor.ExecContext(ctx, query, args...) + sharedlogging.GetLogger(ctx).Debugf("ExecContext: %s %s", queryTxs, argsTxs) + _, err = executor.ExecContext(ctx, queryTxs, argsTxs...) if err != nil { return s.error(err) } + return nil }