Skip to content

Commit

Permalink
Merge pull request #58 from everFinance/feature/submit-chunk-concurrent
Browse files Browse the repository at this point in the history
Feature/submit chunk concurrent
  • Loading branch information
zyjblockchain authored Dec 7, 2022
2 parents 7299974 + 2427466 commit 2b7fc9b
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 16 deletions.
3 changes: 2 additions & 1 deletion api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (s *Arseeding) runAPI(port string) {
}

if !s.NoFee {
r.Use(LimiterMiddleware(30000, "M", s.config.GetIPWhiteList()))
r.Use(LimiterMiddleware(300000, "M", s.config.GetIPWhiteList()))
}
v1 := r.Group("/")
{
Expand Down Expand Up @@ -96,6 +96,7 @@ func (s *Arseeding) arseedInfo(c *gin.Context) {
"Name": "Arseeding",
"Version": "v1.0.19",
"Documentation": "https://web3infra.dev",
"ConcurrentNum": s.config.Param.ChunkConcurrentNum,
})
}

Expand Down
5 changes: 4 additions & 1 deletion arseeding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/everFinance/everpay-go/common"
paySdk "github.com/everFinance/everpay-go/sdk"
"github.com/everFinance/goar"
"github.com/everFinance/goar/types"
"github.com/gin-gonic/gin"
"github.com/go-co-op/gocron"
"sync"
Expand Down Expand Up @@ -40,13 +41,14 @@ type Arseeding struct {
bundlePerFeeMap map[string]schema.Fee // key: tokenSymbol, val: fee per chunk_size(256KB)
paymentExpiredRange int64 // default 1 hour
expectedRange int64 // default 50 block
customTags []types.Tag
}

func New(
boltDirPath, mySqlDsn string, sqliteDir string, useSqlite bool,
arWalletKeyPath string, arNode, payUrl string, noFee bool, enableManifest bool,
useS3 bool, s3AccKey, s3SecretKey, s3BucketPrefix, s3Region, s3Endpoint string,
use4EVER bool, port string,
use4EVER bool, port string, customTags []types.Tag,
) *Arseeding {
var err error
KVDb := &Store{}
Expand Down Expand Up @@ -109,6 +111,7 @@ func New(
bundlePerFeeMap: make(map[string]schema.Fee),
paymentExpiredRange: schema.DefaultPaymentExpiredRange,
expectedRange: schema.DefaultExpectedRange,
customTags: customTags,
}

// init cache
Expand Down
18 changes: 17 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"encoding/json"
"github.com/everFinance/arseeding"
"github.com/everFinance/arseeding/common"
"github.com/everFinance/goar/types"
"log"
"os"
"os/signal"
Expand Down Expand Up @@ -36,6 +38,7 @@ func main() {
&cli.BoolFlag{Name: "use_4ever", Value: false, Usage: "run with 4everland s3 service", EnvVars: []string{"USE_4EVER"}},

&cli.StringFlag{Name: "port", Value: ":8080", EnvVars: []string{"PORT"}},
&cli.StringFlag{Name: "tags", Value: `{"Community":"PermaDAO","Website":"permadao.com"}`, EnvVars: []string{"TAGS"}},
},
Action: run,
}
Expand All @@ -50,11 +53,24 @@ func run(c *cli.Context) error {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt, syscall.SIGTERM)

tagJs := c.String("tags")
tagsMap := make(map[string]string)
if err := json.Unmarshal([]byte(tagJs), &tagsMap); err != nil {
panic(err)
}
customTags := make([]types.Tag, 0)
for k, v := range tagsMap {
customTags = append(customTags, types.Tag{
Name: k,
Value: v,
})
}

s := arseeding.New(
c.String("db_dir"), c.String("mysql"), c.String("sqlite_dir"), c.Bool("use_sqlite"),
c.String("key_path"), c.String("ar_node"), c.String("pay"), c.Bool("no_fee"), c.Bool("manifest"),
c.Bool("use_s3"), c.String("s3_acc_key"), c.String("s3_secret_key"), c.String("s3_prefix"), c.String("s3_region"), c.String("s3_endpoint"),
c.Bool("use_4ever"), c.String("port"))
c.Bool("use_4ever"), c.String("port"), customTags)
s.Run(c.String("port"), c.Int("bundle_interval"))

common.NewMetricServer()
Expand Down
9 changes: 9 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"github.com/everFinance/arseeding/config/schema"
"github.com/go-co-op/gocron"
"time"
)
Expand All @@ -12,6 +13,7 @@ type Config struct {
ipWhiteList map[string]struct{}
apiKeyMap map[string]struct{}
scheduler *gocron.Scheduler
Param schema.Param
}

