From 70d9cf589f46afae545b2e45f9c04fb9a366c5dc Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Fri, 9 Aug 2024 14:32:13 +0545 Subject: [PATCH 1/6] feat: made blocks configuratble and use the block-rpc for most cases decreasing the api usage --- relayer/chains/wasm/provider.go | 2 ++ relayer/chains/wasm/wasm_chain_processor.go | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index fb600ed9f..0b1e6b2a8 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -54,6 +54,8 @@ type WasmProviderConfig struct { ChainID string `json:"chain-id" yaml:"chain-id"` RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` BlockRPCAddr string `json:"block-rpc-addr" yaml:"block-rpc-addr"` + BlockRPCMinDelta int `json:"block-rpc-delta" yaml:"block-rpc-delta"` + BlockRPCRefreshTime int `json:"block-rpc-refresh-time" yaml:"block-rpc-refresh-time"` AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 805142aed..c30e8bdfc 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -286,7 +286,12 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint ccp.log.Debug("Entering Wasm main query loop") if ccp.chainProvider.rangeSupport { - inSyncNumBlocksThreshold = 10 + inSyncNumBlocksThreshold = 15 + defaultQueryLoopTime := 7 + if ccp.chainProvider.PCfg.BlockRPCRefreshTime > 0 { + defaultQueryLoopTime = ccp.chainProvider.PCfg.BlockRPCRefreshTime + } + persistence.minQueryLoopDuration = time.Duration(defaultQueryLoopTime) * time.Second } ticker := time.NewTicker(persistence.minQueryLoopDuration) defer ticker.Stop() @@ -461,7 +466,14 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer var blocks []int64 heighttoSync := syncUpHeight() delta := persistence.latestHeight - persistence.latestQueriedBlock - if ccp.chainProvider.rangeSupport && delta > 20 { + minDelta := 10 + if ccp.chainProvider.PCfg.BlockRPCMinDelta > 0 { + minDelta = ccp.chainProvider.PCfg.BlockRPCMinDelta + } + if ccp.chainProvider.rangeSupport && delta > int64(minDelta) { + ccp.log.Debug("Fetching range block", + zap.Any("last_height", persistence.latestQueriedBlock), + zap.Any("delta", delta)) status, err := ccp.chainProvider.BlockRPCClient.Status(ctx) if err != nil { return nil From 7ce2a03f3667cddea37887a83b25ee459c1f52e1 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Fri, 9 Aug 2024 16:03:27 +0545 Subject: [PATCH 2/6] feat: added error logs --- relayer/chains/wasm/wasm_chain_processor.go | 1 + 1 file changed, 1 insertion(+) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index c30e8bdfc..c5573c377 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -476,6 +476,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer zap.Any("delta", delta)) status, err := ccp.chainProvider.BlockRPCClient.Status(ctx) if err != nil { + ccp.log.Warn("Error occurred fetching block status") return nil } if persistence.latestQueriedBlock > status.SyncInfo.LatestBlockHeight && From 1b952ce4ce3ce1f4a8d57b4b7788bc993beb1aa4 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Fri, 9 Aug 2024 18:28:12 +0545 Subject: [PATCH 3/6] feat: fixed logs --- relayer/chains/wasm/wasm_chain_processor.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index c5573c377..e99360a84 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -476,7 +476,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer zap.Any("delta", delta)) status, err := ccp.chainProvider.BlockRPCClient.Status(ctx) if err != nil { - ccp.log.Warn("Error occurred fetching block status") + ccp.log.Warn("Error occurred fetching block status", zap.Error(err)) return nil } if persistence.latestQueriedBlock > status.SyncInfo.LatestBlockHeight && @@ -491,7 +491,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer } blocks, err = ccp.getBlocksToProcess(ctx, persistence.latestQueriedBlock+1) if err != nil { - ccp.log.Info("error occurred getting blocks") + ccp.log.Warn("error occurred getting blocks", zap.Error(err)) return nil } maxBlock := findMaxBlock(blocks) From 17a41efebf61656ce83fdd2b21f1bfcfabba44f4 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Sun, 11 Aug 2024 22:58:01 +0545 Subject: [PATCH 4/6] fix: fixed chain rpc lagging issue --- relayer/chains/wasm/wasm_chain_processor.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index e99360a84..7ec5caa5d 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -466,23 +466,22 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer var blocks []int64 heighttoSync := syncUpHeight() delta := persistence.latestHeight - persistence.latestQueriedBlock - minDelta := 10 + minDelta := 7 if ccp.chainProvider.PCfg.BlockRPCMinDelta > 0 { minDelta = ccp.chainProvider.PCfg.BlockRPCMinDelta } if ccp.chainProvider.rangeSupport && delta > int64(minDelta) { - ccp.log.Debug("Fetching range block", - zap.Any("last_height", persistence.latestQueriedBlock), - zap.Any("delta", delta)) status, err := ccp.chainProvider.BlockRPCClient.Status(ctx) if err != nil { ccp.log.Warn("Error occurred fetching block status", zap.Error(err)) return nil } - if persistence.latestQueriedBlock > status.SyncInfo.LatestBlockHeight && - persistence.latestHeight > status.SyncInfo.LatestBlockHeight { - persistence.latestHeight = status.SyncInfo.LatestBlockHeight - } + ccp.log.Debug("Fetching range block", + zap.Int64("last_height", persistence.latestQueriedBlock), + zap.Int64("latest_height", status.SyncInfo.LatestBlockHeight), + zap.Int64("delta", delta)) + persistence.latestHeight = status.SyncInfo.LatestBlockHeight + heighttoSync = syncUpHeight() if (persistence.latestQueriedBlock + 1) >= persistence.latestHeight { return nil } From d180adea0ab6f84660174333fe70b772a67571b8 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Sun, 11 Aug 2024 23:26:31 +0545 Subject: [PATCH 5/6] fix: fixed retry issue in case of mempool full --- relayer/chains/wasm/tx.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index c47d4772c..c0cad0080 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -52,7 +52,7 @@ var ( rtyAttNum = uint(5) rtyAtt = retry.Attempts(rtyAttNum) rtyDel = retry.Delay(time.Millisecond * 400) - specialDel = retry.Delay(time.Second * 30) + specialDel = retry.Delay(time.Second * 10) rtyErr = retry.LastErrorOnly(true) numRegex = regexp.MustCompile("[0-9]+") defaultBroadcastWaitTimeout = 10 * time.Minute @@ -841,8 +841,9 @@ func (ap *WasmProvider) SendMessagesToMempool( if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { ap.handleAccountSequenceMismatchError(err) } + return err } - return err + return nil }, retry.Context(ctx), retry.Attempts(0), specialDel, rtyErr); err != nil { ap.log.Error("Failed to update client", zap.Any("Message", msg)) return err From 24949d464c8b735c726bc7bd4a8d941ed0e3c608 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Mon, 12 Aug 2024 08:49:38 +0545 Subject: [PATCH 6/6] fix: reset lastqueried block if provider returns smaller value --- relayer/chains/wasm/wasm_chain_processor.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 7ec5caa5d..fc2fb19ee 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -482,6 +482,13 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer zap.Int64("delta", delta)) persistence.latestHeight = status.SyncInfo.LatestBlockHeight heighttoSync = syncUpHeight() + if persistence.latestQueriedBlock > status.SyncInfo.LatestBlockHeight { + ccp.log.Debug("resetting range block", + zap.Int64("last_height", persistence.latestQueriedBlock), + zap.Int64("latest_height", status.SyncInfo.LatestBlockHeight)) + persistence.latestQueriedBlock = status.SyncInfo.LatestBlockHeight + return nil + } if (persistence.latestQueriedBlock + 1) >= persistence.latestHeight { return nil }