Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve wasm block sync efficiency #226

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions relayer/chains/wasm/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/CosmWasm/wasmd/app"
provtypes "github.com/cometbft/cometbft/light/provider"
"github.com/cometbft/cometbft/rpc/client/http"
comettypes "github.com/cometbft/cometbft/types"
"golang.org/x/mod/semver"

Expand Down Expand Up @@ -278,6 +279,7 @@ type WasmProvider struct {
Keybase keyring.Keyring
KeyringOptions []keyring.Option
RPCClient rpcclient.Client
CometRPCClient *http.HTTP
QueryClient wasmtypes.QueryClient
LightProvider provtypes.Provider
Cdc Codec
Expand Down Expand Up @@ -351,6 +353,12 @@ func (ap *WasmProvider) Init(ctx context.Context) error {
}
ap.RPCClient = rpcClient

cometRPCClient, err := http.New(ap.PCfg.RPCAddr, "/websocket")
bcsainju marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
ap.CometRPCClient = cometRPCClient

lightprovider, err := prov.New(ap.PCfg.ChainID, ap.PCfg.RPCAddr)
if err != nil {
return err
Expand Down
89 changes: 89 additions & 0 deletions relayer/chains/wasm/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,3 +910,92 @@ func (ap *WasmProvider) QueryClientPrevConsensusStateHeight(ctx context.Context,
}
return clienttypes.Height{RevisionNumber: 0, RevisionHeight: uint64(heights[0])}, nil
}

type BlockInfo struct {
Height uint64
Messages []ibcMessage
}

type txSearchParam struct {
page int
perPage int
orderBy string
query string
prove bool
}

func (ap *WasmProvider) GetBlockInfoList(
ctx context.Context,
fromHeight, toHeight uint64,
) ([]BlockInfo, error) {
ibcHandlerAddr := ap.PCfg.IbcHandlerAddress

txsParam := txSearchParam{
query: fmt.Sprintf("tx.height>=%d AND tx.height<=%d AND wasm._contract_address='%s'", fromHeight, toHeight, ibcHandlerAddr),
page: 1,
perPage: 100,
orderBy: "asc",
}

txsResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy)
if err != nil {
return nil, err
}

txs := txsResult.Txs

if txsResult.TotalCount > txsParam.perPage {
totalPages := txsResult.TotalCount / txsParam.page
if txsResult.TotalCount%txsParam.page != 0 {
totalPages += 1
}
for i := 2; i <= totalPages; i++ {
txsParam.page = i
nextResult, err := ap.CometRPCClient.TxSearch(ctx, txsParam.query, txsParam.prove, &txsParam.page, &txsParam.perPage, txsParam.orderBy)
if err != nil {
return nil, err
}
txs = append(txs, nextResult.Txs...)
}
}

blockMessages := map[uint64][]ibcMessage{}
for _, txResult := range txs {
messages := ibcMessagesFromEvents(
bcsainju marked this conversation as resolved.
Show resolved Hide resolved
ap.log,
txResult.TxResult.Events,
ap.ChainId(),
uint64(txResult.Height),
ibcHandlerAddr,
ap.cometLegacyEncoding,
)
if len(messages) > 0 {
blockMessages[uint64(txResult.Height)] = append(blockMessages[uint64(txResult.Height)], messages...)
}
}

blockInfoList := []BlockInfo{}
for h := fromHeight; h <= toHeight; h++ {
if messages, ok := blockMessages[h]; ok {
blockInfoList = append(blockInfoList, BlockInfo{
Height: h,
Messages: messages,
})
}
}

if len(blockInfoList) == 0 {
blockInfoList = append(blockInfoList, BlockInfo{
Height: fromHeight,
Messages: []ibcMessage{},
})
if toHeight != fromHeight {
blockInfoList = append(blockInfoList, BlockInfo{
Height: toHeight,
Messages: []ibcMessage{},
})
}
}

return blockInfoList, nil
}
187 changes: 34 additions & 153 deletions relayer/chains/wasm/wasm_chain_processor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package wasm

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -18,7 +17,6 @@ import (
"github.com/cosmos/relayer/v2/relayer/provider"

ctypes "github.com/cometbft/cometbft/rpc/core/types"
"github.com/cometbft/cometbft/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -57,15 +55,9 @@ type WasmChainProcessor struct {
// parsed gas prices accepted by the chain (only used for metrics)
parsedGasPrices *sdk.DecCoins

verifier *Verifier

heightSnapshotChan chan struct{}
}

