Skip to content

Commit

Permalink
Merge pull request #1 from everFinance/dev/s3-store
Browse files Browse the repository at this point in the history
Dev/s3 store
  • Loading branch information
kevin-zhangzh authored Sep 27, 2022
2 parents bb05d7b + 95adde4 commit ac8c186
Show file tree
Hide file tree
Showing 20 changed files with 900 additions and 297 deletions.
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
## Turing
Upload the data to the arweave network in the specified order and download it from the arweave network in the order of upload.

### Installation
```bash
go get github.com/everFinance/turing
```
### Implements
turing implements the above functionality through the rollup & tracker service

Expand Down
3 changes: 2 additions & 1 deletion common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (txs Transactions) MarshalFromOnChain() ([]byte, error) {
lightTxs := make(Transactions, 0, len(txs))
for _, tx := range txs {
lightTxs = append(lightTxs, &Transaction{
TxData: tx.TxData,
TxData: tx.TxData,
ParentId: tx.ParentId,
})
}
return lightTxs.Marshal()
Expand Down
45 changes: 45 additions & 0 deletions example/rollup-bolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package main

import (
"encoding/json"
"fmt"
"github.com/everFinance/goar/types"
ts "github.com/everFinance/turing/example/schema"
"github.com/everFinance/turing/rollup"
"github.com/everFinance/turing/store/schema"
"time"
)

func main() {
tags := []types.Tag{
{Name: "App", Value: "turing-test"},
{Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"},
}
suggestLastArTxId := ""
arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"
arNode := "https://arweave.net"
arWalletKeyPath := "./k9s.json"
rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, schema.Config{})
rol.Run(2*time.Minute, 999)
feedData(rol.AddTx())
}

func feedData(ch chan<- []byte) {
ticker := time.NewTicker(30 * time.Second)
var cnt int64
for {
select {
case <-ticker.C:
tx := &ts.Tx{
Name: fmt.Sprintf("test-%v", cnt),
Timestamp: time.Now().UnixMilli(),
}
data, err := json.Marshal(tx)
if err != nil {
panic(err)
}
cnt += 1
ch <- data
}
}
}
52 changes: 52 additions & 0 deletions example/rollup-s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"encoding/json"
"fmt"
"github.com/everFinance/goar/types"
ts "github.com/everFinance/turing/example/schema"
"github.com/everFinance/turing/rollup"
"github.com/everFinance/turing/store/schema"
"time"
)

func main() {
tags := []types.Tag{
{Name: "App", Value: "turing-s3-test"},
{Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"},
}
suggestLastArTxId := ""
arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"
arNode := "https://arweave.net"
arWalletKeyPath := "./k9s.json"
cfg := schema.Config{
UseS3: true,
AccKey: "",
SecretKey: "MOPfuebKVsSTZK7XGq/",
BktPrefix: "turing",
Region: "ap-northeast-1",
}
rol := rollup.New(suggestLastArTxId, arNode, "", arWalletKeyPath, arOwner, tags, cfg)
rol.Run(2*time.Minute, 999)
feedDataS3(rol.AddTx())
}

func feedDataS3(ch chan<- []byte) {
ticker := time.NewTicker(30 * time.Second)
var cnt int64
for {
select {
case <-ticker.C:
tx := &ts.Tx{
Name: fmt.Sprintf("test-S3-%v", cnt),
Timestamp: time.Now().UnixMilli(),
}
data, err := json.Marshal(tx)
if err != nil {
panic(err)
}
cnt += 1
ch <- data
}
}
}
6 changes: 6 additions & 0 deletions example/schema/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package schema

type Tx struct {
Name string `json:"name"`
Timestamp int64 `json:"timestamp"`
}
33 changes: 33 additions & 0 deletions example/tracker-bolt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"encoding/json"
"fmt"
"github.com/everFinance/goar/types"
ts "github.com/everFinance/turing/example/schema"
"github.com/everFinance/turing/store/schema"

"github.com/everFinance/turing/tracker"
)

