From c7ccfe37f72f7005ba8fbc8458a6e213a30f6e13 Mon Sep 17 00:00:00 2001 From: kevin-zhangzh <913455507@qq.com> Date: Thu, 15 Sep 2022 17:09:20 +0800 Subject: [PATCH 1/3] update(): s3store --- README.md | 5 +- example/rollup-bolt.go | 20 +++ example/rollup-s3.go | 1 + example/tracker-bolt.go | 25 +++ example/tracker-s3.go | 31 ++++ go.mod | 25 ++- rollup/rollup.go | 43 +++-- store/kv.go | 303 +++++++++++++---------------------- store/kv_test.go | 119 +++++++++----- store/rawdb/bolt.db.go | 117 ++++++++++++++ store/rawdb/database.go | 19 +++ store/rawdb/s3.go | 140 ++++++++++++++++ store/schema.go | 40 ----- store/schema/db.go | 67 ++++++++ store/{ => schema}/errors.go | 2 +- tracker/jobs.go | 4 +- tracker/tracker.go | 29 +++- 17 files changed, 694 insertions(+), 296 deletions(-) create mode 100644 example/rollup-bolt.go create mode 100644 example/rollup-s3.go create mode 100644 example/tracker-bolt.go create mode 100644 example/tracker-s3.go create mode 100644 store/rawdb/bolt.db.go create mode 100644 store/rawdb/database.go create mode 100644 store/rawdb/s3.go delete mode 100644 store/schema.go create mode 100644 store/schema/db.go rename store/{ => schema}/errors.go (80%) diff --git a/README.md b/README.md index 098e8f6..1364391 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ ## Turing Upload the data to the arweave network in the specified order and download it from the arweave network in the order of upload. - +### Installation +```bash +go get github.com/everFinance/turing +``` ### Implements turing implements the above functionality through the rollup & tracker service diff --git a/example/rollup-bolt.go b/example/rollup-bolt.go new file mode 100644 index 0000000..ab0fd5e --- /dev/null +++ b/example/rollup-bolt.go @@ -0,0 +1,20 @@ +package main + +import ( + "github.com/everFinance/goar/types" + "github.com/everFinance/turing/rollup" + "github.com/everFinance/turing/store/schema" + "time" +) + +func main() { + tags := []types.Tag{ + {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + } + suggestLastArTxId := "" + arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arNode := "https://arweave.net" + arWalletKeyPath := "./key.json" + rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, schema.Config{}) + rol.Run(5*time.Second, 999) +} diff --git a/example/rollup-s3.go b/example/rollup-s3.go new file mode 100644 index 0000000..06ab7d0 --- /dev/null +++ b/example/rollup-s3.go @@ -0,0 +1 @@ +package main diff --git a/example/tracker-bolt.go b/example/tracker-bolt.go new file mode 100644 index 0000000..1f90fe9 --- /dev/null +++ b/example/tracker-bolt.go @@ -0,0 +1,25 @@ +package main + +import ( + "fmt" + "github.com/everFinance/goar/types" + "github.com/everFinance/turing/store/schema" + "github.com/everFinance/turing/tracker" +) + +func main() { + tags := []types.Tag{ + {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + } + arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arNode := "https://arweave.net" + arseed := "" + cursor := uint64(7) + dbCfg := schema.Config{} + tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg) + tr.Run(cursor) + for { + tx := <-tr.SubscribeTx() + fmt.Println(tx.CursorId) + } +} diff --git a/example/tracker-s3.go b/example/tracker-s3.go new file mode 100644 index 0000000..b0dfa96 --- /dev/null +++ b/example/tracker-s3.go @@ -0,0 +1,31 @@ +package main + +import ( + "fmt" + "github.com/everFinance/goar/types" + "github.com/everFinance/turing/store/schema" + "github.com/everFinance/turing/tracker" +) + +func main() { + tags := []types.Tag{ + {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + } + arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arNode := "https://arweave.net" + arseed := "" + cursor := uint64(40) + dbCfg := schema.Config{ + UseS3: true, + AccKey: "AKIATZSGGOHI72GMNSO7", + SecretKey: "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", + BktPrefix: "turing", + Region: "ap-northeast-1", + } + tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg) + tr.Run(cursor) + for { + tx := <-tr.SubscribeTx() + fmt.Println(tx.CursorId) + } +} diff --git a/go.mod b/go.mod index 6f55590..0e7f768 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,18 @@ module github.com/everFinance/turing go 1.18 +require ( + github.com/aws/aws-sdk-go v1.27.0 + github.com/everFinance/arseeding v1.0.3 + github.com/everFinance/everpay-go v0.0.1 + github.com/everFinance/goar v1.4.4 + github.com/getsentry/sentry-go v0.11.0 + github.com/go-co-op/gocron v1.11.0 + github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac + github.com/stretchr/testify v1.8.0 + go.etcd.io/bbolt v1.3.6 +) + require ( github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect github.com/VictoriaMetrics/fastcache v1.6.0 // indirect @@ -12,9 +24,7 @@ require ( github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea // indirect github.com/edsrzf/mmap-go v1.0.0 // indirect github.com/ethereum/go-ethereum v1.10.12 // indirect - github.com/everFinance/arseeding v1.0.3 // indirect github.com/everFinance/ethrpc v1.0.4 // indirect - github.com/everFinance/goar v1.4.4 // indirect github.com/everFinance/goether v1.1.7 // indirect github.com/everFinance/gojwk v1.0.0 // indirect github.com/everFinance/ttcrsa v1.1.3 // indirect @@ -30,10 +40,10 @@ require ( github.com/holiman/bloomfilter/v2 v2.0.3 // indirect github.com/holiman/uint256 v1.2.0 // indirect github.com/huin/goupnp v1.0.2 // indirect - github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac // indirect github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.4 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/json-iterator/go v1.1.11 // indirect github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559 // indirect github.com/mattn/go-colorable v0.1.11 // indirect @@ -47,20 +57,25 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/tsdb v0.7.1 // indirect github.com/rjeczalik/notify v0.9.1 // indirect + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect github.com/shopspring/decimal v1.2.0 // indirect github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 // indirect - github.com/stretchr/objx v0.4.0 // indirect - github.com/stretchr/testify v1.8.0 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect + github.com/tidwall/gjson v1.14.1 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/tidwall/sjson v1.2.4 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef // indirect golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect + golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect golang.org/x/text v0.3.7 // indirect google.golang.org/protobuf v1.26.0 // indirect + gopkg.in/h2non/gentleman.v2 v2.0.5 // indirect gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gorm.io/datatypes v1.0.1 // indirect diff --git a/rollup/rollup.go b/rollup/rollup.go index e06b97c..2b40e77 100644 --- a/rollup/rollup.go +++ b/rollup/rollup.go @@ -17,6 +17,7 @@ import ( arseeding "github.com/everFinance/arseeding/sdk" "github.com/everFinance/turing/rollup/txpool" "github.com/everFinance/turing/store" + "github.com/everFinance/turing/store/schema" ) var log = types.NewLog(types.RollupLogMode) @@ -63,21 +64,36 @@ func initTxPool(kv *store.Store, lastPendTokenTxHash string) (*txpool.TxPool, er return pool, nil } +func initDbConfig(cfg *schema.Config) { + if len(cfg.DbPath) == 0 { + cfg.DbPath = schema.BoltDirPath + } + if len(cfg.DbFileName) == 0 { + cfg.DbFileName = schema.RollupDBFileName + } + cfg.Bkt = []string{ + schema.AllTokenTxBucket, + schema.PoolTxIndex, + schema.ConstantBucket, + } +} + // New suggestLastArTxId use rollup last on chain tx id, if server bolt.db is clear, restart server need config this args -func New(suggestLastArTxId string, arNode, arSeedingUrl string, arWalletKeyPath string, owner string, tags []arTypes.Tag, dbDirPath string) *Rollup { - if len(dbDirPath) == 0 { - dbDirPath = store.StoreDirPath +func New(suggestLastArTxId string, arNode, arSeedingUrl string, arWalletKeyPath string, owner string, tags []arTypes.Tag, dbConfig schema.Config) *Rollup { + initDbConfig(&dbConfig) + var err error + kv := &store.Store{} + if dbConfig.UseS3 { + kv, err = store.NewS3Store(dbConfig) + } else { + kv, err = store.NewBoltStore(dbConfig) } - kv, err := store.NewKvStore( - dbDirPath, store.RollupDBFileName, - store.AllTokenTxBucket, store.PoolTxIndex, store.ConstantBucket, - ) if err != nil { panic(err) } // get constant from kv - lastArTxId, err := kv.GetConstant(store.LastArTxIdKey) + lastArTxId, err := kv.GetConstant(schema.LastArTxIdKey) if err != nil { panic(err) } @@ -90,11 +106,11 @@ func New(suggestLastArTxId string, arNode, arSeedingUrl string, arWalletKeyPath } } - lastOnChainTokenTxHash, err := kv.GetConstant(store.LastOnChainTokenTxHashKey) + lastOnChainTokenTxHash, err := kv.GetConstant(schema.LastOnChainTokenTxHashKey) if err != nil { panic(err) } - lastAddPoolTokenTxHash, err := kv.GetConstant(store.LastAddPoolTokenTxIdKey) + lastAddPoolTokenTxHash, err := kv.GetConstant(schema.LastAddPoolTokenTxIdKey) if err != nil { panic(err) } @@ -216,7 +232,7 @@ func (rol *Rollup) listenTokenTxToPool() { panic(err) } else { // update LastAddPoolTokenTxIdKey - if err := rol.store.UpdateConstant(store.LastAddPoolTokenTxIdKey, []byte(txHash)); err != nil { + if err := rol.store.UpdateConstant(schema.LastAddPoolTokenTxIdKey, []byte(txHash)); err != nil { panic(err) } rol.lastAddPoolTokenTxHash = txHash @@ -270,17 +286,18 @@ func (rol *Rollup) sealTxOnChain(timeInterval time.Duration, maxOfRollup int) { // on chain success and modify some status // 1. modify db constants 'lastArTxId' - if err := rol.store.UpdateConstant(store.LastArTxIdKey, []byte(arId)); err != nil { + if err := rol.store.UpdateConstant(schema.LastArTxIdKey, []byte(arId)); err != nil { log.Error("modify lastArTxIdKey error", "error", err, "newValue", arId) panic(err) } // 2. modify db constants 'lastOnChainTokenTxHash' - if err := rol.store.UpdateConstant(store.LastOnChainTokenTxHashKey, []byte(txs[len(txs)-1].TxId)); err != nil { + if err := rol.store.UpdateConstant(schema.LastOnChainTokenTxHashKey, []byte(txs[len(txs)-1].TxId)); err != nil { log.Error("modify lastOnChainTokenTxHashKey error", "error", err, "newValue", txs[len(txs)-1].TxId) panic(err) } // 3. delete poolTxIndex bucket txs index if err := rol.store.BatchDelPoolTokenTxId(txs); err != nil { + // maybe not need panic panic(err) } // 4. change lastTxHash diff --git a/store/kv.go b/store/kv.go index 372bd4e..bdccb60 100644 --- a/store/kv.go +++ b/store/kv.go @@ -4,115 +4,71 @@ import ( "encoding/binary" "encoding/json" "errors" - "fmt" + "github.com/everFinance/goar/utils" + "github.com/everFinance/turing/store/rawdb" + "github.com/everFinance/turing/store/schema" types "github.com/everFinance/turing/common" - bolt "go.etcd.io/bbolt" - "os" - "path" - "time" ) // init log mode var log = types.NewLog(types.StoreLogMode) type Store struct { - Db *bolt.DB - databasePath string + KVDb rawdb.KeyValueDB } -func NewKvStore(dirPath string, dbFileName string, bucketNames ...[]byte) (*Store, error) { - if err := os.MkdirAll(dirPath, os.ModePerm); err != nil { - return nil, err - } - dataFile := path.Join(dirPath, dbFileName) - boltDB, err := bolt.Open(dataFile, 0660, &bolt.Options{Timeout: 1 * time.Second, InitialMmapSize: 10e6}) +func NewBoltStore(cfg schema.Config) (*Store, error) { + Db, err := rawdb.NewBoltDB(cfg) if err != nil { - if err == bolt.ErrTimeout { - return nil, errors.New("cannot obtain database lock,database may be in use by another process") - } - return nil, err - } - boltDB.AllocSize = boltAllocSize - kv := &Store{ - Db: boltDB, - databasePath: dirPath, - } - // create bucket - if len(bucketNames) == 0 { - bucketNames = append(bucketNames, AllTokenTxBucket, PoolTxIndex, ConstantBucket, AllSyncedTokenTxBucket) - } - if err := kv.Db.Update(func(tx *bolt.Tx) error { - return createBuckets(tx, bucketNames...) - }); err != nil { - return nil, err + panic(err) } - return kv, nil -} - -// DatabasePath at which this database writes files. -func (kv *Store) DatabasePath() string { - return kv.databasePath + return &Store{ + KVDb: Db, + }, nil } -func createBuckets(tx *bolt.Tx, buckets ...[]byte) error { - for _, bucket := range buckets { - if _, err := tx.CreateBucketIfNotExists(bucket); err != nil { - return err - } +func NewS3Store(cfg schema.Config) (*Store, error) { + Db, err := rawdb.NewS3DB(cfg) + if err != nil { + panic(err) } - return nil + return &Store{KVDb: Db}, nil } +// DatabasePath at which this database writes files. +// func (kv *Store) DatabasePath() string { +// return kv.databasePath +// } + // Close closes the underlying BoltDB database. func (kv *Store) Close() error { - return kv.Db.Close() + return kv.KVDb.Close() } -func (kv *Store) ClearDB(dbFileName string) error { - if _, err := os.Stat(kv.databasePath); os.IsNotExist(err) { - return nil - } - if err := os.Remove(path.Join(kv.databasePath, dbFileName)); err != nil { - return fmt.Errorf("could not remove database file. error: %v", err) - } - return nil +func (kv *Store) ClearDB() error { + return kv.KVDb.Clear() } // GetConstant -func (kv *Store) GetConstant(key []byte) (value string, err error) { - err = kv.Db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(ConstantBucket) - val1 := bkt.Get(key) - if len(val1) == 0 { - value = "" - } else { - value = string(val1) - } - return nil - }) +func (kv *Store) GetConstant(key string) (value string, err error) { + val, err := kv.KVDb.Get(schema.ConstantBucket, key) + if err == schema.ErrNotExist { + return "", nil + } + value = string(val) return } // UpdateConstant -func (kv *Store) UpdateConstant(key []byte, val []byte) error { - err := kv.Db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(ConstantBucket).Put(key, val) - }) - return err +func (kv *Store) UpdateConstant(key string, val []byte) error { + return kv.KVDb.Put(schema.ConstantBucket, key, val) } // LoadPoolTxFromDb func (kv *Store) LoadPoolTxFromDb() (types.Transactions, error) { - poolHashArr := make([][]byte, 0) - err := kv.Db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(PoolTxIndex) - return bkt.ForEach(func(k, v []byte) error { - poolHashArr = append(poolHashArr, k) - return nil - }) - }) + poolHashArr, err := kv.KVDb.GetAllKey(schema.PoolTxIndex) log.Info("store load pool tx", "tx number", len(poolHashArr)) if err != nil { @@ -120,53 +76,48 @@ func (kv *Store) LoadPoolTxFromDb() (types.Transactions, error) { } poolTxs := make(types.Transactions, 0, len(poolHashArr)) - err = kv.Db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(AllTokenTxBucket) - for _, txHash := range poolHashArr { - val := bkt.Get(txHash) - if len(val) == 0 { - panic("can not get pending tx from allTokenTxBucket") - } else { - ttx := &types.Transaction{} - err = json.Unmarshal(val, ttx) - if err != nil { - return err - } - poolTxs = append(poolTxs, ttx) + for _, txHash := range poolHashArr { + val, err := kv.KVDb.Get(schema.AllTokenTxBucket, txHash) + if err != nil { + return nil, err + } + if len(val) == 0 { + panic("can not get pending tx from allTokenTxBucket") + } else { + ttx := &types.Transaction{} + err = json.Unmarshal(val, ttx) + if err != nil { + return nil, err } + poolTxs = append(poolTxs, ttx) } - return nil - }) - if err != nil { - return nil, err } return poolTxs, nil } func (kv *Store) GetLastOnChainTx() (onChainTx *types.Transaction, err error) { - txHash, err := kv.GetConstant(LastOnChainTokenTxHashKey) + txHash, err := kv.GetConstant(schema.LastOnChainTokenTxHashKey) if err != nil { return nil, err } if txHash == "" { - return nil, ErrNotExist + return nil, schema.ErrNotExist } // get tokenTx from AllTokenTxBucket - err = kv.Db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(AllTokenTxBucket) - val := bkt.Get([]byte(txHash)) - if len(val) == 0 { - return errors.New("not found last onchain tokenTx from rollup AllTokenTxBucket db") - } - tt := types.Transaction{} - err = json.Unmarshal(val, &tt) - if err != nil { - return err - } - onChainTx = &tt - return nil - }) + val, err := kv.KVDb.Get(schema.AllTokenTxBucket, txHash) + if err != nil { + return + } + if len(val) == 0 { + return nil, errors.New("not found last onchain tokenTx from rollup AllTokenTxBucket db") + } + tt := types.Transaction{} + err = json.Unmarshal(val, &tt) + if err != nil { + return + } + onChainTx = &tt return } @@ -176,118 +127,94 @@ func (kv *Store) PutTokenTx(tokenTx *types.Transaction) error { if err != nil { return err } - err = kv.Db.Update(func(tx *bolt.Tx) error { - allTxBkt := tx.Bucket(AllTokenTxBucket) - if err := allTxBkt.Put([]byte(tokenTx.TxId), val); err != nil { - return err - } - return nil - }) - return err + return kv.KVDb.Put(schema.AllTokenTxBucket, tokenTx.TxId, val) } // put tx id func (kv *Store) PutPoolTokenTxId(txId string) error { - err := kv.Db.Update(func(tx *bolt.Tx) error { - txIndexBkt := tx.Bucket(PoolTxIndex) - if err := txIndexBkt.Put([]byte(txId), []byte("0x01")); err != nil { - return err - } - return nil - }) - return err + return kv.KVDb.Put(schema.PoolTxIndex, txId, []byte("0x01")) } // BatchDelPoolTokenTxId -func (kv *Store) BatchDelPoolTokenTxId(txs types.Transactions) error { - err := kv.Db.Update(func(tx *bolt.Tx) error { - // delete PoolTxIndex - bkt := tx.Bucket(PoolTxIndex) - for _, t := range txs { - if err := bkt.Delete([]byte(t.TxId)); err != nil { - log.Error("delete pooltxIndex error", "error", err, "txs", txs) - return err - } - } - return nil - }) - return err +func (kv *Store) BatchDelPoolTokenTxId(txs types.Transactions) (err error) { + for _, tx := range txs { + err = kv.KVDb.Delete(schema.PoolTxIndex, tx.TxId) + return + } + return nil } // itob returns an 64-byte big endian representation of v. -func itob(v uint64) []byte { +func itob(v uint64) string { b := make([]byte, 64) binary.BigEndian.PutUint64(b, v) - return b + return utils.Base64Encode(b) } -func btoi(b []byte) uint64 { +func btoi(base64Str string) uint64 { + b, err := utils.Base64Decode(base64Str) + if err != nil { + panic(err) + } return binary.BigEndian.Uint64(b) } // PutSubscribeTx -func (kv *Store) PutSubscribeTx(subTx types.SubscribeTransaction) (uint64, error) { +func (kv *Store) PutSubscribeTx(subTx types.SubscribeTransaction) (cursor uint64, err error) { value, err := subTx.Marshal() if err != nil { log.Error("json marshal subscribe transaction error", "err", err) return 0, err } - var cursor uint64 - err = kv.Db.Update(func(tx *bolt.Tx) error { - bkt := tx.Bucket(AllSyncedTokenTxBucket) - // use monotonically incrementing, for sort tx - id, err := bkt.NextSequence() - if err != nil { - return err - } - cursor = id - key := itob(id) - if err := bkt.Put(key, value); err != nil { - log.Error("store subscribe transaction error", "err", err) - return err - } - return nil - }) - - return cursor, err + seqNumBy, err := kv.GetConstant(schema.SeqNum) + if err != nil { + return + } + if len(seqNumBy) == 0 { + seqNumBy = itob(uint64(1)) + } + cursor = btoi(seqNumBy) + err = kv.KVDb.Put(schema.AllSyncedTokenTxBucket, seqNumBy, value) + if err != nil { + return + } + // seqNum += 1 + err = kv.KVDb.Put(schema.ConstantBucket, schema.SeqNum, []byte(itob(cursor+1))) + if err != nil { + _ = kv.KVDb.Delete(schema.AllSyncedTokenTxBucket, seqNumBy) + } + return } // UpdateLastProcessArTxId func (kv *Store) UpdateLastProcessArTxId(id string) error { - err := kv.Db.Update(func(tx *bolt.Tx) error { - bkt := tx.Bucket(ConstantBucket) - if err := bkt.Put(LastProcessArTxIdKey, []byte(id)); err != nil { - return err - } - return nil - }) - return err + return kv.KVDb.Put(schema.ConstantBucket, schema.LastProcessArTxIdKey, []byte(id)) } // LoadSubscribeTxs -func (kv *Store) LoadSubscribeTxsToStream(cursor uint64, txChan chan<- types.SubscribeTransaction) error { - err := kv.Db.View(func(tx *bolt.Tx) error { - bkt := tx.Bucket(AllSyncedTokenTxBucket) - - return bkt.ForEach(func(k, v []byte) error { - if btoi(k) <= cursor { - return nil - } - tx := &types.SubscribeTransaction{} - err := tx.Unmarshal(v) - if err != nil { - return err - } - tx.CursorId = btoi(k) - txChan <- *tx - return nil - }) - }) +func (kv *Store) LoadSubscribeTxsToStream(cursor uint64, txChan chan<- types.SubscribeTransaction) (err error) { + keys, err := kv.KVDb.GetAllKey(schema.AllSyncedTokenTxBucket) if err != nil { - return err + return } - return nil + for _, key := range keys { + if btoi(key) <= cursor { + continue + } + val, err := kv.KVDb.Get(schema.AllSyncedTokenTxBucket, key) + if err != nil { + return err + } + tx := &types.SubscribeTransaction{} + err = tx.Unmarshal(val) + if err != nil { + return err + } + tx.CursorId = btoi(key) + txChan <- *tx + } + return } // // LoadRollupEverTxs load tx from rollup.db diff --git a/store/kv_test.go b/store/kv_test.go index 5f40d2e..f9cdc4e 100644 --- a/store/kv_test.go +++ b/store/kv_test.go @@ -1,59 +1,96 @@ package store import ( + "github.com/everFinance/turing/store/schema" "github.com/stretchr/testify/assert" - bolt "go.etcd.io/bbolt" "testing" ) -func TestNewKvStore(t *testing.T) { - dirPath := "./" - dbName := "test.Db" - kv, err := NewKvStore(dirPath, dbName, AllTokenTxBucket, PoolTxIndex, ConstantBucket) +func TestNewBoltStore(t *testing.T) { + dirPath := "../bolt" + dbName := "tracker.db" + kv, err := NewBoltStore(schema.Config{DbPath: dirPath, DbFileName: dbName}) assert.NoError(t, err) - k1 := []byte("key01") - v1 := []byte("value01") - err = kv.Db.Update(func(tx *bolt.Tx) error { - bkt := tx.Bucket(AllTokenTxBucket) - err = bkt.Put(k1, v1) - return err - }) + err = kv.UpdateConstant(schema.LastProcessArTxIdKey, []byte("f8-DEvvlAY7qPRRaHm96Sc7b1EcssK8J0IfOmQaUU2c")) assert.NoError(t, err) +} - var val []byte - kv.Db.View(func(tx *bolt.Tx) error { - val = tx.Bucket(AllTokenTxBucket).Get(k1) - return nil - }) - assert.Equal(t, val, v1) - - err = kv.Db.Update(func(tx *bolt.Tx) error { - return tx.Bucket(AllTokenTxBucket).Delete(k1) - }) +func TestNewS3Store(t *testing.T) { + dbCfg := schema.Config{ + UseS3: true, + AccKey: "AKIATZSGGOHI72GMNSO7", + SecretKey: "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", + BktPrefix: "turing", + Region: "ap-northeast-1", + } + kv, err := NewS3Store(dbCfg) assert.NoError(t, err) - - kv.Db.View(func(tx *bolt.Tx) error { - val := tx.Bucket(AllTokenTxBucket).Get(k1) - assert.Nil(t, val) - return nil - }) - - err = kv.Db.View(func(tx *bolt.Tx) error { - return tx.Bucket(AllTokenTxBucket).ForEach(func(k, v []byte) error { - t.Log(k, v) - return nil - }) - }) + err = kv.UpdateConstant(schema.LastProcessArTxIdKey, []byte("f8-DEvvlAY7qPRRaHm96Sc7b1EcssK8J0IfOmQaUU2c")) assert.NoError(t, err) - err = kv.Db.View(func(tx *bolt.Tx) error { - return tx.Bucket(PoolTxIndex).ForEach(func(k, v []byte) error { - t.Log(string(k), string(v)) - return nil - }) - }) +} +func TestClearS3(t *testing.T) { + dbCfg := schema.Config{ + UseS3: true, + AccKey: "AKIATZSGGOHI72GMNSO7", + SecretKey: "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", + BktPrefix: "turing", + Region: "ap-northeast-1", + } + kv, err := NewS3Store(dbCfg) + assert.NoError(t, err) + err = kv.ClearDB() + assert.NoError(t, err) } +// func TestNewKvStore(t *testing.T) { +// dirPath := "./" +// dbName := "test.Db" +// kv, err := NewKvStore(dirPath, dbName, AllTokenTxBucket, PoolTxIndex, ConstantBucket) +// assert.NoError(t, err) +// k1 := []byte("key01") +// v1 := []byte("value01") +// err = kv.Db.Update(func(tx *bolt.Tx) error { +// bkt := tx.Bucket(AllTokenTxBucket) +// err = bkt.Put(k1, v1) +// return err +// }) +// assert.NoError(t, err) +// +// var val []byte +// kv.Db.View(func(tx *bolt.Tx) error { +// val = tx.Bucket(AllTokenTxBucket).Get(k1) +// return nil +// }) +// assert.Equal(t, val, v1) +// +// err = kv.Db.Update(func(tx *bolt.Tx) error { +// return tx.Bucket(AllTokenTxBucket).Delete(k1) +// }) +// assert.NoError(t, err) +// +// kv.Db.View(func(tx *bolt.Tx) error { +// val := tx.Bucket(AllTokenTxBucket).Get(k1) +// assert.Nil(t, val) +// return nil +// }) +// +// err = kv.Db.View(func(tx *bolt.Tx) error { +// return tx.Bucket(AllTokenTxBucket).ForEach(func(k, v []byte) error { +// t.Log(k, v) +// return nil +// }) +// }) +// assert.NoError(t, err) +// err = kv.Db.View(func(tx *bolt.Tx) error { +// return tx.Bucket(PoolTxIndex).ForEach(func(k, v []byte) error { +// t.Log(string(k), string(v)) +// return nil +// }) +// }) +// +// } + // func TestStore_LoadSubscribeTxsToStream(t *testing.T) { // dirPath := "./" // dbName := "test.Db" diff --git a/store/rawdb/bolt.db.go b/store/rawdb/bolt.db.go new file mode 100644 index 0000000..6370cff --- /dev/null +++ b/store/rawdb/bolt.db.go @@ -0,0 +1,117 @@ +package rawdb + +import ( + "errors" + "fmt" + "github.com/everFinance/turing/store/schema" + bolt "go.etcd.io/bbolt" + "os" + "path" + "time" +) + +const ( + boltAllocSize = 8 * 1024 * 1024 + boltName = "seed.db" +) + +type BoltDB struct { + Db *bolt.DB + DbPath string + DbFile string +} + +func NewBoltDB(cfg schema.Config) (*BoltDB, error) { + if len(cfg.DbPath) == 0 { + return nil, errors.New("boltDb dir path can not null") + } + if err := os.MkdirAll(cfg.DbPath, os.ModePerm); err != nil { + return nil, err + } + if len(cfg.DbFileName) == 0 { + cfg.DbFileName = boltName + } + Db, err := bolt.Open(path.Join(cfg.DbPath, cfg.DbFileName), 0660, &bolt.Options{Timeout: 2 * time.Second, InitialMmapSize: 10e6}) + if err != nil { + if err == bolt.ErrTimeout { + return nil, errors.New("cannot obtain database lock, database may be in use by another process") + } + return nil, err + } + Db.AllocSize = boltAllocSize + boltDB := &BoltDB{ + Db: Db, + DbPath: cfg.DbPath, + DbFile: cfg.DbFileName, + } + if err := boltDB.Db.Update(func(tx *bolt.Tx) error { + return createBuckets(tx, cfg.Bkt) + }); err != nil { + return nil, err + } + return boltDB, nil +} + +func (s *BoltDB) Put(bucket, key string, value []byte) (err error) { + err = s.Db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket([]byte(bucket)) + return bkt.Put([]byte(key), value) + }) + return +} + +func (s *BoltDB) Get(bucket, key string) (data []byte, err error) { + err = s.Db.View(func(tx *bolt.Tx) error { + data = tx.Bucket([]byte(bucket)).Get([]byte(key)) + if data == nil { + err = schema.ErrNotExist + return err + } + return nil + }) + return +} + +func (s *BoltDB) GetAllKey(bucket string) (keys []string, err error) { + keys = make([]string, 0) + err = s.Db.View(func(tx *bolt.Tx) error { + return tx.Bucket([]byte(bucket)).ForEach(func(k, v []byte) error { + keys = append(keys, string(k)) + return nil + }) + }) + return +} + +func (s *BoltDB) Delete(bucket, key string) (err error) { + err = s.Db.Update(func(tx *bolt.Tx) error { + return tx.Bucket([]byte(bucket)).Delete([]byte(key)) + }) + return +} + +func (s *BoltDB) Close() (err error) { + return s.Db.Close() +} + +func (s *BoltDB) Clear() (err error) { + if _, err := os.Stat(s.DbPath); os.IsNotExist(err) { + return nil + } + if err := os.Remove(path.Join(s.DbPath, s.DbFile)); err != nil { + return fmt.Errorf("could not remove database file. error: %v", err) + } + return nil +} + +func createBuckets(tx *bolt.Tx, buckets []string) error { + if len(buckets) == 0 { + buckets = append(buckets, schema.AllBkt...) + } + for _, bucket := range buckets { + if _, err := tx.CreateBucketIfNotExists([]byte(bucket)); err != nil { + return err + } + } + return nil +} diff --git a/store/rawdb/database.go b/store/rawdb/database.go new file mode 100644 index 0000000..ed39909 --- /dev/null +++ b/store/rawdb/database.go @@ -0,0 +1,19 @@ +package rawdb + +import "github.com/everFinance/everpay-go/common" + +var log = common.NewLog("turing") + +type KeyValueDB interface { + Put(bucket, key string, value []byte) (err error) + + Get(bucket, key string) (data []byte, err error) + + GetAllKey(bucket string) (keys []string, err error) + + Delete(bucket, key string) (err error) + + Close() (err error) + + Clear() (err error) +} diff --git a/store/rawdb/s3.go b/store/rawdb/s3.go new file mode 100644 index 0000000..d7e84f3 --- /dev/null +++ b/store/rawdb/s3.go @@ -0,0 +1,140 @@ +package rawdb + +import ( + "bytes" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3iface" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/everFinance/turing/store/schema" + "strings" +) + +type S3DB struct { + downloader s3manager.Downloader + uploader s3manager.Uploader + s3Api s3iface.S3API + bucketPrefix string + bkts []string +} + +func NewS3DB(cfg schema.Config) (*S3DB, error) { + mySession := session.Must(session.NewSession()) + cred := credentials.NewStaticCredentials(cfg.AccKey, cfg.SecretKey, "") + cfgs := aws.NewConfig().WithRegion(cfg.Region).WithCredentials(cred) + if cfg.Use4EVER { + cfgs.WithEndpoint("https://endpoint.4everland.co") // inject 4everland endpoint + } + s3Api := s3.New(mySession, cfgs) + bkts, err := createS3Bucket(s3Api, cfg.BktPrefix, cfg.Bkt) + if err != nil { + return nil, err + } + + log.Info("run with s3 success") + return &S3DB{ + downloader: s3manager.Downloader{ + S3: s3Api, + }, + uploader: s3manager.Uploader{ + S3: s3Api, + }, + s3Api: s3Api, + bucketPrefix: cfg.BktPrefix, + bkts: bkts, + }, nil +} + +func (s *S3DB) Put(bucket, key string, value []byte) (err error) { + bkt := getS3Bucket(s.bucketPrefix, bucket) + uploadInfo := &s3manager.UploadInput{ + Bucket: aws.String(bkt), + Key: aws.String(key), + Body: bytes.NewReader(value), + } + _, err = s.uploader.Upload(uploadInfo) + return +} + +func (s *S3DB) Get(bucket, key string) (data []byte, err error) { + bkt := getS3Bucket(s.bucketPrefix, bucket) + downloadInfo := &s3.GetObjectInput{ + Bucket: aws.String(bkt), + Key: aws.String(key), + } + buf := aws.NewWriteAtBuffer([]byte{}) + n, err := s.downloader.Download(buf, downloadInfo) + if n == 0 { + return nil, schema.ErrNotExist + } + data = buf.Bytes() + return +} + +func (s *S3DB) GetAllKey(bucket string) (keys []string, err error) { + bkt := getS3Bucket(s.bucketPrefix, bucket) + resp, err := s.s3Api.ListObjectsV2(&s3.ListObjectsV2Input{Bucket: aws.String(bkt)}) + if err != nil { + return + } + keys = make([]string, 0) + for _, item := range resp.Contents { + keys = append(keys, *item.Key) + } + return +} + +func (s *S3DB) Delete(bucket, key string) (err error) { + bkt := getS3Bucket(s.bucketPrefix, bucket) + _, err = s.s3Api.DeleteObject(&s3.DeleteObjectInput{Bucket: aws.String(bkt), Key: aws.String(key)}) + return +} + +func (s *S3DB) Close() (err error) { + return +} + +func (s *S3DB) Clear() (err error) { + // delete all keys for each bucket + for _, bkt := range s.bkts { + keys, err := s.GetAllKey(bkt) + if err != nil { + return err + } + for _, key := range keys { + err = s.Delete(bkt, key) + if err != nil { + return err + } + } + } + // delete bucket + for _, bkt := range s.bkts { + s3bkt := getS3Bucket(s.bucketPrefix, bkt) + _, err = s.s3Api.DeleteBucket(&s3.DeleteBucketInput{Bucket: aws.String(s3bkt)}) + if err != nil { + return + } + } + return +} + +func createS3Bucket(svc s3iface.S3API, prefix string, bucketNames []string) ([]string, error) { + if len(bucketNames) == 0 { + bucketNames = append(bucketNames, schema.AllBkt...) + } + for _, bucketName := range bucketNames { + s3Bkt := getS3Bucket(prefix, bucketName) // s3 bucket name only accept lower case + _, err := svc.CreateBucket(&s3.CreateBucketInput{Bucket: aws.String(s3Bkt)}) + if err != nil && !strings.Contains(err.Error(), "BucketAlreadyOwnedByYou") { + return nil, err + } + } + return bucketNames, nil +} + +func getS3Bucket(prefix, bktName string) string { + return strings.ToLower(prefix + "-" + bktName) +} diff --git a/store/schema.go b/store/schema.go deleted file mode 100644 index 0961911..0000000 --- a/store/schema.go +++ /dev/null @@ -1,40 +0,0 @@ -package store - -// --------------------- common ------------------------------ -const ( - boltAllocSize = 8 * 1024 * 1024 - StoreDirPath = "./bolt" -) - -var ( - ConstantBucket = []byte("constant-bucket") // store some constant value -) - -// --------------------- about rollup server -------------------- -const ( - RollupDBFileName = "rollup.db" -) - -var ( - // bucket name - AllTokenTxBucket = []byte("all-token-transaction-bucket") // store all token tx - PoolTxIndex = []byte("pool-transaction-index-bucket") // store pending token tx - - // key - LastOnChainTokenTxHashKey = []byte("LastOnChainTokenTxHashKey") - LastArTxIdKey = []byte("LastArTxIdKey") - LastAddPoolTokenTxIdKey = []byte("LastAddPoolTokenTxIdKey") -) - -// --------------------- about tracker server ------------------- -const ( - TrackerDBFileName = "tracker.db" -) - -var ( - // bucket name - AllSyncedTokenTxBucket = []byte("all-synced-token-tx-bucket") // store all token tx - - // key - LastProcessArTxIdKey = []byte("LastProcessArTxId") // process ar tx id -) diff --git a/store/schema/db.go b/store/schema/db.go new file mode 100644 index 0000000..bccaff9 --- /dev/null +++ b/store/schema/db.go @@ -0,0 +1,67 @@ +package schema + +// --------------------- common ------------------------------ +const ( + BoltAllocSize = 8 * 1024 * 1024 + BoltDirPath = "./bolt" +) + +var ( + ConstantBucket = "constant-bucket" // store some constant value + + // key + SeqNum = "sequence-number" +) + +// --------------------- about rollup server -------------------- +const ( + RollupDBFileName = "rollup.db" +) + +var ( + // bucket name + AllTokenTxBucket = "all-token-transaction-bucket" // store all token tx + PoolTxIndex = "pool-transaction-index-bucket" // store pending token tx + + // key + LastOnChainTokenTxHashKey = "LastOnChainTokenTxHashKey" + LastArTxIdKey = "LastArTxIdKey" + LastAddPoolTokenTxIdKey = "LastAddPoolTokenTxIdKey" +) + +// --------------------- about tracker server ------------------- +const ( + TrackerDBFileName = "tracker.db" +) + +var ( + // bucket name + AllSyncedTokenTxBucket = "all-synced-token-tx-bucket" // store all token tx + + // key + LastProcessArTxIdKey = "LastProcessArTxId" // process ar tx id +) + +var ( + AllBkt = []string{ + ConstantBucket, + AllTokenTxBucket, + PoolTxIndex, + AllSyncedTokenTxBucket, + } +) + +type Config struct { + Bkt []string + // use s3 or 4ever + UseS3 bool + Use4EVER bool + AccKey string + SecretKey string + Region string + BktPrefix string + + // bolt + DbPath string + DbFileName string +} diff --git a/store/errors.go b/store/schema/errors.go similarity index 80% rename from store/errors.go rename to store/schema/errors.go index 3aa57ad..cbaf58e 100644 --- a/store/errors.go +++ b/store/schema/errors.go @@ -1,4 +1,4 @@ -package store +package schema import "errors" diff --git a/tracker/jobs.go b/tracker/jobs.go index 170d565..3a48bd9 100644 --- a/tracker/jobs.go +++ b/tracker/jobs.go @@ -3,7 +3,7 @@ package tracker import ( "github.com/everFinance/turing/common" types "github.com/everFinance/turing/common" - "github.com/everFinance/turing/store" + "github.com/everFinance/turing/store/schema" "time" ) @@ -25,7 +25,7 @@ func (t *Tracker) jobTxsPullFromChain() { // get all unprocessed txs // null value is return "" - lastTxID, err := t.store.GetConstant(store.LastProcessArTxIdKey) + lastTxID, err := t.store.GetConstant(schema.LastProcessArTxIdKey) if err != nil { log.Error("can not get lastTxID", "err", err) return diff --git a/tracker/tracker.go b/tracker/tracker.go index 2f219c5..b2d6238 100644 --- a/tracker/tracker.go +++ b/tracker/tracker.go @@ -2,6 +2,7 @@ package tracker import ( arseeding "github.com/everFinance/arseeding/sdk" + "github.com/everFinance/turing/store/schema" "sync" "time" @@ -33,11 +34,29 @@ type Tracker struct { once sync.Once } -func New(tags []types.Tag, arNode, arSeedingUrl string, arOwner string, dbDirPath string) *Tracker { - if len(dbDirPath) == 0 { - dbDirPath = store.StoreDirPath +func initDbConfig(cfg *schema.Config) { + if len(cfg.DbPath) == 0 { + cfg.DbPath = schema.BoltDirPath + } + if len(cfg.DbFileName) == 0 { + cfg.DbFileName = schema.TrackerDBFileName + } + cfg.Bkt = []string{ + schema.AllSyncedTokenTxBucket, + schema.ConstantBucket, + } +} + +func New(tags []types.Tag, arNode, arSeedingUrl string, arOwner string, dbConfig schema.Config) *Tracker { + + initDbConfig(&dbConfig) + var err error + kv := &store.Store{} + if dbConfig.UseS3 { + kv, err = store.NewS3Store(dbConfig) + } else { + kv, err = store.NewBoltStore(dbConfig) } - kv, err := store.NewKvStore(dbDirPath, store.TrackerDBFileName, store.ConstantBucket, store.AllSyncedTokenTxBucket) if err != nil { panic(err) } @@ -83,7 +102,7 @@ func (t *Tracker) SubscribeTx() <-chan common.SubscribeTransaction { } func (t *Tracker) ProcessedArTxId() (string, error) { - return t.store.GetConstant(store.LastProcessArTxIdKey) + return t.store.GetConstant(schema.LastProcessArTxIdKey) } func (t *Tracker) Addr() string { From 748689cf41652d71071d82d902eae3da6444bfe2 Mon Sep 17 00:00:00 2001 From: kevin-zhangzh <913455507@qq.com> Date: Thu, 15 Sep 2022 17:42:19 +0800 Subject: [PATCH 2/3] update(): parentId onchain --- common/types.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/common/types.go b/common/types.go index ce1fcca..483b34f 100644 --- a/common/types.go +++ b/common/types.go @@ -112,7 +112,8 @@ func (txs Transactions) MarshalFromOnChain() ([]byte, error) { lightTxs := make(Transactions, 0, len(txs)) for _, tx := range txs { lightTxs = append(lightTxs, &Transaction{ - TxData: tx.TxData, + TxData: tx.TxData, + ParentId: tx.ParentId, }) } return lightTxs.Marshal() From 95adde44ea01b0879e0fe5d5690a2cd41e79954b Mon Sep 17 00:00:00 2001 From: kevin-zhangzh <913455507@qq.com> Date: Fri, 16 Sep 2022 19:18:10 +0800 Subject: [PATCH 3/3] test(): rollup&tracker --- example/rollup-bolt.go | 33 +++++++++-- example/rollup-s3.go | 51 +++++++++++++++++ example/schema/types.go | 6 ++ example/tracker-bolt.go | 18 ++++-- example/tracker-s3.go | 4 +- store/kv.go | 7 ++- store/rawdb/database_test.go | 107 +++++++++++++++++++++++++++++++++++ store/rawdb/s3.go | 6 ++ 8 files changed, 218 insertions(+), 14 deletions(-) create mode 100644 example/schema/types.go create mode 100644 store/rawdb/database_test.go diff --git a/example/rollup-bolt.go b/example/rollup-bolt.go index ab0fd5e..157491c 100644 --- a/example/rollup-bolt.go +++ b/example/rollup-bolt.go @@ -1,7 +1,10 @@ package main import ( + "encoding/json" + "fmt" "github.com/everFinance/goar/types" + ts "github.com/everFinance/turing/example/schema" "github.com/everFinance/turing/rollup" "github.com/everFinance/turing/store/schema" "time" @@ -9,12 +12,34 @@ import ( func main() { tags := []types.Tag{ - {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + {Name: "App", Value: "turing-test"}, + {Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"}, } suggestLastArTxId := "" - arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM" arNode := "https://arweave.net" - arWalletKeyPath := "./key.json" + arWalletKeyPath := "./k9s.json" rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, schema.Config{}) - rol.Run(5*time.Second, 999) + rol.Run(2*time.Minute, 999) + feedData(rol.AddTx()) +} + +func feedData(ch chan<- []byte) { + ticker := time.NewTicker(30 * time.Second) + var cnt int64 + for { + select { + case <-ticker.C: + tx := &ts.Tx{ + Name: fmt.Sprintf("test-%v", cnt), + Timestamp: time.Now().UnixMilli(), + } + data, err := json.Marshal(tx) + if err != nil { + panic(err) + } + cnt += 1 + ch <- data + } + } } diff --git a/example/rollup-s3.go b/example/rollup-s3.go index 06ab7d0..32d7862 100644 --- a/example/rollup-s3.go +++ b/example/rollup-s3.go @@ -1 +1,52 @@ package main + +import ( + "encoding/json" + "fmt" + "github.com/everFinance/goar/types" + ts "github.com/everFinance/turing/example/schema" + "github.com/everFinance/turing/rollup" + "github.com/everFinance/turing/store/schema" + "time" +) + +func main() { + tags := []types.Tag{ + {Name: "App", Value: "turing-s3-test"}, + {Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"}, + } + suggestLastArTxId := "" + arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM" + arNode := "https://arweave.net" + arWalletKeyPath := "./k9s.json" + cfg := schema.Config{ + UseS3: true, + AccKey: "", + SecretKey: "MOPfuebKVsSTZK7XGq/", + BktPrefix: "turing", + Region: "ap-northeast-1", + } + rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, cfg) + rol.Run(2*time.Minute, 999) + feedDataS3(rol.AddTx()) +} + +func feedDataS3(ch chan<- []byte) { + ticker := time.NewTicker(30 * time.Second) + var cnt int64 + for { + select { + case <-ticker.C: + tx := &ts.Tx{ + Name: fmt.Sprintf("test-S3-%v", cnt), + Timestamp: time.Now().UnixMilli(), + } + data, err := json.Marshal(tx) + if err != nil { + panic(err) + } + cnt += 1 + ch <- data + } + } +} diff --git a/example/schema/types.go b/example/schema/types.go new file mode 100644 index 0000000..674ccb8 --- /dev/null +++ b/example/schema/types.go @@ -0,0 +1,6 @@ +package schema + +type Tx struct { + Name string `json:"name"` + Timestamp int64 `json:"timestamp"` +} diff --git a/example/tracker-bolt.go b/example/tracker-bolt.go index 1f90fe9..f7bab59 100644 --- a/example/tracker-bolt.go +++ b/example/tracker-bolt.go @@ -1,25 +1,33 @@ package main import ( + "encoding/json" "fmt" "github.com/everFinance/goar/types" + ts "github.com/everFinance/turing/example/schema" "github.com/everFinance/turing/store/schema" + "github.com/everFinance/turing/tracker" ) func main() { tags := []types.Tag{ - {Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"}, + {Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"}, } - arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM" + arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM" arNode := "https://arweave.net" arseed := "" - cursor := uint64(7) + cursor := uint64(0) dbCfg := schema.Config{} tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg) tr.Run(cursor) for { - tx := <-tr.SubscribeTx() - fmt.Println(tx.CursorId) + comTx := <-tr.SubscribeTx() + tx := &ts.Tx{} + err := json.Unmarshal(comTx.Data, tx) + if err != nil { + panic(err) + } + fmt.Println(tx) } } diff --git a/example/tracker-s3.go b/example/tracker-s3.go index b0dfa96..7c81f25 100644 --- a/example/tracker-s3.go +++ b/example/tracker-s3.go @@ -17,8 +17,8 @@ func main() { cursor := uint64(40) dbCfg := schema.Config{ UseS3: true, - AccKey: "AKIATZSGGOHI72GMNSO7", - SecretKey: "MOPfueG+mRNHQHoz9GdTq6/CwyybKVsSTZK7XGq/", + AccKey: "", + SecretKey: "MOPfueG+//", BktPrefix: "turing", Region: "ap-northeast-1", } diff --git a/store/kv.go b/store/kv.go index bdccb60..40ff995 100644 --- a/store/kv.go +++ b/store/kv.go @@ -5,10 +5,9 @@ import ( "encoding/json" "errors" "github.com/everFinance/goar/utils" + types "github.com/everFinance/turing/common" "github.com/everFinance/turing/store/rawdb" "github.com/everFinance/turing/store/schema" - - types "github.com/everFinance/turing/common" ) // init log mode @@ -139,7 +138,9 @@ func (kv *Store) PutPoolTokenTxId(txId string) error { func (kv *Store) BatchDelPoolTokenTxId(txs types.Transactions) (err error) { for _, tx := range txs { err = kv.KVDb.Delete(schema.PoolTxIndex, tx.TxId) - return + if err != nil { + return + } } return nil } diff --git a/store/rawdb/database_test.go b/store/rawdb/database_test.go new file mode 100644 index 0000000..8c43845 --- /dev/null +++ b/store/rawdb/database_test.go @@ -0,0 +1,107 @@ +package rawdb + +// func TestBoltDB(t *testing.T) { +// dataPath := "./tmp/seed.db" +// bktName := schema.ConstantsBucket // cne be replaced by any bucket in schema +// keyNum := 100 +// // prepare key&val to test +// keys := make([]string, keyNum) +// values := make([][]byte, keyNum) +// for i := 0; i < keyNum; i++ { +// key := fmt.Sprintf("key%d", i) +// keys[i] = key +// val := fmt.Sprintf("v%d", i) +// values[i] = []byte(val) +// } +// assert.Equal(t, keyNum, len(keys)) +// // create a bolt db +// boltDb, err := NewBoltDB() +// assert.NoError(t, err) +// +// // test Put & Get +// for i := 0; i < keyNum; i++ { +// err = boltDb.Put(bktName, keys[i], values[i]) +// assert.NoError(t, err) +// } +// +// for i := 0; i < keyNum; i++ { +// val, err := boltDb.Get(bktName, keys[i]) +// assert.NoError(t, err) +// assert.Equal(t, values[i], val) +// } +// +// // test GetAllKey from a bucket +// allKeys, err := boltDb.GetAllKey(bktName) +// // GetAllKey return order may different from keys +// sort.Strings(allKeys) +// sort.Strings(keys) +// assert.NoError(t, err) +// assert.Equal(t, keys, allKeys) +// +// // test Delete +// for i := 0; i < keyNum; i++ { +// err = boltDb.Delete(bktName, keys[i]) +// assert.NoError(t, err) +// } +// for i := 0; i < keyNum; i++ { +// _, err = boltDb.Get(bktName, keys[i]) +// assert.Equal(t, err, schema.ErrNotExist) +// } +// } + +// func TestS3DB(t *testing.T) { +// +// bktName := schema.ConstantsBucket // cne be replaced by any bucket in schema +// keyNum := 10 +// // prepare key&val to test +// keys := make([]string, keyNum) +// values := make([][]byte, keyNum) +// for i := 0; i < keyNum; i++ { +// key := fmt.Sprintf("key%d", i) +// keys[i] = key +// val := fmt.Sprintf("v%d", i) +// values[i] = []byte(val) +// } +// assert.Equal(t, keyNum, len(keys)) +// // info that s3 needed +// accKey := "AKIATZSGGOHIV4QTYNH5" // your aws IAM access key +// secretKey := "uw3gKyHIZlaBx8vnCA/BSdNdH+Fi2j4ACoPJawOy" // your aws IAM secret key +// prefix := "arseed" // create empty bucket +// Region := "ap-northeast-1" +// // create S3DB +// s, err := NewS3DB(accKey, secretKey, Region, prefix) +// // if the bucket exist try a complex prefix, because the bucket name is unique in a specific aws region +// assert.NoError(t, err) +// +// // test Put & Get +// for i := 0; i < keyNum; i++ { +// err = s.Put(bktName, keys[i], values[i]) +// assert.NoError(t, err) +// } +// +// for i := 0; i < keyNum; i++ { +// val, err := s.Get(bktName, keys[i]) +// assert.NoError(t, err) +// assert.Equal(t, values[i], val) +// } +// +// // test GetAllKey from a bucket +// allKeys, err := s.GetAllKey(bktName) +// assert.NoError(t, err) +// // GetAllKey return order may different from keys +// sort.Strings(allKeys) +// sort.Strings(keys) +// if len(allKeys) == len(keys) { // maybe s3 bucket not empty before test +// assert.Equal(t, keys, allKeys) +// } +// +// // test Delete +// for i := 0; i < keyNum; i++ { +// err = s.Delete(bktName, keys[i]) +// assert.NoError(t, err) +// } +// for i := 0; i < keyNum; i++ { +// _, err = s.Get(bktName, keys[i]) +// assert.Equal(t, err, schema.ErrNotExist) +// } +// } diff --git a/store/rawdb/s3.go b/store/rawdb/s3.go index d7e84f3..08a03f7 100644 --- a/store/rawdb/s3.go +++ b/store/rawdb/s3.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/everFinance/turing/store/schema" "strings" + "time" ) type S3DB struct { @@ -54,7 +55,12 @@ func (s *S3DB) Put(bucket, key string, value []byte) (err error) { Key: aws.String(key), Body: bytes.NewReader(value), } + var retry uint64 _, err = s.uploader.Upload(uploadInfo) + for err != nil && retry < 5 { + time.Sleep(time.Duration(retry) * time.Second) + _, err = s.uploader.Upload(uploadInfo) + } return }