diff --git a/CHANGELOG.md b/CHANGELOG.md index a1c73455..8eeb98fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,24 @@ All notable changes to this project will be documented in this file. +## [1.5.1] - 2024-08-12 + +### Changed + +- RPC retries and exponential backoffs. +- Websocket healthcheck timout from 1 min to 10 second. +- Initiate startup tasks early. +- EVM websocket healthcheck uses latest query, previosly genesis block query. + +### Fixed + +- Addressed wasm response format changes. +- WASM chain healthcheck frequency. +- WASM sdk context lockup issue. +- WASM duplicated block when batching requests. +- Recovery start from genesis block when databse is empty. +- Docker build issues. + ## [1.5.0-rc1] - 2024-08-03 ### Added diff --git a/relayer/chain_runtime_test.go b/relayer/chain_runtime_test.go index 2d363e38..08f57ffa 100644 --- a/relayer/chain_runtime_test.go +++ b/relayer/chain_runtime_test.go @@ -45,6 +45,7 @@ func TestChainRuntime(t *testing.T) { t.Run("clear messages", func(t *testing.T) { runtime.clearMessageFromCache([]*types.MessageKey{m1.MessageKey()}) assert.Equal(t, len(runtime.MessageCache.Messages), len(info.Messages)-1) - assert.Equal(t, runtime.MessageCache.Messages[*m2.MessageKey()], types.NewRouteMessage(m2)) + rtMsg, _ := runtime.MessageCache.Get(m2.MessageKey()) + assert.Equal(t, rtMsg, types.NewRouteMessage(m2)) }) } diff --git a/relayer/chains/icon/provider.go b/relayer/chains/icon/provider.go index 77db3f3f..c35c8d72 100644 --- a/relayer/chains/icon/provider.go +++ b/relayer/chains/icon/provider.go @@ -224,7 +224,6 @@ func (p *Provider) SetFee(ctx context.Context, networkID string, msgFee, resFee } txr, err := p.client.WaitForResults(ctx, &types.TransactionHashParam{Hash: types.NewHexBytes(txHash)}) if err != nil { - fmt.Println("SetFee: WaitForResults: %v", err) return fmt.Errorf("SetFee: WaitForResults: %v", err) } if txr.Status != types.NewHexInt(1) { diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index cfcc474c..87bba6d8 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -18,6 +18,7 @@ import ( "github.com/icon-project/centralized-relay/relayer/kms" "github.com/icon-project/centralized-relay/relayer/provider" relayTypes "github.com/icon-project/centralized-relay/relayer/types" + "github.com/icon-project/centralized-relay/utils/retry" jsoniter "github.com/json-iterator/go" "go.uber.org/zap" ) @@ -152,7 +153,7 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx relayTypes.Last latestHeight, err = p.QueryLatestHeight(ctx) if err != nil { p.logger.Error("failed to get latest block height", zap.Error(err)) - pollHeightTicker.Reset(time.Second * 2) + pollHeightTicker.Reset(time.Second * 3) } } } @@ -560,6 +561,8 @@ func (p *Provider) getBlockInfoStream(ctx context.Context, done <-chan bool, hei return blockInfoStream } +// fetchBlockMessages fetches block messages from the chain +// TODO: optimize this function func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.HeightRange) ([]*relayTypes.BlockInfo, error) { perPage := 25 searchParam := types.TxSearchParam{ @@ -571,42 +574,111 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei var ( wg sync.WaitGroup messages coreTypes.ResultTxSearch - messagesChan = make(chan *coreTypes.ResultTxSearch) - errorChan = make(chan error) + messagesChan = make(chan *coreTypes.ResultTxSearch, len(p.eventList)) + errorChan = make(chan error, len(p.eventList)) ) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + for _, event := range p.eventList { wg.Add(1) - go func(wg *sync.WaitGroup, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) { + go func(wg *sync.WaitGroup, event sdkTypes.Event, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) { defer wg.Done() - searchParam.Events = append(searchParam.Events, event) - res, err := p.client.TxSearch(ctx, searchParam) + + var ( + localSearchParam = searchParam + localMessages = new(coreTypes.ResultTxSearch) + zapFields = []zap.Field{ + zap.Uint64("start_height", localSearchParam.StartHeight), + zap.Uint64("end_height", localSearchParam.EndHeight), + zap.String("event", event.Type), + } + ) + localSearchParam.Events = append(localSearchParam.Events, event) + + err := retry.Retry(ctx, p.logger, func() error { + p.logger.Info("fetching block messages", zapFields...) + msgs, err := p.client.TxSearch(ctx, localSearchParam) + if err == nil { + p.logger.Info("fetched block messages", zapFields...) + localMessages = msgs + } + return err + }, zapFields) if err != nil { - errorChan <- err + select { + case errorChan <- err: + default: + } + cancel() return } - if res.TotalCount > perPage { - for i := 2; i <= int(res.TotalCount/perPage)+1; i++ { - searchParam.Page = &i - resNext, err := p.client.TxSearch(ctx, searchParam) + + if localMessages.TotalCount > perPage { + totalPages := (localMessages.TotalCount + perPage - 1) / perPage + for i := 2; i <= totalPages; i++ { + select { + case <-ctx.Done(): + return + default: + } + p.logger.Info("fetching block messages", append(zapFields, zap.Int("page", i))...) + localSearchParam.Page = &i + err := retry.Retry(ctx, p.logger, func() error { + resNext, err := p.client.TxSearch(ctx, localSearchParam) + if err == nil { + localMessages.Txs = append(localMessages.Txs, resNext.Txs...) + } + return err + }, append(zapFields, zap.Int("page", i))) if err != nil { - errorChan <- err + select { + case errorChan <- err: + default: + } + cancel() return } - res.Txs = append(res.Txs, resNext.Txs...) } } - messagesChan <- res - }(&wg, searchParam, messagesChan, errorChan) + messagesChan <- localMessages + }(&wg, event, searchParam, messagesChan, errorChan) + } + + go func() { + wg.Wait() + close(messagesChan) + close(errorChan) + }() + + var errors []error + for { select { - case msgs := <-messagesChan: - messages.Txs = append(messages.Txs, msgs.Txs...) - messages.TotalCount += msgs.TotalCount - case err := <-errorChan: - p.logger.Error("failed to fetch block messages", zap.Error(err)) + case msgs, ok := <-messagesChan: + if !ok { + messagesChan = nil + } else { + messages.Txs = append(messages.Txs, msgs.Txs...) + messages.TotalCount += msgs.TotalCount + } + case err, ok := <-errorChan: + if !ok { + errorChan = nil + } else { + errors = append(errors, err) + } + } + if messagesChan == nil && errorChan == nil { + break } } - wg.Wait() + + if len(errors) > 0 { + p.logger.Error("Errors occurred while fetching block messages", zap.Errors("errors", errors)) + return nil, fmt.Errorf("errors occurred while fetching block messages: %v", errors) + } + return p.getMessagesFromTxList(messages.Txs) } @@ -676,19 +748,17 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT heightStream := p.getHeightStream(done, fromHeight, toHeight) for heightRange := range heightStream { - blockInfo, err := p.fetchBlockMessages(ctx, heightRange) + err := retry.Retry(ctx, p.logger, func() error { + blockInfo, err := p.fetchBlockMessages(ctx, heightRange) + if err == nil { + for _, block := range blockInfo { + blockInfoChan <- block + } + } + return err + }, []zap.Field{zap.Uint64("from_height", heightRange.Start), zap.Uint64("to_height", heightRange.End)}) if err != nil { p.logger.Error("failed to fetch block messages", zap.Error(err)) - continue - } - p.logger.Info("Fetched block messages", zap.Uint64("from", heightRange.Start), zap.Uint64("to", heightRange.End)) - var messages []*relayTypes.Message - for _, block := range blockInfo { - messages = append(messages, block.Messages...) - } - blockInfoChan <- &relayTypes.BlockInfo{ - Height: heightRange.End, - Messages: messages, } } return toHeight + 1 diff --git a/relayer/chains/wasm/types/const.go b/relayer/chains/wasm/types/const.go index 6615aa14..f2a4dec4 100644 --- a/relayer/chains/wasm/types/const.go +++ b/relayer/chains/wasm/types/const.go @@ -1,6 +1,8 @@ package types import ( + "time" + "github.com/cosmos/cosmos-sdk/crypto/hd" "github.com/cosmos/cosmos-sdk/crypto/keyring" "github.com/cosmos/relayer/v2/relayer/codecs/injective" @@ -15,4 +17,9 @@ const ( var ( SupportedAlgorithms = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1} SupportedAlgorithmsLedger = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1} + + // Default parameters for RPC + RPCMaxRetryAttempts uint8 = 5 + BaseRPCRetryDelay = 3 * time.Second + MaxRPCRetryDelay = 5 * time.Minute ) diff --git a/relayer/relay.go b/relayer/relay.go index 5177d102..b98c69d2 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -31,7 +31,6 @@ var ( // main start loop func (r *Relayer) Start(ctx context.Context, flushInterval time.Duration, fresh bool) (chan error, error) { errorChan := make(chan error, 1) - // once flush completes then only start processing if fresh { // flush all the packet and then continue @@ -235,11 +234,11 @@ func (r *Relayer) getActiveMessagesFromStore(nId string, maxMessages uint) ([]*t func (r *Relayer) processMessages(ctx context.Context) { for _, src := range r.chains { - for key, message := range src.MessageCache.Messages { + for _, message := range src.MessageCache.Messages { dst, err := r.FindChainRuntime(message.Dst) if err != nil { r.log.Error("dst chain nid not found", zap.String("nid", message.Dst)) - r.ClearMessages(ctx, []*types.MessageKey{&key}, src) + r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src) continue } @@ -251,7 +250,8 @@ func (r *Relayer) processMessages(ctx context.Context) { message.ToggleProcessing() // if message reached delete the message - messageReceived, err := dst.Provider.MessageReceived(ctx, &key) + + messageReceived, err := dst.Provider.MessageReceived(ctx, message.MessageKey()) if err != nil { dst.log.Error("error occured when checking message received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64()), zap.Error(err)) message.ToggleProcessing() @@ -261,7 +261,7 @@ func (r *Relayer) processMessages(ctx context.Context) { // if message is received we can remove the message from db if messageReceived { dst.log.Info("message already received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64())) - r.ClearMessages(ctx, []*types.MessageKey{&key}, src) + r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src) continue } go r.RouteMessage(ctx, message, dst, src) diff --git a/relayer/types/types.go b/relayer/types/types.go index f6d7ed17..a66a79c2 100644 --- a/relayer/types/types.go +++ b/relayer/types/types.go @@ -157,13 +157,13 @@ func NewMessagekeyWithMessageHeight(key *MessageKey, height uint64) *MessageKeyW } type MessageCache struct { - Messages map[MessageKey]*RouteMessage + Messages map[string]*RouteMessage *sync.RWMutex } func NewMessageCache() *MessageCache { return &MessageCache{ - Messages: make(map[MessageKey]*RouteMessage), + Messages: make(map[string]*RouteMessage), RWMutex: new(sync.RWMutex), } } @@ -171,7 +171,10 @@ func NewMessageCache() *MessageCache { func (m *MessageCache) Add(r *RouteMessage) { m.Lock() defer m.Unlock() - m.Messages[*r.MessageKey()] = r + cacheKey := m.GetCacheKey(r.MessageKey()) + if !m.HasCacheKey(cacheKey) { + m.Messages[cacheKey] = r + } } func (m *MessageCache) Len() int { @@ -181,17 +184,28 @@ func (m *MessageCache) Len() int { func (m *MessageCache) Remove(key *MessageKey) { m.Lock() defer m.Unlock() - delete(m.Messages, *key) + cacheKey := m.GetCacheKey(key) + delete(m.Messages, cacheKey) } // Get returns the message from the cache func (m *MessageCache) Get(key *MessageKey) (*RouteMessage, bool) { m.RLock() defer m.RUnlock() - msg, ok := m.Messages[*key] + cacheKey := m.GetCacheKey(key) + msg, ok := m.Messages[cacheKey] return msg, ok } +func (m *MessageCache) GetCacheKey(key *MessageKey) string { + return key.Src + "-" + key.Dst + "-" + key.EventType + "-" + key.Sn.String() +} + +func (m *MessageCache) HasCacheKey(cacheKey string) bool { + _, ok := m.Messages[cacheKey] + return ok +} + type Coin struct { Denom string Amount uint64 diff --git a/utils/retry/retry.go b/utils/retry/retry.go new file mode 100644 index 00000000..8a5221b0 --- /dev/null +++ b/utils/retry/retry.go @@ -0,0 +1,51 @@ +package retry + +import ( + "context" + "math" + "time" + + "go.uber.org/zap" +) + +const ( + BaseRPCRetryDelay = time.Second + MaxRPCRetryDelay = 30 * time.Second + RPCMaxRetryAttempts = 5 + RetryPower = 3 +) + +func Retry(ctx context.Context, logger *zap.Logger, operation func() error, zapFields []zap.Field) error { + var retryCount uint8 + for retryCount < RPCMaxRetryAttempts { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + err := operation() + if err == nil { + return nil + } + + retryCount++ + if retryCount >= RPCMaxRetryAttempts { + logger.Error("operation failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...) + return err + } + + delay := time.Duration(math.Pow(RetryPower, float64(retryCount))) * BaseRPCRetryDelay + if delay > MaxRPCRetryDelay { + delay = MaxRPCRetryDelay + } + logger.Warn("operation failed, retrying...", append(zapFields, zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.Error(err))...) + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(delay): + } + } + return nil +} diff --git a/utils/retry/retry_test.go b/utils/retry/retry_test.go new file mode 100644 index 00000000..385e83da --- /dev/null +++ b/utils/retry/retry_test.go @@ -0,0 +1,105 @@ +package retry + +import ( + "context" + "errors" + "testing" + + "go.uber.org/zap" +) + +func TestRetry(t *testing.T) { + ctx := context.Background() + logger := zap.NewNop() + + // Test case 1: Operation succeeds on the first attempt + attempts := 0 + operation := func() error { + attempts++ + return nil + } + err := Retry(ctx, logger, operation, nil) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if attempts != 1 { + t.Errorf("Expected 1 attempt, got: %d", attempts) + } + + // Test case 2: Operation fails on all attempts + attempts = 0 + expectedErr := errors.New("operation failed") + operation = func() error { + attempts++ + return expectedErr + } + err = Retry(ctx, logger, operation, nil) + if err != expectedErr { + t.Errorf("Expected error: %v, got: %v", expectedErr, err) + } + if attempts != RPCMaxRetryAttempts { + t.Errorf("Expected %d attempts, got: %d", RPCMaxRetryAttempts, attempts) + } + + // Test case 3: Operation succeeds on the third attempt + attempts = 0 + operation = func() error { + attempts++ + if attempts < 3 { + return errors.New("temporary failure") + } + return nil + } + err = Retry(ctx, logger, operation, nil) + if err != nil { + t.Errorf("Expected no error, got: %v", err) + } + if attempts != 3 { + t.Errorf("Expected 3 attempts, got: %d", attempts) + } + + // Test case 4: Context canceled + ctx, cancel := context.WithCancel(ctx) + cancel() + attempts = 0 + operation = func() error { + attempts++ + return errors.New("operation failed") + } + err = Retry(ctx, logger, operation, nil) + if err != ctx.Err() { + t.Errorf("Expected error: %v, got: %v", ctx.Err(), err) + } + if attempts != 0 { + t.Errorf("Expected 0 attempts, got: %d", attempts) + } + + // Test case 5: Operation fails on the first attempt + ctx = context.Background() + attempts = 0 + expectedErr = errors.New("operation failed on the first attempt") + operation = func() error { + attempts++ + return expectedErr + } + err = Retry(ctx, logger, operation, nil) + if err != expectedErr { + t.Errorf("Expected error: %v, got: %v", expectedErr, err) + } + + // Test case 6: Operation fails on all attempts + ctx = context.Background() + attempts = 0 + expectedErr = errors.New("operation failed on all attempts") + operation = func() error { + attempts++ + return expectedErr + } + err = Retry(ctx, logger, operation, nil) + if err != expectedErr { + t.Errorf("Expected error: %v, got: %v", expectedErr, err) + } + if attempts != RPCMaxRetryAttempts { + t.Errorf("Expected %d attempts, got: %d", RPCMaxRetryAttempts, attempts) + } +}