func main() {
tags := []types.Tag{
{Name: "Owner", Value: "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"},
}
arOwner := "k9sXK8x5lMxxM-PbDZ13tCeZi6rOtlll5a6_rrc2oGM"
arNode := "https://arweave.net"
arseed := ""
cursor := uint64(0)
dbCfg := schema.Config{}
tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg)
tr.Run(cursor)
for {
comTx := <-tr.SubscribeTx()
tx := &ts.Tx{}
err := json.Unmarshal(comTx.Data, tx)
if err != nil {
panic(err)
}
fmt.Println(tx)
}
}
31 changes: 31 additions & 0 deletions example/tracker-s3.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"fmt"
"github.com/everFinance/goar/types"
"github.com/everFinance/turing/store/schema"
"github.com/everFinance/turing/tracker"
)

func main() {
tags := []types.Tag{
{Name: "Owner", Value: "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"},
}
arOwner := "uGx-QfBXSwABKxjha-00dI7vvfyqIYblY6Z5L6cyTFM"
arNode := "https://arweave.net"
arseed := ""
cursor := uint64(40)
dbCfg := schema.Config{
UseS3: true,
AccKey: "",
SecretKey: "MOPfueG+//",
BktPrefix: "turing",
Region: "ap-northeast-1",
}
tr := tracker.New(tags, arNode, arseed, arOwner, dbCfg)
tr.Run(cursor)
for {
tx := <-tr.SubscribeTx()
fmt.Println(tx.CursorId)
}
}
25 changes: 20 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,18 @@ module github.com/everFinance/turing

go 1.18

require (
github.com/aws/aws-sdk-go v1.27.0
github.com/everFinance/arseeding v1.0.3
github.com/everFinance/everpay-go v0.0.1
github.com/everFinance/goar v1.4.4
github.com/getsentry/sentry-go v0.11.0
github.com/go-co-op/gocron v1.11.0
github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac
github.com/stretchr/testify v1.8.0
go.etcd.io/bbolt v1.3.6
)

