Skip to content

Commit

Permalink
Merge branch 'main' into feature/sui-integration
Browse files Browse the repository at this point in the history
  • Loading branch information
debendraoli authored Aug 12, 2024
2 parents f68fef2 + 133c656 commit 23cbb91
Show file tree
Hide file tree
Showing 9 changed files with 309 additions and 44 deletions.
18 changes: 18 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@

All notable changes to this project will be documented in this file.

## [1.5.1] - 2024-08-12

### Changed

- RPC retries and exponential backoffs.
- Websocket healthcheck timout from 1 min to 10 second.
- Initiate startup tasks early.
- EVM websocket healthcheck uses latest query, previosly genesis block query.

### Fixed

- Addressed wasm response format changes.
- WASM chain healthcheck frequency.
- WASM sdk context lockup issue.
- WASM duplicated block when batching requests.
- Recovery start from genesis block when databse is empty.
- Docker build issues.

## [1.5.0-rc1] - 2024-08-03

### Added
Expand Down
3 changes: 2 additions & 1 deletion relayer/chain_runtime_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func TestChainRuntime(t *testing.T) {
t.Run("clear messages", func(t *testing.T) {
runtime.clearMessageFromCache([]*types.MessageKey{m1.MessageKey()})
assert.Equal(t, len(runtime.MessageCache.Messages), len(info.Messages)-1)
assert.Equal(t, runtime.MessageCache.Messages[*m2.MessageKey()], types.NewRouteMessage(m2))
rtMsg, _ := runtime.MessageCache.Get(m2.MessageKey())
assert.Equal(t, rtMsg, types.NewRouteMessage(m2))
})
}
1 change: 0 additions & 1 deletion relayer/chains/icon/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ func (p *Provider) SetFee(ctx context.Context, networkID string, msgFee, resFee
}
txr, err := p.client.WaitForResults(ctx, &types.TransactionHashParam{Hash: types.NewHexBytes(txHash)})
if err != nil {
fmt.Println("SetFee: WaitForResults: %v", err)
return fmt.Errorf("SetFee: WaitForResults: %v", err)
}
if txr.Status != types.NewHexInt(1) {
Expand Down
134 changes: 102 additions & 32 deletions relayer/chains/wasm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/icon-project/centralized-relay/relayer/kms"
"github.com/icon-project/centralized-relay/relayer/provider"
relayTypes "github.com/icon-project/centralized-relay/relayer/types"
"github.com/icon-project/centralized-relay/utils/retry"
jsoniter "github.com/json-iterator/go"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -152,7 +153,7 @@ func (p *Provider) Listener(ctx context.Context, lastProcessedTx relayTypes.Last
latestHeight, err = p.QueryLatestHeight(ctx)
if err != nil {
p.logger.Error("failed to get latest block height", zap.Error(err))
pollHeightTicker.Reset(time.Second * 2)
pollHeightTicker.Reset(time.Second * 3)
}
}
}
Expand Down Expand Up @@ -560,6 +561,8 @@ func (p *Provider) getBlockInfoStream(ctx context.Context, done <-chan bool, hei
return blockInfoStream
}

