From 90c2b8174e4ac91d025bdff496dcb89eca382dd0 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Fri, 19 Jul 2024 11:30:31 +0545 Subject: [PATCH 01/18] rf: robust retries to counter rpc failures --- relayer/chains/wasm/provider.go | 115 ++++++++++++++++++++++++-------- 1 file changed, 86 insertions(+), 29 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 32af25e1..ec23234c 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -3,6 +3,7 @@ package wasm import ( "context" "fmt" + "math" "math/big" "runtime" "strings" @@ -567,41 +568,85 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei wg sync.WaitGroup messages coreTypes.ResultTxSearch messagesChan = make(chan *coreTypes.ResultTxSearch) - errorChan = make(chan error) + errorChan = make(chan error, len(p.eventList)) + sema = make(chan struct{}, len(p.eventList)) ) for _, event := range p.eventList { wg.Add(1) - go func(wg *sync.WaitGroup, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) { + go func(event sdkTypes.Event) { defer wg.Done() - searchParam.Events = append(searchParam.Events, event) - res, err := p.client.TxSearch(ctx, searchParam) + sema <- struct{}{} + defer func() { <-sema }() + + localSearchParam := searchParam + localSearchParam.Events = append(localSearchParam.Events, event) + + localMessages := new(coreTypes.ResultTxSearch) + var err error + retryCount := 0 + for retryCount < 3 { + localMessages, err = p.client.TxSearch(ctx, localSearchParam) + if err == nil { + break + } + retryCount++ + time.Sleep((time.Second * 3) * time.Duration(retryCount*retryCount)) + } + if err != nil { errorChan <- err return } - if res.TotalCount > perPage { - for i := 2; i <= int(res.TotalCount/perPage)+1; i++ { - searchParam.Page = &i - resNext, err := p.client.TxSearch(ctx, searchParam) + + if localMessages.TotalCount > perPage { + for i := 2; i <= int(localMessages.TotalCount/perPage)+1; i++ { + localSearchParam.Page = &i + resNext, err := p.client.TxSearch(ctx, localSearchParam) if err != nil { errorChan <- err return } - res.Txs = append(res.Txs, resNext.Txs...) + localMessages.Txs = append(localMessages.Txs, resNext.Txs...) } } - messagesChan <- res - }(&wg, searchParam, messagesChan, errorChan) + messagesChan <- localMessages + }(event) + } + + go func() { + wg.Wait() + close(messagesChan) + close(errorChan) + }() + + var errors []error + for { select { - case msgs := <-messagesChan: - messages.Txs = append(messages.Txs, msgs.Txs...) - messages.TotalCount += msgs.TotalCount - case err := <-errorChan: - p.logger.Error("failed to fetch block messages", zap.Error(err)) + case msgs, ok := <-messagesChan: + if !ok { + messagesChan = nil + } else { + messages.Txs = append(messages.Txs, msgs.Txs...) + messages.TotalCount += msgs.TotalCount + } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + } else { + errors = append(errors, err) + } + } + if messagesChan == nil && errorChan == nil { + break } } - wg.Wait() + + if len(errors) > 0 { + p.logger.Error("Errors occurred while fetching block messages", zap.Errors("errors", errors)) + return nil, fmt.Errorf("errors occurred while fetching block messages: %v", errors) + } + return p.getMessagesFromTxList(messages.Txs) } @@ -686,18 +731,30 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT go func(wg *sync.WaitGroup, heightStream <-chan *types.HeightRange) { defer wg.Done() for heightRange := range heightStream { - blockInfo, err := p.fetchBlockMessages(ctx, heightRange) - if err != nil { - p.logger.Error("failed to fetch block messages", zap.Error(err)) - continue - } - var messages []*relayTypes.Message - for _, block := range blockInfo { - messages = append(messages, block.Messages...) - } - blockInfoChan <- &relayTypes.BlockInfo{ - Height: heightRange.End, - Messages: messages, + var ( + attempts int + maxAttempts = 5 + baseDelay = 3 * time.Second + maxDelay = 60 * time.Second + ) + for { + blockInfo, err := p.fetchBlockMessages(ctx, heightRange) + if err == nil { + for _, block := range blockInfo { + blockInfoChan <- block + } + } + attempts++ + if attempts >= maxAttempts { + p.logger.Error("fetchBlockMessages failed", zap.Int("attempt", attempts), zap.Error(err)) + continue + } + delay := time.Duration(math.Pow(2, float64(attempts))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Int("attempt", attempts), zap.Duration("retrying_in", delay), zap.Error(err)) + time.Sleep(delay) } } }(wg, heightStream) From 6fd87f18ae04dda9792881339e34bad1d90131ad Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Tue, 23 Jul 2024 14:53:21 +0545 Subject: [PATCH 02/18] fix: used string as cache key and modified other required functions accordingly --- relayer/chain_runtime_test.go | 5 ++++- relayer/chains/icon/provider.go | 1 - relayer/relay.go | 10 +++++----- relayer/types/types.go | 24 +++++++++++++++++++----- 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/relayer/chain_runtime_test.go b/relayer/chain_runtime_test.go index 2d363e38..80f8c3f5 100644 --- a/relayer/chain_runtime_test.go +++ b/relayer/chain_runtime_test.go @@ -2,6 +2,7 @@ package relayer import ( "context" + "fmt" "math/big" "testing" "time" @@ -43,8 +44,10 @@ func TestChainRuntime(t *testing.T) { }) t.Run("clear messages", func(t *testing.T) { + fmt.Println(runtime.MessageCache) runtime.clearMessageFromCache([]*types.MessageKey{m1.MessageKey()}) assert.Equal(t, len(runtime.MessageCache.Messages), len(info.Messages)-1) - assert.Equal(t, runtime.MessageCache.Messages[*m2.MessageKey()], types.NewRouteMessage(m2)) + rtMsg, _ := runtime.MessageCache.Get(m2.MessageKey()) + assert.Equal(t, rtMsg, types.NewRouteMessage(m2)) }) } diff --git a/relayer/chains/icon/provider.go b/relayer/chains/icon/provider.go index 77db3f3f..c35c8d72 100644 --- a/relayer/chains/icon/provider.go +++ b/relayer/chains/icon/provider.go @@ -224,7 +224,6 @@ func (p *Provider) SetFee(ctx context.Context, networkID string, msgFee, resFee } txr, err := p.client.WaitForResults(ctx, &types.TransactionHashParam{Hash: types.NewHexBytes(txHash)}) if err != nil { - fmt.Println("SetFee: WaitForResults: %v", err) return fmt.Errorf("SetFee: WaitForResults: %v", err) } if txr.Status != types.NewHexInt(1) { diff --git a/relayer/relay.go b/relayer/relay.go index 4e281e47..9c437faf 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -29,7 +29,6 @@ var ( // main start loop func (r *Relayer) Start(ctx context.Context, flushInterval time.Duration, fresh bool) (chan error, error) { errorChan := make(chan error, 1) - // once flush completes then only start processing if fresh { // flush all the packet and then continue @@ -213,11 +212,11 @@ func (r *Relayer) getActiveMessagesFromStore(nId string, maxMessages uint) ([]*t func (r *Relayer) processMessages(ctx context.Context) { for _, src := range r.chains { - for key, message := range src.MessageCache.Messages { + for _, message := range src.MessageCache.Messages { dst, err := r.FindChainRuntime(message.Dst) if err != nil { r.log.Error("dst chain nid not found", zap.String("nid", message.Dst)) - r.ClearMessages(ctx, []*types.MessageKey{&key}, src) + r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src) continue } @@ -229,7 +228,8 @@ func (r *Relayer) processMessages(ctx context.Context) { message.ToggleProcessing() // if message reached delete the message - messageReceived, err := dst.Provider.MessageReceived(ctx, &key) + + messageReceived, err := dst.Provider.MessageReceived(ctx, message.MessageKey()) if err != nil { dst.log.Error("error occured when checking message received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64()), zap.Error(err)) message.ToggleProcessing() @@ -239,7 +239,7 @@ func (r *Relayer) processMessages(ctx context.Context) { // if message is received we can remove the message from db if messageReceived { dst.log.Info("message already received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64())) - r.ClearMessages(ctx, []*types.MessageKey{&key}, src) + r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src) continue } go r.RouteMessage(ctx, message, dst, src) diff --git a/relayer/types/types.go b/relayer/types/types.go index 158c05d7..9593629e 100644 --- a/relayer/types/types.go +++ b/relayer/types/types.go @@ -154,13 +154,13 @@ func NewMessagekeyWithMessageHeight(key *MessageKey, height uint64) *MessageKeyW } type MessageCache struct { - Messages map[MessageKey]*RouteMessage + Messages map[string]*RouteMessage *sync.RWMutex } func NewMessageCache() *MessageCache { return &MessageCache{ - Messages: make(map[MessageKey]*RouteMessage), + Messages: make(map[string]*RouteMessage), RWMutex: new(sync.RWMutex), } } @@ -168,7 +168,10 @@ func NewMessageCache() *MessageCache { func (m *MessageCache) Add(r *RouteMessage) { m.Lock() defer m.Unlock() - m.Messages[*r.MessageKey()] = r + cacheKey := m.GetCacheKey(r.MessageKey()) + if !m.HasCacheKey(cacheKey) { + m.Messages[cacheKey] = r + } } func (m *MessageCache) Len() int { @@ -178,17 +181,28 @@ func (m *MessageCache) Len() int { func (m *MessageCache) Remove(key *MessageKey) { m.Lock() defer m.Unlock() - delete(m.Messages, *key) + cacheKey := m.GetCacheKey(key) + delete(m.Messages, cacheKey) } // Get returns the message from the cache func (m *MessageCache) Get(key *MessageKey) (*RouteMessage, bool) { m.RLock() defer m.RUnlock() - msg, ok := m.Messages[*key] + cacheKey := m.GetCacheKey(key) + msg, ok := m.Messages[cacheKey] return msg, ok } +func (m *MessageCache) GetCacheKey(key *MessageKey) string { + return key.Src + "-" + key.Dst + "-" + key.Sn.String() +} + +func (m *MessageCache) HasCacheKey(cacheKey string) bool { + _, ok := m.Messages[cacheKey] + return ok +} + type Coin struct { Denom string Amount uint64 From 3d1a7102bf89541a6a502436b75ef9f750af30c5 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Tue, 23 Jul 2024 15:00:38 +0545 Subject: [PATCH 03/18] fix: used src,eventType and Sn as cache key --- relayer/types/types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/types/types.go b/relayer/types/types.go index 9593629e..cb21d0d8 100644 --- a/relayer/types/types.go +++ b/relayer/types/types.go @@ -195,7 +195,7 @@ func (m *MessageCache) Get(key *MessageKey) (*RouteMessage, bool) { } func (m *MessageCache) GetCacheKey(key *MessageKey) string { - return key.Src + "-" + key.Dst + "-" + key.Sn.String() + return key.Src + "-" + key.EventType + "-" + key.Sn.String() } func (m *MessageCache) HasCacheKey(cacheKey string) bool { From 593eb04d2ff738f8725efa5ffd532c81a3e5433b Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Sat, 27 Jul 2024 11:56:07 +0545 Subject: [PATCH 04/18] fix: exits for successfull and maximum attempts --- relayer/chains/wasm/provider.go | 26 ++++++++++++++------------ relayer/chains/wasm/types/const.go | 7 +++++++ 2 files changed, 21 insertions(+), 12 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index ec23234c..e8f8155a 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -147,7 +147,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn } default: if startHeight < latestHeight { - p.logger.Debug("Query started", zap.Uint64("from-height", startHeight), zap.Uint64("to-height", latestHeight)) + p.logger.Info("Query started", zap.Uint64("from-height", startHeight), zap.Uint64("to-height", latestHeight)) startHeight = p.runBlockQuery(ctx, blockInfoChan, startHeight, latestHeight) } } @@ -568,16 +568,13 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei wg sync.WaitGroup messages coreTypes.ResultTxSearch messagesChan = make(chan *coreTypes.ResultTxSearch) - errorChan = make(chan error, len(p.eventList)) - sema = make(chan struct{}, len(p.eventList)) + errorChan = make(chan error) ) for _, event := range p.eventList { wg.Add(1) go func(event sdkTypes.Event) { defer wg.Done() - sema <- struct{}{} - defer func() { <-sema }() localSearchParam := searchParam localSearchParam.Events = append(localSearchParam.Events, event) @@ -585,13 +582,15 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei localMessages := new(coreTypes.ResultTxSearch) var err error retryCount := 0 - for retryCount < 3 { + for retryCount < types.RPCMaxRetryAttempts { + p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) localMessages, err = p.client.TxSearch(ctx, localSearchParam) if err == nil { + p.logger.Info("fetched block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) break } retryCount++ - time.Sleep((time.Second * 3) * time.Duration(retryCount*retryCount)) + time.Sleep(types.BaseRPCRetryDelay * time.Duration(retryCount*retryCount)) } if err != nil { @@ -601,9 +600,11 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei if localMessages.TotalCount > perPage { for i := 2; i <= int(localMessages.TotalCount/perPage)+1; i++ { + p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i)) localSearchParam.Page = &i resNext, err := p.client.TxSearch(ctx, localSearchParam) if err != nil { + p.logger.Error("failed to fetch block messages with page", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i), zap.Error(err)) errorChan <- err return } @@ -733,9 +734,9 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT for heightRange := range heightStream { var ( attempts int - maxAttempts = 5 - baseDelay = 3 * time.Second - maxDelay = 60 * time.Second + maxAttempts = types.RPCMaxRetryAttempts + baseDelay = types.BaseRPCRetryDelay + maxDelay = types.MaxRPCRetryDelay ) for { blockInfo, err := p.fetchBlockMessages(ctx, heightRange) @@ -743,17 +744,18 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT for _, block := range blockInfo { blockInfoChan <- block } + break } attempts++ if attempts >= maxAttempts { p.logger.Error("fetchBlockMessages failed", zap.Int("attempt", attempts), zap.Error(err)) - continue + break } delay := time.Duration(math.Pow(2, float64(attempts))) * baseDelay if delay > maxDelay { delay = maxDelay } - p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Int("attempt", attempts), zap.Duration("retrying_in", delay), zap.Error(err)) + p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Int("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", heightRange.Start), zap.Uint64("end_height", heightRange.End), zap.Error(err)) time.Sleep(delay) } } diff --git a/relayer/chains/wasm/types/const.go b/relayer/chains/wasm/types/const.go index 6615aa14..7aa03814 100644 --- a/relayer/chains/wasm/types/const.go +++ b/relayer/chains/wasm/types/const.go @@ -1,6 +1,8 @@ package types import ( + "time" + "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/relayer/v2/relayer/codecs/injective" @@ -15,4 +17,9 @@ const ( var ( SupportedAlgorithms = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1} SupportedAlgorithmsLedger = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1} + + // Default parameters for RPC + RPCMaxRetryAttempts = 5 + BaseRPCRetryDelay = 3 * time.Second + MaxRPCRetryDelay = 60 * time.Second ) From 800240d92af11a25e731a03b1ba7598e5f4a2781 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Sat, 27 Jul 2024 14:47:51 +0545 Subject: [PATCH 05/18] rf: exponential retry for pagination --- relayer/chains/wasm/provider.go | 69 +++++++++++++++++++----------- relayer/chains/wasm/types/const.go | 6 +-- 2 files changed, 46 insertions(+), 29 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index e8f8155a..7c70d7ad 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -556,6 +556,8 @@ func (p *Provider) getBlockInfoStream(ctx context.Context, done <-chan bool, hei return blockInfoStream } +// fetchBlockMessages fetches block messages from the chain +// TODO: remove retry deplicated code func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.HeightRange) ([]*relayTypes.BlockInfo, error) { perPage := 25 searchParam := types.TxSearchParam{ @@ -576,26 +578,33 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei go func(event sdkTypes.Event) { defer wg.Done() - localSearchParam := searchParam + var ( + localSearchParam = searchParam + localMessages = new(coreTypes.ResultTxSearch) + retryCount uint8 + ) localSearchParam.Events = append(localSearchParam.Events, event) - localMessages := new(coreTypes.ResultTxSearch) - var err error - retryCount := 0 for retryCount < types.RPCMaxRetryAttempts { p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) - localMessages, err = p.client.TxSearch(ctx, localSearchParam) + msgs, err := p.client.TxSearch(ctx, localSearchParam) if err == nil { p.logger.Info("fetched block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) + localMessages = msgs break } retryCount++ - time.Sleep(types.BaseRPCRetryDelay * time.Duration(retryCount*retryCount)) - } - - if err != nil { - errorChan <- err - return + delay := time.Duration(math.Pow(2, float64(retryCount))) * types.BaseRPCRetryDelay + if delay > types.MaxRPCRetryDelay { + delay = types.MaxRPCRetryDelay + } + if retryCount >= types.RPCMaxRetryAttempts { + p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", retryCount), zap.Error(err)) + errorChan <- err + break + } + p.logger.Error("failed to fetch block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) + time.Sleep(delay) } if localMessages.TotalCount > perPage { @@ -604,9 +613,22 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei localSearchParam.Page = &i resNext, err := p.client.TxSearch(ctx, localSearchParam) if err != nil { - p.logger.Error("failed to fetch block messages with page", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i), zap.Error(err)) - errorChan <- err - return + var attempts uint8 + for attempts < types.RPCMaxRetryAttempts { + p.logger.Error("failed to fetch block messages with page", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i), zap.Error(err)) + attempts++ + delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay + if delay > types.MaxRPCRetryDelay { + delay = types.MaxRPCRetryDelay + } + if attempts >= types.RPCMaxRetryAttempts { + p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) + errorChan <- err + break + } + p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay)) + time.Sleep(delay) + } } localMessages.Txs = append(localMessages.Txs, resNext.Txs...) } @@ -732,12 +754,7 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT go func(wg *sync.WaitGroup, heightStream <-chan *types.HeightRange) { defer wg.Done() for heightRange := range heightStream { - var ( - attempts int - maxAttempts = types.RPCMaxRetryAttempts - baseDelay = types.BaseRPCRetryDelay - maxDelay = types.MaxRPCRetryDelay - ) + var attempts uint8 for { blockInfo, err := p.fetchBlockMessages(ctx, heightRange) if err == nil { @@ -747,15 +764,15 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT break } attempts++ - if attempts >= maxAttempts { - p.logger.Error("fetchBlockMessages failed", zap.Int("attempt", attempts), zap.Error(err)) + if attempts >= types.RPCMaxRetryAttempts { + p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", attempts), zap.Error(err)) break } - delay := time.Duration(math.Pow(2, float64(attempts))) * baseDelay - if delay > maxDelay { - delay = maxDelay + delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay + if delay > types.MaxRPCRetryDelay { + delay = types.MaxRPCRetryDelay } - p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Int("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", heightRange.Start), zap.Uint64("end_height", heightRange.End), zap.Error(err)) + p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", heightRange.Start), zap.Uint64("end_height", heightRange.End), zap.Error(err)) time.Sleep(delay) } } diff --git a/relayer/chains/wasm/types/const.go b/relayer/chains/wasm/types/const.go index 7aa03814..f2a4dec4 100644 --- a/relayer/chains/wasm/types/const.go +++ b/relayer/chains/wasm/types/const.go @@ -19,7 +19,7 @@ var ( SupportedAlgorithmsLedger = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1} // Default parameters for RPC - RPCMaxRetryAttempts = 5 - BaseRPCRetryDelay = 3 * time.Second - MaxRPCRetryDelay = 60 * time.Second + RPCMaxRetryAttempts uint8 = 5 + BaseRPCRetryDelay = 3 * time.Second + MaxRPCRetryDelay = 5 * time.Minute ) From efea30ea81d1fee29bae5dacbf7e34ae601ff983 Mon Sep 17 00:00:00 2001 From: "Biru C. Sainju" Date: Fri, 2 Aug 2024 08:50:14 +0545 Subject: [PATCH 06/18] fix: added all keys as previous --- relayer/chain_runtime_test.go | 2 -- relayer/types/types.go | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/relayer/chain_runtime_test.go b/relayer/chain_runtime_test.go index 80f8c3f5..08f57ffa 100644 --- a/relayer/chain_runtime_test.go +++ b/relayer/chain_runtime_test.go @@ -2,7 +2,6 @@ package relayer import ( "context" - "fmt" "math/big" "testing" "time" @@ -44,7 +43,6 @@ func TestChainRuntime(t *testing.T) { }) t.Run("clear messages", func(t *testing.T) { - fmt.Println(runtime.MessageCache) runtime.clearMessageFromCache([]*types.MessageKey{m1.MessageKey()}) assert.Equal(t, len(runtime.MessageCache.Messages), len(info.Messages)-1) rtMsg, _ := runtime.MessageCache.Get(m2.MessageKey()) diff --git a/relayer/types/types.go b/relayer/types/types.go index cb21d0d8..b054a7e1 100644 --- a/relayer/types/types.go +++ b/relayer/types/types.go @@ -195,7 +195,7 @@ func (m *MessageCache) Get(key *MessageKey) (*RouteMessage, bool) { } func (m *MessageCache) GetCacheKey(key *MessageKey) string { - return key.Src + "-" + key.EventType + "-" + key.Sn.String() + return key.Src + "-" + key.Dst + "-" + key.EventType + "-" + key.Sn.String() } func (m *MessageCache) HasCacheKey(cacheKey string) bool { From d5b086bd65397756b11fd7a92e7e087bf34346c9 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Tue, 6 Aug 2024 13:27:47 +0545 Subject: [PATCH 07/18] add: more logs, fix: pagination logic and early exit for error --- relayer/chains/wasm/provider.go | 71 ++++++++++++++---------------- relayer/chains/wasm/types/types.go | 33 ++++++++++++++ 2 files changed, 67 insertions(+), 37 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 1027c852..fbb2d412 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -573,9 +573,11 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei wg sync.WaitGroup messages coreTypes.ResultTxSearch messagesChan = make(chan *coreTypes.ResultTxSearch) - errorChan = make(chan error) ) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + for _, event := range p.eventList { wg.Add(1) go func(event sdkTypes.Event) { @@ -589,10 +591,16 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei localSearchParam.Events = append(localSearchParam.Events, event) for retryCount < types.RPCMaxRetryAttempts { - p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) + select { + case <-ctx.Done(): + return + default: + } + + p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.String("event", event.Type)) msgs, err := p.client.TxSearch(ctx, localSearchParam) if err == nil { - p.logger.Info("fetched block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) + p.logger.Info("fetched block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.String("event", event.Type), zap.Int("total_count", msgs.TotalCount)) localMessages = msgs break } @@ -602,22 +610,34 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei delay = types.MaxRPCRetryDelay } if retryCount >= types.RPCMaxRetryAttempts { - p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", retryCount), zap.Error(err)) - errorChan <- err - break + p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", retryCount), zap.String("event", event.Type), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) + cancel() + return } - p.logger.Error("failed to fetch block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) + p.logger.Warn("failed to fetch block messages. retrying...", zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.String("event", event.Type), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) time.Sleep(delay) } if localMessages.TotalCount > perPage { - for i := 2; i <= int(localMessages.TotalCount/perPage)+1; i++ { + totalPages := (localMessages.TotalCount + perPage - 1) / perPage + for i := 2; i <= totalPages; i++ { + select { + case <-ctx.Done(): + return + default: + } p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i)) localSearchParam.Page = &i resNext, err := p.client.TxSearch(ctx, localSearchParam) if err != nil { var attempts uint8 for attempts < types.RPCMaxRetryAttempts { + select { + case <-ctx.Done(): + return + default: + } + p.logger.Error("failed to fetch block messages with page", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i), zap.Error(err)) attempts++ delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay @@ -626,10 +646,10 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei } if attempts >= types.RPCMaxRetryAttempts { p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) - errorChan <- err - break + cancel() + return } - p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay)) + p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) time.Sleep(delay) } } @@ -643,34 +663,11 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei go func() { wg.Wait() close(messagesChan) - close(errorChan) }() - var errors []error - for { - select { - case msgs, ok := <-messagesChan: - if !ok { - messagesChan = nil - } else { - messages.Txs = append(messages.Txs, msgs.Txs...) - messages.TotalCount += msgs.TotalCount - } - case err, ok := <-errorChan: - if !ok { - errorChan = nil - } else { - errors = append(errors, err) - } - } - if messagesChan == nil && errorChan == nil { - break - } - } - - if len(errors) > 0 { - p.logger.Error("Errors occurred while fetching block messages", zap.Errors("errors", errors)) - return nil, fmt.Errorf("errors occurred while fetching block messages: %v", errors) + for msgs := range messagesChan { + messages.Txs = append(messages.Txs, msgs.Txs...) + messages.TotalCount += msgs.TotalCount } return p.getMessagesFromTxList(messages.Txs) diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index 2872313a..de08a73c 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -1,12 +1,16 @@ package types import ( + "context" "encoding/hex" "fmt" + "math" "strings" + "time" "github.com/cosmos/cosmos-sdk/types" relayerTypes "github.com/icon-project/centralized-relay/relayer/types" + "go.uber.org/zap" ) type TxSearchParam struct { @@ -114,3 +118,32 @@ type HeightRange struct { Start uint64 End uint64 } + +func Retry(ctx context.Context, maxAttempts uint8, baseDelay, maxDelay time.Duration, logger *zap.Logger, operation func() error) error { + var attempts uint8 + for attempts < maxAttempts { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + err := operation() + if err == nil { + return nil + } + + attempts++ + delay := time.Duration(math.Pow(2, float64(attempts))) * baseDelay + if delay > maxDelay { + delay = maxDelay + } + if attempts >= maxAttempts { + logger.Error("operation failed", zap.Uint8("attempt", attempts), zap.Error(err)) + return err + } + logger.Warn("operation failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay)) + time.Sleep(delay) + } + return fmt.Errorf("operation failed after %d attempts", maxAttempts) +} From beae952227b6ae3ce80c17d26ce55ee9d70b5155 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Tue, 6 Aug 2024 14:30:39 +0545 Subject: [PATCH 08/18] rf: reusable logs fields, rf: retry delay position --- relayer/chains/wasm/provider.go | 38 ++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index be4b6bfd..3e7ff2cf 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -587,6 +587,11 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei localSearchParam = searchParam localMessages = new(coreTypes.ResultTxSearch) retryCount uint8 + zapFields = []zap.Field{ + zap.Uint64("start_height", localSearchParam.StartHeight), + zap.Uint64("end_height", localSearchParam.EndHeight), + zap.String("event", event.Type), + } ) localSearchParam.Events = append(localSearchParam.Events, event) @@ -597,24 +602,24 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei default: } - p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.String("event", event.Type)) + p.logger.Info("fetching block messages", zapFields...) msgs, err := p.client.TxSearch(ctx, localSearchParam) if err == nil { - p.logger.Info("fetched block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.String("event", event.Type), zap.Int("total_count", msgs.TotalCount)) + p.logger.Info("fetched block messages", zapFields...) localMessages = msgs break } retryCount++ - delay := time.Duration(math.Pow(2, float64(retryCount))) * types.BaseRPCRetryDelay - if delay > types.MaxRPCRetryDelay { - delay = types.MaxRPCRetryDelay - } if retryCount >= types.RPCMaxRetryAttempts { - p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", retryCount), zap.String("event", event.Type), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) + p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...) cancel() return } - p.logger.Warn("failed to fetch block messages. retrying...", zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.String("event", event.Type), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) + delay := time.Duration(math.Pow(2, float64(retryCount))) * types.BaseRPCRetryDelay + if delay > types.MaxRPCRetryDelay { + delay = types.MaxRPCRetryDelay + } + p.logger.Warn("failed to fetch block messages. retrying...", append(zapFields, zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.Error(err))...) time.Sleep(delay) } @@ -626,7 +631,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei return default: } - p.logger.Info("fetching block messages", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i)) + p.logger.Info("fetching block messages", append(zapFields, zap.Int("page", i))...) localSearchParam.Page = &i resNext, err := p.client.TxSearch(ctx, localSearchParam) if err != nil { @@ -637,19 +642,18 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei return default: } - - p.logger.Error("failed to fetch block messages with page", zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Int("page", i), zap.Error(err)) + p.logger.Warn("failed to fetch block messages with page", append(zapFields, zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) attempts++ - delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay - if delay > types.MaxRPCRetryDelay { - delay = types.MaxRPCRetryDelay - } if attempts >= types.RPCMaxRetryAttempts { - p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight)) + p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) cancel() return } - p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), zap.Error(err)) + delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay + if delay > types.MaxRPCRetryDelay { + delay = types.MaxRPCRetryDelay + } + p.logger.Warn("fetchBlockMessages failed, retrying...", append(zapFields, zap.Duration("retrying_in", delay), zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) time.Sleep(delay) } } From c75df010b320ce80ee9bea64afc1d285c1c632d1 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Tue, 6 Aug 2024 14:33:17 +0545 Subject: [PATCH 09/18] rm: unused method for retry --- relayer/chains/wasm/types/types.go | 33 ------------------------------ 1 file changed, 33 deletions(-) diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index 2280de3b..2ba0d45b 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -1,18 +1,14 @@ package types import ( - "context" "encoding/hex" "fmt" - "math" "strings" - "time" abiTypes "github.com/cometbft/cometbft/abci/types" "github.com/cosmos/cosmos-sdk/types" relayerTypes "github.com/icon-project/centralized-relay/relayer/types" - "go.uber.org/zap" ) type TxSearchParam struct { @@ -106,32 +102,3 @@ type HeightRange struct { Start uint64 End uint64 } - -func Retry(ctx context.Context, maxAttempts uint8, baseDelay, maxDelay time.Duration, logger *zap.Logger, operation func() error) error { - var attempts uint8 - for attempts < maxAttempts { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - - err := operation() - if err == nil { - return nil - } - - attempts++ - delay := time.Duration(math.Pow(2, float64(attempts))) * baseDelay - if delay > maxDelay { - delay = maxDelay - } - if attempts >= maxAttempts { - logger.Error("operation failed", zap.Uint8("attempt", attempts), zap.Error(err)) - return err - } - logger.Warn("operation failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay)) - time.Sleep(delay) - } - return fmt.Errorf("operation failed after %d attempts", maxAttempts) -} From be916059ca5187c17606f1ce4abfdd0f48bb9fcf Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Tue, 6 Aug 2024 17:17:56 +0545 Subject: [PATCH 10/18] rf: use conditional loop --- relayer/chains/wasm/provider.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 3e7ff2cf..21275d9a 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -744,7 +744,7 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT for heightRange := range heightStream { var attempts uint8 - for { + for attempts < types.RPCMaxRetryAttempts { blockInfo, err := p.fetchBlockMessages(ctx, heightRange) if err == nil { for _, block := range blockInfo { From 84ca35cae1366b662ebcbc7c5f379464f519ad1f Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Wed, 7 Aug 2024 12:33:55 +0545 Subject: [PATCH 11/18] revert: original errorChan logic --- relayer/chains/wasm/provider.go | 32 +++++++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 21275d9a..752cb197 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -573,6 +573,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei wg sync.WaitGroup messages coreTypes.ResultTxSearch messagesChan = make(chan *coreTypes.ResultTxSearch) + errorChan = make(chan error, len(p.eventList)) ) ctx, cancel := context.WithCancel(ctx) @@ -612,6 +613,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei retryCount++ if retryCount >= types.RPCMaxRetryAttempts { p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...) + errorChan <- err cancel() return } @@ -646,6 +648,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei attempts++ if attempts >= types.RPCMaxRetryAttempts { p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) + errorChan <- err cancel() return } @@ -667,11 +670,34 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei go func() { wg.Wait() close(messagesChan) + close(errorChan) }() - for msgs := range messagesChan { - messages.Txs = append(messages.Txs, msgs.Txs...) - messages.TotalCount += msgs.TotalCount + var errors []error + for { + select { + case msgs, ok := <-messagesChan: + if !ok { + messagesChan = nil + } else { + messages.Txs = append(messages.Txs, msgs.Txs...) + messages.TotalCount += msgs.TotalCount + } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + } else { + errors = append(errors, err) + } + } + if messagesChan == nil && errorChan == nil { + break + } + } + + if len(errors) > 0 { + p.logger.Error("Errors occurred while fetching block messages", zap.Errors("errors", errors)) + return nil, fmt.Errorf("errors occurred while fetching block messages: %v", errors) } return p.getMessagesFromTxList(messages.Txs) From 915511edf738b12ea486c67f3855c1ec368610a9 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Fri, 9 Aug 2024 11:57:30 +0545 Subject: [PATCH 12/18] fix: don't wait for other threads --- relayer/chains/wasm/provider.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 752cb197..3d93a085 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -613,7 +613,10 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei retryCount++ if retryCount >= types.RPCMaxRetryAttempts { p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...) - errorChan <- err + select { + case errorChan <- err: + default: + } cancel() return } @@ -648,7 +651,10 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei attempts++ if attempts >= types.RPCMaxRetryAttempts { p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) - errorChan <- err + select { + case errorChan <- err: + default: + } cancel() return } From e1386aed74f3628fffef9b574c16e81ee777e9e1 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Fri, 9 Aug 2024 12:06:30 +0545 Subject: [PATCH 13/18] fix: use smart delay --- relayer/chains/wasm/provider.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 3d93a085..72590233 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -625,7 +625,11 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei delay = types.MaxRPCRetryDelay } p.logger.Warn("failed to fetch block messages. retrying...", append(zapFields, zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.Error(err))...) - time.Sleep(delay) + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } } if localMessages.TotalCount > perPage { @@ -663,7 +667,11 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei delay = types.MaxRPCRetryDelay } p.logger.Warn("fetchBlockMessages failed, retrying...", append(zapFields, zap.Duration("retrying_in", delay), zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) - time.Sleep(delay) + select { + case <-ctx.Done(): + return + case <-time.After(delay): + } } } localMessages.Txs = append(localMessages.Txs, resNext.Txs...) From d6d623cb5e8b09b8fdaa6dd93d608cb08d4567ea Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Fri, 9 Aug 2024 16:37:48 +0545 Subject: [PATCH 14/18] revert: original method args. for easy refactoring --- relayer/chains/wasm/provider.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 72590233..24f9dcbf 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -581,7 +581,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei for _, event := range p.eventList { wg.Add(1) - go func(event sdkTypes.Event) { + go func(wg *sync.WaitGroup, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) { defer wg.Done() var ( @@ -678,7 +678,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei } } messagesChan <- localMessages - }(event) + }(&wg, searchParam, messagesChan, errorChan) } go func() { From 6e0283dfb8f78ecf98ec6fc84df212e71ce12911 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Mon, 12 Aug 2024 12:43:34 +0545 Subject: [PATCH 15/18] feat: retry util --- utils/retry/retry.go | 51 +++++++++++++++++++++ utils/retry/retry_test.go | 93 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 144 insertions(+) create mode 100644 utils/retry/retry.go create mode 100644 utils/retry/retry_test.go diff --git a/utils/retry/retry.go b/utils/retry/retry.go new file mode 100644 index 00000000..8a5221b0 --- /dev/null +++ b/utils/retry/retry.go @@ -0,0 +1,51 @@ +package retry + +import ( + "context" + "math" + "time" + + "go.uber.org/zap" +) + +const ( + BaseRPCRetryDelay = time.Second + MaxRPCRetryDelay = 30 * time.Second + RPCMaxRetryAttempts = 5 + RetryPower = 3 +) + +func Retry(ctx context.Context, logger *zap.Logger, operation func() error, zapFields []zap.Field) error { + var retryCount uint8 + for retryCount < RPCMaxRetryAttempts { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + err := operation() + if err == nil { + return nil + } + + retryCount++ + if retryCount >= RPCMaxRetryAttempts { + logger.Error("operation failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...) + return err + } + + delay := time.Duration(math.Pow(RetryPower, float64(retryCount))) * BaseRPCRetryDelay + if delay > MaxRPCRetryDelay { + delay = MaxRPCRetryDelay + } + logger.Warn("operation failed, retrying...", append(zapFields, zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.Error(err))...) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + return nil +} diff --git a/utils/retry/retry_test.go b/utils/retry/retry_test.go new file mode 100644 index 00000000..90b3a193 --- /dev/null +++ b/utils/retry/retry_test.go @@ -0,0 +1,93 @@ +package retry + +import ( + "context" + "errors" + "testing" + "time" + + "go.uber.org/zap" +) + +func TestRetry(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + + // Test case 1: Operation succeeds on the first attempt + attempts := 0 + operation := func() error { + attempts++ + return nil + } + err := Retry(ctx, logger, operation, nil) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if attempts != 1 { + t.Errorf("Expected 1 attempt, got: %d", attempts) + } + + // Test case 2: Operation fails on all attempts + attempts = 0 + expectedErr := errors.New("operation failed") + operation = func() error { + attempts++ + return expectedErr + } + err = Retry(ctx, logger, operation, nil) + if err != expectedErr { + t.Errorf("Expected error: %v, got: %v", expectedErr, err) + } + if attempts != RPCMaxRetryAttempts { + t.Errorf("Expected %d attempts, got: %d", RPCMaxRetryAttempts, attempts) + } + + // Test case 3: Operation succeeds on the third attempt + attempts = 0 + operation = func() error { + attempts++ + if attempts < 3 { + return errors.New("temporary failure") + } + return nil + } + err = Retry(ctx, logger, operation, nil) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if attempts != 3 { + t.Errorf("Expected 3 attempts, got: %d", attempts) + } + + // Test case 4: Context canceled + ctx, cancel := context.WithCancel(ctx) + cancel() + attempts = 0 + operation = func() error { + attempts++ + return errors.New("operation failed") + } + err = Retry(ctx, logger, operation, nil) + if err != ctx.Err() { + t.Errorf("Expected error: %v, got: %v", ctx.Err(), err) + } + if attempts != 0 { + t.Errorf("Expected 0 attempts, got: %d", attempts) + } + + // Test case 5: Operation takes longer than MaxRPCRetryDelay + ctx = context.Background() + attempts = 0 + operation = func() error { + attempts++ + time.Sleep(MaxRPCRetryDelay + time.Second) + return errors.New("operation failed") + } + err = Retry(ctx, logger, operation, nil) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if attempts != 1 { + t.Errorf("Expected 1 attempt, got: %d", attempts) + } +} From 0b9cd622313afad287e8257eafc2f5285fca6f02 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Mon, 12 Aug 2024 12:43:57 +0545 Subject: [PATCH 16/18] rf: use retry util --- relayer/chains/wasm/provider.go | 106 +++++++++----------------------- 1 file changed, 30 insertions(+), 76 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 24f9dcbf..320c871d 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -3,7 +3,6 @@ package wasm import ( "context" "fmt" - "math" "math/big" "runtime" "strings" @@ -19,6 +18,7 @@ import ( "github.com/icon-project/centralized-relay/relayer/kms" "github.com/icon-project/centralized-relay/relayer/provider" relayTypes "github.com/icon-project/centralized-relay/relayer/types" + "github.com/icon-project/centralized-relay/utils/retry" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" ) @@ -560,7 +560,7 @@ func (p *Provider) getBlockInfoStream(ctx context.Context, done <-chan bool, hei } // fetchBlockMessages fetches block messages from the chain -// TODO: remove retry deplicated code +// TODO: optimize this function func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.HeightRange) ([]*relayTypes.BlockInfo, error) { perPage := 25 searchParam := types.TxSearchParam{ @@ -572,7 +572,7 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei var ( wg sync.WaitGroup messages coreTypes.ResultTxSearch - messagesChan = make(chan *coreTypes.ResultTxSearch) + messagesChan = make(chan *coreTypes.ResultTxSearch, len(p.eventList)) errorChan = make(chan error, len(p.eventList)) ) @@ -581,13 +581,12 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei for _, event := range p.eventList { wg.Add(1) - go func(wg *sync.WaitGroup, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) { + go func(wg *sync.WaitGroup, event sdkTypes.Event, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) { defer wg.Done() var ( localSearchParam = searchParam localMessages = new(coreTypes.ResultTxSearch) - retryCount uint8 zapFields = []zap.Field{ zap.Uint64("start_height", localSearchParam.StartHeight), zap.Uint64("end_height", localSearchParam.EndHeight), @@ -596,40 +595,22 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei ) localSearchParam.Events = append(localSearchParam.Events, event) - for retryCount < types.RPCMaxRetryAttempts { - select { - case <-ctx.Done(): - return - default: - } - + err := retry.Retry(ctx, p.logger, func() error { p.logger.Info("fetching block messages", zapFields...) msgs, err := p.client.TxSearch(ctx, localSearchParam) if err == nil { p.logger.Info("fetched block messages", zapFields...) localMessages = msgs - break } - retryCount++ - if retryCount >= types.RPCMaxRetryAttempts { - p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...) - select { - case errorChan <- err: - default: - } - cancel() - return - } - delay := time.Duration(math.Pow(2, float64(retryCount))) * types.BaseRPCRetryDelay - if delay > types.MaxRPCRetryDelay { - delay = types.MaxRPCRetryDelay - } - p.logger.Warn("failed to fetch block messages. retrying...", append(zapFields, zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.Error(err))...) + return err + }, zapFields) + if err != nil { select { - case <-ctx.Done(): - return - case <-time.After(delay): + case errorChan <- err: + default: } + cancel() + return } if localMessages.TotalCount > perPage { @@ -642,43 +623,25 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei } p.logger.Info("fetching block messages", append(zapFields, zap.Int("page", i))...) localSearchParam.Page = &i - resNext, err := p.client.TxSearch(ctx, localSearchParam) + err := retry.Retry(ctx, p.logger, func() error { + resNext, err := p.client.TxSearch(ctx, localSearchParam) + if err == nil { + localMessages.Txs = append(localMessages.Txs, resNext.Txs...) + } + return err + }, append(zapFields, zap.Int("page", i))) if err != nil { - var attempts uint8 - for attempts < types.RPCMaxRetryAttempts { - select { - case <-ctx.Done(): - return - default: - } - p.logger.Warn("failed to fetch block messages with page", append(zapFields, zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) - attempts++ - if attempts >= types.RPCMaxRetryAttempts { - p.logger.Error("fetchBlockMessages failed", append(zapFields, zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) - select { - case errorChan <- err: - default: - } - cancel() - return - } - delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay - if delay > types.MaxRPCRetryDelay { - delay = types.MaxRPCRetryDelay - } - p.logger.Warn("fetchBlockMessages failed, retrying...", append(zapFields, zap.Duration("retrying_in", delay), zap.Int("page", i), zap.Uint8("attempt", attempts), zap.Error(err))...) - select { - case <-ctx.Done(): - return - case <-time.After(delay): - } + select { + case errorChan <- err: + default: } + cancel() + return } - localMessages.Txs = append(localMessages.Txs, resNext.Txs...) } } messagesChan <- localMessages - }(&wg, searchParam, messagesChan, errorChan) + }(&wg, event, searchParam, messagesChan, errorChan) } go func() { @@ -783,26 +746,17 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT heightStream := p.getHeightStream(done, fromHeight, toHeight) for heightRange := range heightStream { - var attempts uint8 - for attempts < types.RPCMaxRetryAttempts { + err := retry.Retry(ctx, p.logger, func() error { blockInfo, err := p.fetchBlockMessages(ctx, heightRange) if err == nil { for _, block := range blockInfo { blockInfoChan <- block } - break - } - attempts++ - if attempts >= types.RPCMaxRetryAttempts { - p.logger.Error("fetchBlockMessages failed", zap.Uint8("attempt", attempts), zap.Error(err)) - break } - delay := time.Duration(math.Pow(2, float64(attempts))) * types.BaseRPCRetryDelay - if delay > types.MaxRPCRetryDelay { - delay = types.MaxRPCRetryDelay - } - p.logger.Warn("fetchBlockMessages failed, retrying...", zap.Uint8("attempt", attempts), zap.Duration("retrying_in", delay), zap.Uint64("start_height", heightRange.Start), zap.Uint64("end_height", heightRange.End), zap.Error(err)) - time.Sleep(delay) + return err + }, []zap.Field{zap.Uint64("from_height", heightRange.Start), zap.Uint64("to_height", heightRange.End)}) + if err != nil { + p.logger.Error("failed to fetch block messages", zap.Error(err)) } } return toHeight + 1 From 248d10cb916ebf4447f6e4b8b4f053cab5ed6f9d Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Mon, 12 Aug 2024 13:38:54 +0545 Subject: [PATCH 17/18] add: more test cases for retry logic --- utils/retry/retry_test.go | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/utils/retry/retry_test.go b/utils/retry/retry_test.go index 90b3a193..385e83da 100644 --- a/utils/retry/retry_test.go +++ b/utils/retry/retry_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "testing" - "time" "go.uber.org/zap" ) @@ -75,19 +74,32 @@ func TestRetry(t *testing.T) { t.Errorf("Expected 0 attempts, got: %d", attempts) } - // Test case 5: Operation takes longer than MaxRPCRetryDelay + // Test case 5: Operation fails on the first attempt ctx = context.Background() attempts = 0 + expectedErr = errors.New("operation failed on the first attempt") operation = func() error { attempts++ - time.Sleep(MaxRPCRetryDelay + time.Second) - return errors.New("operation failed") + return expectedErr } err = Retry(ctx, logger, operation, nil) - if err != nil { - t.Errorf("Expected no error, got: %v", err) + if err != expectedErr { + t.Errorf("Expected error: %v, got: %v", expectedErr, err) } - if attempts != 1 { - t.Errorf("Expected 1 attempt, got: %d", attempts) + + // Test case 6: Operation fails on all attempts + ctx = context.Background() + attempts = 0 + expectedErr = errors.New("operation failed on all attempts") + operation = func() error { + attempts++ + return expectedErr + } + err = Retry(ctx, logger, operation, nil) + if err != expectedErr { + t.Errorf("Expected error: %v, got: %v", expectedErr, err) + } + if attempts != RPCMaxRetryAttempts { + t.Errorf("Expected %d attempts, got: %d", RPCMaxRetryAttempts, attempts) } } From 2d5a8aa00afc2b59fdd80d1dfb2e9946ae235a3f Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Mon, 12 Aug 2024 14:09:06 +0545 Subject: [PATCH 18/18] add(doc): add changelogs --- CHANGELOG.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index a1c73455..8eeb98fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ All notable changes to this project will be documented in this file. +## [1.5.1] - 2024-08-12 + +### Changed + +- RPC retries and exponential backoffs. +- Websocket healthcheck timout from 1 min to 10 second. +- Initiate startup tasks early. +- EVM websocket healthcheck uses latest query, previosly genesis block query. + +### Fixed + +- Addressed wasm response format changes. +- WASM chain healthcheck frequency. +- WASM sdk context lockup issue. +- WASM duplicated block when batching requests. +- Recovery start from genesis block when databse is empty. +- Docker build issues. + ## [1.5.0-rc1] - 2024-08-03 ### Added