func New(configDSN, sqliteDir string, useSqlite bool) *Config {
Expand All @@ -29,11 +31,18 @@ func New(configDSN, sqliteDir string, useSqlite bool) *Config {
if err != nil {
panic(err)
}
param, err := wdb.GetParam()
if err != nil {
panic(err)
}
return &Config{
wdb: wdb,
speedTxFee: fee.SpeedTxFee,
bundleServeFee: fee.BundleServeFee,
ipWhiteList: make(map[string]struct{}),
apiKeyMap: make(map[string]struct{}),
scheduler: gocron.NewScheduler(time.UTC),
Param: param,
}
}

Expand Down
9 changes: 9 additions & 0 deletions config/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ func (c *Config) runJobs() {
c.scheduler.Every(1).Minute().SingletonMode().Do(c.updateFee)
c.scheduler.Every(1).Minute().SingletonMode().Do(c.updateIPWhiteList)
c.scheduler.Every(5).Seconds().SingletonMode().Do(c.updateApiKey)
c.scheduler.Every(10).Seconds().SingletonMode().Do(c.updateParam)

c.scheduler.StartAsync()
}
Expand Down Expand Up @@ -42,3 +43,11 @@ func (c *Config) updateApiKey() {
}
c.apiKeyMap = apiKeyMap
}

func (c *Config) updateParam() {
param, err := c.wdb.GetParam()
if err != nil {
return
}
c.Param = param
}
4 changes: 4 additions & 0 deletions config/schema/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,7 @@ type ApiKey struct {
Key string
Description string
}

type Param struct {
ChunkConcurrentNum int
}
14 changes: 13 additions & 1 deletion config/wdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ func NewSqliteDb(dbDir string) *Wdb {
func (w *Wdb) Migrate() error {
return w.Db.AutoMigrate(&schema.FeeConfig{},
&schema.IpRateWhitelist{},
&schema.ApiKey{})
&schema.ApiKey{},
&schema.Param{})
}

