Skip to content

Commit

Permalink
fix: performances
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag committed Dec 18, 2024
1 parent 1c9802f commit 3bafdbd
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 53 deletions.
130 changes: 78 additions & 52 deletions internal/storage/ledger/resource_aggregated_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ledger

import (
"errors"
"fmt"
ledger "github.com/formancehq/ledger/internal"
ledgercontroller "github.com/formancehq/ledger/internal/controller/ledger"
"github.com/formancehq/ledger/pkg/features"
Expand All @@ -22,22 +23,32 @@ func (h aggregatedBalancesResourceRepositoryHandler) filters() []filter {
},
{
name: "metadata",
validators: []propertyValidator{
acceptOperators("$exists"),
matchers: []func(string) bool{
func(key string) bool {
return key == "metadata" || metadataRegex.Match([]byte(key))
},
},
},
{
name: `metadata\[.*]`,
validators: []propertyValidator{
acceptOperators("$match"),
propertyValidatorFunc(func(l ledger.Ledger, operator string, key string, value any) error {
if key == "metadata" {
if operator != "$exists" {
return fmt.Errorf("unsupported operator %s for metadata", operator)
}
return nil
}
if operator != "$match" {
return fmt.Errorf("unsupported operator %s for metadata", operator)
}
return nil
}),
},
},
}
}

func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store, query repositoryHandlerBuildContext[ledgercontroller.GetAggregatedVolumesOptions]) (*bun.SelectQuery, error) {

if query.PIT != nil && !query.PIT.IsZero() {
if query.UsePIT() {
ret := store.db.NewSelect().
ModelTableExpr(store.GetPrefixedRelationName("moves")).
DistinctOn("accounts_address, asset").
Expand All @@ -48,75 +59,90 @@ func (h aggregatedBalancesResourceRepositoryHandler) buildDataset(store *Store,
return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistory)
}

return ret.
ret = ret.
ColumnExpr("first_value(post_commit_volumes) over (partition by (accounts_address, asset) order by seq desc) as volumes").
Where("insertion_date <= ?", query.PIT), nil
Where("insertion_date <= ?", query.PIT)
} else {
if !store.ledger.HasFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes, "SYNC") {
return nil, ledgercontroller.NewErrMissingFeature(features.FeatureMovesHistoryPostCommitEffectiveVolumes)
}

return ret.
ret = ret.
ColumnExpr("first_value(post_commit_effective_volumes) over (partition by (accounts_address, asset) order by effective_date desc, seq desc) as volumes").
Where("effective_date <= ?", query.PIT), nil
Where("effective_date <= ?", query.PIT)
}

if query.useFilter("address", isPartialAddress) {
subQuery := store.db.NewSelect().
TableExpr(store.GetPrefixedRelationName("accounts")).
Column("address_array").
Where("accounts.address = accounts_address").
Where("ledger = ?", store.ledger.Name)

ret = ret.
ColumnExpr("accounts.address_array as accounts_address_array").
Join(`join lateral (?) accounts on true`, subQuery)
}

if query.useFilter("metadata") {
subQuery := store.db.NewSelect().
DistinctOn("accounts_address").
ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")).
ColumnExpr("first_value(metadata) over (partition by accounts_address order by revision desc) as metadata").
Where("ledger = ?", store.ledger.Name).
Where("accounts_metadata.accounts_address = moves.accounts_address").
Where("date <= ?", query.PIT)

ret = ret.
Join(`left join lateral (?) accounts_metadata on true`, subQuery).
Column("metadata")
}

