diff --git a/api.go b/api.go index c64eb9c..2b187b7 100644 --- a/api.go +++ b/api.go @@ -25,7 +25,7 @@ func (s *Arseeding) runAPI(port string) { } if !s.NoFee { - r.Use(LimiterMiddleware(30000, "M", s.config.GetIPWhiteList())) + r.Use(LimiterMiddleware(300000, "M", s.config.GetIPWhiteList())) } v1 := r.Group("/") { @@ -96,6 +96,7 @@ func (s *Arseeding) arseedInfo(c *gin.Context) { "Name": "Arseeding", "Version": "v1.0.19", "Documentation": "https://web3infra.dev", + "ConcurrentNum": s.config.Param.ChunkConcurrentNum, }) } diff --git a/arseeding.go b/arseeding.go index 19bf156..0ad6a8e 100644 --- a/arseeding.go +++ b/arseeding.go @@ -8,6 +8,7 @@ import ( "github.com/everFinance/everpay-go/common" paySdk "github.com/everFinance/everpay-go/sdk" "github.com/everFinance/goar" + "github.com/everFinance/goar/types" "github.com/gin-gonic/gin" "github.com/go-co-op/gocron" "sync" @@ -40,13 +41,14 @@ type Arseeding struct { bundlePerFeeMap map[string]schema.Fee // key: tokenSymbol, val: fee per chunk_size(256KB) paymentExpiredRange int64 // default 1 hour expectedRange int64 // default 50 block + customTags []types.Tag } func New( boltDirPath, mySqlDsn string, sqliteDir string, useSqlite bool, arWalletKeyPath string, arNode, payUrl string, noFee bool, enableManifest bool, useS3 bool, s3AccKey, s3SecretKey, s3BucketPrefix, s3Region, s3Endpoint string, - use4EVER bool, port string, + use4EVER bool, port string, customTags []types.Tag, ) *Arseeding { var err error KVDb := &Store{} @@ -109,6 +111,7 @@ func New( bundlePerFeeMap: make(map[string]schema.Fee), paymentExpiredRange: schema.DefaultPaymentExpiredRange, expectedRange: schema.DefaultExpectedRange, + customTags: customTags, } // init cache diff --git a/cmd/main.go b/cmd/main.go index a2ea01c..7148416 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,8 +1,10 @@ package main import ( + "encoding/json" "github.com/everFinance/arseeding" "github.com/everFinance/arseeding/common" + "github.com/everFinance/goar/types" "log" "os" "os/signal" @@ -36,6 +38,7 @@ func main() { &cli.BoolFlag{Name: "use_4ever", Value: false, Usage: "run with 4everland s3 service", EnvVars: []string{"USE_4EVER"}}, &cli.StringFlag{Name: "port", Value: ":8080", EnvVars: []string{"PORT"}}, + &cli.StringFlag{Name: "tags", Value: `{"Community":"PermaDAO","Website":"permadao.com"}`, EnvVars: []string{"TAGS"}}, }, Action: run, } @@ -50,11 +53,24 @@ func run(c *cli.Context) error { signals := make(chan os.Signal, 1) signal.Notify(signals, os.Interrupt, syscall.SIGTERM) + tagJs := c.String("tags") + tagsMap := make(map[string]string) + if err := json.Unmarshal([]byte(tagJs), &tagsMap); err != nil { + panic(err) + } + customTags := make([]types.Tag, 0) + for k, v := range tagsMap { + customTags = append(customTags, types.Tag{ + Name: k, + Value: v, + }) + } + s := arseeding.New( c.String("db_dir"), c.String("mysql"), c.String("sqlite_dir"), c.Bool("use_sqlite"), c.String("key_path"), c.String("ar_node"), c.String("pay"), c.Bool("no_fee"), c.Bool("manifest"), c.Bool("use_s3"), c.String("s3_acc_key"), c.String("s3_secret_key"), c.String("s3_prefix"), c.String("s3_region"), c.String("s3_endpoint"), - c.Bool("use_4ever"), c.String("port")) + c.Bool("use_4ever"), c.String("port"), customTags) s.Run(c.String("port"), c.Int("bundle_interval")) common.NewMetricServer() diff --git a/config/config.go b/config/config.go index 92d716d..12c1988 100644 --- a/config/config.go +++ b/config/config.go @@ -1,6 +1,7 @@ package config import ( + "github.com/everFinance/arseeding/config/schema" "github.com/go-co-op/gocron" "time" ) @@ -12,6 +13,7 @@ type Config struct { ipWhiteList map[string]struct{} apiKeyMap map[string]struct{} scheduler *gocron.Scheduler + Param schema.Param } func New(configDSN, sqliteDir string, useSqlite bool) *Config { @@ -29,11 +31,18 @@ func New(configDSN, sqliteDir string, useSqlite bool) *Config { if err != nil { panic(err) } + param, err := wdb.GetParam() + if err != nil { + panic(err) + } return &Config{ wdb: wdb, speedTxFee: fee.SpeedTxFee, bundleServeFee: fee.BundleServeFee, + ipWhiteList: make(map[string]struct{}), + apiKeyMap: make(map[string]struct{}), scheduler: gocron.NewScheduler(time.UTC), + Param: param, } } diff --git a/config/jobs.go b/config/jobs.go index b8b88a3..afe6e7f 100644 --- a/config/jobs.go +++ b/config/jobs.go @@ -4,6 +4,7 @@ func (c *Config) runJobs() { c.scheduler.Every(1).Minute().SingletonMode().Do(c.updateFee) c.scheduler.Every(1).Minute().SingletonMode().Do(c.updateIPWhiteList) c.scheduler.Every(5).Seconds().SingletonMode().Do(c.updateApiKey) + c.scheduler.Every(10).Seconds().SingletonMode().Do(c.updateParam) c.scheduler.StartAsync() } @@ -42,3 +43,11 @@ func (c *Config) updateApiKey() { } c.apiKeyMap = apiKeyMap } + +func (c *Config) updateParam() { + param, err := c.wdb.GetParam() + if err != nil { + return + } + c.Param = param +} diff --git a/config/schema/db.go b/config/schema/db.go index b88dba3..cd12340 100644 --- a/config/schema/db.go +++ b/config/schema/db.go @@ -15,3 +15,7 @@ type ApiKey struct { Key string Description string } + +type Param struct { + ChunkConcurrentNum int +} diff --git a/config/wdb.go b/config/wdb.go index f40fc0c..9511a5f 100644 --- a/config/wdb.go +++ b/config/wdb.go @@ -52,7 +52,8 @@ func NewSqliteDb(dbDir string) *Wdb { func (w *Wdb) Migrate() error { return w.Db.AutoMigrate(&schema.FeeConfig{}, &schema.IpRateWhitelist{}, - &schema.ApiKey{}) + &schema.ApiKey{}, + &schema.Param{}) } func (w *Wdb) Close() { @@ -85,3 +86,14 @@ func (w *Wdb) GetAllApiKey() ([]schema.ApiKey, error) { err := w.Db.Model(&schema.ApiKey{}).Find(&res).Error return res, err } + +func (w *Wdb) GetParam() (param schema.Param, err error) { + err = w.Db.First(¶m).Error + if err == gorm.ErrRecordNotFound { + param = schema.Param{ + ChunkConcurrentNum: 0, + } + return param, nil + } + return +} diff --git a/go.mod b/go.mod index 3c2c905..06f657f 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,11 @@ go 1.17 require ( github.com/aws/aws-sdk-go v1.27.0 github.com/everFinance/everpay-go v0.0.2 - github.com/everFinance/goar v1.4.6 + github.com/everFinance/goar v1.4.7 github.com/everFinance/goether v1.1.8 github.com/gin-gonic/gin v1.7.7 github.com/go-co-op/gocron v1.11.0 - github.com/panjf2000/ants/v2 v2.5.0 + github.com/panjf2000/ants/v2 v2.6.0 github.com/prometheus/client_golang v1.12.2 github.com/shopspring/decimal v1.2.0 github.com/stretchr/testify v1.8.0 diff --git a/jobs.go b/jobs.go index 4a18d7a..f10ab97 100644 --- a/jobs.go +++ b/jobs.go @@ -1,6 +1,7 @@ package arseeding import ( + "context" "encoding/json" "errors" "fmt" @@ -595,10 +596,15 @@ func (s *Arseeding) onChainBundleTx(itemIds []string) (arTx types.Transaction, o {Name: "Contract", Value: "VFr3Bk-uM-motpNNkkFg4lNW1BMmSfzqsVO551Ho4hA"}, } + if len(s.customTags) > 0 { + arTxtags = append(s.customTags, arTxtags...) + } + // speed arTx Fee price := calculatePrice(s.cache.GetFee(), int64(len(bundle.BundleBinary))) speedFactor := calculateFactor(price, s.config.GetSpeedFee()) - arTx, err = s.bundler.SendBundleTxSpeedUp(bundle.BundleBinary, arTxtags, speedFactor) + concurrentNum := s.config.Param.ChunkConcurrentNum + arTx, err = s.bundler.SendBundleTxSpeedUp(context.TODO(), concurrentNum, bundle.BundleBinary, arTxtags, speedFactor) if err != nil { log.Error("s.bundler.SendBundleTxSpeedUp(bundle.BundleBinary,arTxtags)", "err", err) return @@ -606,8 +612,8 @@ func (s *Arseeding) onChainBundleTx(itemIds []string) (arTx types.Transaction, o log.Info("send bundle arTx", "arTx", arTx.ID) // arseeding broadcast tx data - if err := s.arseedCli.SubmitTx(arTx); err != nil { - log.Error("s.arseedCli.SubmitTx(arTx)", "err", err, "arId", arTx.ID) + if err := s.arseedCli.SubmitTxConcurrent(context.TODO(), concurrentNum, arTx); err != nil { + log.Error("s.arseedCli.SubmitTxConcurrent(arTx)", "err", err, "arId", arTx.ID) } return } @@ -779,6 +785,8 @@ func arTxWatcher(arCli *goar.Client, arTxHash string) bool { if status.NumberOfConfirmations < 3 { log.Debug("arseeding send sequence tx must more than 2 block confirms", "txHash", arTxHash, "currentConfirms", status.NumberOfConfirmations) continue + } else { + return true } } return false diff --git a/sdk/client.go b/sdk/client.go index 8ff14e5..a0c93af 100644 --- a/sdk/client.go +++ b/sdk/client.go @@ -2,6 +2,7 @@ package sdk import ( "bytes" + "context" "errors" "fmt" "github.com/everFinance/arseeding/schema" @@ -31,6 +32,14 @@ func (a *ArSeedCli) SubmitTx(arTx types.Transaction) error { return uploader.Once() } +func (a *ArSeedCli) SubmitTxConcurrent(ctx context.Context, concurrentNum int, arTx types.Transaction) error { + uploader, err := goar.CreateUploader(a.ACli, &arTx, nil) + if err != nil { + return err + } + return uploader.ConcurrentOnce(ctx, concurrentNum) +} + func (a *ArSeedCli) BroadcastTxData(arId string) error { return a.postTask(schema.TaskTypeBroadcast, arId) } diff --git a/submit.go b/submit.go index a4b1241..4e41e60 100644 --- a/submit.go +++ b/submit.go @@ -15,7 +15,7 @@ func (s *Arseeding) SaveSubmitChunk(chunk types.GetChunk) error { // 1. verify chunk err, ok := verifyChunk(chunk) if err != nil || !ok { - log.Error("verifyChunk(chunk) failed", "err", err, "chunk", chunk) + log.Error("verifyChunk(chunk) failed", "err", err, "chunk.DataRoot", chunk.DataRoot) return fmt.Errorf("verifyChunk error:%v", err) } @@ -26,14 +26,14 @@ func (s *Arseeding) SaveSubmitChunk(chunk types.GetChunk) error { if !s.store.IsExistTxDataEndOffset(chunk.DataRoot, chunk.DataSize) { // add TxDataEndOffset if err := s.syncAddTxDataEndOffset(chunk.DataRoot, chunk.DataSize); err != nil { - log.Error("syncAddTxDataEndOffset(s.store,chunk.DataRoot,chunk.DataSize)", "err", err, "chunk", chunk) + log.Error("syncAddTxDataEndOffset(s.store,chunk.DataRoot,chunk.DataSize)", "err", err, "chunk.DataRoot", chunk.DataRoot) return err } } // 3. store chunk if err := storeChunk(chunk, s.store); err != nil { - log.Error("storeChunk(chunk,s.store)", "err", err, "chunk", chunk) + log.Error("storeChunk(chunk,s.store)", "err", err, "chunk.DataRoot", chunk.DataRoot) return err } @@ -76,7 +76,7 @@ func (s *Arseeding) SaveSubmitTx(arTx types.Transaction) error { // set chunks dataBy, err := utils.Base64Decode(arTx.Data) if err != nil { - log.Error("utils.Base64Decode(arTx.Data)", "err", err, "data", arTx.Data) + log.Error("utils.Base64Decode(arTx.Data)", "err", err, "arTx", arTx.ID) return err } if err := setTxDataChunks(arTx, dataBy, s.store); err != nil { @@ -219,7 +219,7 @@ func setTxDataChunks(arTx types.Transaction, txData []byte, db *Store) error { } if err := storeChunk(*chunk, db); err != nil { - log.Error("storeChunk(*chunk,s.store)", "err", err, "chunk", *chunk) + log.Error("storeChunk(*chunk,s.store)", "err", err) return err } } diff --git a/wdb.go b/wdb.go index 3c3b37c..2bc7e28 100644 --- a/wdb.go +++ b/wdb.go @@ -123,13 +123,13 @@ func (w *Wdb) UpdateOrderPay(id uint, everHash string, paymentStatus string, tx func (w *Wdb) GetNeedOnChainOrders() ([]schema.Order, error) { res := make([]schema.Order, 0) - err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, false).Limit(500).Find(&res).Error + err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, false).Limit(2000).Find(&res).Error return res, err } func (w *Wdb) GetNeedOnChainOrdersSorted() ([]schema.Order, error) { res := make([]schema.Order, 0) - err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, true).Limit(500).Find(&res).Error + err := w.Db.Model(&schema.Order{}).Where("payment_status = ? and on_chain_status = ? and sort = ?", schema.SuccPayment, schema.WaitOnChain, true).Limit(2000).Find(&res).Error return res, err }