From d16fdea90e5cd26e78adfd64db7c9da00e6f15d4 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Mon, 23 Jan 2023 16:08:19 +0100 Subject: [PATCH] chore: execution path refactoring (#410) * chore: first * chore: add bench in CI * wip * wip * wip * wip * wip * wip * chore: refactoring of Execute * wip * wip * wip * wip * wip * wip * wip * wip * wip * CI * chore: first * chore: execute GetVolumes optim * chore: execute query optim * chore: improve bench accuracy * fix: Execute volumes * fix: CI setup-task token * chore: add post transactions batch benchmark * fix: CI * chore: script compilation caching optimization * chore: script variables for postings optimization * fix: to fix: world is not recognized as such in Numscript variables * chore: cleanup --- .github/workflows/benchmarks.yaml | 2 + .github/workflows/main.yml | 15 + .github/workflows/template_sdk-generate.yaml | 2 + .github/workflows/template_sdk-publish.yaml | 2 + .gitignore | 1 + Taskfile.yaml | 14 + go.mod | 4 +- go.sum | 5 + pkg/api/controllers/script_controller.go | 6 +- pkg/api/controllers/transaction_controller.go | 56 +- .../transaction_controller_test.go | 588 ++++++++++++------ pkg/bus/monitor.go | 11 +- pkg/bus/monitor_test.go | 3 +- pkg/core/numscript.go | 94 ++- pkg/core/numscript_test.go | 40 +- pkg/core/posting.go | 21 + pkg/core/volumes.go | 57 +- pkg/ledger/executor.go | 343 +++++++--- pkg/ledger/executor_test.go | 511 +++++++++++---- pkg/ledger/ledger.go | 116 ++-- pkg/ledger/ledger_test.go | 259 +++++--- pkg/ledger/monitor.go | 5 +- pkg/ledger/process.go | 162 ----- pkg/ledger/process_test.go | 82 +-- pkg/ledger/storage.go | 1 + pkg/ledger/volume_agg.go | 152 ++--- pkg/ledger/volume_agg_test.go | 431 ++++++------- pkg/storage/sqlstorage/aggregations.go | 75 +++ 28 files changed, 1848 insertions(+), 1210 deletions(-) delete mode 100644 pkg/ledger/process.go diff --git a/.github/workflows/benchmarks.yaml b/.github/workflows/benchmarks.yaml index bd2cce244..ec4a1ef34 100644 --- a/.github/workflows/benchmarks.yaml +++ b/.github/workflows/benchmarks.yaml @@ -8,6 +8,8 @@ jobs: steps: - name: Install task uses: arduino/setup-task@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - uses: actions/checkout@v3 - uses: actions/setup-go@v3 with: diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 690255354..e27e2e167 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -30,6 +30,21 @@ jobs: Test_sqlite: uses: formancehq/gh-workflows/.github/workflows/golang-test.yml@main + Bench: + runs-on: ubuntu-latest + steps: + - name: Install task + uses: arduino/setup-task@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version-file: 'go.mod' + cache: true + - name: Run bench + run: task install:perf bench:ledger + GoReleaserBuild: if: github.event_name != 'release' name: 'GoReleaser Build' diff --git a/.github/workflows/template_sdk-generate.yaml b/.github/workflows/template_sdk-generate.yaml index 7f6f94054..d38bdcc4d 100644 --- a/.github/workflows/template_sdk-generate.yaml +++ b/.github/workflows/template_sdk-generate.yaml @@ -29,5 +29,7 @@ jobs: uses: docker/setup-buildx-action@v1 - name: Install Task uses: arduino/setup-task@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Launch Generate run: task sdk:generate VERSION=main -- ${{ matrix.sdk }} diff --git a/.github/workflows/template_sdk-publish.yaml b/.github/workflows/template_sdk-publish.yaml index ae77245f6..e15dff219 100644 --- a/.github/workflows/template_sdk-publish.yaml +++ b/.github/workflows/template_sdk-publish.yaml @@ -39,6 +39,8 @@ jobs: uses: docker/setup-buildx-action@v1 - name: Install Task uses: arduino/setup-task@v1 + with: + repo-token: ${{ secrets.GITHUB_TOKEN }} - name: Launch Generate run: task sdk:generate VERSION=${{ inputs.VERSION }} -- ${{ matrix.sdk }} - uses: stefanzweifel/git-auto-commit-action@v4 diff --git a/.gitignore b/.gitignore index c0d6541aa..8a2bc512f 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ sdk/sdks .vscode .env sqlstorage.test +ledger.test diff --git a/Taskfile.yaml b/Taskfile.yaml index 2b925ffe1..5a7c3c3ac 100644 --- a/Taskfile.yaml +++ b/Taskfile.yaml @@ -85,6 +85,20 @@ tasks: NUMARY_STORAGE_DRIVER: "postgres" NUMARY_STORAGE_POSTGRES_CONN_STRING: "postgresql://ledger:ledger@127.0.0.1/ledger" + bench:ledger: + deps: [postgres] + cmds: + - mkdir -p {{.BENCH_RESULTS_DIR}} + - > + go test {{.TAGS}} ./pkg/ledger + -run=XXX -bench=BenchmarkLedger_{{.RUN}} -benchmem -benchtime=30s -timeout 1h + -cpuprofile {{.BENCH_CPU_PROFILE}} -memprofile {{.BENCH_MEM_PROFILE}} + | tee {{.BENCH_RESULTS_FILE}} + - benchstat {{.BENCH_RESULTS_FILE}} + env: + NUMARY_STORAGE_DRIVER: "postgres" + NUMARY_STORAGE_POSTGRES_CONN_STRING: "postgresql://ledger:ledger@127.0.0.1/ledger" + bench:cpu: cmds: - go tool pprof -http=":" {{.BENCH_CPU_PROFILE}} diff --git a/go.mod b/go.mod index f50220d78..68a7c5c8b 100755 --- a/go.mod +++ b/go.mod @@ -61,7 +61,7 @@ require ( github.com/docker/docker v20.10.7+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect - github.com/dustin/go-humanize v1.0.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/eapache/go-resiliency v1.3.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 // indirect github.com/eapache/queue v1.1.0 // indirect @@ -148,7 +148,7 @@ require ( go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/net v0.4.0 // indirect - golang.org/x/sys v0.3.0 // indirect + golang.org/x/sys v0.4.0 // indirect golang.org/x/text v0.5.0 // indirect google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect google.golang.org/grpc v1.51.0 // indirect diff --git a/go.sum b/go.sum index dd60d21c9..7dad05146 100755 --- a/go.sum +++ b/go.sum @@ -78,6 +78,7 @@ github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4r github.com/cenkalti/backoff/v4 v4.2.0 h1:HN5dHm3WBOgndBH6E8V0q2jIYIR3s9yglV8k/+MN3u4= github.com/cenkalti/backoff/v4 v4.2.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -130,6 +131,8 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= @@ -832,6 +835,8 @@ golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.4.0 h1:Zr2JFtRQNX3BCZ8YtxRE9hNJYC8J6I1MVbMg6owUp18= +golang.org/x/sys v0.4.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= diff --git a/pkg/api/controllers/script_controller.go b/pkg/api/controllers/script_controller.go index e1e9953e4..842d2e378 100644 --- a/pkg/api/controllers/script_controller.go +++ b/pkg/api/controllers/script_controller.go @@ -36,7 +36,7 @@ func (ctl *ScriptController) PostScript(c *gin.Context) { preview := ok && (strings.ToUpper(value) == "YES" || strings.ToUpper(value) == "TRUE" || value == "1") res := ScriptResponse{} - txs, err := l.(*ledger.Ledger).Execute(c.Request.Context(), preview, script) + execRes, err := l.(*ledger.Ledger).Execute(c.Request.Context(), false, preview, script) if err != nil { var ( code = apierrors.ErrInternal @@ -60,8 +60,8 @@ func (ctl *ScriptController) PostScript(c *gin.Context) { res.Details = apierrors.EncodeLink(message) } } - if len(txs) > 0 { - res.Transaction = &txs[0] + if len(execRes) > 0 { + res.Transaction = &execRes[0] } c.JSON(http.StatusOK, res) diff --git a/pkg/api/controllers/transaction_controller.go b/pkg/api/controllers/transaction_controller.go index a509b1ea7..c65e8d7e2 100644 --- a/pkg/api/controllers/transaction_controller.go +++ b/pkg/api/controllers/transaction_controller.go @@ -15,6 +15,7 @@ import ( "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/ledger" "github.com/numary/ledger/pkg/storage/sqlstorage" + "github.com/pkg/errors" ) type TransactionController struct{} @@ -179,30 +180,38 @@ func (ctl *TransactionController) PostTransaction(c *gin.Context) { return } - script := core.ScriptData{ - Script: payload.Script, - Timestamp: payload.Timestamp, - Reference: payload.Reference, - Metadata: payload.Metadata, - } + var res []core.ExpandedTransaction + var err error - if len(payload.Postings) > 0 { + if len(payload.Postings) > 0 && payload.Script.Plain != "" || + len(payload.Postings) == 0 && payload.Script.Plain == "" { + apierrors.ResponseError(c, ledger.NewValidationError( + "invalid payload: should contain either postings or script")) + return + } else if len(payload.Postings) > 0 { + if i, err := payload.Postings.Validate(); err != nil { + apierrors.ResponseError(c, ledger.NewValidationError(errors.Wrap(err, + fmt.Sprintf("invalid posting %d", i)).Error())) + return + } txData := core.TransactionData{ Postings: payload.Postings, Timestamp: payload.Timestamp, Reference: payload.Reference, Metadata: payload.Metadata, } - i, err := l.(*ledger.Ledger).ValidatePostings(c.Request.Context(), txData) - if err != nil { - apierrors.ResponseError(c, ledger.NewTransactionCommitError(i, err)) - return + res, err = l.(*ledger.Ledger).Execute(c.Request.Context(), + true, preview, core.TxsToScriptsData(txData)...) + } else { + script := core.ScriptData{ + Script: payload.Script, + Timestamp: payload.Timestamp, + Reference: payload.Reference, + Metadata: payload.Metadata, } - postingsScript := core.TxsToScriptsData(txData)[0] - script.Plain = postingsScript.Plain + script.Plain + res, err = l.(*ledger.Ledger).Execute(c.Request.Context(), + false, preview, script) } - - res, err := l.(*ledger.Ledger).Execute(c.Request.Context(), preview, script) if err != nil { apierrors.ResponseError(c, err) return @@ -291,13 +300,20 @@ func (ctl *TransactionController) PostTransactionsBatch(c *gin.Context) { return } - i, err := l.(*ledger.Ledger).ValidatePostings(c.Request.Context(), txs.Transactions...) - if err != nil { - apierrors.ResponseError(c, ledger.NewTransactionCommitError(i, err)) - return + for i, tx := range txs.Transactions { + if len(tx.Postings) == 0 { + apierrors.ResponseError(c, ledger.NewValidationError(errors.New(fmt.Sprintf( + "invalid transaction %d: no postings", i)).Error())) + return + } + if j, err := tx.Postings.Validate(); err != nil { + apierrors.ResponseError(c, ledger.NewValidationError(errors.Wrap(err, + fmt.Sprintf("invalid transaction %d: posting %d", i, j)).Error())) + return + } } - res, err := l.(*ledger.Ledger).Execute(c.Request.Context(), false, + res, err := l.(*ledger.Ledger).Execute(c.Request.Context(), true, false, core.TxsToScriptsData(txs.Transactions...)...) if err != nil { apierrors.ResponseError(c, err) diff --git a/pkg/api/controllers/transaction_controller_test.go b/pkg/api/controllers/transaction_controller_test.go index ea8ad4e57..ac5e0510f 100644 --- a/pkg/api/controllers/transaction_controller_test.go +++ b/pkg/api/controllers/transaction_controller_test.go @@ -37,21 +37,11 @@ func TestPostTransactions(t *testing.T) { expectedErr sharedapi.ErrorResponse } - var now = time.Now().Round(time.Second).UTC() + var timestamp1 = time.Now().Add(1 * time.Minute).Truncate(time.Second) + var timestamp2 = time.Now().Add(2 * time.Minute).Truncate(time.Second) + var timestamp3 = time.Now().Add(3 * time.Minute).Truncate(time.Second) testCases := []testCase{ - { - name: "no postings or script", - payload: []controllers.PostTransaction{ - {}, - }, - expectedStatusCode: http.StatusBadRequest, - expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrScriptNoScript, - ErrorMessage: "[NO_SCRIPT] no script to execute", - Details: apierrors.EncodeLink("no script to execute"), - }, - }, { name: "postings nominal", payload: []controllers.PostTransaction{ @@ -116,6 +106,95 @@ func TestPostTransactions(t *testing.T) { }}, }, }, + { + name: "script nominal", + payload: []controllers.PostTransaction{{ + Script: core.Script{ + Plain: ` + vars { + account $acc + } + send [COIN 100] ( + source = @world + destination = @centralbank + ) + send [COIN 100] ( + source = @centralbank + destination = $acc + )`, + Vars: map[string]json.RawMessage{ + "acc": json.RawMessage(`"users:001"`), + }, + }, + }}, + expectedStatusCode: http.StatusOK, + expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ + Data: &[]core.ExpandedTransaction{{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: core.Postings{ + { + Source: "world", + Destination: "centralbank", + Amount: core.NewMonetaryInt(100), + Asset: "COIN", + }, + { + Source: "centralbank", + Destination: "users:001", + Amount: core.NewMonetaryInt(100), + Asset: "COIN", + }, + }, + }, + }, + }}, + }, + }, + { + name: "script with set_account_meta", + payload: []controllers.PostTransaction{{ + Script: core.Script{ + Plain: ` + send [TOK 1000] ( + source = @world + destination = @bar + ) + set_account_meta(@bar, "foo", "bar") + `, + }, + }}, + expectedStatusCode: http.StatusOK, + expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ + Data: &[]core.ExpandedTransaction{{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: core.Postings{ + { + Source: "world", + Destination: "bar", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", + }, + }, + }, + }, + }}, + }, + }, + { + name: "no postings or script", + payload: []controllers.PostTransaction{ + {}, + }, + expectedStatusCode: http.StatusBadRequest, + expectedErr: sharedapi.ErrorResponse{ + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid payload: should contain either postings or script", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid payload: should contain either postings or script", + }, + }, { name: "postings negative amount", payload: []controllers.PostTransaction{ @@ -132,8 +211,10 @@ func TestPostTransactions(t *testing.T) { }, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrValidation, - ErrorMessage: "processing tx 0: negative amount", + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid posting 0: negative amount", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid posting 0: negative amount", }, }, { @@ -152,8 +233,10 @@ func TestPostTransactions(t *testing.T) { }, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrValidation, - ErrorMessage: "processing tx 0: invalid asset", + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid posting 0: invalid asset", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid posting 0: invalid asset", }, }, { @@ -172,8 +255,10 @@ func TestPostTransactions(t *testing.T) { }, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrValidation, - ErrorMessage: "processing tx 0: invalid asset", + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid posting 0: invalid asset", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid posting 0: invalid asset", }, }, { @@ -192,8 +277,10 @@ func TestPostTransactions(t *testing.T) { }, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrValidation, - ErrorMessage: "processing tx 0: invalid destination address", + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid posting 0: invalid destination address", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid posting 0: invalid destination address", }, }, { @@ -212,8 +299,11 @@ func TestPostTransactions(t *testing.T) { }, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "processing tx 0: balance.insufficient.TOK", + ErrorCode: apierrors.ErrInsufficientFund, + ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", + Details: apierrors.EncodeLink("account had insufficient funds"), + ErrorCodeDeprecated: apierrors.ErrInsufficientFund, + ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", }, }, { @@ -244,149 +334,10 @@ func TestPostTransactions(t *testing.T) { }, expectedStatusCode: http.StatusConflict, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrConflict, - ErrorMessage: "conflict error on reference", - }, - }, - { - name: "postings with specified timestamp", - payload: []controllers.PostTransaction{ - { - Postings: core.Postings{ - { - Source: "world", - Destination: "bar", - Amount: core.NewMonetaryInt(1000), - Asset: "TOK", - }, - }, - Timestamp: now, - }, - }, - expectedStatusCode: http.StatusOK, - expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ - Data: &[]core.ExpandedTransaction{{ - Transaction: core.Transaction{ - TransactionData: core.TransactionData{ - Postings: core.Postings{ - { - Source: "world", - Destination: "bar", - Amount: core.NewMonetaryInt(1000), - Asset: "TOK", - }, - }, - }, - }, - }}, - }, - }, - { - name: "postings with specified timestamp prior to last tx", - payload: []controllers.PostTransaction{ - { - Postings: core.Postings{ - { - Source: "world", - Destination: "bar", - Amount: core.NewMonetaryInt(1000), - Asset: "TOK", - }, - }, - Timestamp: now, - }, - { - Postings: core.Postings{ - { - Source: "world", - Destination: "bar", - Amount: core.NewMonetaryInt(1000), - Asset: "TOK", - }, - }, - Timestamp: now.Add(-time.Second), - }, - }, - expectedStatusCode: http.StatusBadRequest, - expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrValidation, - ErrorMessage: "processing tx 0: cannot pass a date prior to the last transaction", - }, - }, - { - name: "script nominal", - payload: []controllers.PostTransaction{{ - Script: core.Script{ - Plain: ` - vars { - account $acc - } - send [COIN 100] ( - source = @world - destination = @centralbank - ) - send [COIN 100] ( - source = @centralbank - destination = $acc - )`, - Vars: map[string]json.RawMessage{ - "acc": json.RawMessage(`"users:001"`), - }, - }, - }}, - expectedStatusCode: http.StatusOK, - expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ - Data: &[]core.ExpandedTransaction{{ - Transaction: core.Transaction{ - TransactionData: core.TransactionData{ - Postings: core.Postings{ - { - Source: "world", - Destination: "centralbank", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", - }, - { - Source: "centralbank", - Destination: "users:001", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", - }, - }, - }, - }, - }}, - }, - }, - { - name: "script with set_account_meta", - payload: []controllers.PostTransaction{{ - Script: core.Script{ - Plain: ` - send [TOK 1000] ( - source = @world - destination = @bar - ) - set_account_meta(@bar, "foo", "bar") - `, - }, - }}, - expectedStatusCode: http.StatusOK, - expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ - Data: &[]core.ExpandedTransaction{{ - Transaction: core.Transaction{ - TransactionData: core.TransactionData{ - Postings: core.Postings{ - { - Source: "world", - Destination: "bar", - Amount: core.NewMonetaryInt(1000), - Asset: "TOK", - }, - }, - }, - }, - }}, + ErrorCode: apierrors.ErrConflict, + ErrorMessage: "conflict error on reference", + ErrorCodeDeprecated: apierrors.ErrConflict, + ErrorMessageDeprecated: "conflict error on reference", }, }, { @@ -402,9 +353,11 @@ func TestPostTransactions(t *testing.T) { }}, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", - Details: apierrors.EncodeLink("account had insufficient funds"), + ErrorCode: apierrors.ErrInsufficientFund, + ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", + Details: apierrors.EncodeLink("account had insufficient funds"), + ErrorCodeDeprecated: apierrors.ErrInsufficientFund, + ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", }, }, { @@ -425,9 +378,11 @@ func TestPostTransactions(t *testing.T) { }}, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrScriptMetadataOverride, - ErrorMessage: "[METADATA_OVERRIDE] cannot override metadata from script", - Details: apierrors.EncodeLink("cannot override metadata from script"), + ErrorCode: apierrors.ErrScriptMetadataOverride, + ErrorMessage: "[METADATA_OVERRIDE] cannot override metadata from script", + Details: apierrors.EncodeLink("cannot override metadata from script"), + ErrorCodeDeprecated: apierrors.ErrScriptMetadataOverride, + ErrorMessageDeprecated: "[METADATA_OVERRIDE] cannot override metadata from script", }, }, { @@ -441,8 +396,10 @@ func TestPostTransactions(t *testing.T) { }}, expectedStatusCode: http.StatusBadRequest, expectedErr: sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrValidation, - ErrorMessage: "transaction has no postings", + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "transaction has no postings", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "transaction has no postings", }, }, { @@ -491,6 +448,29 @@ func TestPostTransactions(t *testing.T) { )`, }}, }, + expectedStatusCode: http.StatusBadRequest, + expectedErr: sharedapi.ErrorResponse{ + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid payload: should contain either postings or script", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid payload: should contain either postings or script", + }, + }, + { + name: "postings with specified timestamp", + payload: []controllers.PostTransaction{ + { + Postings: core.Postings{ + { + Source: "world", + Destination: "bar", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", + }, + }, + Timestamp: timestamp2, + }, + }, expectedStatusCode: http.StatusOK, expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ Data: &[]core.ExpandedTransaction{{ @@ -499,15 +479,40 @@ func TestPostTransactions(t *testing.T) { Postings: core.Postings{ { Source: "world", - Destination: "alice", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", + Destination: "bar", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", }, + }, + }, + }, + }}, + }, + }, + { + name: "script with specified timestamp", + payload: []controllers.PostTransaction{{ + Script: core.Script{ + Plain: ` + send [TOK 1000] ( + source = @world + destination = @bar + ) + `, + }, + Timestamp: timestamp3, + }}, + expectedStatusCode: http.StatusOK, + expectedRes: sharedapi.BaseResponse[[]core.ExpandedTransaction]{ + Data: &[]core.ExpandedTransaction{{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: core.Postings{ { Source: "world", - Destination: "bob", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", + Destination: "bar", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", }, }, }, @@ -515,6 +520,51 @@ func TestPostTransactions(t *testing.T) { }}, }, }, + { + name: "postings with specified timestamp prior to last tx", + payload: []controllers.PostTransaction{ + { + Postings: core.Postings{ + { + Source: "world", + Destination: "bar", + Amount: core.NewMonetaryInt(1000), + Asset: "TOK", + }, + }, + Timestamp: timestamp1, + }, + }, + expectedStatusCode: http.StatusBadRequest, + expectedErr: sharedapi.ErrorResponse{ + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "cannot pass a timestamp prior to the last transaction:", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "cannot pass a timestamp prior to the last transaction:", + }, + }, + { + name: "script with specified timestamp prior to last tx", + payload: []controllers.PostTransaction{ + { + Script: core.Script{ + Plain: ` + send [COIN 100] ( + source = @world + destination = @bob + )`, + }, + Timestamp: timestamp1, + }, + }, + expectedStatusCode: http.StatusBadRequest, + expectedErr: sharedapi.ErrorResponse{ + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "cannot pass a timestamp prior to the last transaction:", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "cannot pass a timestamp prior to the last transaction:", + }, + }, } internal.RunTest(t, fx.Invoke(func(lc fx.Lifecycle, api *api.API) { @@ -529,7 +579,7 @@ func TestPostTransactions(t *testing.T) { require.True(t, ok) require.Len(t, txs, 1) if !tc.payload[i].Timestamp.IsZero() { - require.Equal(t, tc.payload[i].Timestamp, txs[0].Timestamp) + require.Equal(t, tc.payload[i].Timestamp.UTC(), txs[0].Timestamp) } } tcIndex := 0 @@ -543,7 +593,9 @@ func TestPostTransactions(t *testing.T) { actualErr := sharedapi.ErrorResponse{} if internal.Decode(t, rsp.Body, &actualErr) { require.Equal(t, tc.expectedErr.ErrorCode, actualErr.ErrorCode, actualErr.ErrorMessage) - require.Equal(t, tc.expectedErr.ErrorMessage, actualErr.ErrorMessage) + require.Contains(t, actualErr.ErrorMessage, tc.expectedErr.ErrorMessage) + require.Equal(t, tc.expectedErr.ErrorCodeDeprecated, actualErr.ErrorCodeDeprecated, actualErr.ErrorMessageDeprecated) + require.Contains(t, actualErr.ErrorMessageDeprecated, tc.expectedErr.ErrorMessageDeprecated) require.Equal(t, tc.expectedErr.Details, actualErr.Details) } } else { @@ -553,7 +605,7 @@ func TestPostTransactions(t *testing.T) { require.Equal(t, (*tc.expectedRes.Data)[0].Postings, txs[0].Postings) require.Equal(t, len((*tc.expectedRes.Data)[0].Metadata), len(txs[0].Metadata)) if !tc.payload[tcIndex].Timestamp.IsZero() { - require.Equal(t, tc.payload[tcIndex].Timestamp, txs[0].Timestamp) + require.Equal(t, tc.payload[tcIndex].Timestamp.UTC(), txs[0].Timestamp) } } }) @@ -739,11 +791,10 @@ func TestPostTransactionInvalidBody(t *testing.T) { err := sharedapi.ErrorResponse{} internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ - ErrorCode: apierrors.ErrScriptNoScript, - ErrorMessage: "[NO_SCRIPT] no script to execute", - Details: apierrors.EncodeLink("no script to execute"), - ErrorCodeDeprecated: apierrors.ErrScriptNoScript, - ErrorMessageDeprecated: "[NO_SCRIPT] no script to execute", + ErrorCode: apierrors.ErrValidation, + ErrorMessage: "invalid payload: should contain either postings or script", + ErrorCodeDeprecated: apierrors.ErrValidation, + ErrorMessageDeprecated: "invalid payload: should contain either postings or script", }, err) }) @@ -1792,9 +1843,9 @@ func TestRevertTransaction(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrNotFound, - ErrorMessage: "transaction not found", + ErrorMessage: "transaction 42 not found", ErrorCodeDeprecated: apierrors.ErrNotFound, - ErrorMessageDeprecated: "transaction not found", + ErrorMessageDeprecated: "transaction 42 not found", }, err) }) @@ -1806,9 +1857,9 @@ func TestRevertTransaction(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrValidation, - ErrorMessage: "transaction already reverted", + ErrorMessage: fmt.Sprintf("transaction %d already reverted", revertedTxID), ErrorCodeDeprecated: apierrors.ErrValidation, - ErrorMessageDeprecated: "transaction already reverted", + ErrorMessageDeprecated: fmt.Sprintf("transaction %d already reverted", revertedTxID), }, err) }) @@ -1894,9 +1945,9 @@ func TestPostTransactionsBatch(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrValidation, - ErrorMessage: "processing tx 1: transaction has no postings", + ErrorMessage: "invalid transaction 1: no postings", ErrorCodeDeprecated: apierrors.ErrValidation, - ErrorMessageDeprecated: "processing tx 1: transaction has no postings", + ErrorMessageDeprecated: "invalid transaction 1: no postings", }, err) }) @@ -1923,9 +1974,10 @@ func TestPostTransactionsBatch(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "processing tx 0: balance.insufficient.COIN", + ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", + Details: apierrors.EncodeLink("account had insufficient funds"), ErrorCodeDeprecated: apierrors.ErrInsufficientFund, - ErrorMessageDeprecated: "processing tx 0: balance.insufficient.COIN", + ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", }, err) }) @@ -1972,9 +2024,10 @@ func TestPostTransactionsBatch(t *testing.T) { internal.Decode(t, rsp.Body, &err) require.EqualValues(t, sharedapi.ErrorResponse{ ErrorCode: apierrors.ErrInsufficientFund, - ErrorMessage: "processing tx 1: balance.insufficient.GEM", + ErrorMessage: "[INSUFFICIENT_FUND] account had insufficient funds", + Details: apierrors.EncodeLink("account had insufficient funds"), ErrorCodeDeprecated: apierrors.ErrInsufficientFund, - ErrorMessageDeprecated: "processing tx 1: balance.insufficient.GEM", + ErrorMessageDeprecated: "[INSUFFICIENT_FUND] account had insufficient funds", }, err) }) @@ -1997,3 +2050,142 @@ func TestPostTransactionsBatch(t *testing.T) { }) })) } + +func TestPostTransactionsBatchComplex(t *testing.T) { + internal.RunTest(t, fx.Invoke(func(lc fx.Lifecycle, api *api.API, driver storage.Driver[ledger.Store]) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + + txs := []core.TransactionData{ + { + Postings: core.Postings{ + { + Source: "world", + Destination: "payins:001", + Amount: core.NewMonetaryInt(10000), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "payins:001", + Destination: "users:001:wallet", + Amount: core.NewMonetaryInt(10000), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "world", + Destination: "teller", + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + { + Source: "world", + Destination: "teller", + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "users:001:wallet", + Destination: "trades:001", + Amount: core.NewMonetaryInt(1500), + Asset: "EUR/2", + }, + { + Source: "trades:001", + Destination: "fiat:holdings", + Amount: core.NewMonetaryInt(1500), + Asset: "EUR/2", + }, + { + Source: "teller", + Destination: "trades:001", + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + { + Source: "trades:001", + Destination: "users:001:wallet", + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "users:001:wallet", + Destination: "trades:001", + Amount: core.NewMonetaryInt(4230), + Asset: "EUR/2", + }, + { + Source: "trades:001", + Destination: "fiat:holdings", + Amount: core.NewMonetaryInt(4230), + Asset: "EUR/2", + }, + { + Source: "teller", + Destination: "trades:001", + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + { + Source: "trades:001", + Destination: "users:001:wallet", + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "users:001:wallet", + Destination: "users:001:withdrawals", + Amount: core.NewMonetaryInt(2270), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "users:001:withdrawals", + Destination: "payouts:001", + Amount: core.NewMonetaryInt(2270), + Asset: "EUR/2", + }, + }, + }, + } + + rsp := internal.PostTransactionBatch(t, api, core.Transactions{ + Transactions: txs, + }) + require.Equal(t, http.StatusOK, rsp.Result().StatusCode) + res, _ := internal.DecodeSingleResponse[[]core.ExpandedTransaction](t, rsp.Body) + require.Len(t, res, 7) + require.Equal(t, txs[0].Postings, res[0].Postings) + require.Equal(t, txs[1].Postings, res[1].Postings) + require.Equal(t, txs[2].Postings, res[2].Postings) + require.Equal(t, txs[3].Postings, res[3].Postings) + require.Equal(t, txs[4].Postings, res[4].Postings) + require.Equal(t, txs[5].Postings, res[5].Postings) + require.Equal(t, txs[6].Postings, res[6].Postings) + + return nil + }}) + })) +} diff --git a/pkg/bus/monitor.go b/pkg/bus/monitor.go index 23ca35e26..4faef2ff5 100644 --- a/pkg/bus/monitor.go +++ b/pkg/bus/monitor.go @@ -37,14 +37,15 @@ func LedgerMonitorModule() fx.Option { ) } -func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, res ledger.CommitResult) { +func (l *ledgerMonitor) CommittedTransactions(ctx context.Context, ledger string, txs ...core.ExpandedTransaction) { + postCommitVolumes := core.AggregatePostCommitVolumes(txs...) l.publish(ctx, EventTypeCommittedTransactions, newEventCommittedTransactions(CommittedTransactions{ Ledger: ledger, - Transactions: res.GeneratedTransactions, - Volumes: res.PostCommitVolumes, - PostCommitVolumes: res.PostCommitVolumes, - PreCommitVolumes: res.PreCommitVolumes, + Transactions: txs, + Volumes: postCommitVolumes, + PostCommitVolumes: postCommitVolumes, + PreCommitVolumes: core.AggregatePreCommitVolumes(txs...), })) } diff --git a/pkg/bus/monitor_test.go b/pkg/bus/monitor_test.go index 853705b1a..a7935a85f 100644 --- a/pkg/bus/monitor_test.go +++ b/pkg/bus/monitor_test.go @@ -8,7 +8,6 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" "github.com/formancehq/go-libs/publish" - "github.com/numary/ledger/pkg/ledger" "github.com/pborman/uuid" "github.com/stretchr/testify/assert" ) @@ -29,7 +28,7 @@ func TestMonitor(t *testing.T) { "*": "testing", }) m := newLedgerMonitor(p) - go m.CommittedTransactions(context.Background(), uuid.New(), ledger.CommitResult{}) + go m.CommittedTransactions(context.Background(), uuid.New()) select { case m := <-messages: diff --git a/pkg/core/numscript.go b/pkg/core/numscript.go index 92a75aa5d..530eb7b14 100644 --- a/pkg/core/numscript.go +++ b/pkg/core/numscript.go @@ -1,27 +1,103 @@ package core import ( + "encoding/json" + "fmt" "strings" ) +type variable struct { + name string + jsonVal json.RawMessage +} + func TxsToScriptsData(txsData ...TransactionData) []ScriptData { res := []ScriptData{} for _, txData := range txsData { sb := strings.Builder{} + monetaryToVars := map[string]variable{} + accountsToVars := map[string]variable{} + i := 0 + j := 0 + for _, p := range txData.Postings { + if _, ok := accountsToVars[p.Source]; !ok { + if p.Source != WORLD { + accountsToVars[p.Source] = variable{ + name: fmt.Sprintf("va%d", i), + jsonVal: json.RawMessage(`"` + p.Source + `"`), + } + i++ + } + } + if _, ok := accountsToVars[p.Destination]; !ok { + if p.Destination != WORLD { + accountsToVars[p.Destination] = variable{ + name: fmt.Sprintf("va%d", i), + jsonVal: json.RawMessage(`"` + p.Destination + `"`), + } + i++ + } + } + mon := fmt.Sprintf("[%s %s]", p.Amount.String(), p.Asset) + if _, ok := monetaryToVars[mon]; !ok { + monetaryToVars[mon] = variable{ + name: fmt.Sprintf("vm%d", j), + jsonVal: json.RawMessage( + `{"asset":"` + p.Asset + `","amount":` + p.Amount.String() + `}`), + } + j++ + } + } + + sb.WriteString("vars {\n") + for _, v := range accountsToVars { + sb.WriteString(fmt.Sprintf("\taccount $%s\n", v.name)) + } + for _, v := range monetaryToVars { + sb.WriteString(fmt.Sprintf("\tmonetary $%s\n", v.name)) + } + sb.WriteString("}\n") + for _, p := range txData.Postings { - sb.WriteString("send [") - sb.WriteString(p.Asset) - sb.WriteString(" ") - sb.WriteString(p.Amount.String()) - sb.WriteString("] (\n\tsource = @") - sb.WriteString(p.Source) - sb.WriteString("\n\tdestination = @") - sb.WriteString(p.Destination) - sb.WriteString("\n)\n") + m := fmt.Sprintf("[%s %s]", p.Amount.String(), p.Asset) + mon, ok := monetaryToVars[m] + if !ok { + panic(fmt.Sprintf("monetary %s not found", m)) + } + sb.WriteString(fmt.Sprintf("send $%s (\n", mon.name)) + if p.Source == WORLD { + sb.WriteString("\tsource = @world\n") + } else { + src, ok := accountsToVars[p.Source] + if !ok { + panic(fmt.Sprintf("source %s not found", p.Source)) + } + sb.WriteString(fmt.Sprintf("\tsource = $%s\n", src.name)) + } + if p.Destination == WORLD { + sb.WriteString("\tdestination = @world\n") + } else { + dest, ok := accountsToVars[p.Destination] + if !ok { + panic(fmt.Sprintf("destination %s not found", p.Destination)) + } + sb.WriteString(fmt.Sprintf("\tdestination = $%s\n", dest.name)) + } + sb.WriteString(")\n") + } + + vars := map[string]json.RawMessage{} + for _, v := range accountsToVars { + vars[v.name] = v.jsonVal + } + for _, v := range monetaryToVars { + vars[v.name] = v.jsonVal } + res = append(res, ScriptData{ Script: Script{ Plain: sb.String(), + Vars: vars, }, Timestamp: txData.Timestamp, Reference: txData.Reference, diff --git a/pkg/core/numscript_test.go b/pkg/core/numscript_test.go index 3270fc372..f948b7798 100644 --- a/pkg/core/numscript_test.go +++ b/pkg/core/numscript_test.go @@ -1,6 +1,7 @@ package core import ( + "encoding/json" "testing" "time" @@ -39,42 +40,11 @@ func TestTxsToScriptsData(t *testing.T) { output: []ScriptData{ { Script: Script{ - Plain: "send [EUR/2 100] (\n\tsource = @world\n\tdestination = @alice\n)\n", - }, - Reference: "ref", - Timestamp: ts, - Metadata: Metadata{"key": "val"}, - }, - }, - }, - { - name: "multiple postings", - input: []TransactionData{ - { - Postings: Postings{ - { - Source: "world", - Destination: "alice", - Asset: "EUR/2", - Amount: NewMonetaryInt(100), + Plain: "vars {\n\taccount $va0\n\tmonetary $vm0\n}\nsend $vm0 (\n\tsource = @world\n\tdestination = $va0\n)\n", + Vars: map[string]json.RawMessage{ + "va0": json.RawMessage(`"alice"`), + "vm0": json.RawMessage(`{"asset":"EUR/2","amount":100}`), }, - { - Source: "world", - Destination: "bob", - Asset: "USD/2", - Amount: NewMonetaryInt(1000), - }, - }, - Reference: "ref", - Timestamp: ts, - Metadata: Metadata{"key": "val"}, - }, - }, - output: []ScriptData{ - { - Script: Script{ - Plain: "send [EUR/2 100] (\n\tsource = @world\n\tdestination = @alice\n)\n" + - "send [USD/2 1000] (\n\tsource = @world\n\tdestination = @bob\n)\n", }, Reference: "ref", Timestamp: ts, diff --git a/pkg/core/posting.go b/pkg/core/posting.go index 4110b20e1..2c0a81090 100644 --- a/pkg/core/posting.go +++ b/pkg/core/posting.go @@ -4,6 +4,8 @@ import ( "database/sql/driver" "encoding/json" "regexp" + + "github.com/pkg/errors" ) type Posting struct { @@ -56,3 +58,22 @@ var addressRegexp = regexp.MustCompile(`^\w+(:\w+)*$`) func ValidateAddress(addr string) bool { return addressRegexp.Match([]byte(addr)) } + +func (p Postings) Validate() (int, error) { + for i, p := range p { + if p.Amount.Ltz() { + return i, errors.New("negative amount") + } + if !ValidateAddress(p.Source) { + return i, errors.New("invalid source address") + } + if !ValidateAddress(p.Destination) { + return i, errors.New("invalid destination address") + } + if !AssetIsValid(p.Asset) { + return i, errors.New("invalid asset") + } + } + + return 0, nil +} diff --git a/pkg/core/volumes.go b/pkg/core/volumes.go index 0888fbf04..eea5e5f15 100644 --- a/pkg/core/volumes.go +++ b/pkg/core/volumes.go @@ -44,6 +44,12 @@ func (v AssetsVolumes) Balances() AssetsBalances { type AccountsAssetsVolumes map[string]AssetsVolumes func (a AccountsAssetsVolumes) GetVolumes(account, asset string) Volumes { + if a == nil { + return Volumes{ + Input: NewMonetaryInt(0), + Output: NewMonetaryInt(0), + } + } if assetsVolumes, ok := a[account]; !ok { return Volumes{ Input: NewMonetaryInt(0), @@ -57,9 +63,12 @@ func (a AccountsAssetsVolumes) GetVolumes(account, asset string) Volumes { } } -func (a AccountsAssetsVolumes) SetVolumes(account, asset string, volumes Volumes) { - if assetsVolumes, ok := a[account]; !ok { - a[account] = map[string]Volumes{ +func (a *AccountsAssetsVolumes) SetVolumes(account, asset string, volumes Volumes) { + if *a == nil { + *a = AccountsAssetsVolumes{} + } + if assetsVolumes, ok := (*a)[account]; !ok { + (*a)[account] = map[string]Volumes{ asset: { Input: volumes.Input.OrZero(), Output: volumes.Output.OrZero(), @@ -73,9 +82,12 @@ func (a AccountsAssetsVolumes) SetVolumes(account, asset string, volumes Volumes } } -func (a AccountsAssetsVolumes) AddInput(account, asset string, input *MonetaryInt) { - if assetsVolumes, ok := a[account]; !ok { - a[account] = map[string]Volumes{ +func (a *AccountsAssetsVolumes) AddInput(account, asset string, input *MonetaryInt) { + if *a == nil { + *a = AccountsAssetsVolumes{} + } + if assetsVolumes, ok := (*a)[account]; !ok { + (*a)[account] = map[string]Volumes{ asset: { Input: input.OrZero(), Output: NewMonetaryInt(0), @@ -88,9 +100,12 @@ func (a AccountsAssetsVolumes) AddInput(account, asset string, input *MonetaryIn } } -func (a AccountsAssetsVolumes) AddOutput(account, asset string, output *MonetaryInt) { - if assetsVolumes, ok := a[account]; !ok { - a[account] = map[string]Volumes{ +func (a *AccountsAssetsVolumes) AddOutput(account, asset string, output *MonetaryInt) { + if *a == nil { + *a = AccountsAssetsVolumes{} + } + if assetsVolumes, ok := (*a)[account]; !ok { + (*a)[account] = map[string]Volumes{ asset: { Output: output.OrZero(), Input: NewMonetaryInt(0), @@ -104,11 +119,17 @@ func (a AccountsAssetsVolumes) AddOutput(account, asset string, output *Monetary } func (a AccountsAssetsVolumes) HasAccount(account string) bool { + if a == nil { + return false + } _, ok := a[account] return ok } func (a AccountsAssetsVolumes) HasAccountAndAsset(account, asset string) bool { + if a == nil { + return false + } volumesByAsset, ok := a[account] if !ok { return false @@ -139,6 +160,24 @@ func (a *AccountsAssetsVolumes) Scan(value interface{}) error { } } +func AggregatePreCommitVolumes(txs ...ExpandedTransaction) AccountsAssetsVolumes { + ret := AccountsAssetsVolumes{} + for i := 0; i < len(txs); i++ { + tx := txs[i] + for _, posting := range tx.Postings { + if !ret.HasAccountAndAsset(posting.Source, posting.Asset) { + ret.SetVolumes(posting.Source, posting.Asset, + tx.PreCommitVolumes.GetVolumes(posting.Source, posting.Asset)) + } + if !ret.HasAccountAndAsset(posting.Destination, posting.Asset) { + ret.SetVolumes(posting.Destination, posting.Asset, + tx.PreCommitVolumes.GetVolumes(posting.Destination, posting.Asset)) + } + } + } + return ret +} + func AggregatePostCommitVolumes(txs ...ExpandedTransaction) AccountsAssetsVolumes { ret := AccountsAssetsVolumes{} for i := len(txs) - 1; i >= 0; i-- { diff --git a/pkg/ledger/executor.go b/pkg/ledger/executor.go index a0cb4c3ab..150b31e69 100644 --- a/pkg/ledger/executor.go +++ b/pkg/ledger/executor.go @@ -2,161 +2,269 @@ package ledger import ( "context" + "crypto/sha256" "encoding/json" "fmt" + "time" machine "github.com/formancehq/machine/core" "github.com/formancehq/machine/script/compiler" "github.com/formancehq/machine/vm" + "github.com/formancehq/machine/vm/program" "github.com/numary/ledger/pkg/core" + "github.com/numary/ledger/pkg/storage" "github.com/pkg/errors" ) -func (l *Ledger) Execute(ctx context.Context, preview bool, scripts ...core.ScriptData) ([]core.ExpandedTransaction, error) { +func (l *Ledger) Execute(ctx context.Context, checkMapping, preview bool, scripts ...core.ScriptData) ([]core.ExpandedTransaction, error) { if len(scripts) == 0 { return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorNoScript, "no script to execute") } - txsData, addOps, err := l.ProcessScripts(ctx, scripts...) + addOps := new(core.AdditionalOperations) + + lastTx, err := l.store.GetLastTransaction(ctx) if err != nil { - return []core.ExpandedTransaction{}, err + return []core.ExpandedTransaction{}, errors.Wrap(err, + "could not get last transaction") } - for _, txData := range txsData { - if len(txData.Postings) == 0 { - return []core.ExpandedTransaction{}, - NewValidationError("transaction has no postings") - } + vAggr := NewVolumeAggregator(l) + txs := make([]core.ExpandedTransaction, 0) + var nextTxId uint64 + var lastTxTimestamp time.Time + if lastTx != nil { + nextTxId = lastTx.ID + 1 + lastTxTimestamp = lastTx.Timestamp } - - txs, err := l.Commit(ctx, preview, addOps, txsData...) - if err != nil { - return []core.ExpandedTransaction{}, err + contracts := make([]core.Contract, 0) + if checkMapping { + mapping, err := l.store.LoadMapping(ctx) + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "loading mapping") + } + if mapping != nil { + contracts = append(contracts, mapping.Contracts...) + } + contracts = append(contracts, DefaultContracts...) } - return txs, nil -} + usedReferences := make(map[string]struct{}) + accs := map[string]*core.AccountWithVolumes{} + for i, script := range scripts { + // Until v1.5.0, dates was stored as string using rfc3339 format + // So round the date to the second to keep the same behaviour + if script.Timestamp.IsZero() { + script.Timestamp = time.Now().UTC().Truncate(time.Second) + } else { + script.Timestamp = script.Timestamp.UTC() + } -func (l *Ledger) ProcessScripts(ctx context.Context, scripts ...core.ScriptData) ([]core.TransactionData, *core.AdditionalOperations, error) { - txsData := []core.TransactionData{} - addOps := new(core.AdditionalOperations) + past := false + if lastTx != nil && script.Timestamp.Before(lastTxTimestamp) { + past = true + } + if past && !l.allowPastTimestamps { + return []core.ExpandedTransaction{}, NewValidationError(fmt.Sprintf( + "cannot pass a timestamp prior to the last transaction: %s (passed) is %s before %s (last)", + script.Timestamp.Format(time.RFC3339Nano), + lastTxTimestamp.Sub(script.Timestamp), + lastTxTimestamp.Format(time.RFC3339Nano))) + } + lastTxTimestamp = script.Timestamp - for _, script := range scripts { if script.Reference != "" { + if _, ok := usedReferences[script.Reference]; ok { + return []core.ExpandedTransaction{}, NewConflictError() + } + usedReferences[script.Reference] = struct{}{} + txs, err := l.GetTransactions(ctx, *NewTransactionsQuery(). WithReferenceFilter(script.Reference)) if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, "GetTransactions") + return []core.ExpandedTransaction{}, errors.Wrap(err, "GetTransactions") } if len(txs.Data) > 0 { - return []core.TransactionData{}, nil, NewConflictError() + return []core.ExpandedTransaction{}, NewConflictError() } } if script.Plain == "" { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorNoScript, + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorNoScript, "no script to execute") } - p, err := compiler.Compile(script.Plain) - if err != nil { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, - err.Error()) + h := sha256.New() + if _, err = h.Write([]byte(script.Plain)); err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, "hashing script") } + curr := h.Sum(nil) - m := vm.NewMachine(*p) + var m *vm.Machine + if cachedP, found := l.cache.Get(curr); found { + m = vm.NewMachine(cachedP.(program.Program)) + } else { + newP, err := compiler.Compile(script.Plain) + if err != nil { + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + err.Error()) + } + l.cache.Set(curr, *newP, 1) + m = vm.NewMachine(*newP) + } if err = m.SetVarsFromJSON(script.Vars); err != nil { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, errors.Wrap(err, "could not set variables").Error()) } - { - ch, err := m.ResolveResources() - if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, - "could not resolve program resources") + resourcesChan, err := m.ResolveResources() + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "could not resolve program resources") + } + for req := range resourcesChan { + if req.Error != nil { + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(req.Error, "could not resolve program resources").Error()) } - for req := range ch { - if req.Error != nil { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(req.Error, "could not resolve program resources").Error()) - } - account, err := l.GetAccount(ctx, req.Account) + if _, ok := accs[req.Account]; !ok { + accs[req.Account], err = l.GetAccount(ctx, req.Account) if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, + return []core.ExpandedTransaction{}, errors.Wrap(err, fmt.Sprintf("could not get account %q", req.Account)) } - if req.Key != "" { - entry, ok := account.Metadata[req.Key] - if !ok { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, - fmt.Sprintf("missing key %v in metadata for account %v", req.Key, req.Account)) - } - data, err := json.Marshal(entry) - if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, "json.Marshal") - } - value, err := machine.NewValueFromTypedJSON(data) - if err != nil { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(err, fmt.Sprintf( - "invalid format for metadata at key %v for account %v", - req.Key, req.Account)).Error()) - } - req.Response <- *value - } else if req.Asset != "" { - amt := account.Balances[req.Asset].OrZero() - resp := machine.MonetaryInt(*amt) - req.Response <- &resp - } else { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(err, fmt.Sprintf("invalid ResourceRequest: %+v", req)).Error()) + } + if req.Key != "" { + entry, ok := accs[req.Account].Metadata[req.Key] + if !ok { + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + fmt.Sprintf("missing key %v in metadata for account %v", req.Key, req.Account)) + } + data, err := json.Marshal(entry) + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, "json.Marshal") + } + value, err := machine.NewValueFromTypedJSON(data) + if err != nil { + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(err, fmt.Sprintf( + "invalid format for metadata at key %v for account %v", + req.Key, req.Account)).Error()) } + req.Response <- *value + } else if req.Asset != "" { + amt := accs[req.Account].Balances[req.Asset].OrZero() + resp := machine.MonetaryInt(*amt) + req.Response <- &resp + } else { + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(err, fmt.Sprintf("invalid ResourceRequest: %+v", req)).Error()) } } - { - ch, err := m.ResolveBalances() - if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, - "could not resolve balances") + balanceCh, err := m.ResolveBalances() + if err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "could not resolve balances") + } + for req := range balanceCh { + if req.Error != nil { + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorCompilationFailed, + errors.Wrap(req.Error, "could not resolve program balances").Error()) } - for req := range ch { - if req.Error != nil { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorCompilationFailed, - errors.Wrap(req.Error, "could not resolve program balances").Error()) - } - account, err := l.GetAccount(ctx, req.Account) + var amt *core.MonetaryInt + if _, ok := accs[req.Account]; !ok { + accs[req.Account], err = l.GetAccount(ctx, req.Account) if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, + return []core.ExpandedTransaction{}, errors.Wrap(err, fmt.Sprintf("could not get account %q", req.Account)) } - amt := account.Balances[req.Asset].OrZero() - resp := machine.MonetaryInt(*amt) - req.Response <- &resp } + amt = accs[req.Account].Balances[req.Asset].OrZero() + resp := machine.MonetaryInt(*amt) + req.Response <- &resp } exitCode, err := m.Execute() if err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, "script execution failed") + return []core.ExpandedTransaction{}, errors.Wrap(err, + "script execution failed") } if exitCode != vm.EXIT_OK { switch exitCode { case vm.EXIT_FAIL: - return []core.TransactionData{}, nil, errors.New("script exited with error code EXIT_FAIL") + return []core.ExpandedTransaction{}, errors.New( + "script exited with error code EXIT_FAIL") case vm.EXIT_FAIL_INVALID: - return []core.TransactionData{}, nil, errors.New("internal error: compiled script was invalid") + return []core.ExpandedTransaction{}, errors.New( + "internal error: compiled script was invalid") case vm.EXIT_FAIL_INSUFFICIENT_FUNDS: // TODO: If the machine can provide the asset which is failing // we should be able to use InsufficientFundError{} instead of error code - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorInsufficientFund, + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorInsufficientFund, "account had insufficient funds") default: - return []core.TransactionData{}, nil, errors.New("script execution failed") + return []core.ExpandedTransaction{}, errors.New( + "script execution failed") + } + } + + if len(m.Postings) == 0 { + return []core.ExpandedTransaction{}, + NewValidationError("transaction has no postings") + } + + txVolumeAggr := vAggr.NextTx() + postings := make([]core.Posting, len(m.Postings)) + for j, posting := range m.Postings { + amt := core.MonetaryInt(*posting.Amount) + if err := txVolumeAggr.Transfer(ctx, + posting.Source, posting.Destination, posting.Asset, &amt, accs); err != nil { + return []core.ExpandedTransaction{}, NewTransactionCommitError(i, err) + } + postings[j] = core.Posting{ + Source: posting.Source, + Destination: posting.Destination, + Amount: &amt, + Asset: posting.Asset, + } + } + + for account, volumes := range txVolumeAggr.PostCommitVolumes { + if _, ok := accs[account]; !ok { + accs[account], err = l.GetAccount(ctx, account) + if err != nil { + return []core.ExpandedTransaction{}, NewTransactionCommitError(i, + errors.Wrap(err, fmt.Sprintf("GetAccount '%s'", account))) + } + } + for asset, vol := range volumes { + accs[account].Volumes[asset] = vol + } + accs[account].Balances = accs[account].Volumes.Balances() + for asset, volume := range volumes { + if account == core.WORLD { + continue + } + + for _, contract := range contracts { + if contract.Match(account) { + if ok := contract.Expr.Eval(core.EvalContext{ + Variables: map[string]interface{}{ + "balance": volume.Balance(), + }, + Metadata: accs[account].Metadata, + Asset: asset, + }); !ok { + return []core.ExpandedTransaction{}, NewInsufficientFundError(asset) + } + break + } + } } } @@ -164,14 +272,14 @@ func (l *Ledger) ProcessScripts(ctx context.Context, scripts ...core.ScriptData) for k, v := range metadata { asMapAny := make(map[string]any) if err := json.Unmarshal(v.([]byte), &asMapAny); err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, "json.Unmarshal") + return []core.ExpandedTransaction{}, errors.Wrap(err, "json.Unmarshal") } metadata[k] = asMapAny } for k, v := range script.Metadata { _, ok := metadata[k] if ok { - return []core.TransactionData{}, nil, NewScriptError(ScriptErrorMetadataOverride, + return []core.ExpandedTransaction{}, NewScriptError(ScriptErrorMetadataOverride, "cannot override metadata from script") } metadata[k] = v @@ -182,7 +290,7 @@ func (l *Ledger) ProcessScripts(ctx context.Context, scripts ...core.ScriptData) for k, v := range meta { asMapAny := make(map[string]any) if err := json.Unmarshal(v, &asMapAny); err != nil { - return []core.TransactionData{}, nil, errors.Wrap(err, "json.Unmarshal") + return []core.ExpandedTransaction{}, errors.Wrap(err, "json.Unmarshal") } if account[0] == '@' { account = account[1:] @@ -197,24 +305,55 @@ func (l *Ledger) ProcessScripts(ctx context.Context, scripts ...core.ScriptData) } } - postings := make([]core.Posting, len(m.Postings)) - for i, p := range m.Postings { - amt := core.MonetaryInt(*p.Amount) - postings[i] = core.Posting{ - Source: p.Source, - Destination: p.Destination, - Amount: &amt, - Asset: p.Asset, + tx := core.ExpandedTransaction{ + Transaction: core.Transaction{ + TransactionData: core.TransactionData{ + Postings: postings, + Reference: script.Reference, + Metadata: core.Metadata(metadata), + Timestamp: script.Timestamp, + }, + ID: nextTxId, + }, + PreCommitVolumes: txVolumeAggr.PreCommitVolumes, + PostCommitVolumes: txVolumeAggr.PostCommitVolumes, + } + lastTx = &tx + txs = append(txs, tx) + nextTxId++ + } + + if preview { + return txs, nil + } + + if err := l.store.Commit(ctx, txs...); err != nil { + switch { + case storage.IsErrorCode(err, storage.ConstraintFailed): + return []core.ExpandedTransaction{}, NewConflictError() + default: + return []core.ExpandedTransaction{}, errors.Wrap(err, + "committing transactions") + } + } + + if addOps != nil && addOps.SetAccountMeta != nil { + for addr, m := range addOps.SetAccountMeta { + if err := l.store.UpdateAccountMetadata(ctx, + addr, m, time.Now().Round(time.Second).UTC()); err != nil { + return []core.ExpandedTransaction{}, errors.Wrap(err, + "updating account metadata") } } + } - txsData = append(txsData, core.TransactionData{ - Postings: postings, - Reference: script.Reference, - Metadata: core.Metadata(metadata), - Timestamp: script.Timestamp, - }) + l.monitor.CommittedTransactions(ctx, l.store.Name(), txs...) + if addOps != nil && addOps.SetAccountMeta != nil { + for addr, m := range addOps.SetAccountMeta { + l.monitor.SavedMetadata(ctx, + l.store.Name(), core.MetaTargetTypeAccount, addr, m) + } } - return txsData, addOps, nil + return txs, nil } diff --git a/pkg/ledger/executor_test.go b/pkg/ledger/executor_test.go index bb45b6097..e435591fb 100644 --- a/pkg/ledger/executor_test.go +++ b/pkg/ledger/executor_test.go @@ -3,6 +3,8 @@ package ledger_test import ( "context" "encoding/json" + "fmt" + "strconv" "testing" "github.com/numary/ledger/pkg/api/apierrors" @@ -16,7 +18,7 @@ func TestNoScript(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { script := core.ScriptData{} - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) assert.IsType(t, &ledger.ScriptError{}, err) assert.Equal(t, ledger.ScriptErrorNoScript, err.(*ledger.ScriptError).Code) }) @@ -28,7 +30,7 @@ func TestCompilationError(t *testing.T) { Script: core.Script{Plain: "willnotcompile"}, } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) assert.IsType(t, &ledger.ScriptError{}, err) assert.Equal(t, ledger.ScriptErrorCompilationFailed, err.(*ledger.ScriptError).Code) }) @@ -50,7 +52,7 @@ func TestSend(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) require.NoError(t, err) assertBalance(t, l, "user:001", @@ -75,7 +77,7 @@ func TestNoVariables(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) assert.Error(t, err) require.NoError(t, l.Close(context.Background())) @@ -105,7 +107,7 @@ func TestVariables(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) require.NoError(t, err) assertBalance(t, l, "user:042", @@ -130,20 +132,21 @@ func TestEnoughFunds(t *testing.T) { }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) script := core.ScriptData{ Script: core.Script{ Plain: ` - send [COIN 95] ( - source = @user:001 - destination = @world - )`, + send [COIN 95] ( + source = @user:001 + destination = @world + )`, }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), false, false, script) assert.NoError(t, err) }) } @@ -165,20 +168,21 @@ func TestNotEnoughFunds(t *testing.T) { }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) script := core.ScriptData{ Script: core.Script{ Plain: ` - send [COIN 105] ( - source = @user:002 - destination = @world - )`, + send [COIN 105] ( + source = @user:002 + destination = @world + )`, }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), false, false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, apierrors.ErrInsufficientFund)) }) } @@ -208,7 +212,7 @@ func TestMissingMetadata(t *testing.T) { }, } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, ledger.ScriptErrorCompilationFailed)) }) } @@ -230,16 +234,17 @@ func TestMetadata(t *testing.T) { }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) err = l.SaveMeta(context.Background(), core.MetaTargetTypeAccount, "sales:042", core.Metadata{ "seller": json.RawMessage(`{ - "type": "account", - "value": "users:053" - }`), + "type": "account", + "value": "users:053" + }`), }) require.NoError(t, err) @@ -247,27 +252,27 @@ func TestMetadata(t *testing.T) { "users:053", core.Metadata{ "commission": json.RawMessage(`{ - "type": "portion", - "value": "15.5%" - }`), + "type": "portion", + "value": "15.5%" + }`), }) require.NoError(t, err) plain := ` - vars { - account $sale - account $seller = meta($sale, "seller") - portion $commission = meta($seller, "commission") - } - - send [COIN *] ( - source = $sale - destination = { - remaining to $seller - $commission to @platform - } - ) - ` + vars { + account $sale + account $seller = meta($sale, "seller") + portion $commission = meta($seller, "commission") + } + + send [COIN *] ( + source = $sale + destination = { + remaining to $seller + $commission to @platform + } + ) + ` require.NoError(t, err) script := core.ScriptData{ @@ -279,13 +284,11 @@ func TestMetadata(t *testing.T) { }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), false, false, script) require.NoError(t, err) assertBalance(t, l, "sales:042", "COIN", core.NewMonetaryInt(0)) - assertBalance(t, l, "users:053", "COIN", core.NewMonetaryInt(85)) - assertBalance(t, l, "platform", "COIN", core.NewMonetaryInt(15)) }) } @@ -358,7 +361,7 @@ func TestSetTxMeta(t *testing.T) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) - _, err := l.Execute(context.Background(), false, tc.script) + _, err := l.Execute(context.Background(), false, false, tc.script) if tc.expectedErrorCode != "" { require.Error(t, err) @@ -394,7 +397,7 @@ func TestScriptSetReference(t *testing.T) { Reference: "tx_ref", } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) require.NoError(t, err) last, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -410,7 +413,7 @@ func TestScriptReferenceConflict(t *testing.T) { require.NoError(t, l.Close(ctx)) }(l, context.Background()) - _, err := l.Execute(context.Background(), false, + _, err := l.Execute(context.Background(), false, false, core.ScriptData{ Script: core.Script{ Plain: ` @@ -424,7 +427,7 @@ func TestScriptReferenceConflict(t *testing.T) { }) require.NoError(t, err) - _, err = l.Execute(context.Background(), false, + _, err = l.Execute(context.Background(), false, false, core.ScriptData{ Script: core.Script{ Plain: ` @@ -444,33 +447,46 @@ func TestScriptReferenceConflict(t *testing.T) { func TestSetAccountMeta(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { t.Run("valid", func(t *testing.T) { - _, addOps, err := l.ProcessScripts(context.Background(), core.ScriptData{ - Script: core.Script{Plain: ` - set_account_meta(@alice, "aaa", "string meta") - set_account_meta(@alice, "bbb", 42) - set_account_meta(@alice, "ccc", COIN) - set_account_meta(@alice, "ddd", [COIN 30]) - set_account_meta(@alice, "eee", @bob) - `, - }, - }) + res, err := l.Execute(context.Background(), + false, false, core.ScriptData{ + Script: core.Script{Plain: ` + send [USD/2 99] ( + source = @world + destination = @user:001 + ) + set_account_meta(@alice, "aaa", "string meta") + set_account_meta(@alice, "bbb", 42) + set_account_meta(@alice, "ccc", COIN) + set_account_meta(@alice, "ddd", [COIN 30]) + set_account_meta(@alice, "eee", @bob) + `}, + }) + require.NoError(t, err) + require.Equal(t, 1, len(res)) + + acc, err := l.GetAccount(context.Background(), "alice") require.NoError(t, err) - require.Equal(t, core.AccountsMeta{ - "alice": map[string]any{ - "aaa": map[string]any{"type": "string", "value": "string meta"}, - "bbb": map[string]any{"type": "number", "value": 42.}, - "ccc": map[string]any{"type": "asset", "value": "COIN"}, - "ddd": map[string]any{"type": "monetary", - "value": map[string]any{"asset": "COIN", "amount": 30.}}, - "eee": map[string]any{"type": "account", "value": "bob"}, - }, - }, addOps.SetAccountMeta) + require.Equal(t, core.Metadata{ + "aaa": map[string]any{"type": "string", "value": "string meta"}, + "bbb": map[string]any{"type": "number", "value": 42.}, + "ccc": map[string]any{"type": "asset", "value": "COIN"}, + "ddd": map[string]any{"type": "monetary", + "value": map[string]any{"asset": "COIN", "amount": 30.}}, + "eee": map[string]any{"type": "account", "value": "bob"}, + }, acc.Metadata) }) t.Run("invalid syntax", func(t *testing.T) { - _, _, err := l.ProcessScripts(context.Background(), core.ScriptData{ - Script: core.Script{Plain: `set_account_meta(@bob, "is")`}, - }) + _, err := l.Execute(context.Background(), false, false, + core.ScriptData{ + Script: core.Script{Plain: ` + send [USD/2 99] ( + source = @world + destination = @user:001 + ) + set_account_meta(@bob, "is") + `}, + }) require.True(t, ledger.IsScriptErrorWithCode(err, ledger.ScriptErrorCompilationFailed)) }) @@ -494,23 +510,25 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) script := core.ScriptData{ Script: core.Script{ Plain: ` - vars { - monetary $bal = balance(@users:001, COIN) - } - send $bal ( - source = @users:001 - destination = @world - )`, + vars { + monetary $bal = balance(@users:001, COIN) + } + send $bal ( + source = @users:001 + destination = @world + )`, }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), + false, false, script) require.NoError(t, err) assertBalance(t, l, "world", "COIN", core.NewMonetaryInt(0)) assertBalance(t, l, "users:001", "COIN", core.NewMonetaryInt(0)) @@ -539,29 +557,31 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) script := core.ScriptData{ Script: core.Script{ Plain: ` - vars { - monetary $initial = balance(@A, USD/2) - } - send [USD/2 100] ( - source = { - @A - @C - } - destination = { - max $initial to @B - remaining to @D - } - )`, + vars { + monetary $initial = balance(@A, USD/2) + } + send [USD/2 100] ( + source = { + @A + @C + } + destination = { + max $initial to @B + remaining to @D + } + )`, }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), + false, false, script) require.NoError(t, err) assertBalance(t, l, "B", "USD/2", core.NewMonetaryInt(40)) assertBalance(t, l, "D", "USD/2", core.NewMonetaryInt(60)) @@ -584,26 +604,28 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) script := core.ScriptData{ Script: core.Script{ Plain: ` - vars { - monetary $bal = balance(@users:001, COIN) - } - send $bal ( - source = @users:001 - destination = @world - ) - send $bal ( - source = @users:001 - destination = @world - )`, + vars { + monetary $bal = balance(@users:001, COIN) + } + send $bal ( + source = @users:001 + destination = @world + ) + send $bal ( + source = @users:001 + destination = @world + )`, }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), + false, false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, apierrors.ErrInsufficientFund)) }) }) @@ -624,23 +646,24 @@ func TestMonetaryVariableBalance(t *testing.T) { }, }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) script := core.ScriptData{ Script: core.Script{ Plain: ` - vars { - monetary $bal = balance(@world, COIN) - } - send $bal ( - source = @users:001 - destination = @world - )`, + vars { + monetary $bal = balance(@world, COIN) + } + send $bal ( + source = @users:001 + destination = @world + )`, }, } - _, err = l.Execute(context.Background(), false, script) + _, err = l.Execute(context.Background(), false, false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, ledger.ScriptErrorCompilationFailed)) assert.ErrorContains(t, err, "must be non-negative") }) @@ -655,16 +678,16 @@ func TestMonetaryVariableBalance(t *testing.T) { script := core.ScriptData{ Script: core.Script{ Plain: ` - vars { - account $bal = balance(@users:001, COIN) - } - send $bal ( - source = @users:001 - destination = @world - )`, + vars { + account $bal = balance(@users:001, COIN) + } + send $bal ( + source = @users:001 + destination = @world + )`, }, } - _, err := l.Execute(context.Background(), false, script) + _, err := l.Execute(context.Background(), false, false, script) assert.True(t, ledger.IsScriptErrorWithCode(err, apierrors.ErrScriptCompilationFailed)) }) }) @@ -681,3 +704,245 @@ func assertBalance(t *testing.T, l *ledger.Ledger, account, asset string, amount amount, b, ) } + +var execRes []core.ExpandedTransaction + +func BenchmarkLedger_PostTransactions(b *testing.B) { + runOnLedger(func(l *ledger.Ledger) { + defer func(l *ledger.Ledger, ctx context.Context) { + require.NoError(b, l.Close(ctx)) + }(l, context.Background()) + + txData := core.TransactionData{} + for i := 0; i < 1000; i++ { + txData.Postings = append(txData.Postings, core.Posting{ + Source: "world", + Destination: "benchmarks:" + strconv.Itoa(i), + Asset: "COIN", + Amount: core.NewMonetaryInt(10), + }) + } + + b.ResetTimer() + + res := []core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + _, err := txData.Postings.Validate() + require.NoError(b, err) + script := core.TxsToScriptsData(txData) + res, err = l.Execute(context.Background(), true, true, script...) + require.NoError(b, err) + require.Len(b, res, 1) + require.Len(b, res[0].Postings, 1000) + } + + execRes = res + require.Len(b, execRes, 1) + require.Len(b, execRes[0].Postings, 1000) + }) +} + +func newTxsData(i int) []core.TransactionData { + return []core.TransactionData{ + { + Postings: core.Postings{ + { + Source: "world", + Destination: fmt.Sprintf("payins:%d", i), + Amount: core.NewMonetaryInt(10000), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("payins:%d", i), + Destination: fmt.Sprintf("users:%d:wallet", i), + Amount: core.NewMonetaryInt(10000), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: "world", + Destination: fmt.Sprintf("teller:%d", i), + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + { + Source: "world", + Destination: fmt.Sprintf("teller:%d", i), + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:wallet", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(1500), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("fiat:holdings:%d", i), + Amount: core.NewMonetaryInt(1500), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("teller:%d", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("users:%d:wallet", i), + Amount: core.NewMonetaryInt(350000), + Asset: "RBLX/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:wallet", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(4230), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("fiat:holdings:%d", i), + Amount: core.NewMonetaryInt(4230), + Asset: "EUR/2", + }, + { + Source: fmt.Sprintf("teller:%d", i), + Destination: fmt.Sprintf("trades:%d", i), + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + { + Source: fmt.Sprintf("trades:%d", i), + Destination: fmt.Sprintf("users:%d:wallet", i), + Amount: core.NewMonetaryInt(1840000), + Asset: "SNAP/6", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:wallet", i), + Destination: fmt.Sprintf("users:%d:withdrawals", i), + Amount: core.NewMonetaryInt(2270), + Asset: "EUR/2", + }, + }, + }, + { + Postings: core.Postings{ + { + Source: fmt.Sprintf("users:%d:withdrawals", i), + Destination: fmt.Sprintf("payouts:%d", i), + Amount: core.NewMonetaryInt(2270), + Asset: "EUR/2", + }, + }, + }, + } +} + +func BenchmarkLedger_PostTransactionsBatch(b *testing.B) { + runOnLedger(func(l *ledger.Ledger) { + defer func(l *ledger.Ledger, ctx context.Context) { + require.NoError(b, l.Close(ctx)) + }(l, context.Background()) + + txsData := newTxsData(1) + + b.ResetTimer() + + res := []core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + var err error + for _, txData := range txsData { + _, err := txData.Postings.Validate() + require.NoError(b, err) + } + script := core.TxsToScriptsData(txsData...) + res, err = l.Execute(context.Background(), true, true, script...) + require.NoError(b, err) + require.Len(b, res, 7) + require.Len(b, res[0].Postings, 1) + require.Len(b, res[1].Postings, 1) + require.Len(b, res[2].Postings, 2) + require.Len(b, res[3].Postings, 4) + require.Len(b, res[4].Postings, 4) + require.Len(b, res[5].Postings, 1) + require.Len(b, res[6].Postings, 1) + } + + execRes = res + require.Len(b, execRes, 7) + require.Len(b, execRes[0].Postings, 1) + require.Len(b, execRes[1].Postings, 1) + require.Len(b, execRes[2].Postings, 2) + require.Len(b, execRes[3].Postings, 4) + require.Len(b, execRes[4].Postings, 4) + require.Len(b, execRes[5].Postings, 1) + require.Len(b, execRes[6].Postings, 1) + }) +} + +func BenchmarkLedger_PostTransactionsBatch2(b *testing.B) { + runOnLedger(func(l *ledger.Ledger) { + defer func(l *ledger.Ledger, ctx context.Context) { + require.NoError(b, l.Close(ctx)) + }(l, context.Background()) + + b.ResetTimer() + + res := []core.ExpandedTransaction{} + + for n := 0; n < b.N; n++ { + b.StopTimer() + txsData := newTxsData(n) + b.StartTimer() + var err error + for _, txData := range txsData { + _, err := txData.Postings.Validate() + require.NoError(b, err) + } + script := core.TxsToScriptsData(txsData...) + res, err = l.Execute(context.Background(), true, true, script...) + require.NoError(b, err) + require.Len(b, res, 7) + require.Len(b, res[0].Postings, 1) + require.Len(b, res[1].Postings, 1) + require.Len(b, res[2].Postings, 2) + require.Len(b, res[3].Postings, 4) + require.Len(b, res[4].Postings, 4) + require.Len(b, res[5].Postings, 1) + require.Len(b, res[6].Postings, 1) + } + + execRes = res + require.Len(b, execRes, 7) + require.Len(b, execRes[0].Postings, 1) + require.Len(b, execRes[1].Postings, 1) + require.Len(b, execRes[2].Postings, 2) + require.Len(b, execRes[3].Postings, 4) + require.Len(b, execRes[4].Postings, 4) + require.Len(b, execRes[5].Postings, 1) + require.Len(b, execRes[6].Postings, 1) + }) +} diff --git a/pkg/ledger/ledger.go b/pkg/ledger/ledger.go index 7a700e5c4..283d73fda 100644 --- a/pkg/ledger/ledger.go +++ b/pkg/ledger/ledger.go @@ -5,9 +5,9 @@ import ( "fmt" "time" + "github.com/dgraph-io/ristretto" "github.com/formancehq/go-libs/api" "github.com/numary/ledger/pkg/core" - "github.com/numary/ledger/pkg/storage" "github.com/pkg/errors" ) @@ -30,6 +30,7 @@ type Ledger struct { store Store monitor Monitor allowPastTimestamps bool + cache *ristretto.Cache } type LedgerOption = func(*Ledger) @@ -38,14 +39,20 @@ func WithPastTimestamps(l *Ledger) { l.allowPastTimestamps = true } -func NewLedger( - store Store, - monitor Monitor, - options ...LedgerOption, -) (*Ledger, error) { +func NewLedger(store Store, monitor Monitor, options ...LedgerOption) (*Ledger, error) { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: 1e7, // number of keys to track frequency of (10M). + MaxCost: 1 << 30, // maximum cost of cache (1GB). + BufferItems: 64, // number of keys per Get buffer. + }) + if err != nil { + return nil, errors.Wrap(err, "creating ledger cache") + } + l := &Ledger{ store: store, monitor: monitor, + cache: cache, } for _, option := range options { @@ -59,6 +66,7 @@ func (l *Ledger) Close(ctx context.Context) error { if err := l.store.Close(ctx); err != nil { return errors.Wrap(err, "closing store") } + l.cache.Close() return nil } @@ -66,51 +74,6 @@ func (l *Ledger) GetLedgerStore() Store { return l.store } -type CommitResult struct { - PreCommitVolumes core.AccountsAssetsVolumes - PostCommitVolumes core.AccountsAssetsVolumes - GeneratedTransactions []core.ExpandedTransaction -} - -func (l *Ledger) Commit(ctx context.Context, preview bool, addOps *core.AdditionalOperations, txsData ...core.TransactionData) ([]core.ExpandedTransaction, error) { - commitRes, err := l.ProcessTxsData(ctx, txsData...) - if err != nil { - return []core.ExpandedTransaction{}, err - } - - if preview { - return commitRes.GeneratedTransactions, nil - } - - if err := l.store.Commit(ctx, commitRes.GeneratedTransactions...); err != nil { - switch { - case storage.IsErrorCode(err, storage.ConstraintFailed): - return []core.ExpandedTransaction{}, NewConflictError() - default: - return []core.ExpandedTransaction{}, err - } - } - - if addOps != nil && addOps.SetAccountMeta != nil { - for addr, m := range addOps.SetAccountMeta { - if err := l.store.UpdateAccountMetadata(ctx, - addr, m, time.Now().Round(time.Second).UTC()); err != nil { - return []core.ExpandedTransaction{}, err - } - } - } - - l.monitor.CommittedTransactions(ctx, l.store.Name(), commitRes) - if addOps != nil && addOps.SetAccountMeta != nil { - for addr, m := range addOps.SetAccountMeta { - l.monitor.SavedMetadata(ctx, - l.store.Name(), core.MetaTargetTypeAccount, addr, m) - } - } - - return commitRes.GeneratedTransactions, nil -} - func (l *Ledger) GetTransactions(ctx context.Context, q TransactionsQuery) (api.Cursor[core.ExpandedTransaction], error) { return l.store.GetTransactions(ctx, q) } @@ -147,39 +110,46 @@ func (l *Ledger) LoadMapping(ctx context.Context) (*core.Mapping, error) { func (l *Ledger) RevertTransaction(ctx context.Context, id uint64) (*core.ExpandedTransaction, error) { revertedTx, err := l.store.GetTransaction(ctx, id) if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf("getting transaction %d", id)) } if revertedTx == nil { - return nil, NewNotFoundError("transaction not found") + return nil, NewNotFoundError(fmt.Sprintf("transaction %d not found", id)) } if revertedTx.IsReverted() { - return nil, NewValidationError("transaction already reverted") + return nil, NewValidationError(fmt.Sprintf("transaction %d already reverted", id)) } rt := revertedTx.Reverse() rt.Metadata = core.Metadata{} rt.Metadata.MarkReverts(revertedTx.ID) - result, err := l.ProcessTxsData(ctx, rt) + txData := core.TransactionData{ + Postings: rt.Postings, + Timestamp: rt.Timestamp, + Reference: rt.Reference, + Metadata: rt.Metadata, + } + res, err := l.Execute(ctx, false, false, + core.TxsToScriptsData(txData)...) if err != nil { - return nil, err + return nil, errors.Wrap(err, fmt.Sprintf( + "executing revert script for transaction %d", id)) } - revert := result.GeneratedTransactions[0] + revertTx := res[0] - if err := l.store.Commit(ctx, revert); err != nil { - return nil, err - } - if err := l.store.UpdateTransactionMetadata(ctx, revertedTx.ID, core.RevertedMetadata(revert.ID), revert.Timestamp); err != nil { - return nil, err + if err := l.store.UpdateTransactionMetadata(ctx, + revertedTx.ID, core.RevertedMetadata(revertTx.ID), revertTx.Timestamp); err != nil { + return nil, errors.Wrap(err, fmt.Sprintf( + "updating transaction %d metadata while reverting", id)) } if revertedTx.Metadata == nil { revertedTx.Metadata = core.Metadata{} } - revertedTx.Metadata.Merge(core.RevertedMetadata(revert.ID)) + revertedTx.Metadata.Merge(core.RevertedMetadata(revertTx.ID)) - l.monitor.RevertedTransaction(ctx, l.store.Name(), revertedTx, &result.GeneratedTransactions[0]) - return &result.GeneratedTransactions[0], nil + l.monitor.RevertedTransaction(ctx, l.store.Name(), revertedTx, &revertTx) + return &revertTx, nil } func (l *Ledger) CountAccounts(ctx context.Context, a AccountsQuery) (uint64, error) { @@ -191,21 +161,7 @@ func (l *Ledger) GetAccounts(ctx context.Context, a AccountsQuery) (api.Cursor[c } func (l *Ledger) GetAccount(ctx context.Context, address string) (*core.AccountWithVolumes, error) { - account, err := l.store.GetAccount(ctx, address) - if err != nil { - return nil, err - } - - volumes, err := l.store.GetAssetsVolumes(ctx, address) - if err != nil { - return nil, err - } - - return &core.AccountWithVolumes{ - Account: *account, - Volumes: volumes, - Balances: volumes.Balances(), - }, nil + return l.store.GetAccountWithVolumes(ctx, address) } func (l *Ledger) GetBalances(ctx context.Context, q BalancesQuery) (api.Cursor[core.AccountsBalances], error) { diff --git a/pkg/ledger/ledger_test.go b/pkg/ledger/ledger_test.go index fe1523c36..25b210bc1 100644 --- a/pkg/ledger/ledger_test.go +++ b/pkg/ledger/ledger_test.go @@ -129,7 +129,8 @@ func TestTransaction(t *testing.T) { continue } - _, err := l.Commit(context.Background(), false, nil, batch...) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(batch...)...) require.NoError(t, err) batch = []core.TransactionData{} @@ -187,7 +188,8 @@ func TestTransactionBatchWithConflictingReference(t *testing.T) { }, } - _, err := l.Commit(context.Background(), false, nil, batch...) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(batch...)...) assert.Error(t, err) assert.IsType(t, new(ledger.ConflictError), err) }) @@ -205,16 +207,116 @@ func TestTransactionBatchWithConflictingReference(t *testing.T) { }, Reference: "ref1", } - _, err := l.Commit(context.Background(), false, nil, txData) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(txData)...) require.NoError(t, err) - _, err = l.Commit(context.Background(), false, nil, txData) + _, err = l.Execute(context.Background(), + true, false, core.TxsToScriptsData(txData)...) assert.Error(t, err) assert.IsType(t, new(ledger.ConflictError), err) }) }) } +func TestTransactionBatchTimestamps(t *testing.T) { + runOnLedger(func(l *ledger.Ledger) { + timestamp1 := time.Now().UTC().Add(-10 * time.Second) + timestamp2 := time.Now().UTC().Add(-9 * time.Second) + timestamp3 := time.Now().UTC().Add(-8 * time.Second) + timestamp4 := time.Now().UTC().Add(-7 * time.Second) + t.Run("descending order should fail", func(t *testing.T) { + batch := []core.TransactionData{ + { + Postings: []core.Posting{ + { + Source: core.WORLD, + Destination: "player", + Asset: "GEM", + Amount: core.NewMonetaryInt(1), + }, + }, + Timestamp: timestamp2, + }, + { + Postings: []core.Posting{ + { + Source: core.WORLD, + Destination: "player", + Asset: "GEM", + Amount: core.NewMonetaryInt(1), + }, + }, + Timestamp: timestamp1, + }, + } + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(batch...)...) + require.True(t, ledger.IsValidationError(err), err) + require.ErrorContains(t, err, "cannot pass a timestamp prior to the last transaction") + }) + t.Run("ascending order should succeed", func(t *testing.T) { + batch := []core.TransactionData{ + { + Postings: []core.Posting{ + { + Source: core.WORLD, + Destination: "player", + Asset: "GEM", + Amount: core.NewMonetaryInt(1), + }, + }, + Timestamp: timestamp2, + }, + { + Postings: []core.Posting{ + { + Source: core.WORLD, + Destination: "player", + Asset: "GEM", + Amount: core.NewMonetaryInt(1), + }, + }, + Timestamp: timestamp3, + }, + } + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(batch...)...) + assert.NoError(t, err) + }) + t.Run("ascending order but before last inserted should fail", func(t *testing.T) { + batch := []core.TransactionData{ + { + Postings: []core.Posting{ + { + Source: core.WORLD, + Destination: "player", + Asset: "GEM", + Amount: core.NewMonetaryInt(1), + }, + }, + Timestamp: timestamp1, + }, + { + Postings: []core.Posting{ + { + Source: core.WORLD, + Destination: "player", + Asset: "GEM", + Amount: core.NewMonetaryInt(1), + }, + }, + Timestamp: timestamp4, + }, + } + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(batch...)...) + require.True(t, ledger.IsValidationError(err)) + require.ErrorContains(t, err, "cannot pass a timestamp prior to the last transaction") + }) + }) +} + func TestTransactionExpectedVolumes(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { txsData := []core.TransactionData{ @@ -260,9 +362,12 @@ func TestTransactionExpectedVolumes(t *testing.T) { }, } - commitRes, err := l.ProcessTxsData(context.Background(), txsData...) + res, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(txsData...)...) assert.NoError(t, err) + postCommitVolumes := core.AggregatePostCommitVolumes(res...) + assert.Equal(t, 4, len(res)) assert.EqualValues(t, core.AccountsAssetsVolumes{ "world": core.AssetsVolumes{ "USD": { @@ -290,7 +395,7 @@ func TestTransactionExpectedVolumes(t *testing.T) { Output: core.NewMonetaryInt(0), }, }, - }, commitRes.PostCommitVolumes) + }, postCommitVolumes) }) } @@ -308,10 +413,12 @@ func TestReference(t *testing.T) { }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) require.NoError(t, err) - _, err = l.Commit(context.Background(), false, nil, tx) + _, err = l.Execute(context.Background(), + true, false, core.TxsToScriptsData(tx)...) assert.Error(t, err) }) } @@ -342,16 +449,17 @@ func TestAccountMetadata(t *testing.T) { { // We have to create at least one transaction to retrieve an account from GetAccounts store method - _, err := l.Commit(context.Background(), false, nil, core.TransactionData{ - Postings: core.Postings{ - { - Source: "world", - Amount: core.NewMonetaryInt(100), - Asset: "USD", - Destination: "users:001", + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(core.TransactionData{ + Postings: core.Postings{ + { + Source: "world", + Amount: core.NewMonetaryInt(100), + Asset: "USD", + Destination: "users:001", + }, }, - }, - }) + })...) assert.NoError(t, err) acc, err := l.GetAccount(context.Background(), "users:001") @@ -368,16 +476,17 @@ func TestAccountMetadata(t *testing.T) { func TestTransactionMetadata(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - _, err := l.Commit(context.Background(), false, nil, core.TransactionData{ - Postings: []core.Posting{ - { - Source: "world", - Destination: "payments:001", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", + _, err := l.Execute(context.Background(), true, false, + core.TxsToScriptsData(core.TransactionData{ + Postings: []core.Posting{ + { + Source: "world", + Destination: "payments:001", + Amount: core.NewMonetaryInt(100), + Asset: "COIN", + }, }, - }, - }) + })...) require.NoError(t, err) tx, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -406,19 +515,20 @@ func TestTransactionMetadata(t *testing.T) { func TestSaveTransactionMetadata(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - _, err := l.Commit(context.Background(), false, nil, core.TransactionData{ - Postings: []core.Posting{ - { - Source: "world", - Destination: "payments:001", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", + _, err := l.Execute(context.Background(), true, false, + core.TxsToScriptsData(core.TransactionData{ + Postings: []core.Posting{ + { + Source: "world", + Destination: "payments:001", + Amount: core.NewMonetaryInt(100), + Asset: "COIN", + }, }, - }, - Metadata: core.Metadata{ - "a metadata": "a value", - }, - }) + Metadata: core.Metadata{ + "a metadata": "a value", + }, + })...) require.NoError(t, err) tx, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -434,17 +544,18 @@ func TestSaveTransactionMetadata(t *testing.T) { func TestGetTransaction(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - _, err := l.Commit(context.Background(), false, nil, core.TransactionData{ - Reference: "bar", - Postings: []core.Posting{ - { - Source: "world", - Destination: "payments:001", - Amount: core.NewMonetaryInt(100), - Asset: "COIN", + _, err := l.Execute(context.Background(), true, false, + core.TxsToScriptsData(core.TransactionData{ + Reference: "bar", + Postings: []core.Posting{ + { + Source: "world", + Destination: "payments:001", + Amount: core.NewMonetaryInt(100), + Asset: "COIN", + }, }, - }, - }) + })...) require.NoError(t, err) last, err := l.GetLedgerStore().GetLastTransaction(context.Background()) @@ -470,7 +581,8 @@ func TestGetTransactions(t *testing.T) { }, } - _, err := l.Commit(context.Background(), false, nil, tx) + _, err := l.Execute(context.Background(), true, false, + core.TxsToScriptsData(tx)...) require.NoError(t, err) res, err := l.GetTransactions(context.Background(), *ledger.NewTransactionsQuery()) @@ -484,17 +596,18 @@ func TestRevertTransaction(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { revertAmt := core.NewMonetaryInt(100) - res, err := l.Commit(context.Background(), false, nil, core.TransactionData{ - Reference: "foo", - Postings: []core.Posting{ - { - Source: "world", - Destination: "payments:001", - Amount: revertAmt, - Asset: "COIN", + res, err := l.Execute(context.Background(), true, false, + core.TxsToScriptsData(core.TransactionData{ + Reference: "foo", + Postings: []core.Posting{ + { + Source: "world", + Destination: "payments:001", + Amount: revertAmt, + Asset: "COIN", + }, }, - }, - }) + })...) require.NoError(t, err) world, err := l.GetAccount(context.Background(), "world") @@ -539,20 +652,22 @@ func TestRevertTransaction(t *testing.T) { func TestVeryBigTransaction(t *testing.T) { runOnLedger(func(l *ledger.Ledger) { - amount, err := core.ParseMonetaryInt("199999999999999999992919191919192929292939847477171818284637291884661818183647392936472918836161728274766266161728493736383838") + amount, err := core.ParseMonetaryInt( + "199999999999999999992919191919192929292939847477171818284637291884661818183647392936472918836161728274766266161728493736383838") require.NoError(t, err) - txs, err := l.Commit(context.Background(), false, nil, core.TransactionData{ - Postings: []core.Posting{{ - Source: "world", - Destination: "bank", - Asset: "ETH/18", - Amount: amount, - }}, - }) + res, err := l.Execute(context.Background(), true, false, + core.TxsToScriptsData(core.TransactionData{ + Postings: []core.Posting{{ + Source: "world", + Destination: "bank", + Asset: "ETH/18", + Amount: amount, + }}, + })...) require.NoError(t, err) - txFromDB, err := l.GetTransaction(context.Background(), txs[0].ID) + txFromDB, err := l.GetTransaction(context.Background(), res[0].ID) require.NoError(t, err) require.Equal(t, txFromDB.Postings[0].Amount, amount) }) @@ -574,7 +689,8 @@ func BenchmarkTransaction1(b *testing.B) { }, }) - _, err := l.Commit(context.Background(), false, nil, txs...) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(txs...)...) require.NoError(b, err) } }) @@ -599,7 +715,8 @@ func BenchmarkTransaction_20_1k(b *testing.B) { }) } - _, err := l.Commit(context.Background(), false, nil, txs...) + _, err := l.Execute(context.Background(), + true, false, core.TxsToScriptsData(txs...)...) require.NoError(b, err) } } diff --git a/pkg/ledger/monitor.go b/pkg/ledger/monitor.go index 1eabbce41..be3c08099 100644 --- a/pkg/ledger/monitor.go +++ b/pkg/ledger/monitor.go @@ -7,7 +7,7 @@ import ( ) type Monitor interface { - CommittedTransactions(ctx context.Context, ledger string, res CommitResult) + CommittedTransactions(ctx context.Context, ledger string, res ...core.ExpandedTransaction) SavedMetadata(ctx context.Context, ledger, targetType, id string, metadata core.Metadata) UpdatedMapping(ctx context.Context, ledger string, mapping core.Mapping) RevertedTransaction(ctx context.Context, ledger string, reverted, revert *core.ExpandedTransaction) @@ -15,7 +15,8 @@ type Monitor interface { type noOpMonitor struct{} -func (n noOpMonitor) CommittedTransactions(ctx context.Context, s string, result CommitResult) {} +func (n noOpMonitor) CommittedTransactions(ctx context.Context, s string, res ...core.ExpandedTransaction) { +} func (n noOpMonitor) SavedMetadata(ctx context.Context, ledger string, targetType string, id string, metadata core.Metadata) { } func (n noOpMonitor) UpdatedMapping(ctx context.Context, s string, mapping core.Mapping) {} diff --git a/pkg/ledger/process.go b/pkg/ledger/process.go deleted file mode 100644 index f8b55f783..000000000 --- a/pkg/ledger/process.go +++ /dev/null @@ -1,162 +0,0 @@ -package ledger - -import ( - "context" - "fmt" - "time" - - "github.com/numary/ledger/pkg/core" - "github.com/pkg/errors" -) - -func (l *Ledger) ProcessTxsData(ctx context.Context, txsData ...core.TransactionData) (CommitResult, error) { - var nextTxId uint64 - lastTx, err := l.store.GetLastTransaction(ctx) - if err != nil { - return CommitResult{}, err - } - if lastTx != nil { - nextTxId = lastTx.ID + 1 - } - - volumeAggregator := NewVolumeAggregator(l.store) - - generatedTxs := make([]core.ExpandedTransaction, 0) - - usedReferences := make(map[string]struct{}) - for i, t := range txsData { - if t.Timestamp.IsZero() { - // Until v1.5.0, dates was stored as string using rfc3339 format - // So round the date to the second to keep the same behaviour - t.Timestamp = time.Now().UTC().Truncate(time.Second) - } - - if t.Reference != "" { - if _, ok := usedReferences[t.Reference]; ok { - return CommitResult{}, NewConflictError() - } - cursor, err := l.store.GetTransactions(ctx, *NewTransactionsQuery().WithReferenceFilter(t.Reference)) - if err != nil { - return CommitResult{}, err - } - if len(cursor.Data) > 0 { - return CommitResult{}, NewConflictError() - } - usedReferences[t.Reference] = struct{}{} - } - - txVolumeAggregator := volumeAggregator.NextTx() - - for _, p := range t.Postings { - if err := txVolumeAggregator.Transfer(ctx, p.Source, p.Destination, p.Asset, p.Amount); err != nil { - return CommitResult{}, NewTransactionCommitError(i, err) - } - } - - tx := core.ExpandedTransaction{ - Transaction: core.Transaction{ - TransactionData: t, - ID: nextTxId, - }, - PostCommitVolumes: txVolumeAggregator.PostCommitVolumes(), - PreCommitVolumes: txVolumeAggregator.PreCommitVolumes(), - } - lastTx = &tx - generatedTxs = append(generatedTxs, tx) - nextTxId++ - } - - return CommitResult{ - PreCommitVolumes: volumeAggregator.AggregatedPreCommitVolumes(), - PostCommitVolumes: volumeAggregator.AggregatedPostCommitVolumes(), - GeneratedTransactions: generatedTxs, - }, nil -} - -func (l *Ledger) ValidatePostings(ctx context.Context, txsData ...core.TransactionData) (int, error) { - lastTx, err := l.store.GetLastTransaction(ctx) - if err != nil { - return 0, errors.Wrap(err, "GetLastTransaction") - } - - mapping, err := l.store.LoadMapping(ctx) - if err != nil { - return 0, errors.Wrap(err, "loading mapping") - } - contracts := make([]core.Contract, 0) - if mapping != nil { - contracts = append(contracts, mapping.Contracts...) - } - contracts = append(contracts, DefaultContracts...) - - volumeAggregator := NewVolumeAggregator(l.store) - - for i, t := range txsData { - past := false - if !t.Timestamp.IsZero() { - if lastTx != nil && t.Timestamp.Before(lastTx.Timestamp) { - past = true - } - } - if len(t.Postings) == 0 { - return i, NewValidationError("transaction has no postings") - } - if past && !l.allowPastTimestamps { - return i, NewValidationError("cannot pass a date prior to the last transaction") - } - - txVolumeAggregator := volumeAggregator.NextTx() - - for _, p := range t.Postings { - if p.Amount.Ltz() { - return i, NewValidationError("negative amount") - } - if !core.ValidateAddress(p.Source) { - return i, NewValidationError("invalid source address") - } - if !core.ValidateAddress(p.Destination) { - return i, NewValidationError("invalid destination address") - } - if !core.AssetIsValid(p.Asset) { - return i, NewValidationError("invalid asset") - } - if err := txVolumeAggregator.Transfer(ctx, p.Source, p.Destination, p.Asset, p.Amount); err != nil { - return i, errors.Wrap(err, "transferring volumes") - } - } - - accounts := make(map[string]*core.Account, 0) - for addr, volumes := range txVolumeAggregator.PostCommitVolumes() { - for asset, volume := range volumes { - if addr == "world" { - continue - } - - expectedBalance := volume.Balance() - for _, contract := range contracts { - if contract.Match(addr) { - if _, ok := accounts[addr]; !ok { - account, err := l.store.GetAccount(ctx, addr) - if err != nil { - return i, errors.Wrap(err, fmt.Sprintf("GetAccount '%s'", addr)) - } - accounts[addr] = account - } - if ok := contract.Expr.Eval(core.EvalContext{ - Variables: map[string]interface{}{ - "balance": expectedBalance, - }, - Metadata: accounts[addr].Metadata, - Asset: asset, - }); !ok { - return i, NewInsufficientFundError(asset) - } - break - } - } - } - } - } - - return 0, nil -} diff --git a/pkg/ledger/process_test.go b/pkg/ledger/process_test.go index 332309432..9a9e477f2 100644 --- a/pkg/ledger/process_test.go +++ b/pkg/ledger/process_test.go @@ -131,14 +131,15 @@ func TestLedger_processTx(t *testing.T) { { Postings: postings, Timestamp: time.Now().UTC().Round(time.Second), + Metadata: core.Metadata{}, }, } - res, err := l.ProcessTxsData(context.Background(), txsData...) + res, err := l.Execute(context.Background(), true, true, + core.TxsToScriptsData(txsData...)...) assert.NoError(t, err) - assert.Equal(t, expectedPreCommitVol, res.PreCommitVolumes) - assert.Equal(t, expectedPostCommitVol, res.PostCommitVolumes) + assert.Equal(t, len(txsData), len(res)) expectedTxs := []core.ExpandedTransaction{{ Transaction: core.Transaction{ @@ -148,7 +149,12 @@ func TestLedger_processTx(t *testing.T) { PreCommitVolumes: expectedPreCommitVol, PostCommitVolumes: expectedPostCommitVol, }} - assert.Equal(t, expectedTxs, res.GeneratedTransactions) + assert.Equal(t, expectedTxs, res) + + preCommitVolumes := core.AggregatePreCommitVolumes(res...) + postCommitVolumes := core.AggregatePostCommitVolumes(res...) + assert.Equal(t, expectedPreCommitVol, preCommitVolumes) + assert.Equal(t, expectedPostCommitVol, postCommitVolumes) }) t.Run("multi transactions single postings", func(t *testing.T) { @@ -180,18 +186,18 @@ func TestLedger_processTx(t *testing.T) { }, } - res, err := l.ProcessTxsData(context.Background(), txsData...) - assert.NoError(t, err) - - assert.Equal(t, expectedPreCommitVol, res.PreCommitVolumes) - assert.Equal(t, expectedPostCommitVol, res.PostCommitVolumes) + res, err := l.Execute(context.Background(), true, true, + core.TxsToScriptsData(txsData...)...) + require.NoError(t, err) + require.Equal(t, len(txsData), len(res)) expectedTxs := []core.ExpandedTransaction{ { Transaction: core.Transaction{ TransactionData: core.TransactionData{ - Timestamp: now, + Timestamp: now.UTC(), Postings: core.Postings{postings[0]}, + Metadata: core.Metadata{}, }, ID: 0, }, @@ -206,7 +212,8 @@ func TestLedger_processTx(t *testing.T) { Transaction: core.Transaction{ TransactionData: core.TransactionData{ Postings: core.Postings{postings[1]}, - Timestamp: now.Add(time.Second), + Timestamp: now.UTC().Add(time.Second), + Metadata: core.Metadata{}, }, ID: 1, }, @@ -222,8 +229,9 @@ func TestLedger_processTx(t *testing.T) { { Transaction: core.Transaction{ TransactionData: core.TransactionData{ - Timestamp: now.Add(2 * time.Second), + Timestamp: now.UTC().Add(2 * time.Second), Postings: core.Postings{postings[2]}, + Metadata: core.Metadata{}, }, ID: 2, }, @@ -239,8 +247,9 @@ func TestLedger_processTx(t *testing.T) { { Transaction: core.Transaction{ TransactionData: core.TransactionData{ - Timestamp: now.Add(3 * time.Second), + Timestamp: now.UTC().Add(3 * time.Second), Postings: core.Postings{postings[3]}, + Metadata: core.Metadata{}, }, ID: 3, }, @@ -256,8 +265,9 @@ func TestLedger_processTx(t *testing.T) { { Transaction: core.Transaction{ TransactionData: core.TransactionData{ - Timestamp: now.Add(4 * time.Second), + Timestamp: now.UTC().Add(4 * time.Second), Postings: core.Postings{postings[4]}, + Metadata: core.Metadata{}, }, ID: 4, }, @@ -273,8 +283,9 @@ func TestLedger_processTx(t *testing.T) { { Transaction: core.Transaction{ TransactionData: core.TransactionData{ - Timestamp: now.Add(5 * time.Second), + Timestamp: now.UTC().Add(5 * time.Second), Postings: core.Postings{postings[5]}, + Metadata: core.Metadata{}, }, ID: 5, }, @@ -288,19 +299,19 @@ func TestLedger_processTx(t *testing.T) { }, }, } + assert.Equal(t, expectedTxs, res) - assert.Equal(t, expectedTxs, res.GeneratedTransactions) + preCommitVolumes := core.AggregatePreCommitVolumes(res...) + postCommitVolumes := core.AggregatePostCommitVolumes(res...) + assert.Equal(t, expectedPreCommitVol, preCommitVolumes) + assert.Equal(t, expectedPostCommitVol, postCommitVolumes) }) }) - t.Run("no transactions", func(t *testing.T) { - result, err := l.ProcessTxsData(context.Background()) - assert.NoError(t, err) - assert.Equal(t, ledger.CommitResult{ - PreCommitVolumes: core.AccountsAssetsVolumes{}, - PostCommitVolumes: core.AccountsAssetsVolumes{}, - GeneratedTransactions: []core.ExpandedTransaction{}, - }, result) + t.Run("no script", func(t *testing.T) { + _, err := l.Execute(context.Background(), true, true, core.ScriptData{}) + assert.Error(t, err) + assert.ErrorContains(t, err, "no script to execute") }) }) @@ -310,25 +321,24 @@ func TestLedger_processTx(t *testing.T) { require.NoError(t, l.GetLedgerStore().Commit(context.Background(), core.ExpandedTransaction{ Transaction: core.Transaction{ TransactionData: core.TransactionData{ - Timestamp: now, + Timestamp: now.UTC(), Postings: []core.Posting{{}}, }, ID: 0, }, })) - _, err := l.ProcessTxsData(context.Background(), core.TransactionData{ - Postings: []core.Posting{{ - Source: "world", - Destination: "bank", - Amount: core.NewMonetaryInt(100), - Asset: "USD", - }}, - Timestamp: now.Add(-time.Second), - }) - + _, err := l.Execute(context.Background(), true, true, + core.TxsToScriptsData(core.TransactionData{ + Postings: []core.Posting{{ + Source: "world", + Destination: "bank", + Amount: core.NewMonetaryInt(100), + Asset: "USD", + }}, + Timestamp: now.UTC().Add(-time.Second), + })...) assert.NoError(t, err) - }) }, ledger.WithPastTimestamps) } diff --git a/pkg/ledger/storage.go b/pkg/ledger/storage.go index 36d648e2a..bee74fb8c 100644 --- a/pkg/ledger/storage.go +++ b/pkg/ledger/storage.go @@ -15,6 +15,7 @@ type Store interface { GetTransaction(ctx context.Context, txid uint64) (*core.ExpandedTransaction, error) GetAccount(ctx context.Context, accountAddress string) (*core.Account, error) GetAssetsVolumes(ctx context.Context, accountAddress string) (core.AssetsVolumes, error) + GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) GetVolumes(ctx context.Context, accountAddress, asset string) (core.Volumes, error) CountAccounts(context.Context, AccountsQuery) (uint64, error) GetAccounts(context.Context, AccountsQuery) (api.Cursor[core.Account], error) diff --git a/pkg/ledger/volume_agg.go b/pkg/ledger/volume_agg.go index c6eba78ac..58e430e86 100644 --- a/pkg/ledger/volume_agg.go +++ b/pkg/ledger/volume_agg.go @@ -6,90 +6,80 @@ import ( "github.com/numary/ledger/pkg/core" ) -type transactionVolumeAggregator struct { - agg *volumeAggregator - postVolumes core.AccountsAssetsVolumes - preVolumes core.AccountsAssetsVolumes - previousTx *transactionVolumeAggregator -} +type TxVolumeAggregator struct { + agg *VolumeAggregator + previousTx *TxVolumeAggregator -func (tva *transactionVolumeAggregator) PostCommitVolumes() core.AccountsAssetsVolumes { - return tva.postVolumes + PreCommitVolumes core.AccountsAssetsVolumes + PostCommitVolumes core.AccountsAssetsVolumes } -func (tva *transactionVolumeAggregator) PreCommitVolumes() core.AccountsAssetsVolumes { - return tva.preVolumes +func (tva *TxVolumeAggregator) FindInPreviousTxs(addr, asset string) *core.Volumes { + current := tva.previousTx + for current != nil { + if v, ok := current.PostCommitVolumes[addr][asset]; ok { + return &v + } + current = current.previousTx + } + return nil } -func (tva *transactionVolumeAggregator) Transfer( +func (tva *TxVolumeAggregator) Transfer( ctx context.Context, from, to, asset string, amount *core.MonetaryInt, + accs map[string]*core.AccountWithVolumes, ) error { - if tva.preVolumes == nil { - tva.preVolumes = core.AccountsAssetsVolumes{} - } - if tva.postVolumes == nil { - tva.postVolumes = core.AccountsAssetsVolumes{} - } for _, addr := range []string{from, to} { - if _, ok := tva.preVolumes[addr][asset]; !ok { - current := tva.previousTx - found := false - if _, ok := tva.preVolumes[addr]; !ok { - tva.preVolumes[addr] = core.AssetsVolumes{} - } - for current != nil { - if v, ok := current.postVolumes[addr][asset]; ok { - tva.preVolumes[addr][asset] = core.Volumes{ - Input: v.Input.OrZero(), - Output: v.Output.OrZero(), - } - found = true - break - } - current = current.previousTx - } - if !found { - v, err := tva.agg.store.GetVolumes(ctx, addr, asset) - if err != nil { - return err + if !tva.PreCommitVolumes.HasAccountAndAsset(addr, asset) { + previousVolumes := tva.FindInPreviousTxs(addr, asset) + if previousVolumes != nil { + tva.PreCommitVolumes.SetVolumes(addr, asset, *previousVolumes) + } else { + var vol core.Volumes + var ok1, ok2 bool + _, ok1 = accs[addr] + if ok1 { + _, ok2 = accs[addr].Volumes[asset] } - tva.preVolumes[addr][asset] = core.Volumes{ - Input: v.Input.OrZero(), - Output: v.Output.OrZero(), + if ok1 && ok2 { + vol = accs[addr].Volumes[asset] + } else { + acc, err := tva.agg.l.GetAccount(ctx, addr) + if err != nil { + return err + } + if accs[addr] == nil { + accs[addr] = acc + } + accs[addr].Volumes[asset] = acc.Volumes[asset] + vol = accs[addr].Volumes[asset] } + tva.PreCommitVolumes.SetVolumes(addr, asset, vol) } } - if _, ok := tva.postVolumes[addr][asset]; !ok { - if _, ok := tva.postVolumes[addr]; !ok { - tva.postVolumes[addr] = core.AssetsVolumes{} - } - tva.postVolumes[addr][asset] = tva.preVolumes[addr][asset] + if !tva.PostCommitVolumes.HasAccountAndAsset(addr, asset) { + tva.PostCommitVolumes.SetVolumes(addr, asset, tva.PreCommitVolumes.GetVolumes(addr, asset)) } } - v := tva.postVolumes[from][asset] - v.Output = v.Output.Add(amount) - tva.postVolumes[from][asset] = v - - v = tva.postVolumes[to][asset] - v.Input = v.Input.Add(amount) - tva.postVolumes[to][asset] = v + tva.PostCommitVolumes.AddOutput(from, asset, amount) + tva.PostCommitVolumes.AddInput(to, asset, amount) return nil } -type volumeAggregator struct { - store Store - txs []*transactionVolumeAggregator +type VolumeAggregator struct { + l *Ledger + txs []*TxVolumeAggregator } -func (agg *volumeAggregator) NextTx() *transactionVolumeAggregator { - var previousTx *transactionVolumeAggregator +func (agg *VolumeAggregator) NextTx() *TxVolumeAggregator { + var previousTx *TxVolumeAggregator if len(agg.txs) > 0 { previousTx = agg.txs[len(agg.txs)-1] } - tva := &transactionVolumeAggregator{ + tva := &TxVolumeAggregator{ agg: agg, previousTx: previousTx, } @@ -97,46 +87,8 @@ func (agg *volumeAggregator) NextTx() *transactionVolumeAggregator { return tva } -func (agg *volumeAggregator) AggregatedPostCommitVolumes() core.AccountsAssetsVolumes { - ret := core.AccountsAssetsVolumes{} - for i := len(agg.txs) - 1; i >= 0; i-- { - tx := agg.txs[i] - postVolumes := tx.PostCommitVolumes() - for account, volumes := range postVolumes { - for asset, volume := range volumes { - if _, ok := ret[account]; !ok { - ret[account] = core.AssetsVolumes{} - } - if _, ok := ret[account][asset]; !ok { - ret[account][asset] = volume - } - } - } - } - return ret -} - -func (agg *volumeAggregator) AggregatedPreCommitVolumes() core.AccountsAssetsVolumes { - ret := core.AccountsAssetsVolumes{} - for i := 0; i < len(agg.txs); i++ { - tx := agg.txs[i] - preVolumes := tx.PreCommitVolumes() - for account, volumes := range preVolumes { - for asset, volume := range volumes { - if _, ok := ret[account]; !ok { - ret[account] = core.AssetsVolumes{} - } - if _, ok := ret[account][asset]; !ok { - ret[account][asset] = volume - } - } - } - } - return ret -} - -func NewVolumeAggregator(store Store) *volumeAggregator { - return &volumeAggregator{ - store: store, +func NewVolumeAggregator(l *Ledger) *VolumeAggregator { + return &VolumeAggregator{ + l: l, } } diff --git a/pkg/ledger/volume_agg_test.go b/pkg/ledger/volume_agg_test.go index 9122cbcf8..853114c0b 100644 --- a/pkg/ledger/volume_agg_test.go +++ b/pkg/ledger/volume_agg_test.go @@ -6,265 +6,194 @@ import ( "github.com/numary/ledger/pkg/core" "github.com/numary/ledger/pkg/ledger" - "github.com/numary/ledger/pkg/storage" - "github.com/pborman/uuid" "github.com/stretchr/testify/require" - "go.uber.org/fx" ) func TestVolumeAggregator(t *testing.T) { - withContainer(fx.Invoke(func(lc fx.Lifecycle, storageDriver storage.Driver[ledger.Store]) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - name := uuid.New() + runOnLedger(func(l *ledger.Ledger) { + defer func(l *ledger.Ledger, ctx context.Context) { + require.NoError(t, l.Close(ctx)) + }(l, context.Background()) - store, _, err := storageDriver.GetLedgerStore(context.Background(), name, true) - if err != nil { - return err - } - - _, err = store.Initialize(context.Background()) - if err != nil { - return err - } - - tx1 := core.ExpandedTransaction{ - Transaction: core.Transaction{ - ID: 0, - TransactionData: core.TransactionData{ - Postings: []core.Posting{ - { - Source: "bob", - Destination: "zozo", - Amount: core.NewMonetaryInt(100), - Asset: "USD", - }, - }, - }, - }, - PreCommitVolumes: map[string]core.AssetsVolumes{ - "bob": { - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - "zozo": { - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - }, - PostCommitVolumes: map[string]core.AssetsVolumes{ - "bob": { - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(100), - }, - }, - "zozo": { - "USD": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(0), - }, - }, - }, - } - - tx2 := core.ExpandedTransaction{ - Transaction: core.Transaction{ - ID: 1, - TransactionData: core.TransactionData{ - Postings: []core.Posting{ - { - Source: "zozo", - Destination: "alice", - Amount: core.NewMonetaryInt(100), - Asset: "USD", - }, - }, - }, - }, - PostCommitVolumes: map[string]core.AssetsVolumes{ - "alice": { - "USD": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(0), - }, - }, - "zozo": { - "USD": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(100), - }, - }, - }, - PreCommitVolumes: map[string]core.AssetsVolumes{ - "alice": { - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - "zozo": { - "USD": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(0), - }, - }, - }, - } - require.NoError(t, store.Commit(context.Background(), tx1, tx2)) - - volumeAggregator := ledger.NewVolumeAggregator(store) - firstTx := volumeAggregator.NextTx() - require.NoError(t, firstTx.Transfer(context.Background(), "bob", "alice", "USD", core.NewMonetaryInt(100))) - require.NoError(t, firstTx.Transfer(context.Background(), "bob", "zoro", "USD", core.NewMonetaryInt(50))) - - require.Equal(t, core.AccountsAssetsVolumes{ - "bob": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(250), - }, - }, - "alice": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(200), - Output: core.NewMonetaryInt(0), - }, - }, - "zoro": { - "USD": { - Input: core.NewMonetaryInt(50), - Output: core.NewMonetaryInt(0), - }, - }, - }, firstTx.PostCommitVolumes()) - require.Equal(t, core.AccountsAssetsVolumes{ - "bob": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(100), - }, - }, - "alice": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(0), - }, - }, - "zoro": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - }, firstTx.PreCommitVolumes()) + tx1 := core.ExpandedTransaction{ + Transaction: core.Transaction{ + ID: 0, + TransactionData: core.TransactionData{ + Postings: []core.Posting{ + { + Source: "bob", + Destination: "zozo", + Amount: core.NewMonetaryInt(100), + Asset: "USD", + }, + }, + }, + }, + PreCommitVolumes: map[string]core.AssetsVolumes{ + "bob": { + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), + }, + }, + "zozo": { + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), + }, + }, + }, + PostCommitVolumes: map[string]core.AssetsVolumes{ + "bob": { + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(100), + }, + }, + "zozo": { + "USD": { + Input: core.NewMonetaryInt(100), + Output: core.NewMonetaryInt(0), + }, + }, + }, + } - secondTx := volumeAggregator.NextTx() - require.NoError(t, secondTx.Transfer(context.Background(), "alice", "fred", "USD", core.NewMonetaryInt(50))) - require.NoError(t, secondTx.Transfer(context.Background(), "bob", "fred", "USD", core.NewMonetaryInt(25))) - require.Equal(t, core.AccountsAssetsVolumes{ - "bob": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(275), - }, - }, - "alice": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(200), - Output: core.NewMonetaryInt(50), - }, - }, - "fred": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(75), - Output: core.NewMonetaryInt(0), - }, - }, - }, secondTx.PostCommitVolumes()) - require.Equal(t, core.AccountsAssetsVolumes{ - "bob": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(250), - }, - }, - "alice": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(200), - Output: core.NewMonetaryInt(0), - }, - }, - "fred": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - }, secondTx.PreCommitVolumes()) + tx2 := core.ExpandedTransaction{ + Transaction: core.Transaction{ + ID: 1, + TransactionData: core.TransactionData{ + Postings: []core.Posting{ + { + Source: "zozo", + Destination: "alice", + Amount: core.NewMonetaryInt(100), + Asset: "USD", + }, + }, + }, + }, + PostCommitVolumes: map[string]core.AssetsVolumes{ + "alice": { + "USD": { + Input: core.NewMonetaryInt(100), + Output: core.NewMonetaryInt(0), + }, + }, + "zozo": { + "USD": { + Input: core.NewMonetaryInt(100), + Output: core.NewMonetaryInt(100), + }, + }, + }, + PreCommitVolumes: map[string]core.AssetsVolumes{ + "alice": { + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), + }, + }, + "zozo": { + "USD": { + Input: core.NewMonetaryInt(100), + Output: core.NewMonetaryInt(0), + }, + }, + }, + } + err := l.GetLedgerStore().Commit(context.Background(), tx1, tx2) + require.NoError(t, err) - aggregatedPostVolumes := volumeAggregator.AggregatedPostCommitVolumes() - require.Equal(t, core.AccountsAssetsVolumes{ - "bob": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(275), - }, - }, - "alice": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(200), - Output: core.NewMonetaryInt(50), - }, - }, - "fred": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(75), - Output: core.NewMonetaryInt(0), - }, - }, - "zoro": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(50), - Output: core.NewMonetaryInt(0), - }, - }, - }, aggregatedPostVolumes) + volumeAggregator := ledger.NewVolumeAggregator(l) + firstTx := volumeAggregator.NextTx() + accs := map[string]*core.AccountWithVolumes{} + require.NoError(t, firstTx.Transfer(context.Background(), "bob", "alice", "USD", core.NewMonetaryInt(100), accs)) + require.NoError(t, firstTx.Transfer(context.Background(), "bob", "zoro", "USD", core.NewMonetaryInt(50), accs)) - aggregatedPreVolumes := volumeAggregator.AggregatedPreCommitVolumes() - require.Equal(t, core.AccountsAssetsVolumes{ - "bob": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(100), - }, - }, - "alice": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(100), - Output: core.NewMonetaryInt(0), - }, - }, - "fred": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - "zoro": core.AssetsVolumes{ - "USD": { - Input: core.NewMonetaryInt(0), - Output: core.NewMonetaryInt(0), - }, - }, - }, aggregatedPreVolumes) + require.Equal(t, core.AccountsAssetsVolumes{ + "bob": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(250), + }, + }, + "alice": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(200), + Output: core.NewMonetaryInt(0), + }, + }, + "zoro": { + "USD": { + Input: core.NewMonetaryInt(50), + Output: core.NewMonetaryInt(0), + }, + }, + }, firstTx.PostCommitVolumes) + require.Equal(t, core.AccountsAssetsVolumes{ + "bob": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(100), + }, + }, + "alice": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(100), + Output: core.NewMonetaryInt(0), + }, + }, + "zoro": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), + }, + }, + }, firstTx.PreCommitVolumes) - return nil + secondTx := volumeAggregator.NextTx() + require.NoError(t, secondTx.Transfer(context.Background(), "alice", "fred", "USD", core.NewMonetaryInt(50), accs)) + require.NoError(t, secondTx.Transfer(context.Background(), "bob", "fred", "USD", core.NewMonetaryInt(25), accs)) + require.Equal(t, core.AccountsAssetsVolumes{ + "bob": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(275), + }, + }, + "alice": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(200), + Output: core.NewMonetaryInt(50), + }, + }, + "fred": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(75), + Output: core.NewMonetaryInt(0), + }, + }, + }, secondTx.PostCommitVolumes) + require.Equal(t, core.AccountsAssetsVolumes{ + "bob": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(250), + }, + }, + "alice": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(200), + Output: core.NewMonetaryInt(0), + }, + }, + "fred": core.AssetsVolumes{ + "USD": { + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), + }, }, - }) - })) + }, secondTx.PreCommitVolumes) + }) } diff --git a/pkg/storage/sqlstorage/aggregations.go b/pkg/storage/sqlstorage/aggregations.go index fa830e10b..ef72355fe 100644 --- a/pkg/storage/sqlstorage/aggregations.go +++ b/pkg/storage/sqlstorage/aggregations.go @@ -10,6 +10,81 @@ import ( "github.com/numary/ledger/pkg/ledger" ) +func (s *Store) GetAccountWithVolumes(ctx context.Context, account string) (*core.AccountWithVolumes, error) { + sb := sqlbuilder.NewSelectBuilder() + sb.Select("accounts.metadata", "volumes.asset", "volumes.input", "volumes.output") + sb.From(s.schema.Table("accounts")) + sb.JoinWithOption(sqlbuilder.LeftOuterJoin, + s.schema.Table("volumes"), + "accounts.address = volumes.account") + sb.Where(sb.E("accounts.address", account)) + + executor, err := s.executorProvider(ctx) + if err != nil { + return nil, err + } + + q, args := sb.BuildWithFlavor(s.schema.Flavor()) + rows, err := executor.QueryContext(ctx, q, args...) + if err != nil { + return nil, s.error(err) + } + defer rows.Close() + + acc := core.Account{ + Address: account, + Metadata: core.Metadata{}, + } + assetsVolumes := core.AssetsVolumes{} + + for rows.Next() { + var asset, inputStr, outputStr sql.NullString + if err := rows.Scan(&acc.Metadata, &asset, &inputStr, &outputStr); err != nil { + return nil, s.error(err) + } + + if asset.Valid { + assetsVolumes[asset.String] = core.Volumes{ + Input: core.NewMonetaryInt(0), + Output: core.NewMonetaryInt(0), + } + + if inputStr.Valid { + input, err := core.ParseMonetaryInt(inputStr.String) + if err != nil { + return nil, s.error(err) + } + assetsVolumes[asset.String] = core.Volumes{ + Input: input, + Output: assetsVolumes[asset.String].Output, + } + } + + if outputStr.Valid { + output, err := core.ParseMonetaryInt(outputStr.String) + if err != nil { + return nil, s.error(err) + } + assetsVolumes[asset.String] = core.Volumes{ + Input: assetsVolumes[asset.String].Input, + Output: output, + } + } + } + } + if err := rows.Err(); err != nil { + return nil, s.error(err) + } + + res := &core.AccountWithVolumes{ + Account: acc, + Volumes: assetsVolumes, + } + res.Balances = res.Volumes.Balances() + + return res, nil +} + func (s *Store) CountTransactions(ctx context.Context, q ledger.TransactionsQuery) (uint64, error) { var count uint64