return ret, nil
} else {
return store.db.NewSelect().
ret := store.db.NewSelect().
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Column("asset", "accounts_address").
ColumnExpr("(input, output)::"+store.GetPrefixedRelationName("volumes")+" as volumes").
Where("ledger = ?", store.ledger.Name), nil
Where("ledger = ?", store.ledger.Name)

if query.useFilter("metadata") || query.useFilter("address", isPartialAddress) {
subQuery := store.db.NewSelect().
TableExpr(store.GetPrefixedRelationName("accounts")).
Column("address").
Where("ledger = ?", store.ledger.Name).
Where("accounts.address = accounts_address")

if query.useFilter("address") {
subQuery = subQuery.ColumnExpr("address_array as accounts_address_array")
ret = ret.Column("accounts_address_array")
}
if query.useFilter("metadata") {
subQuery = subQuery.ColumnExpr("metadata")
ret = ret.Column("metadata")
}

ret = ret.
Join(`join lateral (?) accounts on true`, subQuery)
}

return ret, nil
}
}

func (h aggregatedBalancesResourceRepositoryHandler) resolveFilter(store *Store, query ledgercontroller.ResourceQuery[ledgercontroller.GetAggregatedVolumesOptions], operator, property string, value any) (string, []any, error) {
switch {
case property == "address":
address := value.(string)
if isPartialAddress(address) {
return store.db.NewSelect().
TableExpr(store.GetPrefixedRelationName("accounts")).
ColumnExpr("true").
Where(filterAccountAddress(address, "address")).
Where("address = dataset.accounts_address").
String(), []any{}, nil
}

return "accounts_address = ?", []any{address}, nil
return filterAccountAddress(value.(string), "accounts_address"), nil, nil
case metadataRegex.Match([]byte(property)) || property == "metadata":
var selectMetadata *bun.SelectQuery
if store.ledger.HasFeature(features.FeatureAccountMetadataHistory, "SYNC") && query.PIT != nil && !query.PIT.IsZero() {
selectMetadata = store.db.NewSelect().
DistinctOn("accounts_address").
ModelTableExpr(store.GetPrefixedRelationName("accounts_metadata")).
Where("accounts_address = dataset.accounts_address").
Order("accounts_address", "revision desc")

if query.PIT != nil && !query.PIT.IsZero() {
selectMetadata = selectMetadata.Where("date <= ?", query.PIT)
}
if property == "metadata" {
return "metadata -> ? is not null", []any{value}, nil
} else {
selectMetadata = store.db.NewSelect().
ModelTableExpr(store.GetPrefixedRelationName("accounts")).
Where("address = dataset.accounts_address")
}
selectMetadata = selectMetadata.
Where("ledger = ?", store.ledger.Name).
Column("metadata").
Limit(1)

switch {
case metadataRegex.Match([]byte(property)):
match := metadataRegex.FindAllStringSubmatch(property, 3)

return "(?) @> ?", []any{selectMetadata, map[string]any{
return "metadata @> ?", []any{map[string]any{
match[0][1]: value,
}}, nil

case property == "metadata":
return "(?) -> ? is not null", []any{selectMetadata, value}, nil
default:
panic("unreachable")
}
default:
return "", nil, ledgercontroller.NewErrInvalidQuery("unknown key '%s' when building query", property)
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/ledger/resource_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func (h volumesResourceHandler) buildDataset(store *Store, query repositoryHandl
needAddressSegments := query.useFilter("address", isPartialAddress)
if !query.UsePIT() && !query.UseOOT() {
selectVolumes = store.db.NewSelect().
DistinctOn("accounts_address, asset").
Column("accounts_address", "asset", "input", "output").
ColumnExpr("input - output as balance").
ModelTableExpr(store.GetPrefixedRelationName("accounts_volumes")).
Expand Down Expand Up @@ -188,6 +187,8 @@ func (h volumesResourceHandler) aggregate(
query ledgercontroller.ResourceQuery[ledgercontroller.GetVolumesOptions],
selectQuery *bun.SelectQuery,
) (*bun.SelectQuery, error) {
selectQuery = selectQuery.DistinctOn("accounts_address, asset")

if query.Opts.GroupLvl == 0 {
return store.db.NewSelect().
ModelTableExpr("(?) data", selectQuery).
Expand Down

0 comments on commit 3bafdbd

Please sign in to comment.