From 569e03087d5a6836a08df5db2ea92e8e31372d64 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Wed, 10 Jul 2024 17:12:34 +0545 Subject: [PATCH 01/18] fix: add BlockInfo type --- relayer/chains/wasm/types/types.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index f6085a42c..aa3969e59 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -4,6 +4,9 @@ import ( "encoding/hex" "encoding/json" "fmt" + + ctypes "github.com/cometbft/cometbft/rpc/core/types" + "github.com/cometbft/cometbft/types" ) type HexBytes string @@ -369,3 +372,8 @@ func NewPrevConsensusStateHeight(clientId string, height uint64) *GetPrevConsens }, } } + +type BlockInfo struct { + types.Block + Txs []*ctypes.ResultTx +} From 3c01832e1e2daeb61ecaf58fd914e0f471cf32dd Mon Sep 17 00:00:00 2001 From: sherpalden Date: Wed, 10 Jul 2024 17:17:00 +0545 Subject: [PATCH 02/18] fix: remove wasm verifier --- relayer/chains/wasm/wasm_chain_processor.go | 114 -------------------- 1 file changed, 114 deletions(-) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 0eb2712d9..79b10de17 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -1,7 +1,6 @@ package wasm import ( - "bytes" "context" "errors" "fmt" @@ -18,7 +17,6 @@ import ( "github.com/cosmos/relayer/v2/relayer/provider" ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/cometbft/cometbft/types" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -57,15 +55,9 @@ type WasmChainProcessor struct { // parsed gas prices accepted by the chain (only used for metrics) parsedGasPrices *sdk.DecCoins - verifier *Verifier - heightSnapshotChan chan struct{} } -type Verifier struct { - Header *types.LightBlock -} - func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor { return &WasmChainProcessor{ log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())), @@ -256,19 +248,6 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint ccp.log.Info("Start to query from height ", zap.Int64("height", latestQueriedBlock)) - _, lightBlock, err := ccp.chainProvider.QueryLightBlock(ctx, persistence.latestQueriedBlock) - if err != nil { - ccp.log.Error("Failed to get ibcHeader", - zap.Int64("height", persistence.latestQueriedBlock), - zap.Any("error", err), - ) - return err - } - - ccp.verifier = &Verifier{ - Header: lightBlock, - } - var eg errgroup.Group eg.Go(func() error { return ccp.initializeConnectionState(ctx) @@ -415,7 +394,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ { var eg errgroup.Group var blockRes *ctypes.ResultBlockResults - var lightBlock *types.LightBlock var h provider.IBCHeader i := i eg.Go(func() (err error) { @@ -424,12 +402,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i) return err }) - eg.Go(func() (err error) { - queryCtx, cancelQueryCtx := context.WithTimeout(ctx, queryTimeout) - defer cancelQueryCtx() - h, lightBlock, err = ccp.chainProvider.QueryLightBlock(queryCtx, i) - return err - }) if err := eg.Wait(); err != nil { ccp.log.Warn("Error querying block data", zap.Error(err)) @@ -445,11 +417,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer latestHeader = h - if err := ccp.Verify(ctx, lightBlock); err != nil { - ccp.log.Warn("Failed to verify block", zap.Int64("height", blockRes.Height), zap.Error(err)) - return err - } - heightUint64 := uint64(i) ccp.latestBlock = provider.LatestBlock{ @@ -550,87 +517,6 @@ func (ccp *WasmChainProcessor) CurrentBlockHeight(ctx context.Context, persisten ccp.metrics.SetLatestHeight(ccp.chainProvider.ChainId(), persistence.latestHeight) } -func (ccp *WasmChainProcessor) Verify(ctx context.Context, untrusted *types.LightBlock) error { - - if untrusted.Height != ccp.verifier.Header.Height+1 { - return errors.New("headers must be adjacent in height") - } - - if err := verifyNewHeaderAndVals(untrusted.SignedHeader, - untrusted.ValidatorSet, - ccp.verifier.Header.SignedHeader, - time.Now(), 0); err != nil { - return fmt.Errorf("failed to verify Header: %v", err) - } - - if !bytes.Equal(untrusted.Header.ValidatorsHash, ccp.verifier.Header.NextValidatorsHash) { - err := fmt.Errorf("expected old header next validators (%X) to match those from new header (%X)", - ccp.verifier.Header.NextValidatorsHash, - untrusted.Header.ValidatorsHash, - ) - return err - } - - if !bytes.Equal(untrusted.Header.LastBlockID.Hash.Bytes(), ccp.verifier.Header.Commit.BlockID.Hash.Bytes()) { - err := fmt.Errorf("expected LastBlockId Hash (%X) of current header to match those from trusted Header BlockID hash (%X)", - ccp.verifier.Header.NextValidatorsHash, - untrusted.Header.ValidatorsHash, - ) - return err - } - - // Ensure that +2/3 of new validators signed correctly. - if err := untrusted.ValidatorSet.VerifyCommitLight(ccp.verifier.Header.ChainID, untrusted.Commit.BlockID, - untrusted.Header.Height, untrusted.Commit); err != nil { - return fmt.Errorf("invalid header: %v", err) - } - - ccp.verifier.Header = untrusted - return nil - -} - -func verifyNewHeaderAndVals( - untrustedHeader *types.SignedHeader, - untrustedVals *types.ValidatorSet, - trustedHeader *types.SignedHeader, - now time.Time, - maxClockDrift time.Duration) error { - - if err := untrustedHeader.ValidateBasic(trustedHeader.ChainID); err != nil { - return fmt.Errorf("untrustedHeader.ValidateBasic failed: %w", err) - } - - if untrustedHeader.Height <= trustedHeader.Height { - return fmt.Errorf("expected new header height %d to be greater than one of old header %d", - untrustedHeader.Height, - trustedHeader.Height) - } - - if !untrustedHeader.Time.After(trustedHeader.Time) { - return fmt.Errorf("expected new header time %v to be after old header time %v", - untrustedHeader.Time, - trustedHeader.Time) - } - - if !untrustedHeader.Time.Before(now.Add(maxClockDrift)) { - return fmt.Errorf("new header has a time from the future %v (now: %v; max clock drift: %v)", - untrustedHeader.Time, - now, - maxClockDrift) - } - - if !bytes.Equal(untrustedHeader.ValidatorsHash, untrustedVals.Hash()) { - return fmt.Errorf("expected new header validators (%X) to match those that were supplied (%X) at height %d", - untrustedHeader.ValidatorsHash, - untrustedVals.Hash(), - untrustedHeader.Height, - ) - } - - return nil -} - // func (ccp *WasmChainProcessor) CurrentRelayerBalance(ctx context.Context) { // // memoize the current gas prices to only show metrics for "interesting" denoms // if ccp.parsedGasPrices == nil { From 14d625798f7294f241d385620300b188234bf450 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Wed, 10 Jul 2024 21:21:00 +0545 Subject: [PATCH 03/18] feat: use batch query for block transactions --- relayer/chains/wasm/provider.go | 8 +++ relayer/chains/wasm/query.go | 80 +++++++++++++++++++++ relayer/chains/wasm/types/types.go | 8 --- relayer/chains/wasm/wasm_chain_processor.go | 57 +++++---------- 4 files changed, 105 insertions(+), 48 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 0fae2a4bd..bceec2628 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -11,6 +11,7 @@ import ( "github.com/CosmWasm/wasmd/app" provtypes "github.com/cometbft/cometbft/light/provider" + "github.com/cometbft/cometbft/rpc/client/http" comettypes "github.com/cometbft/cometbft/types" "golang.org/x/mod/semver" @@ -278,6 +279,7 @@ type WasmProvider struct { Keybase keyring.Keyring KeyringOptions []keyring.Option RPCClient rpcclient.Client + CometRPCClient *http.HTTP QueryClient wasmtypes.QueryClient LightProvider provtypes.Provider Cdc Codec @@ -351,6 +353,12 @@ func (ap *WasmProvider) Init(ctx context.Context) error { } ap.RPCClient = rpcClient + cometRPCClient, err := http.New(ap.PCfg.RPCAddr, "/websocket") + if err != nil { + return err + } + ap.CometRPCClient = cometRPCClient + lightprovider, err := prov.New(ap.PCfg.ChainID, ap.PCfg.RPCAddr) if err != nil { return err diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 546ab61a6..3c10a4448 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -910,3 +910,83 @@ func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, } return clienttypes.Height{RevisionNumber: 0, RevisionHeight: uint64(heights[0])}, nil } + +type BlockInfo struct { + IBCHeader provider.IBCHeader + Messages []ibcMessage +} + +type txSearchParam struct { + page int + perPage int + orderBy string + query string + prove bool +} + +func (ap *WasmProvider) GetBlockInfoList( + ctx context.Context, + fromHeight, toHeight uint64, +) ([]BlockInfo, error) { + ibcHandlerAddr := ap.PCfg.IbcHandlerAddress + + txsParam := txSearchParam{ + query: fmt.Sprintf("block.height >= %d AND block.height <= %d AND wasmMessage._contract_address = %s", fromHeight, toHeight, ibcHandlerAddr), + page: 1, + perPage: 100, + orderBy: "asc", + } + txsResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + if err != nil { + return nil, err + } + + txs := txsResult.Txs + + if txsResult.TotalCount > txsParam.perPage { + totalPages := txsResult.TotalCount / txsParam.page + if txsResult.TotalCount%txsParam.page != 0 { + totalPages += 1 + } + for i := 2; i <= totalPages; i++ { + txsParam.page = i + nextResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + if err != nil { + return nil, err + } + txs = append(txs, nextResult.Txs...) + } + } + + blockMessages := map[uint64][]ibcMessage{} + for _, txResult := range txs { + messages := ibcMessagesFromEvents( + ap.log, + txResult.TxResult.Events, + ap.ChainId(), + uint64(txResult.Height), + ibcHandlerAddr, + ap.cometLegacyEncoding, + ) + blockMessages[uint64(txResult.Height)] = append(blockMessages[uint64(txResult.Height)], messages...) + } + + blockInfoList := []BlockInfo{} + for h := fromHeight; h <= toHeight; h++ { + ibcHeader, err := ap.QueryIBCHeader(ctx, int64(h)) + if err != nil { + return nil, err + } + + bInfo := BlockInfo{ + IBCHeader: ibcHeader, + } + + if messages, ok := blockMessages[ibcHeader.Height()]; ok { + bInfo.Messages = messages + } + blockInfoList = append(blockInfoList, bInfo) + } + + return blockInfoList, nil +} diff --git a/relayer/chains/wasm/types/types.go b/relayer/chains/wasm/types/types.go index aa3969e59..f6085a42c 100644 --- a/relayer/chains/wasm/types/types.go +++ b/relayer/chains/wasm/types/types.go @@ -4,9 +4,6 @@ import ( "encoding/hex" "encoding/json" "fmt" - - ctypes "github.com/cometbft/cometbft/rpc/core/types" - "github.com/cometbft/cometbft/types" ) type HexBytes string @@ -372,8 +369,3 @@ func NewPrevConsensusStateHeight(clientId string, height uint64) *GetPrevConsens }, } } - -type BlockInfo struct { - types.Block - Txs []*ctypes.ResultTx -} diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 79b10de17..cde363b8e 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -384,64 +384,41 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer chainID := ccp.chainProvider.ChainId() var latestHeader provider.IBCHeader - syncUpHeight := func() int64 { + fromHeight := persistence.latestQueriedBlock + 1 + toHeight := func() int64 { if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { return persistence.latestQueriedBlock + MaxBlockFetch } return persistence.latestHeight - } - - for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ { - var eg errgroup.Group - var blockRes *ctypes.ResultBlockResults - var h provider.IBCHeader - i := i - eg.Go(func() (err error) { - queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout) - defer cancelQueryCtx() - blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i) - return err - }) + }() - if err := eg.Wait(); err != nil { - ccp.log.Warn("Error querying block data", zap.Error(err)) - break - } + blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) + if err != nil { + return err + } + for _, blockInfo := range blockInfos { ccp.log.Debug( "Queried block", - zap.Int64("height", i), + zap.Uint64("height", blockInfo.IBCHeader.Height()), zap.Int64("latest", persistence.latestHeight), - zap.Int64("delta", persistence.latestHeight-i), + zap.Int64("delta", persistence.latestHeight-int64(blockInfo.IBCHeader.Height())), ) - latestHeader = h - - heightUint64 := uint64(i) + ppChanged = true ccp.latestBlock = provider.LatestBlock{ - Height: heightUint64, + Height: blockInfo.IBCHeader.Height(), } - ibcHeaderCache[heightUint64] = latestHeader - ppChanged = true - - base64Encoded := ccp.chainProvider.cometLegacyEncoding + ibcHeaderCache[blockInfo.IBCHeader.Height()] = blockInfo.IBCHeader - for _, tx := range blockRes.TxsResults { - if tx.Code != 0 { - // tx was not successful - continue - } - messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded) - - for _, m := range messages { - ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", heightUint64)) - ccp.handleMessage(ctx, m, ibcMessagesCache) - } + for _, m := range blockInfo.Messages { + ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", blockInfo.IBCHeader.Height())) + ccp.handleMessage(ctx, m, ibcMessagesCache) } - newLatestQueriedBlock = i + newLatestQueriedBlock = int64(blockInfo.IBCHeader.Height()) } if newLatestQueriedBlock == persistence.latestQueriedBlock { From 5c130a55c2891570b8b682ccd4848b3f6aabe563 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Thu, 11 Jul 2024 11:21:08 +0545 Subject: [PATCH 04/18] fix: ensure validator set in sent before updating wasm client --- relayer/chains/wasm/provider.go | 13 ++++++++----- relayer/chains/wasm/tx.go | 16 ++++++++++++++++ 2 files changed, 24 insertions(+), 5 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index bceec2628..d53234e7f 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -86,9 +86,9 @@ func NewWasmIBCHeader(header *itm.SignedHeader, validators *itm.ValidatorSet) Wa } } -func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHeader { +func itmValidatorSetFromLightBlock(lb *comettypes.LightBlock) *itm.ValidatorSet { vSets := make([]*itm.Validator, 0) - for _, v := range lightBlock.ValidatorSet.Validators { + for _, v := range lb.ValidatorSet.Validators { _v := &itm.Validator{ Address: v.Address, PubKey: &itm.PublicKey{ @@ -100,7 +100,12 @@ func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHe vSets = append(vSets, _v) } + return &itm.ValidatorSet{ + Validators: vSets, + } +} +func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHeader { signatures := make([]*itm.CommitSig, 0) for _, d := range lightBlock.Commit.Signatures { @@ -160,9 +165,7 @@ func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHe Signatures: signatures, }, }, - ValidatorSet: &itm.ValidatorSet{ - Validators: vSets, - }, + ValidatorSet: itmValidatorSetFromLightBlock(lightBlock), } } diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index 797f2401c..c4bbe0fb5 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -600,6 +600,22 @@ func (ap *WasmProvider) MsgUpdateClientHeader(latestHeader provider.IBCHeader, t return nil, fmt.Errorf("unsupported IBC header type, expected: TendermintIBCHeader, actual: %T", latestHeader) } + if latestWasmHeader.ValidatorSet == nil { + lb, err := ap.LightProvider.LightBlock(context.TODO(), latestWasmHeader.SignedHeader.Header.Height) + if err != nil { + return nil, err + } + latestWasmHeader.ValidatorSet = itmValidatorSetFromLightBlock(lb) + } + + if trustedWasmHeader.ValidatorSet == nil { + lb, err := ap.LightProvider.LightBlock(context.TODO(), trustedWasmHeader.SignedHeader.Header.Height) + if err != nil { + return nil, err + } + trustedWasmHeader.ValidatorSet = itmValidatorSetFromLightBlock(lb) + } + return &itm.TmHeader{ SignedHeader: latestWasmHeader.SignedHeader, ValidatorSet: latestWasmHeader.ValidatorSet, From a34ae2de044ecab5fd1a22fb4f4364928ad9d547 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Thu, 11 Jul 2024 14:16:10 +0545 Subject: [PATCH 05/18] fix: get ibc header for necessary height only in query cycle --- relayer/chains/wasm/provider.go | 13 ++++------ relayer/chains/wasm/query.go | 27 +++++++++------------ relayer/chains/wasm/tx.go | 16 ------------ relayer/chains/wasm/wasm_chain_processor.go | 25 ++++++++++++++----- 4 files changed, 35 insertions(+), 46 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index d53234e7f..bceec2628 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -86,9 +86,9 @@ func NewWasmIBCHeader(header *itm.SignedHeader, validators *itm.ValidatorSet) Wa } } -func itmValidatorSetFromLightBlock(lb *comettypes.LightBlock) *itm.ValidatorSet { +func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHeader { vSets := make([]*itm.Validator, 0) - for _, v := range lb.ValidatorSet.Validators { + for _, v := range lightBlock.ValidatorSet.Validators { _v := &itm.Validator{ Address: v.Address, PubKey: &itm.PublicKey{ @@ -100,12 +100,7 @@ func itmValidatorSetFromLightBlock(lb *comettypes.LightBlock) *itm.ValidatorSet vSets = append(vSets, _v) } - return &itm.ValidatorSet{ - Validators: vSets, - } -} -func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHeader { signatures := make([]*itm.CommitSig, 0) for _, d := range lightBlock.Commit.Signatures { @@ -165,7 +160,9 @@ func NewWasmIBCHeaderFromLightBlock(lightBlock *comettypes.LightBlock) WasmIBCHe Signatures: signatures, }, }, - ValidatorSet: itmValidatorSetFromLightBlock(lightBlock), + ValidatorSet: &itm.ValidatorSet{ + Validators: vSets, + }, } } diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 3c10a4448..20cf0a421 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -912,8 +912,8 @@ func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context, } type BlockInfo struct { - IBCHeader provider.IBCHeader - Messages []ibcMessage + Height uint64 + Messages []ibcMessage } type txSearchParam struct { @@ -931,7 +931,7 @@ func (ap *WasmProvider) GetBlockInfoList( ibcHandlerAddr := ap.PCfg.IbcHandlerAddress txsParam := txSearchParam{ - query: fmt.Sprintf("block.height >= %d AND block.height <= %d AND wasmMessage._contract_address = %s", fromHeight, toHeight, ibcHandlerAddr), + query: fmt.Sprintf("block.height >= %d AND block.height <= %d AND wasm._contract_address = %s", fromHeight, toHeight, ibcHandlerAddr), page: 1, perPage: 100, orderBy: "asc", @@ -968,24 +968,19 @@ func (ap *WasmProvider) GetBlockInfoList( ibcHandlerAddr, ap.cometLegacyEncoding, ) - blockMessages[uint64(txResult.Height)] = append(blockMessages[uint64(txResult.Height)], messages...) + if len(messages) > 0 { + blockMessages[uint64(txResult.Height)] = append(blockMessages[uint64(txResult.Height)], messages...) + } } blockInfoList := []BlockInfo{} for h := fromHeight; h <= toHeight; h++ { - ibcHeader, err := ap.QueryIBCHeader(ctx, int64(h)) - if err != nil { - return nil, err - } - - bInfo := BlockInfo{ - IBCHeader: ibcHeader, - } - - if messages, ok := blockMessages[ibcHeader.Height()]; ok { - bInfo.Messages = messages + if messages, ok := blockMessages[h]; ok { + blockInfoList = append(blockInfoList, BlockInfo{ + Height: h, + Messages: messages, + }) } - blockInfoList = append(blockInfoList, bInfo) } return blockInfoList, nil diff --git a/relayer/chains/wasm/tx.go b/relayer/chains/wasm/tx.go index c4bbe0fb5..797f2401c 100644 --- a/relayer/chains/wasm/tx.go +++ b/relayer/chains/wasm/tx.go @@ -600,22 +600,6 @@ func (ap *WasmProvider) MsgUpdateClientHeader(latestHeader provider.IBCHeader, t return nil, fmt.Errorf("unsupported IBC header type, expected: TendermintIBCHeader, actual: %T", latestHeader) } - if latestWasmHeader.ValidatorSet == nil { - lb, err := ap.LightProvider.LightBlock(context.TODO(), latestWasmHeader.SignedHeader.Header.Height) - if err != nil { - return nil, err - } - latestWasmHeader.ValidatorSet = itmValidatorSetFromLightBlock(lb) - } - - if trustedWasmHeader.ValidatorSet == nil { - lb, err := ap.LightProvider.LightBlock(context.TODO(), trustedWasmHeader.SignedHeader.Header.Height) - if err != nil { - return nil, err - } - trustedWasmHeader.ValidatorSet = itmValidatorSetFromLightBlock(lb) - } - return &itm.TmHeader{ SignedHeader: latestWasmHeader.SignedHeader, ValidatorSet: latestWasmHeader.ValidatorSet, diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index cde363b8e..3eaeb38c8 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -400,25 +400,30 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer for _, blockInfo := range blockInfos { ccp.log.Debug( "Queried block", - zap.Uint64("height", blockInfo.IBCHeader.Height()), + zap.Uint64("height", blockInfo.Height), zap.Int64("latest", persistence.latestHeight), - zap.Int64("delta", persistence.latestHeight-int64(blockInfo.IBCHeader.Height())), + zap.Int64("delta", persistence.latestHeight-int64(blockInfo.Height)), ) ppChanged = true ccp.latestBlock = provider.LatestBlock{ - Height: blockInfo.IBCHeader.Height(), + Height: blockInfo.Height, } - ibcHeaderCache[blockInfo.IBCHeader.Height()] = blockInfo.IBCHeader + ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height)) + if err != nil { + ccp.log.Error("failed to query ibc header", zap.Error(err), zap.Uint64("height", blockInfo.Height)) + } else { + ibcHeaderCache[blockInfo.Height] = ibcHeader + } for _, m := range blockInfo.Messages { - ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", blockInfo.IBCHeader.Height())) + ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", blockInfo.Height)) ccp.handleMessage(ctx, m, ibcMessagesCache) } - newLatestQueriedBlock = int64(blockInfo.IBCHeader.Height()) + newLatestQueriedBlock = int64(blockInfo.Height) } if newLatestQueriedBlock == persistence.latestQueriedBlock { @@ -446,6 +451,14 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer continue } + clientStateHeight := clientState.ConsensusHeight.RevisionHeight + ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(clientStateHeight)) + if err != nil { + ccp.log.Error("failed to query ibc header", zap.Error(err), zap.Uint64("height", clientStateHeight)) + } else { + ibcHeaderCache[clientStateHeight] = ibcHeader + } + pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ LatestBlock: ccp.latestBlock, LatestHeader: latestHeader, From 880403ab2e16294469a4c455e9e2f000b7d91fdd Mon Sep 17 00:00:00 2001 From: sherpalden Date: Thu, 11 Jul 2024 17:04:05 +0545 Subject: [PATCH 06/18] chore: test --- relayer/chains/wasm/query.go | 15 ++++++++++++++- relayer/chains/wasm/wasm_chain_processor.go | 17 +++++++++++++---- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 20cf0a421..332b34f44 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -931,7 +931,7 @@ func (ap *WasmProvider) GetBlockInfoList( ibcHandlerAddr := ap.PCfg.IbcHandlerAddress txsParam := txSearchParam{ - query: fmt.Sprintf("block.height >= %d AND block.height <= %d AND wasm._contract_address = %s", fromHeight, toHeight, ibcHandlerAddr), + query: fmt.Sprintf("block.height >= %d AND block.height <= %d", fromHeight, toHeight), page: 1, perPage: 100, orderBy: "asc", @@ -943,6 +943,8 @@ func (ap *WasmProvider) GetBlockInfoList( txs := txsResult.Txs + fmt.Printf("\nTx Result: %+v query: %s\n", txs, txsParam.query) + if txsResult.TotalCount > txsParam.perPage { totalPages := txsResult.TotalCount / txsParam.page if txsResult.TotalCount%txsParam.page != 0 { @@ -983,5 +985,16 @@ func (ap *WasmProvider) GetBlockInfoList( } } + if len(blockInfoList) == 0 { + blockInfoList = append(blockInfoList, BlockInfo{ + Height: fromHeight, + Messages: []ibcMessage{}, + }) + blockInfoList = append(blockInfoList, BlockInfo{ + Height: toHeight, + Messages: []ibcMessage{}, + }) + } + return blockInfoList, nil } diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 3eaeb38c8..970e43f3b 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -366,7 +366,10 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer if (persistence.latestHeight - persistence.latestQueriedBlock) < inSyncNumBlocksThreshold { ccp.inSync = true firstTimeInSync = true - ccp.log.Info("Chain is in sync") + ccp.log.Info("Chain is in sync", + zap.Int64("latest_queried_block", persistence.latestQueriedBlock), + zap.Int64("latest_height", persistence.latestHeight), + ) } else { ccp.log.Info("Chain is not yet in sync", zap.Int64("latest_queried_block", persistence.latestQueriedBlock), @@ -392,9 +395,14 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer return persistence.latestHeight }() + if fromHeight > toHeight { + fromHeight = toHeight + } + blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) if err != nil { - return err + ccp.log.Error("failed to query block messages", zap.Error(err)) + return nil } for _, blockInfo := range blockInfos { @@ -413,7 +421,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height)) if err != nil { - ccp.log.Error("failed to query ibc header", zap.Error(err), zap.Uint64("height", blockInfo.Height)) + ccp.log.Error("failed to query ibc header ", zap.Error(err), zap.Uint64("height", blockInfo.Height)) } else { ibcHeaderCache[blockInfo.Height] = ibcHeader } @@ -424,6 +432,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer } newLatestQueriedBlock = int64(blockInfo.Height) + latestHeader = ibcHeader } if newLatestQueriedBlock == persistence.latestQueriedBlock { @@ -454,7 +463,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer clientStateHeight := clientState.ConsensusHeight.RevisionHeight ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(clientStateHeight)) if err != nil { - ccp.log.Error("failed to query ibc header", zap.Error(err), zap.Uint64("height", clientStateHeight)) + ccp.log.Error("failed to query ibc header", zap.String("client-id", clientID), zap.Error(err), zap.Uint64("height", clientStateHeight)) } else { ibcHeaderCache[clientStateHeight] = ibcHeader } From b3233532ff65be984bf9cf793d643382c64965c2 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Thu, 11 Jul 2024 17:23:22 +0545 Subject: [PATCH 07/18] chore: test --- relayer/chains/wasm/query.go | 12 +++++++----- relayer/chains/wasm/wasm_chain_processor.go | 8 -------- 2 files changed, 7 insertions(+), 13 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 332b34f44..9aef1fead 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -931,7 +931,7 @@ func (ap *WasmProvider) GetBlockInfoList( ibcHandlerAddr := ap.PCfg.IbcHandlerAddress txsParam := txSearchParam{ - query: fmt.Sprintf("block.height >= %d AND block.height <= %d", fromHeight, toHeight), + query: fmt.Sprintf("block.height >= %d", fromHeight), page: 1, perPage: 100, orderBy: "asc", @@ -990,10 +990,12 @@ func (ap *WasmProvider) GetBlockInfoList( Height: fromHeight, Messages: []ibcMessage{}, }) - blockInfoList = append(blockInfoList, BlockInfo{ - Height: toHeight, - Messages: []ibcMessage{}, - }) + if toHeight != fromHeight { + blockInfoList = append(blockInfoList, BlockInfo{ + Height: toHeight, + Messages: []ibcMessage{}, + }) + } } return blockInfoList, nil diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 970e43f3b..be35e33db 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -460,14 +460,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer continue } - clientStateHeight := clientState.ConsensusHeight.RevisionHeight - ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(clientStateHeight)) - if err != nil { - ccp.log.Error("failed to query ibc header", zap.String("client-id", clientID), zap.Error(err), zap.Uint64("height", clientStateHeight)) - } else { - ibcHeaderCache[clientStateHeight] = ibcHeader - } - pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ LatestBlock: ccp.latestBlock, LatestHeader: latestHeader, From c6f8cfb01a875968dcdb8487ea3204fc6f0edfed Mon Sep 17 00:00:00 2001 From: sherpalden Date: Thu, 11 Jul 2024 18:24:54 +0545 Subject: [PATCH 08/18] fix: wasm block sync fast --- relayer/chains/wasm/query.go | 5 ++--- relayer/chains/wasm/wasm_chain_processor.go | 4 ++++ 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 9aef1fead..d0a6d2950 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -931,11 +931,12 @@ func (ap *WasmProvider) GetBlockInfoList( ibcHandlerAddr := ap.PCfg.IbcHandlerAddress txsParam := txSearchParam{ - query: fmt.Sprintf("block.height >= %d", fromHeight), + query: fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND wasm._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr), page: 1, perPage: 100, orderBy: "asc", } + txsResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) if err != nil { return nil, err @@ -943,8 +944,6 @@ func (ap *WasmProvider) GetBlockInfoList( txs := txsResult.Txs - fmt.Printf("\nTx Result: %+v query: %s\n", txs, txsParam.query) - if txsResult.TotalCount > txsParam.perPage { totalPages := txsResult.TotalCount / txsParam.page if txsResult.TotalCount%txsParam.page != 0 { diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index be35e33db..9855bc928 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -399,6 +399,10 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer fromHeight = toHeight } + if fromHeight <= persistence.latestQueriedBlock { + return nil + } + blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) if err != nil { ccp.log.Error("failed to query block messages", zap.Error(err)) From e6dd954e5a8cea583b91647f5badc3a6a0f325e7 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 11:45:35 +0545 Subject: [PATCH 09/18] fix: implement retry strategy --- relayer/chains/wasm/query.go | 30 ++++++++++------ relayer/chains/wasm/wasm_chain_processor.go | 39 ++++++++++++++------- 2 files changed, 47 insertions(+), 22 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index d0a6d2950..c557889a1 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -984,16 +984,26 @@ func (ap *WasmProvider) GetBlockInfoList( } } - if len(blockInfoList) == 0 { - blockInfoList = append(blockInfoList, BlockInfo{ - Height: fromHeight, - Messages: []ibcMessage{}, - }) - if toHeight != fromHeight { - blockInfoList = append(blockInfoList, BlockInfo{ - Height: toHeight, - Messages: []ibcMessage{}, - }) + if _, ok := blockMessages[fromHeight]; !ok { + blockInfoList = append( + []BlockInfo{ + { + Height: fromHeight, + Messages: []ibcMessage{}, + }, + }, blockInfoList..., + ) + } + + if fromHeight != toHeight { + if _, ok := blockMessages[toHeight]; !ok { + blockInfoList = append( + blockInfoList, + BlockInfo{ + Height: fromHeight, + Messages: []ibcMessage{}, + }, + ) } } diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 9855bc928..f8a9d8a36 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -399,13 +399,19 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer fromHeight = toHeight } + // avoid duplicate query if fromHeight <= persistence.latestQueriedBlock { return nil } blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) if err != nil { - ccp.log.Error("failed to query block messages", zap.Error(err)) + ccp.log.Error( + "failed to query block messages", + zap.Uint64("from-height", uint64(fromHeight)), + zap.Uint64("to-height", uint64(toHeight)), + zap.Error(err), + ) return nil } @@ -417,17 +423,21 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer zap.Int64("delta", persistence.latestHeight-int64(blockInfo.Height)), ) - ppChanged = true - - ccp.latestBlock = provider.LatestBlock{ - Height: blockInfo.Height, - } - - ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height)) - if err != nil { - ccp.log.Error("failed to query ibc header ", zap.Error(err), zap.Uint64("height", blockInfo.Height)) - } else { - ibcHeaderCache[blockInfo.Height] = ibcHeader + var ibcHeader provider.IBCHeader + if err := retry.Do(func() error { + ibcHeader, _, err = ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height)) + if err != nil { + return err + } + return nil + }, retry.Context(ctx), rtyAtt, retry.Delay(2*time.Second), rtyErr); err != nil { + ccp.log.Error( + "failed to query ibc header ", + zap.Error(err), + zap.Uint64("height", blockInfo.Height), + zap.Uint("total-attempts", rtyAttNum), + ) + return nil //exit and rerun the query cycle from current state } for _, m := range blockInfo.Messages { @@ -435,8 +445,13 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer ccp.handleMessage(ctx, m, ibcMessagesCache) } + ppChanged = true + ccp.latestBlock = provider.LatestBlock{ + Height: blockInfo.Height, + } newLatestQueriedBlock = int64(blockInfo.Height) latestHeader = ibcHeader + ibcHeaderCache[blockInfo.Height] = ibcHeader } if newLatestQueriedBlock == persistence.latestQueriedBlock { From e915b8890ab5d4c49bfc354c5c366078d1629be2 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 11:54:22 +0545 Subject: [PATCH 10/18] fix: remove unwanted ws rpc client --- relayer/chains/wasm/provider.go | 8 -------- relayer/chains/wasm/query.go | 4 ++-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index bceec2628..0fae2a4bd 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -11,7 +11,6 @@ import ( "github.com/CosmWasm/wasmd/app" provtypes "github.com/cometbft/cometbft/light/provider" - "github.com/cometbft/cometbft/rpc/client/http" comettypes "github.com/cometbft/cometbft/types" "golang.org/x/mod/semver" @@ -279,7 +278,6 @@ type WasmProvider struct { Keybase keyring.Keyring KeyringOptions []keyring.Option RPCClient rpcclient.Client - CometRPCClient *http.HTTP QueryClient wasmtypes.QueryClient LightProvider provtypes.Provider Cdc Codec @@ -353,12 +351,6 @@ func (ap *WasmProvider) Init(ctx context.Context) error { } ap.RPCClient = rpcClient - cometRPCClient, err := http.New(ap.PCfg.RPCAddr, "/websocket") - if err != nil { - return err - } - ap.CometRPCClient = cometRPCClient - lightprovider, err := prov.New(ap.PCfg.ChainID, ap.PCfg.RPCAddr) if err != nil { return err diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index c557889a1..119a82d98 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -937,7 +937,7 @@ func (ap *WasmProvider) GetBlockInfoList( orderBy: "asc", } - txsResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + txsResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) if err != nil { return nil, err } @@ -951,7 +951,7 @@ func (ap *WasmProvider) GetBlockInfoList( } for i := 2; i <= totalPages; i++ { txsParam.page = i - nextResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + nextResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) if err != nil { return nil, err } From 5afbebe2882e9a1b08f3d16430f39f4a7d79065b Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 11:59:06 +0545 Subject: [PATCH 11/18] fix: skip to process failed txn --- relayer/chains/wasm/query.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 119a82d98..dafe780bd 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -961,6 +961,9 @@ func (ap *WasmProvider) GetBlockInfoList( blockMessages := map[uint64][]ibcMessage{} for _, txResult := range txs { + if txResult.TxResult.Code != 0 { + continue // skip failed transaction + } messages := ibcMessagesFromEvents( ap.log, txResult.TxResult.Events, From a77a06de92a8e4a44d900e348fe0a32f80be4763 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 13:17:13 +0545 Subject: [PATCH 12/18] fix: unwanted and failing test --- relayer/chains/wasm/provider_test.go | 658 +-------------------------- 1 file changed, 1 insertion(+), 657 deletions(-) diff --git a/relayer/chains/wasm/provider_test.go b/relayer/chains/wasm/provider_test.go index 58bd1f74a..4ad06a5ae 100644 --- a/relayer/chains/wasm/provider_test.go +++ b/relayer/chains/wasm/provider_test.go @@ -72,27 +72,6 @@ func GetProvider(ctx context.Context, handlerAddr string, local bool) (provider. } -func TestGetAddress(t *testing.T) { - ctx := context.Background() - p, err := GetProvider(ctx, "archway14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9sy85n2u", false) - assert.NoError(t, err) - pArch := p.(*WasmProvider) - assert.NoError(t, err) - // prefix will be setup when querying a contract or doing a txn, not when provider is initialized - a := "cosmos1jpdcgkwv7wmwaqc6lyvd82dwhkxxfvpl53qrze" - addr, err := pArch.GetKeyAddress() - assert.NoError(t, err) - assert.Equal(t, a, addr.String()) - - // op, err := pArch.QueryBalance(ctx, "default") - // assert.NoError(t, err) - - // fmt.Println("balance", op) - // opx, err := pArch.ShowAddress("testWallet") - // assert.NoError(t, err) - // assert.Equal(t, addr, opx) -} - type HexBytes string func (hs HexBytes) Value() ([]byte, error) { @@ -120,245 +99,6 @@ func (m *SendPacket) MsgBytes() ([]byte, error) { return json.Marshal(m) } -// func TestTransaction(t *testing.T) { -// ctx := context.Background() -// contract := "archway1j2zsnnv7qpd6hqhrkg96c57wv9yff4y6amarcvsp5lkta2e4k5vstvt9j3" -// p, _ := GetProvider(ctx, contract) -// pArch := p.(*WasmProvider) -// pArch.Init(ctx) - -// key := "jptKey" - -// msg := &SendPacket{ -// Pkt: struct { -// Packet HexBytes "json:\"packet\"" -// Id string "json:\"id\"" -// }{ -// Packet: NewHexBytes([]byte("Hello")), -// Id: key, -// }, -// } - -// // msg, err := pArch.MsgSendPacketTemp(key) -// // assert.NoError(t, err) - -// callback := func(rtr *provider.RelayerTxResponse, err error) { -// if err != nil { -// return -// } -// } - -// err := pArch.SendMessagesToMempool(ctx, []provider.RelayerMessage{msg}, "memo", nil, callback) -// assert.NoError(t, err) - -// storageKey := fmt.Sprintf("0007%x%s", []byte("packets"), key) -// _, err = pArch.QueryWasmProof(ctx, []byte(storageKey), 1932589) -// assert.NoError(t, err) - -// } - -// func TestTxnResult(t *testing.T) { -// hash := "A7FAA098E4671ABDB9C3557B4E94F5C208939804B4CE64BF066669EC75313151" -// b, e := hex.DecodeString(hash) -// assert.NoError(t, e) - -// ctx := context.Background() -// p, err := GetProvider(ctx, "archway21", true) -// assert.NoError(t, err) -// pArch, ok := p.(*WasmProvider) -// assert.True(t, ok) - -// a := make(chan provider.RelayerTxResponse, 10) - -// callback := func(rtr *provider.RelayerTxResponse, err error) { -// fmt.Printf("Tx Response:: %+v\n ", rtr) -// if err == nil { -// a <- *rtr -// } -// return -// } - -// pArch.waitForTx(ctx, b, nil, time.Minute*10, callback) -// brakHere: -// for { -// select { -// case <-a: -// { -// fmt.Println("response received") -// break brakHere -// } -// } - -// } - -// } - -// func TestClientState(t *testing.T) { - -// ctx := context.Background() -// contractAddr := "archway1vguuxez2h5ekltfj9gjd62fs5k4rl2zy5hfrncasykzw08rezpfsa4aasz" -// p, err := GetProvider(ctx, contractAddr, true) -// assert.NoError(t, err) - -// archP := p.(*WasmProvider) - -// clientId := "iconclient-0" - -// iconM, err := archP.QueryClientStateContract(ctx, clientId) -// assert.NoError(t, err) -// fmt.Printf("%+v", iconM) -// } - -// func TestTxCall(t *testing.T) { - -// ctx := context.Background() - -// p, _ := GetProvider(ctx, "", false) -// pArch := p.(*WasmProvider) - -// // cl, _ := client.NewClientFromNode("http://localhost:26657") -// cl, _ := client.NewClientFromNode("https://rpc.constantine-2.archway.tech:443") - -// addr, err := pArch.GetKeyAddress() -// assert.NoError(t, err) - -// encodingConfig := app.MakeEncodingConfig() -// cliCtx := client.Context{}. -// WithClient(cl). -// WithFromName(pArch.PCfg.Key). -// WithFromAddress(addr). -// WithTxConfig(encodingConfig.TxConfig). -// WithSkipConfirmation(true). -// WithBroadcastMode("sync") - -// ///////////////////////////////////////////////// -// ///////////////////// EXECUTION ///////////////// -// ///////////////////////////////////////////////// - -// // pktData := []byte("hello_world") - -// // type SendPacketParams struct { -// // Packet HexBytes `json:"packet"` -// // Id string `json:"id"` -// // } -// // type SendPacket struct { -// // Pkt SendPacketParams `json:"send_packet"` -// // } - -// // sendPkt := SendPacket{ -// // Pkt: SendPacketParams{ -// // Packet: NewHexBytes(pktData), -// // Id: "345", -// // }, -// // } - -// // dB, err := json.Marshal(sendPkt) -// // assert.NoError(t, err) - -// // msg := &wasmtypes.MsgExecuteContract{ -// // Sender: addr.String(), -// // Contract: contract, -// // Msg: dB, -// // } - -// // a := pArch.TxFactory() -// // factory, err := pArch.PrepareFactory(a) -// // assert.NoError(t, err) - -// // tx.GenerateOrBroadcastTxWithFactory(cliCtx, factory, msg) - -// ///////////////////////////////////////////////// -// /////////////////////// QUERY /////////////////// -// ///////////////////////////////////////////////// - -// type GetPacket struct { -// GetPacket struct { -// Id string `json:"id"` -// } `json:"get_packet"` -// } - -// type PacketOutput struct { -// Packet []byte `json:"packet"` -// } - -// // _param := GetPacket{ -// // GetPacket: struct { -// // Id string "json:\"id\"" -// // }{ -// // Id: "100", -// // }, -// // } - -// // type GetAllPacket struct { -// // GetAllPacket interface{} `json:"get_packet"` -// // } - -// cs := types.GetClientState{ -// ClientState: struct { -// ClientId string "json:\"client_id\"" -// }{ -// ClientId: "iconclient-0", -// }, -// } - -// param, _ := json.Marshal(cs) - -// queryCLient := wasmtypes.NewQueryClient(cliCtx) -// contractState, err := queryCLient.SmartContractState(ctx, &wasmtypes.QuerySmartContractStateRequest{ -// Address: archway_mock_address, -// QueryData: param, -// }) - -// assert.NoError(t, err) -// e := contractState.Data -// var i icon_types.ClientState -// err = json.Unmarshal(e, &i) -// fmt.Printf("data is %s \n", e) -// assert.NoError(t, err) -// fmt.Printf("data is %+v \n", i) - -// } - -// func TestCreateClient(t *testing.T) { - -// ctx := context.Background() -// ap, err := GetProvider(ctx, "archway1hpufl3l8g44aaz3qsqw886sjanhhu73ul6tllxuw3pqlhxzq9e4sqcz9uv", true) //"archway1g4w5f2l25dav7h4mc0mzeute5859wa9hgmavancmprfldqun6ppqsn0zma") -// assert.NoError(t, err) - -// networkId := 1 -// height := 59 -// ip := GetIconProvider(networkId) - -// btpHeader, err := ip.GetBtpHeader(int64(height)) -// assert.NoError(t, err) - -// header := icon.NewIconIBCHeader(btpHeader, nil, int64(height)) - -// clS, err := ip.NewClientState("07-tendermint", header, 100, 100, true, true) -// assert.NoError(t, err) - -// msg, err := ap.MsgCreateClient(clS, header.ConsensusState()) -// assert.NoError(t, err) - -// call := make(chan bool) - -// callback := func(rtr *provider.RelayerTxResponse, err error) { -// assert.NoError(t, err) -// fmt.Printf("Tx Response:: %+v\n ", rtr) -// call <- true -// } - -// err = ap.SendMessagesToMempool(ctx, []provider.RelayerMessage{msg}, "memo", nil, callback) -// assert.NoError(t, err) -// namedLoop: -// for { -// select { -// case <-call: -// break namedLoop -// } -// } -// } - func TestSerializeAny(t *testing.T) { d := clienttypes.Height{ @@ -402,127 +142,6 @@ func GetIconProvider(network_id int) *icon.IconProvider { return iconProvider } -// func TestCreateClient(t *testing.T) { - -// ctx := context.Background() -// ap, err := GetProvider(ctx, "archway1maqs3qvslrjaq8xz9402shucnr4wzdujty8lr7ux5z5rnj989lwsmssrzk", true) -// assert.NoError(t, err) - -// archwayP, ok := ap.(*WasmProvider) -// if !ok { -// assert.Fail(t, "failed to convert to archwayP") -// } - -// networkId := 2 -// height := 307 -// ip := GetIconProvider(networkId) - -// btpHeader, err := ip.GetBtpHeader(int64(height)) -// assert.NoError(t, err) - -// header := icon.NewIconIBCHeader(btpHeader, nil, int64(height)) -// fmt.Println(header.Height()) - -// clS, err := ip.NewClientState("07-tendermint", header, 100, 100, true, true) -// assert.NoError(t, err) - -// msg, err := archwayP.MsgCreateClient(clS, header.ConsensusState()) -// if err != nil { -// assert.Fail(t, err.Error()) -// fmt.Println("error in unexpected place ") -// return -// } - -// fmt.Printf("the value is %s \n", msg) - -// callback := func(rtr *provider.RelayerTxResponse, err error) { -// if err != nil { -// return -// } -// } - -// err = archwayP.SendMessagesToMempool(ctx, []provider.RelayerMessage{msg}, "memo", nil, callback) -// time.Sleep(2 * 1000) -// assert.NoError(t, err) - -// } - -// func TestGetClientState(t *testing.T) { -// ctx := context.Background() -// ap, err := GetProvider(ctx, "", false) -// assert.NoError(t, err) - -// archwayP, ok := ap.(*WasmProvider) -// if !ok { -// assert.Fail(t, "failed to convert to archwayP") -// } - -// state, err := archwayP.QueryClientStateContract(ctx, "iconclient-0") -// assert.NoError(t, err) -// fmt.Printf("ClentState %+v \n", state) - -// } - -// func TestDataDecode(t *testing.T) { - -// d := []byte{10, 32, 47, 105, 99, 111, 110, 46, 108, 105, 103, 104, 116, 99, 108, 105, 101, 110, 116, 46, 118, 49, 46, 67, 108, 105, 101, 110, 116, 83, 116, 97, 116, 101, 18, 32, 127, 98, 36, 134, 45, 9, 198, 30, 199, 185, 205, 28, 128, 214, 203, 138, 15, 65, 45, 70, 134, 139, 202, 40, 61, 44, 97, 169, 50, 7, 225, 18} -// // d := "103247105991111104610810510310411699108105101110116461184946671081051011101168311697116101183212798361344591983019918520528128214203138156545701341392024061449716950722518" -// // b, err := hex.DecodeString(d) -// // assert.NoError(t, err) - -// ctx := context.Background() -// ap, err := GetProvider(ctx, "archway123", false) -// assert.NoError(t, err) -// archwayP, _ := ap.(*WasmProvider) - -// var iconee exported.ClientState -// err = archwayP.Cdc.Marshaler.UnmarshalInterface(d, &iconee) -// assert.NoError(t, err) -// fmt.Println(iconee.GetLatestHeight()) -// } - -// func TestArchwayLightHeader(t *testing.T) { -// ctx := context.Background() -// apx, err := GetProvider(ctx, "abcd", true) -// assert.NoError(t, err) - -// ap := apx.(*WasmProvider) - -// tsHeight := 34055 -// cl := "07-tendermint-0" - -// trustedIbcHeader, err := ap.QueryIBCHeader(ctx, int64(tsHeight)) - -// latestBlockHeader, err := ap.QueryIBCHeader(ctx, 34060) - -// trustedHeight := clienttypes.Height{ -// RevisionHeight: uint64(tsHeight), -// RevisionNumber: 0, -// } - -// msg, err := ap.MsgUpdateClientHeader(latestBlockHeader, trustedHeight, trustedIbcHeader) -// assert.NoError(t, err) - -// iconP := GetIconProvider(2) - -// updateMessage, err := iconP.MsgUpdateClient(cl, msg) -// assert.NoError(t, err) -// fmt.Printf("%x \n ", updateMessage) - -// // err = iconP.SendMessagesToMempool(ctx, []provider.RelayerMessage{updateMessage}, "", ctx, nil) -// // assert.Error(t, err) -// } - -// func TestGetConsensusState(t *testing.T) { -// iconP := GetIconProvider(2) - -// ctx := context.Background() - -// op, err := iconP.QueryClientConsensusState(ctx, 200, "07-tendermint-34", clienttypes.NewHeight(0, 31600)) -// assert.NoError(t, err) -// fmt.Println(op) -// } - func TestProtoMarshal(t *testing.T) { codec := MakeCodec(ModuleBasics, []string{}) @@ -548,244 +167,6 @@ func TestDecodeProto(t *testing.T) { } -// goloop rpc sendtx call \ -// --uri http://localhost:9082/api/v3 \ -// --nid 3 \ -// --step_limit 1000000000\ -// --to cxc327ce659d52f63f727c8d9e9503b8b9cced75f2 \ -// --method updateClient \ -// --raw "{\"params\":{\"msg\":{\"clientId\":\"07-tendermint-0\",\"clientMessage\":\"0x0acc040a8f030a02080b12086c6f63616c6e6574188c8a02220b08bbdbb6a30610fcabd1512a480a20c50b8ec7e3a350b8f4952a23f98e94ca97960d969397625584b33ab2cf2ea45612240801122019bc479a78d29fe86494b54203583cabe3e01edf199f191dfa8f5ea8518b4add3220a7edff3caabb2dd4bfe9ac7c9dd2cd9049ec4332e0e7cf72ed8a78f6e5ea790a3a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8554220d299a92ba9789927c2f4e6bf0db8be41300d33aec0b3d53a37c0d7cf6035e3204a20d299a92ba9789927c2f4e6bf0db8be41300d33aec0b3d53a37c0d7cf6035e3205220048091bc7ddc283f77bfbf91d73c44da58c3df8a9cbc867405d8b7f3daada22f5a206e24d5124a854616c3689c32b2bd77278f45e7b12de588c712d4937f03106b156220e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b8556a20e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b85572146e5f3463982f10e154de349f627857b7acf5218912b701088c8a021a480a20a068896ce65463eb64ff67db82244d084fda128dc09bb2033d78ed3ee636f224122408011220ffc44c3d4d8af8084358d13e3e3174ef0c22b13be985e9fe00f929231929b4892267080212146e5f3463982f10e154de349f627857b7acf521891a0b08c0dbb6a30610f7b2c56722404bef8bdcbf7863684d589abe2af674ff3d418ce4516a4b209e1b1011f6124b0c2eaa15a22ee848e9b609f3357876f50170c9692a59ac4b0fca12311ef373a502123e0a3c0a146e5f3463982f10e154de349f627857b7acf5218912220a208636f208b278b8527a22c7bad921c39ece0594dc95ad213435140be2beaf380e180a18878a02223e0a3c0a146e5f3463982f10e154de349f627857b7acf5218912220a208636f208b278b8527a22c7bad921c39ece0594dc95ad213435140be2beaf380e180a\"}}}"\ \ -// --key_store /Users/viveksharmapoudel/keystore/godWallet.json\ -// --key_password gochain - -// func TestBtpHeader(t *testing.T) { -// a := GetIconProvider(2) -// h, err := a.GetBtpHeader(1125) -// assert.NoError(t, err) -// fmt.Printf("%x \n ", h.PrevNetworkSectionHash) -// } - -// func TestArchwayEvent(t *testing.T) { - -// ctx := context.Background() -// pro, err := GetProvider(ctx, "archway17ymdtz48qey0lpha8erch8hghj37ag4dn0qqyyrtseymvgw6lfnqgmtsrj", true) -// assert.NoError(t, err) - -// archwayP := pro.(*WasmProvider) - -// height := int64(6343) -// blockRes, err := archwayP.RPCClient.BlockResults(ctx, &height) -// assert.NoError(t, err) - -// for _, tx := range blockRes.TxsResults { -// if tx.Code != 0 { -// // tx was not successful -// continue -// } - -// // fmt.Println(tx.Events) -// messages := ibcMessagesFromEvents(archwayP.log, tx.Events, archwayP.ChainId(), uint64(height), archwayP.PCfg.IbcHandlerAddress, true) - -// assert.Equal(t, len(messages), 2) - -// for _, m := range messages { - -// fmt.Println(m.eventType) -// fmt.Printf("message is %+v \n", m.info) - -// } -// } - -// } - -// func TestDecodeMerkleProof(t *testing.T) { - -// v := common.MustHexStrToBytes("0x0ac90612c6060a7b03ade4a5f5803a439835c636395a8d648dee57b2fc90d98dc17fa887159b69638b30303062363336663665366536353633373436393666366537336230326433663334643963373863663565353734396637373131373861386361663034653731303432636366336636396165656430356230383066333336373712f5020a4503ade4a5f5803a439835c636395a8d648dee57b2fc90d98dc17fa887159b69638b0016636c69656e745f696d706c656d656e746174696f6e7369636f6e636c69656e742d3012442261726368776179316e633574617461667636657971376c6c6b7232677635306666396532326d6e66373071676a6c763733376b746d74346573777271676a33336736221a0b0801180120012a03000238222b0801120402043e201a2120a30ef45adecacce36447237e218f8cf3ad48357e82cae6aeea7df465573854cb22290801122504083e20fd6187d3aeb814e2a15d3987fd093b63aae12f9a04ba1c871e8a06c9f85a710b2022290801122508163e2064cfaa6db5902310f5d7255b7e8733f455699291f73d3988a17dc47348d63323202229080112250a2a3e2090d36297ce6f62cdb1110921e2482c20cd630e2817648bb0b95a42ce9fe081a420222b080112040c403e201a2120a8753c7dfe3f41e2bb9936721c8e0547e2c74a46a6d25c3f144d784204ceb86e1ace020a2e03ade4a5f5803a439835c636395a8d648dee57b2fc90d98dc17fa887159b69638b636f6e74726163745f696e666f12367b22636f6e7472616374223a226372617465732e696f3a63772d6962632d636f7265222c2276657273696f6e223a22302e312e30227d1a0b0801180120012a0300021422290801122502043e205a76cca2d1f3103d95080d98bf27abb862829151eb227f6be56d3dc8990d47182022290801122504083e20fd6187d3aeb814e2a15d3987fd093b63aae12f9a04ba1c871e8a06c9f85a710b2022290801122508163e2064cfaa6db5902310f5d7255b7e8733f455699291f73d3988a17dc47348d63323202229080112250a2a3e2090d36297ce6f62cdb1110921e2482c20cd630e2817648bb0b95a42ce9fe081a420222b080112040c403e201a2120a8753c7dfe3f41e2bb9936721c8e0547e2c74a46a6d25c3f144d784204ceb86e0a84010a81010a047761736d122031710a6b9c07bb7f1d7816f5b76f65d48e53ea30ad6d8138322f31374e8733321a090801180120012a0100222508011221011107704879ce264af2b8ca54a7ad461538067d296f22b7de0482e4fdf43314b922250801122101efb0c2cf8ed06dea231b3f0f26942e24623f13012e6297b343e7e1afc3863d6d") - -// var op commitmenttypes.MerkleProof -// err := proto.Unmarshal(v, &op) -// assert.NoError(t, err) - -// for i, v := range op.Proofs { -// fmt.Printf("index %d \n ", i) -// fmt.Printf("existence proof %x : \n ", v.GetExist()) -// fmt.Printf("Non-existence proof %x : \n", v.GetNonexist()) - -// } - -// // err = op.VerifyMembership([]*ics23.ProofSpec{ics23.IavlSpec, ics23.TendermintSpec}, root, path, result.Response.Value) -// assert.NoError(t, err) - -// } - -// func TestCommitmentKey(t *testing.T) { -// fmt.Printf("%x \n ", common.GetConnectionCommitmentKey("connection-0")) - -// } - -// func TestGetProofTendermint(t *testing.T) { - -// ctx := context.Background() -// contractAddr := "archway17p9rzwnnfxcjp32un9ug7yhhzgtkhvl9jfksztgw5uh69wac2pgssf05p7" -// pro, err := GetProvider(ctx, contractAddr, true) -// assert.NoError(t, err) - -// archwayP := pro.(*WasmProvider) - -// connectionKey := common.GetConnectionCommitmentKey("connection-3") - -// connStorageKey := fmt.Sprintf("%s%x", getKey(STORAGEKEY__Commitments), connectionKey) -// hexStrkey, err := hex.DecodeString(connStorageKey) -// assert.NoError(t, err) -// fmt.Printf("the main key is %x \n ", hexStrkey) - -// proofConnBytes, err := archwayP.QueryWasmProof(ctx, hexStrkey, int64(2273)) - -// var op icn.MerkleProof -// err = proto.Unmarshal(proofConnBytes, &op) -// assert.NoError(t, err) -// for ind, xx := range op.Proofs { -// fmt.Println("index ", ind) -// fmt.Printf("Get Exist %x \n", xx.GetExist()) -// fmt.Printf("non ExistP %x \n", xx.GetNonexist()) -// } - -// } - -// func TestVerifyMembership(t *testing.T) { - -// ctx := context.Background() -// contractAddr := "archway10qt8wg0n7z740ssvf3urmvgtjhxpyp74hxqvqt7z226gykuus7eqzla6h5" -// pro, err := GetProvider(ctx, contractAddr, true) -// assert.NoError(t, err) -// archwayP := pro.(*WasmProvider) -// height := 410 - -// ibcAddr, err := sdk.AccAddressFromBech32(archwayP.PCfg.IbcHandlerAddress) -// assert.NoError(t, err) - -// connectionKey := common.GetConnectionCommitmentKey("connection-0") -// fmt.Printf("commitment key is %x \n ", connectionKey) - -// // map_key := []byte("state") -// map_key := []byte("commitments") -// keyV := fmt.Sprintf("03%x000b%x%x", ibcAddr.Bytes(), map_key, connectionKey) -// // keyV := fmt.Sprintf("03%x00077374617465", ibcAddr.Bytes()) -// key, _ := hex.DecodeString(keyV) -// fmt.Printf("contract Address %x \n ", ibcAddr.Bytes()) - -// key1, err := hex.DecodeString(fmt.Sprintf("%s%x", archwayP.CommitmentPrefix().KeyPrefix, common.GetConnectionCommitmentKey("connection-0"))) -// assert.NoError(t, err) - -// fmt.Printf("%x", key1) -// assert.Equal(t, key, key1) - -// req := abci.RequestQuery{ -// Path: fmt.Sprintf("store/wasm/key"), -// Data: key, -// Prove: true, -// Height: int64(height), -// } - -// ibcH, err := archwayP.QueryIBCHeader(ctx, req.Height+1) -// assert.NoError(t, err) -// header := ibcH.(WasmIBCHeader) - -// path := commitmenttypes.MerklePath{KeyPath: []string{ -// "wasm", -// string(key), -// }} - -// fmt.Println("path is ", string(key)) - -// opts := rpcclient.ABCIQueryOptions{ -// Height: req.Height, -// Prove: req.Prove, -// } -// result, err := archwayP.RPCClient.ABCIQueryWithOptions(ctx, req.Path, req.Data, opts) -// assert.NoError(t, err) - -// root := commitmenttypes.MerkleRoot{Hash: header.SignedHeader.Header.AppHash} - -// rootMarshalled, _ := proto.Marshal(&root) - -// fmt.Printf("proto marshalled root %x \n", rootMarshalled) - -// proof, err := commitmenttypes.ConvertProofs(result.Response.ProofOps) -// assert.NoError(t, err) - -// fmt.Printf("value %x \n ", result.Response.Value) -// proofProtoMarshal, err := proto.Marshal(&proof) -// fmt.Printf("proof %x \n ", proofProtoMarshal) - -// err = proof.VerifyMembership([]*ics23.ProofSpec{ics23.IavlSpec, ics23.TendermintSpec}, root, path, result.Response.Value) -// assert.NoError(t, err) -// if err != nil { -// fmt.Println("failed to verify Memebership ", err) -// } -// } - -// func TestVerifyMembershipTestCC(t *testing.T) { - -// ctx := context.Background() -// contractAddr := "archway1999u8suptza3rtxwk7lspve02m406xe7l622erg3np3aq05gawxsk8g4pd" //START CONTRACT -// // contractAddr := "archway10qt8wg0n7z740ssvf3urmvgtjhxpyp74hxqvqt7z226gykuus7eqzla6h5" -// pro, err := GetProvider(ctx, contractAddr, true) -// assert.NoError(t, err) -// archwayP := pro.(*WasmProvider) - -// ibcAddr, err := sdk.AccAddressFromBech32(archwayP.PCfg.IbcHandlerAddress) -// assert.NoError(t, err) - -// // map_key := []byte("state") -// map_key := []byte("test_maphello") -// keyV := fmt.Sprintf("03%x0008%x", ibcAddr.Bytes(), map_key) -// // keyV := fmt.Sprintf("03%x00077374617465", ibcAddr.Bytes()) -// key, _ := hex.DecodeString(keyV) -// fmt.Printf("contract Address %x \n ", ibcAddr.Bytes()) - -// fmt.Printf("the main key is : %x \n", map_key) - -// req := abci.RequestQuery{ -// Path: fmt.Sprintf("store/wasm/key"), -// Data: key, -// Prove: true, -// Height: 17084, -// } - -// path := commitmenttypes.MerklePath{KeyPath: []string{ -// "wasm", -// string(key), -// }} - -// opts := rpcclient.ABCIQueryOptions{ -// Height: req.Height, -// Prove: req.Prove, -// } -// result, err := archwayP.RPCClient.ABCIQueryWithOptions(ctx, req.Path, req.Data, opts) -// assert.NoError(t, err) - -// rootB, _ := hex.DecodeString("7526C2B51C1FDCCD86BD4FAB4F0AF762242C50B321829B11D04E81B52DB83BBF") -// root := commitmenttypes.MerkleRoot{Hash: rootB} - -// rootMarshalled, _ := proto.Marshal(&root) - -// fmt.Printf("proto marshalled root %x \n", rootMarshalled) - -// proof, err := commitmenttypes.ConvertProofs(result.Response.ProofOps) -// assert.NoError(t, err) - -// fmt.Println("value \n ", result.Response.Value) -// proofProtoMarshal, err := proto.Marshal(&proof) -// fmt.Printf("proof %x \n ", proofProtoMarshal) - -// err = proof.VerifyMembership([]*ics23.ProofSpec{ics23.IavlSpec, ics23.TendermintSpec}, root, path, result.Response.Value) -// assert.NoError(t, err) -// if err != nil { -// fmt.Println("failed to verify Memebership ", err) -// } -// } - func TestGenRoot(t *testing.T) { rootB, _ := hex.DecodeString("99306EBA529FB6416B0984146B97C9C76386F226E9541A47197FA7ADA530EDA3") @@ -797,48 +178,11 @@ func TestGenRoot(t *testing.T) { } -func TestStringToHex(t *testing.T) { - - // type YY struct { - // Req []byte - // } - - // b := "5b332c3234322c3232362c3136322c38322c3231392c3131382c3231302c3130302c3139352c35312c3130382c33302c35382c3130372c37362c3130312c3232332c332c37322c3231312c32372c302c31302c3135372c3135362c3235312c3234312c3235342c38382c3233332c3230375d" - // y, _ := hex.DecodeString(b) - - // x := []byte(fmt.Sprintf(`{"req":%s}`, y)) - // var a YY - // err := json.Unmarshal(x, &a) - // assert.NoError(t, err) - // fmt.Printf("%x", a.Req) - - var byteArray []byte - str := "[3,242,226,162,82,219,118,210,100,195,51,108,30,58,107,76,101,223,3,72,211,27,0,10,157,156,251,241,254,88,233,207]" - - err := json.Unmarshal([]byte(str), &byteArray) - if err != nil { - fmt.Println(err) - } - - fmt.Printf("%x \n", byteArray) - -} - func TestProtoUnmarshal(t *testing.T) { val, _ := hex.DecodeString("080210021a110a046d6f636b12096368616e6e656c2d30220c636f6e6e656374696f6e2d302a0769637332302d31") var channelS chantypes.Channel err := proto.Unmarshal(val, &channelS) assert.NoError(t, err) assert.Equal(t, channelS.State, chantypes.State(2)) -} - -// func TestCommitmentPrefix(t *testing.T) { -// ctx := context.Background() -// p, _ := GetProvider(ctx, "archway13we0myxwzlpx8l5ark8elw5gj5d59dl6cjkzmt80c5q5cv5rt54quagxpp", true) - -// archwayP := p.(*WasmProvider) - -// _, err := archwayP.GetCommitmentPrefixFromContract(ctx) -// assert.NoError(t, err) -// } +} From 5507cc267b9f04e9d879bbc254493f2b50de0319 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 15:43:54 +0545 Subject: [PATCH 13/18] fix: run query cycle only when required --- relayer/chains/wasm/query.go | 2 +- relayer/chains/wasm/wasm_chain_processor.go | 56 +++++++-------------- 2 files changed, 18 insertions(+), 40 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index dafe780bd..4143e6099 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -1003,7 +1003,7 @@ func (ap *WasmProvider) GetBlockInfoList( blockInfoList = append( blockInfoList, BlockInfo{ - Height: fromHeight, + Height: toHeight, Messages: []ibcMessage{}, }, ) diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index f8a9d8a36..01f11d6da 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -264,17 +264,27 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint ticker := time.NewTicker(persistence.minQueryLoopDuration) defer ticker.Stop() + latestHeight := persistence.latestHeight + for { - if err := ccp.queryCycle(ctx, &persistence); err != nil { - return err - } select { case <-ctx.Done(): return nil case <-ccp.heightSnapshotChan: ccp.SnapshotHeight(ccp.getHeightToSave(persistence.latestHeight)) case <-ticker.C: - ticker.Reset(persistence.minQueryLoopDuration) + status, err := ccp.chainProvider.QueryStatus(ctx) + if err != nil { + ccp.log.Error("failed to query node status", zap.Error(err)) + } else { + latestHeight = status.SyncInfo.LatestBlockHeight + } + if latestHeight > persistence.latestQueriedBlock { + persistence.latestHeight = latestHeight + if err := ccp.queryCycle(ctx, &persistence); err != nil { + return err + } + } } } } @@ -338,23 +348,6 @@ func (ccp *WasmChainProcessor) initializeChannelState(ctx context.Context) error } func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence) error { - status, err := ccp.nodeStatusWithRetry(ctx) - if err != nil { - // don't want to cause WasmChainProcessor to quit here, can retry again next cycle. - ccp.log.Error( - "Failed to query node status after max attempts", - zap.Uint("attempts", latestHeightQueryRetries), - zap.Error(err), - ) - - // TODO: Save height when node status is false? - // ccp.SnapshotHeight(ccp.getHeightToSave(status.SyncInfo.LatestBlockHeight)) - return nil - } - - persistence.latestHeight = status.SyncInfo.LatestBlockHeight - // ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version) - if ccp.metrics != nil { ccp.CollectMetrics(ctx, persistence) } @@ -388,20 +381,9 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer var latestHeader provider.IBCHeader fromHeight := persistence.latestQueriedBlock + 1 - toHeight := func() int64 { - if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { - return persistence.latestQueriedBlock + MaxBlockFetch - } - return persistence.latestHeight - }() - - if fromHeight > toHeight { - fromHeight = toHeight - } - - // avoid duplicate query - if fromHeight <= persistence.latestQueriedBlock { - return nil + toHeight := persistence.latestHeight + if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { + toHeight = persistence.latestQueriedBlock + MaxBlockFetch } blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) @@ -454,10 +436,6 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer ibcHeaderCache[blockInfo.Height] = ibcHeader } - if newLatestQueriedBlock == persistence.latestQueriedBlock { - return nil - } - if !ppChanged { if firstTimeInSync { for _, pp := range ccp.pathProcessors { From 1d74ee88044713d02a63d32e807a2aa7ec3580af Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 17:03:49 +0545 Subject: [PATCH 14/18] fix: failed docker build --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5721a5afa..5d8cf7a49 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,7 +4,7 @@ RUN apk add --update --no-cache make git musl-dev gcc binutils-gold cargo ARG BUILDPLATFORM=arm64 ARG TARGETPLATFORM=arm64 -ARG COSMWASM_VERSION=1.2.3 +ARG COSMWASM_VERSION=1.5.0 RUN wget https://github.com/CosmWasm/wasmvm/releases/download/v${COSMWASM_VERSION}/libwasmvm_muslc.aarch64.a -O /usr/lib/libwasmvm.aarch64.a && \ wget https://github.com/CosmWasm/wasmvm/releases/download/v${COSMWASM_VERSION}/libwasmvm_muslc.x86_64.a -O /usr/lib/libwasmvm.x86_64.a @@ -50,7 +50,7 @@ RUN ln sh pwd && \ rm ln rm # Install chain binaries -COPY --from=build-env /bin/rly /bin +COPY --from=build-env /go/bin/rly /bin # Install trusted CA certificates COPY --from=busybox-min /etc/ssl/cert.pem /etc/ssl/cert.pem From ae7173a9218df965b5dfbd2b92fd9d415dcf2350 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Fri, 12 Jul 2024 18:49:03 +0545 Subject: [PATCH 15/18] fix: event filter --- relayer/chains/wasm/query.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 4143e6099..68ac916c5 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -931,7 +931,7 @@ func (ap *WasmProvider) GetBlockInfoList( ibcHandlerAddr := ap.PCfg.IbcHandlerAddress txsParam := txSearchParam{ - query: fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND wasm._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr), + query: fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND execute._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr), page: 1, perPage: 100, orderBy: "asc", From 0c8170f1aab2b68efe765a056d2d92c2063fa6fa Mon Sep 17 00:00:00 2001 From: sherpalden Date: Wed, 17 Jul 2024 17:09:37 +0545 Subject: [PATCH 16/18] fix: add test for tx_search --- relayer/chains/wasm/provider_test.go | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/relayer/chains/wasm/provider_test.go b/relayer/chains/wasm/provider_test.go index 4ad06a5ae..3a7ebd992 100644 --- a/relayer/chains/wasm/provider_test.go +++ b/relayer/chains/wasm/provider_test.go @@ -7,6 +7,9 @@ import ( "fmt" "path/filepath" "testing" + "time" + + cosmosclient "github.com/cosmos/cosmos-sdk/client" codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/gogoproto/proto" @@ -186,3 +189,28 @@ func TestProtoUnmarshal(t *testing.T) { assert.Equal(t, channelS.State, chantypes.State(2)) } + +func TestTxSearch(t *testing.T) { + rpcNode := "https://1rpc.io:443/inj-rpc" + rpc, err := cosmosclient.NewClientFromNode(rpcNode) + assert.NoError(t, err) + + prove := true + page := 1 + perPage := 100 + orderBy := "asc" + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) + defer cancel() + res, err := rpc.TxSearch( + ctx, + "tx.height>=78328133 AND tx.height<=78328139", + prove, + &page, + &perPage, + orderBy, + ) + assert.NoError(t, err) + + fmt.Printf("\nTx Results: %+v\n", res.Txs) +} From 95e0d5e4265e1e4481bdd2c799a050267ef31a93 Mon Sep 17 00:00:00 2001 From: sherpalden Date: Wed, 17 Jul 2024 18:23:48 +0545 Subject: [PATCH 17/18] fix: fetch batch txns based on config flag --- relayer/chains/wasm/provider.go | 1 + relayer/chains/wasm/query.go | 82 ++++++++++++++++++++++----------- 2 files changed, 57 insertions(+), 26 deletions(-) diff --git a/relayer/chains/wasm/provider.go b/relayer/chains/wasm/provider.go index 0fae2a4bd..7d014d755 100644 --- a/relayer/chains/wasm/provider.go +++ b/relayer/chains/wasm/provider.go @@ -71,6 +71,7 @@ type WasmProviderConfig struct { FirstRetryBlockAfter uint64 `json:"first-retry-block-after" yaml:"first-retry-block-after"` StartHeight uint64 `json:"start-height" yaml:"start-height"` BlockInterval uint64 `json:"block-interval" yaml:"block-interval"` + HeightRangeTxSearch bool `json:"height-range-tx-search" yaml:"height-range-tx-search"` } type WasmIBCHeader struct { diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index 68ac916c5..c7c275ce9 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -14,6 +14,7 @@ import ( "github.com/avast/retry-go/v4" abci "github.com/cometbft/cometbft/abci/types" rpcclient "github.com/cometbft/cometbft/rpc/client" + coretypes "github.com/cometbft/cometbft/rpc/core/types" tmtypes "github.com/cometbft/cometbft/types" "github.com/cosmos/cosmos-sdk/codec" sdk "github.com/cosmos/cosmos-sdk/types" @@ -928,35 +929,21 @@ func (ap *WasmProvider) GetBlockInfoList( ctx context.Context, fromHeight, toHeight uint64, ) ([]BlockInfo, error) { - ibcHandlerAddr := ap.PCfg.IbcHandlerAddress - - txsParam := txSearchParam{ - query: fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND execute._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr), - page: 1, - perPage: 100, - orderBy: "asc", - } - - txsResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) - if err != nil { - return nil, err - } - - txs := txsResult.Txs - - if txsResult.TotalCount > txsParam.perPage { - totalPages := txsResult.TotalCount / txsParam.page - if txsResult.TotalCount%txsParam.page != 0 { - totalPages += 1 - } - for i := 2; i <= totalPages; i++ { - txsParam.page = i - nextResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + txs := []*coretypes.ResultTx{} + if !ap.PCfg.HeightRangeTxSearch { + for h := fromHeight; h <= toHeight; h++ { + txList, err := ap.FetchTxs(ctx, h, h) if err != nil { return nil, err } - txs = append(txs, nextResult.Txs...) + txs = append(txs, txList...) } + } else { + txList, err := ap.FetchTxs(ctx, fromHeight, toHeight) + if err != nil { + return nil, err + } + txs = append(txs, txList...) } blockMessages := map[uint64][]ibcMessage{} @@ -969,7 +956,7 @@ func (ap *WasmProvider) GetBlockInfoList( txResult.TxResult.Events, ap.ChainId(), uint64(txResult.Height), - ibcHandlerAddr, + ap.PCfg.IbcHandlerAddress, ap.cometLegacyEncoding, ) if len(messages) > 0 { @@ -1012,3 +999,46 @@ func (ap *WasmProvider) GetBlockInfoList( return blockInfoList, nil } + +func (ap *WasmProvider) FetchTxs( + ctx context.Context, + fromHeight, toHeight uint64, +) ([]*coretypes.ResultTx, error) { + ibcHandlerAddr := ap.PCfg.IbcHandlerAddress + + query := fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND execute._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr) + if fromHeight == toHeight { + query = fmt.Sprintf("tx.height=%d AND execute._contract_address='%s'", fromHeight, ibcHandlerAddr) + } + + txsParam := txSearchParam{ + query: query, + page: 1, + perPage: 100, + orderBy: "asc", + } + + txsResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + if err != nil { + return nil, err + } + + txs := txsResult.Txs + + if txsResult.TotalCount > txsParam.perPage { + totalPages := txsResult.TotalCount / txsParam.page + if txsResult.TotalCount%txsParam.page != 0 { + totalPages += 1 + } + for i := 2; i <= totalPages; i++ { + txsParam.page = i + nextResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) + if err != nil { + return nil, err + } + txs = append(txs, nextResult.Txs...) + } + } + + return txs, nil +} From b370df3c6e7810a35dcb1c8fdbe1edad651025aa Mon Sep 17 00:00:00 2001 From: sherpalden Date: Wed, 31 Jul 2024 14:49:04 +0545 Subject: [PATCH 18/18] test: push to save --- relayer/chains/wasm/query.go | 76 +++++---- relayer/chains/wasm/wasm_chain_processor.go | 172 ++++++++++++++++++-- 2 files changed, 203 insertions(+), 45 deletions(-) diff --git a/relayer/chains/wasm/query.go b/relayer/chains/wasm/query.go index c7c275ce9..667302fc3 100644 --- a/relayer/chains/wasm/query.go +++ b/relayer/chains/wasm/query.go @@ -925,27 +925,10 @@ type txSearchParam struct { prove bool } -func (ap *WasmProvider) GetBlockInfoList( - ctx context.Context, +func (ap *WasmProvider) processBlocksTxs( fromHeight, toHeight uint64, -) ([]BlockInfo, error) { - txs := []*coretypes.ResultTx{} - if !ap.PCfg.HeightRangeTxSearch { - for h := fromHeight; h <= toHeight; h++ { - txList, err := ap.FetchTxs(ctx, h, h) - if err != nil { - return nil, err - } - txs = append(txs, txList...) - } - } else { - txList, err := ap.FetchTxs(ctx, fromHeight, toHeight) - if err != nil { - return nil, err - } - txs = append(txs, txList...) - } - + txs []*coretypes.ResultTx, +) []BlockInfo { blockMessages := map[uint64][]ibcMessage{} for _, txResult := range txs { if txResult.TxResult.Code != 0 { @@ -997,18 +980,51 @@ func (ap *WasmProvider) GetBlockInfoList( } } - return blockInfoList, nil + return blockInfoList } -func (ap *WasmProvider) FetchTxs( +func (ap *WasmProvider) GetBlockInfoStream(ctx context.Context, fromHeight uint64) <-chan []BlockInfo { + blockInfoStream := make(chan []BlockInfo) + go func() { + defer close(blockInfoStream) + ticker := time.NewTicker(5 * time.Second) + startHeight := fromHeight + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + txs, endHeight, err := ap.fetchTxs(ctx, startHeight) + if err != nil { + ap.log.Error("failed to fetch txns", zap.Uint64("start-height", startHeight), zap.Error(err)) + break + } + blockInfolist := ap.processBlocksTxs(startHeight, endHeight, txs) + blockInfoStream <- blockInfolist + startHeight = endHeight + 1 + } + } + }() + return blockInfoStream +} + +func (ap *WasmProvider) fetchTxs( ctx context.Context, - fromHeight, toHeight uint64, -) ([]*coretypes.ResultTx, error) { + startHeight uint64, +) ([]*coretypes.ResultTx, uint64, error) { ibcHandlerAddr := ap.PCfg.IbcHandlerAddress + query := fmt.Sprintf("tx.height=%d AND execute._contract_address='%s'", startHeight, ibcHandlerAddr) - query := fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND execute._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr) - if fromHeight == toHeight { - query = fmt.Sprintf("tx.height=%d AND execute._contract_address='%s'", fromHeight, ibcHandlerAddr) + endHeight := startHeight + if ap.PCfg.HeightRangeTxSearch { + query = fmt.Sprintf("tx.height>=%d AND execute._contract_address='%s'", startHeight, ibcHandlerAddr) + + status, err := ap.QueryStatus(ctx) + if err != nil { + return nil, endHeight, fmt.Errorf("failed to query latest block height: %w", err) + } else { + endHeight = uint64(status.SyncInfo.LatestBlockHeight) + } } txsParam := txSearchParam{ @@ -1020,7 +1036,7 @@ func (ap *WasmProvider) FetchTxs( txsResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) if err != nil { - return nil, err + return nil, endHeight, err } txs := txsResult.Txs @@ -1034,11 +1050,11 @@ func (ap *WasmProvider) FetchTxs( txsParam.page = i nextResult, err := ap.RPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy) if err != nil { - return nil, err + return nil, endHeight, err } txs = append(txs, nextResult.Txs...) } } - return txs, nil + return txs, endHeight, nil } diff --git a/relayer/chains/wasm/wasm_chain_processor.go b/relayer/chains/wasm/wasm_chain_processor.go index 01f11d6da..8ae927a70 100644 --- a/relayer/chains/wasm/wasm_chain_processor.go +++ b/relayer/chains/wasm/wasm_chain_processor.go @@ -289,6 +289,51 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint } } +func (ccp *WasmChainProcessor) RunV1(ctx context.Context, _ uint64) error { + var eg errgroup.Group + eg.Go(func() error { + return ccp.initializeConnectionState(ctx) + }) + eg.Go(func() error { + return ccp.initializeChannelState(ctx) + }) + if err := eg.Wait(); err != nil { + return err + } + + lastQueriedHeight := ccp.StartFromHeight(ctx) + + status, err := ccp.nodeStatusWithRetry(ctx) + if err != nil { + return fmt.Errorf("failed to query node status for latest height: %w", err) + } + latestHeight := status.SyncInfo.LatestBlockHeight + + var startHeight uint64 + if lastQueriedHeight > 0 && lastQueriedHeight < latestHeight { + startHeight = uint64(lastQueriedHeight) + } else { + startHeight = uint64(latestHeight) + } + + ccp.log.Info("started block query", zap.Uint64("start-height", startHeight)) + + blockInfoStream := ccp.chainProvider.GetBlockInfoStream(ctx, startHeight) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case blockInfoList := <-blockInfoStream: + status, err := ccp.nodeStatusWithRetry(ctx) + if err != nil { + return fmt.Errorf("failed to query node status for latest height: %w", err) + } + ccp.handleNewBlocks(ctx, blockInfoList, uint64(status.SyncInfo.LatestBlockHeight)) + } + } +} + // initializeConnectionState will bootstrap the connectionStateCache with the open connection state. func (ccp *WasmChainProcessor) initializeConnectionState(ctx context.Context) error { // ctx, cancel := context.WithTimeout(ctx, queryTimeout) @@ -347,6 +392,100 @@ func (ccp *WasmChainProcessor) initializeChannelState(ctx context.Context) error return nil } +func (ccp *WasmChainProcessor) handleNewBlocks(ctx context.Context, blockInfoList []BlockInfo, latestHeight uint64) error { + var latestHeader provider.IBCHeader + ibcHeaderCache := make(processor.IBCHeaderCache) + ibcMessagesCache := processor.NewIBCMessagesCache() + + for _, blockInfo := range blockInfoList { + ccp.log.Debug( + "Queried block", + zap.Uint64("height", blockInfo.Height), + zap.Uint64("latest", latestHeight), + zap.Uint64("delta", latestHeight-blockInfo.Height), + ) + + var ibcHeader provider.IBCHeader + if err := retry.Do(func() error { + var err error + ibcHeader, _, err = ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height)) + if err != nil { + return err + } + return nil + }, retry.Context(ctx), rtyAtt, retry.Delay(2*time.Second), rtyErr); err != nil { + ccp.log.Error( + "failed to query ibc header ", + zap.Error(err), + zap.Uint64("height", blockInfo.Height), + zap.Uint("total-attempts", rtyAttNum), + ) + return nil //exit and rerun the query cycle from current state + } + + for _, m := range blockInfo.Messages { + ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", blockInfo.Height)) + ccp.handleMessage(ctx, m, ibcMessagesCache) + } + + ccp.latestBlock = provider.LatestBlock{ + Height: blockInfo.Height, + } + latestHeader = ibcHeader + ibcHeaderCache[blockInfo.Height] = ibcHeader + } + + lastQueriedHeight := blockInfoList[len(blockInfoList)-1].Height + firstTimeInSync := false + if !ccp.inSync { + if (latestHeight - lastQueriedHeight) < inSyncNumBlocksThreshold { + ccp.inSync = true + firstTimeInSync = true + ccp.log.Info("Chain is in sync", + zap.Uint64("last-queried-block", lastQueriedHeight), + zap.Uint64("latest-height", latestHeight), + ) + } else { + ccp.log.Info("Chain is not yet in sync", + zap.Uint64("last-queried-block", lastQueriedHeight), + zap.Uint64("latest-height", latestHeight), + ) + } + } + + if firstTimeInSync { + for _, pp := range ccp.pathProcessors { + pp.ProcessBacklogIfReady() + } + } + + chainID := ccp.chainProvider.ChainId() + for _, pp := range ccp.pathProcessors { + clientID := pp.RelevantClientID(chainID) + clientState, err := ccp.clientState(ctx, clientID) + if err != nil { + ccp.log.Error("Error fetching client state", + zap.String("client_id", clientID), + zap.Error(err), + ) + continue + } + + pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ + LatestBlock: ccp.latestBlock, + LatestHeader: latestHeader, + IBCMessagesCache: ibcMessagesCache.Clone(), + InSync: ccp.inSync, + ClientState: clientState, + ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID), + ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients), + IBCHeaderCache: ibcHeaderCache.Clone(), + }) + } + + return nil +} + func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *queryCyclePersistence) error { if ccp.metrics != nil { ccp.CollectMetrics(ctx, persistence) @@ -380,22 +519,24 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer chainID := ccp.chainProvider.ChainId() var latestHeader provider.IBCHeader - fromHeight := persistence.latestQueriedBlock + 1 - toHeight := persistence.latestHeight - if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { - toHeight = persistence.latestQueriedBlock + MaxBlockFetch - } + // fromHeight := persistence.latestQueriedBlock + 1 + // toHeight := persistence.latestHeight + // if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch { + // toHeight = persistence.latestQueriedBlock + MaxBlockFetch + // } - blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) - if err != nil { - ccp.log.Error( - "failed to query block messages", - zap.Uint64("from-height", uint64(fromHeight)), - zap.Uint64("to-height", uint64(toHeight)), - zap.Error(err), - ) - return nil - } + // blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight)) + // if err != nil { + // ccp.log.Error( + // "failed to query block messages", + // zap.Uint64("from-height", uint64(fromHeight)), + // zap.Uint64("to-height", uint64(toHeight)), + // zap.Error(err), + // ) + // return nil + // } + + blockInfos := []BlockInfo{} for _, blockInfo := range blockInfos { ccp.log.Debug( @@ -407,6 +548,7 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer var ibcHeader provider.IBCHeader if err := retry.Do(func() error { + var err error ibcHeader, _, err = ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height)) if err != nil { return err