func (w *Wdb) Close() {
Expand Down Expand Up @@ -85,3 +86,14 @@ func (w *Wdb) GetAllApiKey() ([]schema.ApiKey, error) {
err := w.Db.Model(&schema.ApiKey{}).Find(&res).Error
return res, err
}

func (w *Wdb) GetParam() (param schema.Param, err error) {
err = w.Db.First(&param).Error
if err == gorm.ErrRecordNotFound {
param = schema.Param{
ChunkConcurrentNum: 0,
}
return param, nil
}
return
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ go 1.17
require (
github.com/aws/aws-sdk-go v1.27.0
github.com/everFinance/everpay-go v0.0.2
github.com/everFinance/goar v1.4.6
github.com/everFinance/goar v1.4.7
github.com/everFinance/goether v1.1.8
github.com/gin-gonic/gin v1.7.7
github.com/go-co-op/gocron v1.11.0
github.com/panjf2000/ants/v2 v2.5.0
github.com/panjf2000/ants/v2 v2.6.0
github.com/prometheus/client_golang v1.12.2
github.com/shopspring/decimal v1.2.0
github.com/stretchr/testify v1.8.0
Expand Down
14 changes: 11 additions & 3 deletions jobs.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package arseeding

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -595,19 +596,24 @@ func (s *Arseeding) onChainBundleTx(itemIds []string) (arTx types.Transaction, o
{Name: "Contract", Value: "VFr3Bk-uM-motpNNkkFg4lNW1BMmSfzqsVO551Ho4hA"},
}

if len(s.customTags) > 0 {
arTxtags = append(s.customTags, arTxtags...)
}

// speed arTx Fee
price := calculatePrice(s.cache.GetFee(), int64(len(bundle.BundleBinary)))
speedFactor := calculateFactor(price, s.config.GetSpeedFee())
arTx, err = s.bundler.SendBundleTxSpeedUp(bundle.BundleBinary, arTxtags, speedFactor)
concurrentNum := s.config.Param.ChunkConcurrentNum
arTx, err = s.bundler.SendBundleTxSpeedUp(context.TODO(), concurrentNum, bundle.BundleBinary, arTxtags, speedFactor)
if err != nil {
log.Error("s.bundler.SendBundleTxSpeedUp(bundle.BundleBinary,arTxtags)", "err", err)
return
}
log.Info("send bundle arTx", "arTx", arTx.ID)

// arseeding broadcast tx data
if err := s.arseedCli.SubmitTx(arTx); err != nil {
log.Error("s.arseedCli.SubmitTx(arTx)", "err", err, "arId", arTx.ID)
if err := s.arseedCli.SubmitTxConcurrent(context.TODO(), concurrentNum, arTx); err != nil {
log.Error("s.arseedCli.SubmitTxConcurrent(arTx)", "err", err, "arId", arTx.ID)
}
return
}
Expand Down Expand Up @@ -779,6 +785,8 @@ func arTxWatcher(arCli *goar.Client, arTxHash string) bool {
if status.NumberOfConfirmations < 3 {
log.Debug("arseeding send sequence tx must more than 2 block confirms", "txHash", arTxHash, "currentConfirms", status.NumberOfConfirmations)
continue
} else {
return true
}
}
return false
Expand Down
9 changes: 9 additions & 0 deletions sdk/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sdk

import (
"bytes"
"context"
"errors"
"fmt"
"github.com/everFinance/arseeding/schema"
Expand Down Expand Up @@ -31,6 +32,14 @@ func (a *ArSeedCli) SubmitTx(arTx types.Transaction) error {
return uploader.Once()
}

func (a *ArSeedCli) SubmitTxConcurrent(ctx context.Context, concurrentNum int, arTx types.Transaction) error {
uploader, err := goar.CreateUploader(a.ACli, &arTx, nil)
if err != nil {
return err
}
return uploader.ConcurrentOnce(ctx, concurrentNum)
}

func (a *ArSeedCli) BroadcastTxData(arId string) error {
return a.postTask(schema.TaskTypeBroadcast, arId)
}
Expand Down
10 changes: 5 additions & 5 deletions submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func (s *Arseeding) SaveSubmitChunk(chunk types.GetChunk) error {
// 1. verify chunk
err, ok := verifyChunk(chunk)
if err != nil || !ok {
log.Error("verifyChunk(chunk) failed", "err", err, "chunk", chunk)
log.Error("verifyChunk(chunk) failed", "err", err, "chunk.DataRoot", chunk.DataRoot)
return fmt.Errorf("verifyChunk error:%v", err)
}

Expand All @@ -26,14 +26,14 @@ func (s *Arseeding) SaveSubmitChunk(chunk types.GetChunk) error {
if !s.store.IsExistTxDataEndOffset(chunk.DataRoot, chunk.DataSize) {
// add TxDataEndOffset
if err := s.syncAddTxDataEndOffset(chunk.DataRoot, chunk.DataSize); err != nil {
log.Error("syncAddTxDataEndOffset(s.store,chunk.DataRoot,chunk.DataSize)", "err", err, "chunk", chunk)
log.Error("syncAddTxDataEndOffset(s.store,chunk.DataRoot,chunk.DataSize)", "err", err, "chunk.DataRoot", chunk.DataRoot)
return err
}
}

// 3. store chunk
if err := storeChunk(chunk, s.store); err != nil {
log.Error("storeChunk(chunk,s.store)", "err", err, "chunk", chunk)
log.Error("storeChunk(chunk,s.store)", "err", err, "chunk.DataRoot", chunk.DataRoot)
return err
}

Expand Down Expand Up @@ -76,7 +76,7 @@ func (s *Arseeding) SaveSubmitTx(arTx types.Transaction) error {
// set chunks
dataBy, err := utils.Base64Decode(arTx.Data)
if err != nil {
log.Error("utils.Base64Decode(arTx.Data)", "err", err, "data", arTx.Data)
log.Error("utils.Base64Decode(arTx.Data)", "err", err, "arTx", arTx.ID)
return err
}
if err := setTxDataChunks(arTx, dataBy, s.store); err != nil {
Expand Down Expand Up @@ -219,7 +219,7 @@ func setTxDataChunks(arTx types.Transaction, txData []byte, db *Store) error {
}

if err := storeChunk(*chunk, db); err != nil {
log.Error("storeChunk(*chunk,s.store)", "err", err, "chunk", *chunk)
log.Error("storeChunk(*chunk,s.store)", "err", err)
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions wdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,13 @@ func (w *Wdb) UpdateOrderPay(id uint, everHash string, paymentStatus string, tx

func (w *Wdb) GetNeedOnChainOrders() ([]schema.Order, error) {
res := make([]schema.Order, 0)
err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, false).Limit(500).Find(&res).Error
err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, false).Limit(2000).Find(&res).Error
return res, err
}

func (w *Wdb) GetNeedOnChainOrdersSorted() ([]schema.Order, error) {
res := make([]schema.Order, 0)
err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, true).Limit(500).Find(&res).Error
err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, true).Limit(2000).Find(&res).Error
return res, err
}

Expand Down

0 comments on commit 2b7fc9b

Please sign in to comment.