From f69c6a3cdb071cac6d82ec6eb07a7b77dcbfe13e Mon Sep 17 00:00:00 2001 From: altitude Date: Mon, 21 Jun 2021 19:45:24 +0200 Subject: [PATCH] batch support --- api/http.go | 2 +- ledger/ledger.go | 42 ++++++++++++---------- ledger/ledger_test.go | 80 ++++++++++++++++++++++++++++++----------- storage/sqlite/store.go | 48 +++++++++++++------------ storage/storage.go | 2 +- 5 files changed, 110 insertions(+), 64 deletions(-) diff --git a/api/http.go b/api/http.go index 7cf3cc5e2..6e0e170d9 100644 --- a/api/http.go +++ b/api/http.go @@ -50,7 +50,7 @@ func NewHttpAPI(lc fx.Lifecycle, l *ledger.Ledger) *HttpAPI { var t core.Transaction c.ShouldBind(&t) - err := l.Commit(t) + err := l.Commit([]core.Transaction{t}) c.JSON(200, gin.H{ "ok": err == nil, diff --git a/ledger/ledger.go b/ledger/ledger.go index 9335ba957..97b229f91 100644 --- a/ledger/ledger.go +++ b/ledger/ledger.go @@ -55,16 +55,13 @@ func (l *Ledger) Close() { l.store.Close() } -func (l *Ledger) Commit(t core.Transaction) error { +func (l *Ledger) Commit(ts []core.Transaction) error { l.Lock() defer l.Unlock() count, _ := l.store.CountTransactions() - t.ID = count - - if t.Timestamp == "" { - t.Timestamp = time.Now().Format(time.RFC3339) - } + rf := map[string]map[string]int64{} + timestamp := time.Now().Format(time.RFC3339) if l._last == nil { last, err := l.GetLastTransaction() @@ -76,22 +73,29 @@ func (l *Ledger) Commit(t core.Transaction) error { l._last = &last } - t.Hash = core.Hash(l._last, &t) + last := l._last - rf := map[string]map[string]int64{} + for i := range ts { - for _, p := range t.Postings { - if _, ok := rf[p.Source]; !ok { - rf[p.Source] = map[string]int64{} - } + ts[i].ID = count + int64(i) + ts[i].Timestamp = timestamp - rf[p.Source][p.Asset] += p.Amount + ts[i].Hash = core.Hash(last, &ts[i]) + last = &ts[i] - if _, ok := rf[p.Destination]; !ok { - rf[p.Destination] = map[string]int64{} - } + for _, p := range ts[i].Postings { + if _, ok := rf[p.Source]; !ok { + rf[p.Source] = map[string]int64{} + } + + rf[p.Source][p.Asset] += p.Amount - rf[p.Destination][p.Asset] -= p.Amount + if _, ok := rf[p.Destination]; !ok { + rf[p.Destination] = map[string]int64{} + } + + rf[p.Destination][p.Asset] -= p.Amount + } } for addr := range rf { @@ -131,9 +135,9 @@ func (l *Ledger) Commit(t core.Transaction) error { } } - err := l.store.AppendTransaction(t) + err := l.store.SaveTransactions(ts) - l._last = &t + l._last = &ts[len(ts)-1] return err } diff --git a/ledger/ledger_test.go b/ledger/ledger_test.go index dd9fca60d..793f92c5f 100644 --- a/ledger/ledger_test.go +++ b/ledger/ledger_test.go @@ -14,6 +14,9 @@ import ( func with(f func(l *Ledger)) { fx.New( + fx.Option( + fx.NopLogger, + ), fx.Provide( func() config.Config { c := config.DefaultConfig() @@ -38,20 +41,17 @@ func TestMain(m *testing.M) { func TestTransaction(t *testing.T) { with(func(l *Ledger) { + testsize := 1e4 total := 0 + batch := []core.Transaction{} - testsize := 1e5 - for i := 0; i < int(testsize); i++ { - if i%int(testsize/10) == 0 && i > 0 { - fmt.Println(i) - } - + for i := 1; i <= int(testsize); i++ { user := fmt.Sprintf("users:%03d", 1+rand.Intn(100)) amount := 1 + rand.Intn(100) amount = 100 total += amount - err := l.Commit(core.Transaction{ + batch = append(batch, core.Transaction{ Postings: []core.Posting{ { Source: "world", @@ -68,10 +68,19 @@ func TestTransaction(t *testing.T) { }, }) + if i%int(1e3) != 0 { + continue + } + + fmt.Println(i) + + err := l.Commit(batch) + if err != nil { - fmt.Println(err) t.Error(err) } + + batch = []core.Transaction{} } world, err := l.GetAccount("world") @@ -95,13 +104,15 @@ func TestTransaction(t *testing.T) { func TestBalance(t *testing.T) { with(func(l *Ledger) { - err := l.Commit(core.Transaction{ - Postings: []core.Posting{ - { - Source: "empty_wallet", - Destination: "world", - Amount: 1, - Asset: "COIN", + err := l.Commit([]core.Transaction{ + { + Postings: []core.Posting{ + { + Source: "empty_wallet", + Destination: "world", + Amount: 1, + Asset: "COIN", + }, }, }, }) @@ -128,13 +139,13 @@ func TestReference(t *testing.T) { }, } - err := l.Commit(tx) + err := l.Commit([]core.Transaction{tx}) if err != nil { t.Error(err) } - err = l.Commit(tx) + err = l.Commit([]core.Transaction{tx}) if err == nil { t.Fail() @@ -152,10 +163,12 @@ func TestLast(t *testing.T) { }) } -func BenchmarkLedger(b *testing.B) { +func BenchmarkTransaction1(b *testing.B) { with(func(l *Ledger) { - for i := 0; i < b.N; i++ { - l.Commit(core.Transaction{ + for n := 0; n < b.N; n++ { + txs := []core.Transaction{} + + txs = append(txs, core.Transaction{ Postings: []core.Posting{ { Source: "world", @@ -165,6 +178,33 @@ func BenchmarkLedger(b *testing.B) { }, }, }) + + l.Commit(txs) + } + }) +} + +func BenchmarkTransaction_20_1k(b *testing.B) { + with(func(l *Ledger) { + for n := 0; n < b.N; n++ { + for i := 0; i < 20; i++ { + txs := []core.Transaction{} + + for j := 0; j < 1e3; j++ { + txs = append(txs, core.Transaction{ + Postings: []core.Posting{ + { + Source: "world", + Destination: "benchmark", + Asset: "COIN", + Amount: 10, + }, + }, + }) + } + + l.Commit(txs) + } } }) } diff --git a/storage/sqlite/store.go b/storage/sqlite/store.go index 3ee7f3841..3ca4e5b1d 100644 --- a/storage/sqlite/store.go +++ b/storage/sqlite/store.go @@ -92,48 +92,50 @@ func (s *SQLiteStore) Close() { fmt.Println("db closed") } -func (s *SQLiteStore) AppendTransaction(t core.Transaction) error { +func (s *SQLiteStore) SaveTransactions(ts []core.Transaction) error { tx, _ := s.db.Begin() - var ref *string + for _, t := range ts { + var ref *string - if t.Reference != "" { - ref = &t.Reference - } + if t.Reference != "" { + ref = &t.Reference + } - _, err := tx.Exec(` + _, err := tx.Exec(` INSERT INTO "transactions" ("id", "reference", "timestamp", "hash") VALUES ($1, $2, $3, $4) `, t.ID, ref, t.Timestamp, t.Hash) - if err != nil { - tx.Rollback() + if err != nil { + tx.Rollback() - return err - } + return err + } - for i, p := range t.Postings { - _, err := tx.Exec( - ` + for i, p := range t.Postings { + _, err := tx.Exec( + ` INSERT INTO "postings" ("id", "txid", "source", "destination", "amount", "asset") VALUES (:id, :txid, :source, :destination, :amount, :asset) `, - sql.Named("id", i), - sql.Named("txid", t.ID), - sql.Named("source", p.Source), - sql.Named("destination", p.Destination), - sql.Named("amount", p.Amount), - sql.Named("asset", p.Asset), - ) + sql.Named("id", i), + sql.Named("txid", t.ID), + sql.Named("source", p.Source), + sql.Named("destination", p.Destination), + sql.Named("amount", p.Amount), + sql.Named("asset", p.Asset), + ) - if err != nil { - tx.Rollback() + if err != nil { + tx.Rollback() - return err + return err + } } } diff --git a/storage/storage.go b/storage/storage.go index 2e7a82464..354979ad0 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -6,7 +6,7 @@ import ( ) type Store interface { - AppendTransaction(core.Transaction) error + SaveTransactions([]core.Transaction) error CountTransactions() (int64, error) FindTransactions(query.Query) (query.Cursor, error) AggregateBalances(string) (map[string]int64, error)