// fetchBlockMessages fetches block messages from the chain
// TODO: optimize this function
func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.HeightRange) ([]*relayTypes.BlockInfo, error) {
perPage := 25
searchParam := types.TxSearchParam{
Expand All @@ -571,42 +574,111 @@ func (p *Provider) fetchBlockMessages(ctx context.Context, heightInfo *types.Hei
var (
wg sync.WaitGroup
messages coreTypes.ResultTxSearch
messagesChan = make(chan *coreTypes.ResultTxSearch)
errorChan = make(chan error)
messagesChan = make(chan *coreTypes.ResultTxSearch, len(p.eventList))
errorChan = make(chan error, len(p.eventList))
)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

for _, event := range p.eventList {
wg.Add(1)
go func(wg *sync.WaitGroup, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) {
go func(wg *sync.WaitGroup, event sdkTypes.Event, searchParam types.TxSearchParam, messagesChan chan *coreTypes.ResultTxSearch, errorChan chan error) {
defer wg.Done()
searchParam.Events = append(searchParam.Events, event)
res, err := p.client.TxSearch(ctx, searchParam)

var (
localSearchParam = searchParam
localMessages = new(coreTypes.ResultTxSearch)
zapFields = []zap.Field{
zap.Uint64("start_height", localSearchParam.StartHeight),
zap.Uint64("end_height", localSearchParam.EndHeight),
zap.String("event", event.Type),
}
)
localSearchParam.Events = append(localSearchParam.Events, event)

err := retry.Retry(ctx, p.logger, func() error {
p.logger.Info("fetching block messages", zapFields...)
msgs, err := p.client.TxSearch(ctx, localSearchParam)
if err == nil {
p.logger.Info("fetched block messages", zapFields...)
localMessages = msgs
}
return err
}, zapFields)
if err != nil {
errorChan <- err
select {
case errorChan <- err:
default:
}
cancel()
return
}
if res.TotalCount > perPage {
for i := 2; i <= int(res.TotalCount/perPage)+1; i++ {
searchParam.Page = &i
resNext, err := p.client.TxSearch(ctx, searchParam)

if localMessages.TotalCount > perPage {
totalPages := (localMessages.TotalCount + perPage - 1) / perPage
for i := 2; i <= totalPages; i++ {
select {
case <-ctx.Done():
return
default:
}
p.logger.Info("fetching block messages", append(zapFields, zap.Int("page", i))...)
localSearchParam.Page = &i
err := retry.Retry(ctx, p.logger, func() error {
resNext, err := p.client.TxSearch(ctx, localSearchParam)
if err == nil {
localMessages.Txs = append(localMessages.Txs, resNext.Txs...)
}
return err
}, append(zapFields, zap.Int("page", i)))
if err != nil {
errorChan <- err
select {
case errorChan <- err:
default:
}
cancel()
return
}
res.Txs = append(res.Txs, resNext.Txs...)
}
}
messagesChan <- res
}(&wg, searchParam, messagesChan, errorChan)
messagesChan <- localMessages
}(&wg, event, searchParam, messagesChan, errorChan)
}

go func() {
wg.Wait()
close(messagesChan)
close(errorChan)
}()

var errors []error
for {
select {
case msgs := <-messagesChan:
messages.Txs = append(messages.Txs, msgs.Txs...)
messages.TotalCount += msgs.TotalCount
case err := <-errorChan:
p.logger.Error("failed to fetch block messages", zap.Error(err))
case msgs, ok := <-messagesChan:
if !ok {
messagesChan = nil
} else {
messages.Txs = append(messages.Txs, msgs.Txs...)
messages.TotalCount += msgs.TotalCount
}
case err, ok := <-errorChan:
if !ok {
errorChan = nil
} else {
errors = append(errors, err)
}
}
if messagesChan == nil && errorChan == nil {
break
}
}
wg.Wait()

if len(errors) > 0 {
p.logger.Error("Errors occurred while fetching block messages", zap.Errors("errors", errors))
return nil, fmt.Errorf("errors occurred while fetching block messages: %v", errors)
}

return p.getMessagesFromTxList(messages.Txs)
}

Expand Down Expand Up @@ -676,19 +748,17 @@ func (p *Provider) runBlockQuery(ctx context.Context, blockInfoChan chan *relayT
heightStream := p.getHeightStream(done, fromHeight, toHeight)

for heightRange := range heightStream {
blockInfo, err := p.fetchBlockMessages(ctx, heightRange)
err := retry.Retry(ctx, p.logger, func() error {
blockInfo, err := p.fetchBlockMessages(ctx, heightRange)
if err == nil {
for _, block := range blockInfo {
blockInfoChan <- block
}
}
return err
}, []zap.Field{zap.Uint64("from_height", heightRange.Start), zap.Uint64("to_height", heightRange.End)})
if err != nil {
p.logger.Error("failed to fetch block messages", zap.Error(err))
continue
}
p.logger.Info("Fetched block messages", zap.Uint64("from", heightRange.Start), zap.Uint64("to", heightRange.End))
var messages []*relayTypes.Message
for _, block := range blockInfo {
messages = append(messages, block.Messages...)
}
blockInfoChan <- &relayTypes.BlockInfo{
Height: heightRange.End,
Messages: messages,
}
}
return toHeight + 1
Expand Down
7 changes: 7 additions & 0 deletions relayer/chains/wasm/types/const.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package types

import (
"time"

"github.com/cosmos/cosmos-sdk/crypto/hd"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
"github.com/cosmos/relayer/v2/relayer/codecs/injective"
Expand All @@ -15,4 +17,9 @@ const (
var (
SupportedAlgorithms = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1}
SupportedAlgorithmsLedger = keyring.SigningAlgoList{hd.Secp256k1, injective.EthSecp256k1}

// Default parameters for RPC
RPCMaxRetryAttempts uint8 = 5
BaseRPCRetryDelay = 3 * time.Second
MaxRPCRetryDelay = 5 * time.Minute
)
10 changes: 5 additions & 5 deletions relayer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ var (
// main start loop
func (r *Relayer) Start(ctx context.Context, flushInterval time.Duration, fresh bool) (chan error, error) {
errorChan := make(chan error, 1)

// once flush completes then only start processing
if fresh {
// flush all the packet and then continue
Expand Down Expand Up @@ -235,11 +234,11 @@ func (r *Relayer) getActiveMessagesFromStore(nId string, maxMessages uint) ([]*t

func (r *Relayer) processMessages(ctx context.Context) {
for _, src := range r.chains {
for key, message := range src.MessageCache.Messages {
for _, message := range src.MessageCache.Messages {
dst, err := r.FindChainRuntime(message.Dst)
if err != nil {
r.log.Error("dst chain nid not found", zap.String("nid", message.Dst))
r.ClearMessages(ctx, []*types.MessageKey{&key}, src)
r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src)
continue
}

Expand All @@ -251,7 +250,8 @@ func (r *Relayer) processMessages(ctx context.Context) {
message.ToggleProcessing()

// if message reached delete the message
messageReceived, err := dst.Provider.MessageReceived(ctx, &key)

messageReceived, err := dst.Provider.MessageReceived(ctx, message.MessageKey())
if err != nil {
dst.log.Error("error occured when checking message received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64()), zap.Error(err))
message.ToggleProcessing()
Expand All @@ -261,7 +261,7 @@ func (r *Relayer) processMessages(ctx context.Context) {
// if message is received we can remove the message from db
if messageReceived {
dst.log.Info("message already received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64()))
r.ClearMessages(ctx, []*types.MessageKey{&key}, src)
r.ClearMessages(ctx, []*types.MessageKey{message.MessageKey()}, src)
continue
}
go r.RouteMessage(ctx, message, dst, src)
Expand Down
24 changes: 19 additions & 5 deletions relayer/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,24 @@ func NewMessagekeyWithMessageHeight(key *MessageKey, height uint64) *MessageKeyW
}

type MessageCache struct {
Messages map[MessageKey]*RouteMessage
Messages map[string]*RouteMessage
*sync.RWMutex
}

func NewMessageCache() *MessageCache {
return &MessageCache{
Messages: make(map[MessageKey]*RouteMessage),
Messages: make(map[string]*RouteMessage),
RWMutex: new(sync.RWMutex),
}
}

func (m *MessageCache) Add(r *RouteMessage) {
m.Lock()
defer m.Unlock()
m.Messages[*r.MessageKey()] = r
cacheKey := m.GetCacheKey(r.MessageKey())
if !m.HasCacheKey(cacheKey) {
m.Messages[cacheKey] = r
}
}

func (m *MessageCache) Len() int {
Expand All @@ -181,17 +184,28 @@ func (m *MessageCache) Len() int {
func (m *MessageCache) Remove(key *MessageKey) {
m.Lock()
defer m.Unlock()
delete(m.Messages, *key)
cacheKey := m.GetCacheKey(key)
delete(m.Messages, cacheKey)
}

// Get returns the message from the cache
func (m *MessageCache) Get(key *MessageKey) (*RouteMessage, bool) {
m.RLock()
defer m.RUnlock()
msg, ok := m.Messages[*key]
cacheKey := m.GetCacheKey(key)
msg, ok := m.Messages[cacheKey]
return msg, ok
}

func (m *MessageCache) GetCacheKey(key *MessageKey) string {
return key.Src + "-" + key.Dst + "-" + key.EventType + "-" + key.Sn.String()
}

func (m *MessageCache) HasCacheKey(cacheKey string) bool {
_, ok := m.Messages[cacheKey]
return ok
}

type Coin struct {
Denom string
Amount uint64
Expand Down
51 changes: 51 additions & 0 deletions utils/retry/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package retry

import (
"context"
"math"
"time"

"go.uber.org/zap"
)

const (
BaseRPCRetryDelay = time.Second
MaxRPCRetryDelay = 30 * time.Second
RPCMaxRetryAttempts = 5
RetryPower = 3
)

func Retry(ctx context.Context, logger *zap.Logger, operation func() error, zapFields []zap.Field) error {
var retryCount uint8
for retryCount < RPCMaxRetryAttempts {
select {
case <-ctx.Done():
return ctx.Err()
default:
}

err := operation()
if err == nil {
return nil
}

retryCount++
if retryCount >= RPCMaxRetryAttempts {
logger.Error("operation failed", append(zapFields, zap.Uint8("attempt", retryCount), zap.Error(err))...)
return err
}

delay := time.Duration(math.Pow(RetryPower, float64(retryCount))) * BaseRPCRetryDelay
if delay > MaxRPCRetryDelay {
delay = MaxRPCRetryDelay
}
logger.Warn("operation failed, retrying...", append(zapFields, zap.Uint8("attempt", retryCount), zap.Duration("retrying_in", delay), zap.Error(err))...)

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(delay):
}
}
return nil
}
Loading

0 comments on commit 23cbb91

Please sign in to comment.