type Verifier struct {
Header *types.LightBlock
}

func NewWasmChainProcessor(log *zap.Logger, provider *WasmProvider, metrics *processor.PrometheusMetrics, heightSnapshot chan struct{}) *WasmChainProcessor {
return &WasmChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
Expand Down Expand Up @@ -256,19 +248,6 @@ func (ccp *WasmChainProcessor) Run(ctx context.Context, initialBlockHistory uint

ccp.log.Info("Start to query from height ", zap.Int64("height", latestQueriedBlock))

_, lightBlock, err := ccp.chainProvider.QueryLightBlock(ctx, persistence.latestQueriedBlock)
if err != nil {
ccp.log.Error("Failed to get ibcHeader",
zap.Int64("height", persistence.latestQueriedBlock),
zap.Any("error", err),
)
return err
}

ccp.verifier = &Verifier{
Header: lightBlock,
}

var eg errgroup.Group
eg.Go(func() error {
return ccp.initializeConnectionState(ctx)
Expand Down Expand Up @@ -387,7 +366,10 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
if (persistence.latestHeight - persistence.latestQueriedBlock) < inSyncNumBlocksThreshold {
ccp.inSync = true
firstTimeInSync = true
ccp.log.Info("Chain is in sync")
ccp.log.Info("Chain is in sync",
zap.Int64("latest_queried_block", persistence.latestQueriedBlock),
zap.Int64("latest_height", persistence.latestHeight),
)
} else {
ccp.log.Info("Chain is not yet in sync",
zap.Int64("latest_queried_block", persistence.latestQueriedBlock),
Expand All @@ -405,76 +387,56 @@ func (ccp *WasmChainProcessor) queryCycle(ctx context.Context, persistence *quer
chainID := ccp.chainProvider.ChainId()
var latestHeader provider.IBCHeader

syncUpHeight := func() int64 {
fromHeight := persistence.latestQueriedBlock + 1
toHeight := func() int64 {
if persistence.latestHeight-persistence.latestQueriedBlock > MaxBlockFetch {
return persistence.latestQueriedBlock + MaxBlockFetch
}
return persistence.latestHeight
}()

if fromHeight > toHeight {
fromHeight = toHeight
}

for i := persistence.latestQueriedBlock + 1; i <= syncUpHeight(); i++ {
var eg errgroup.Group
var blockRes *ctypes.ResultBlockResults
var lightBlock *types.LightBlock
var h provider.IBCHeader
i := i
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, blockResultsQueryTimeout)
defer cancelQueryCtx()
blockRes, err = ccp.chainProvider.RPCClient.BlockResults(queryCtx, &i)
return err
})
eg.Go(func() (err error) {
queryCtx, cancelQueryCtx := context.WithTimeout(ctx, queryTimeout)
defer cancelQueryCtx()
h, lightBlock, err = ccp.chainProvider.QueryLightBlock(queryCtx, i)
return err
})
if fromHeight <= persistence.latestQueriedBlock {
return nil
}

if err := eg.Wait(); err != nil {
ccp.log.Warn("Error querying block data", zap.Error(err))
break
}
blockInfos, err := ccp.chainProvider.GetBlockInfoList(ctx, uint64(fromHeight), uint64(toHeight))
bcsainju marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
ccp.log.Error("failed to query block messages", zap.Error(err))
return nil
}

for _, blockInfo := range blockInfos {
ccp.log.Debug(
"Queried block",
zap.Int64("height", i),
zap.Uint64("height", blockInfo.Height),
zap.Int64("latest", persistence.latestHeight),
zap.Int64("delta", persistence.latestHeight-i),
zap.Int64("delta", persistence.latestHeight-int64(blockInfo.Height)),
)

latestHeader = h

if err := ccp.Verify(ctx, lightBlock); err != nil {
ccp.log.Warn("Failed to verify block", zap.Int64("height", blockRes.Height), zap.Error(err))
return err
}

heightUint64 := uint64(i)
ppChanged = true

ccp.latestBlock = provider.LatestBlock{
Height: heightUint64,
Height: blockInfo.Height,
}

ibcHeaderCache[heightUint64] = latestHeader
ppChanged = true

base64Encoded := ccp.chainProvider.cometLegacyEncoding

for _, tx := range blockRes.TxsResults {
if tx.Code != 0 {
// tx was not successful
continue
}
messages := ibcMessagesFromEvents(ccp.log, tx.Events, chainID, heightUint64, ccp.chainProvider.PCfg.IbcHandlerAddress, base64Encoded)
ibcHeader, _, err := ccp.chainProvider.QueryLightBlock(ctx, int64(blockInfo.Height))
if err != nil {
ccp.log.Error("failed to query ibc header ", zap.Error(err), zap.Uint64("height", blockInfo.Height))
} else {
ibcHeaderCache[blockInfo.Height] = ibcHeader
}

for _, m := range messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", heightUint64))
ccp.handleMessage(ctx, m, ibcMessagesCache)
}
for _, m := range blockInfo.Messages {
ccp.log.Info("Detected eventlog", zap.String("eventlog", m.eventType), zap.Uint64("height", blockInfo.Height))
ccp.handleMessage(ctx, m, ibcMessagesCache)
}

newLatestQueriedBlock = i
newLatestQueriedBlock = int64(blockInfo.Height)
latestHeader = ibcHeader
}

if newLatestQueriedBlock == persistence.latestQueriedBlock {
Expand Down Expand Up @@ -550,87 +512,6 @@ func (ccp *WasmChainProcessor) CurrentBlockHeight(ctx context.Context, persisten
ccp.metrics.SetLatestHeight(ccp.chainProvider.ChainId(), persistence.latestHeight)
}

func (ccp *WasmChainProcessor) Verify(ctx context.Context, untrusted *types.LightBlock) error {

if untrusted.Height != ccp.verifier.Header.Height+1 {
return errors.New("headers must be adjacent in height")
}

if err := verifyNewHeaderAndVals(untrusted.SignedHeader,
untrusted.ValidatorSet,
ccp.verifier.Header.SignedHeader,
time.Now(), 0); err != nil {
return fmt.Errorf("failed to verify Header: %v", err)
}

if !bytes.Equal(untrusted.Header.ValidatorsHash, ccp.verifier.Header.NextValidatorsHash) {
err := fmt.Errorf("expected old header next validators (%X) to match those from new header (%X)",
ccp.verifier.Header.NextValidatorsHash,
untrusted.Header.ValidatorsHash,
)
return err
}

if !bytes.Equal(untrusted.Header.LastBlockID.Hash.Bytes(), ccp.verifier.Header.Commit.BlockID.Hash.Bytes()) {
err := fmt.Errorf("expected LastBlockId Hash (%X) of current header to match those from trusted Header BlockID hash (%X)",
ccp.verifier.Header.NextValidatorsHash,
untrusted.Header.ValidatorsHash,
)
return err
}

// Ensure that +2/3 of new validators signed correctly.
if err := untrusted.ValidatorSet.VerifyCommitLight(ccp.verifier.Header.ChainID, untrusted.Commit.BlockID,
untrusted.Header.Height, untrusted.Commit); err != nil {
return fmt.Errorf("invalid header: %v", err)
}

ccp.verifier.Header = untrusted
return nil

}

func verifyNewHeaderAndVals(
untrustedHeader *types.SignedHeader,
untrustedVals *types.ValidatorSet,
trustedHeader *types.SignedHeader,
now time.Time,
maxClockDrift time.Duration) error {

if err := untrustedHeader.ValidateBasic(trustedHeader.ChainID); err != nil {
return fmt.Errorf("untrustedHeader.ValidateBasic failed: %w", err)
}

if untrustedHeader.Height <= trustedHeader.Height {
return fmt.Errorf("expected new header height %d to be greater than one of old header %d",
untrustedHeader.Height,
trustedHeader.Height)
}

if !untrustedHeader.Time.After(trustedHeader.Time) {
return fmt.Errorf("expected new header time %v to be after old header time %v",
untrustedHeader.Time,
trustedHeader.Time)
}

if !untrustedHeader.Time.Before(now.Add(maxClockDrift)) {
return fmt.Errorf("new header has a time from the future %v (now: %v; max clock drift: %v)",
untrustedHeader.Time,
now,
maxClockDrift)
}

if !bytes.Equal(untrustedHeader.ValidatorsHash, untrustedVals.Hash()) {
return fmt.Errorf("expected new header validators (%X) to match those that were supplied (%X) at height %d",
untrustedHeader.ValidatorsHash,
untrustedVals.Hash(),
untrustedHeader.Height,
)
}

return nil
}

// func (ccp *WasmChainProcessor) CurrentRelayerBalance(ctx context.Context) {
// // memoize the current gas prices to only show metrics for "interesting" denoms
// if ccp.parsedGasPrices == nil {
Expand Down
Loading