require (
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/VictoriaMetrics/fastcache v1.6.0 // indirect
Expand All @@ -12,9 +24,7 @@ require (
github.com/deckarep/golang-set v0.0.0-20180603214616-504e848d77ea // indirect
github.com/edsrzf/mmap-go v1.0.0 // indirect
github.com/ethereum/go-ethereum v1.10.12 // indirect
github.com/everFinance/arseeding v1.0.3 // indirect
github.com/everFinance/ethrpc v1.0.4 // indirect
github.com/everFinance/goar v1.4.4 // indirect
github.com/everFinance/goether v1.1.7 // indirect
github.com/everFinance/gojwk v1.0.0 // indirect
github.com/everFinance/ttcrsa v1.1.3 // indirect
Expand All @@ -30,10 +40,10 @@ require (
github.com/holiman/bloomfilter/v2 v2.0.3 // indirect
github.com/holiman/uint256 v1.2.0 // indirect
github.com/huin/goupnp v1.0.2 // indirect
github.com/inconshreveable/log15 v0.0.0-20201112154412-8562bdadbbac // indirect
github.com/jackpal/go-nat-pmp v1.0.2-0.20160603034137-1fa385a6f458 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.4 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/karalabe/usb v0.0.0-20211005121534-4c5740d64559 // indirect
github.com/mattn/go-colorable v0.1.11 // indirect
Expand All @@ -47,20 +57,25 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/tsdb v0.7.1 // indirect
github.com/rjeczalik/notify v0.9.1 // indirect
github.com/robfig/cron/v3 v3.0.1 // indirect
github.com/shirou/gopsutil v3.21.4-0.20210419000835-c7a38de76ee5+incompatible // indirect
github.com/shopspring/decimal v1.2.0 // indirect
github.com/status-im/keycard-go v0.0.0-20190316090335-8537d3370df4 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
github.com/tidwall/gjson v1.14.1 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.0 // indirect
github.com/tidwall/sjson v1.2.4 // indirect
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
github.com/tyler-smith/go-bip39 v1.0.1-0.20181017060643-dbb3b84ba2ef // indirect
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect
golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.26.0 // indirect
gopkg.in/h2non/gentleman.v2 v2.0.5 // indirect
gopkg.in/natefinch/npipe.v2 v2.0.0-20160621034901-c1b8fa8bdcce // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gorm.io/datatypes v1.0.1 // indirect
Expand Down
43 changes: 30 additions & 13 deletions rollup/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
arseeding "github.com/everFinance/arseeding/sdk"
"github.com/everFinance/turing/rollup/txpool"
"github.com/everFinance/turing/store"
"github.com/everFinance/turing/store/schema"
)

var log = types.NewLog(types.RollupLogMode)
Expand Down Expand Up @@ -63,21 +64,36 @@ func initTxPool(kv *store.Store, lastPendTokenTxHash string) (*txpool.TxPool, er
return pool, nil
}

func initDbConfig(cfg *schema.Config) {
if len(cfg.DbPath) == 0 {
cfg.DbPath = schema.BoltDirPath
}
if len(cfg.DbFileName) == 0 {
cfg.DbFileName = schema.RollupDBFileName
}
cfg.Bkt = []string{
schema.AllTokenTxBucket,
schema.PoolTxIndex,
schema.ConstantBucket,
}
}

// New suggestLastArTxId use rollup last on chain tx id, if server bolt.db is clear, restart server need config this args
func New(suggestLastArTxId string, arNode, arSeedingUrl string, arWalletKeyPath string, owner string, tags []arTypes.Tag, dbDirPath string) *Rollup {
if len(dbDirPath) == 0 {
dbDirPath = store.StoreDirPath
func New(suggestLastArTxId string, arNode, arSeedingUrl string, arWalletKeyPath string, owner string, tags []arTypes.Tag, dbConfig schema.Config) *Rollup {
initDbConfig(&dbConfig)
var err error
kv := &store.Store{}
if dbConfig.UseS3 {
kv, err = store.NewS3Store(dbConfig)
} else {
kv, err = store.NewBoltStore(dbConfig)
}
kv, err := store.NewKvStore(
dbDirPath, store.RollupDBFileName,
store.AllTokenTxBucket, store.PoolTxIndex, store.ConstantBucket,
)
if err != nil {
panic(err)
}

// get constant from kv
lastArTxId, err := kv.GetConstant(store.LastArTxIdKey)
lastArTxId, err := kv.GetConstant(schema.LastArTxIdKey)
if err != nil {
panic(err)
}
Expand All @@ -90,11 +106,11 @@ func New(suggestLastArTxId string, arNode, arSeedingUrl string, arWalletKeyPath
}
}

lastOnChainTokenTxHash, err := kv.GetConstant(store.LastOnChainTokenTxHashKey)
lastOnChainTokenTxHash, err := kv.GetConstant(schema.LastOnChainTokenTxHashKey)
if err != nil {
panic(err)
}
lastAddPoolTokenTxHash, err := kv.GetConstant(store.LastAddPoolTokenTxIdKey)
lastAddPoolTokenTxHash, err := kv.GetConstant(schema.LastAddPoolTokenTxIdKey)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -216,7 +232,7 @@ func (rol *Rollup) listenTokenTxToPool() {
panic(err)
} else {
// update LastAddPoolTokenTxIdKey
if err := rol.store.UpdateConstant(store.LastAddPoolTokenTxIdKey, []byte(txHash)); err != nil {
if err := rol.store.UpdateConstant(schema.LastAddPoolTokenTxIdKey, []byte(txHash)); err != nil {
panic(err)
}
rol.lastAddPoolTokenTxHash = txHash
Expand Down Expand Up @@ -270,17 +286,18 @@ func (rol *Rollup) sealTxOnChain(timeInterval time.Duration, maxOfRollup int) {

// on chain success and modify some status
// 1. modify db constants 'lastArTxId'
if err := rol.store.UpdateConstant(store.LastArTxIdKey, []byte(arId)); err != nil {
if err := rol.store.UpdateConstant(schema.LastArTxIdKey, []byte(arId)); err != nil {
log.Error("modify lastArTxIdKey error", "error", err, "newValue", arId)
panic(err)
}
// 2. modify db constants 'lastOnChainTokenTxHash'
if err := rol.store.UpdateConstant(store.LastOnChainTokenTxHashKey, []byte(txs[len(txs)-1].TxId)); err != nil {
if err := rol.store.UpdateConstant(schema.LastOnChainTokenTxHashKey, []byte(txs[len(txs)-1].TxId)); err != nil {
log.Error("modify lastOnChainTokenTxHashKey error", "error", err, "newValue", txs[len(txs)-1].TxId)
panic(err)
}
// 3. delete poolTxIndex bucket txs index
if err := rol.store.BatchDelPoolTokenTxId(txs); err != nil {
// maybe not need panic
panic(err)
}
// 4. change lastTxHash
Expand Down
Loading

0 comments on commit ac8c186

Please sign in to comment.