Skip to content

Commit

Permalink
Merge pull request #403 from icon-project/fix/handle_cb_err
Browse files Browse the repository at this point in the history
fix: properly handle callback error while message routing
  • Loading branch information
sherpalden authored Oct 22, 2024
2 parents b9c0efd + 009bdfd commit e3dd880
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 87 deletions.
44 changes: 19 additions & 25 deletions relayer/chains/evm/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package evm

import (
"context"
"encoding/hex"
"fmt"
"strings"

Expand All @@ -27,26 +28,28 @@ func (p *Provider) Route(ctx context.Context, message *providerTypes.Message, ca
p.routerMutex.Lock()

p.log.Info("starting to route message",
zap.String("src", message.Src),
zap.String("dst", message.Dst),
zap.Any("sn", message.Sn),
zap.Any("req_id", message.ReqID),
zap.String("src", message.Src),
zap.String("event_type", message.EventType))
zap.String("event_type", message.EventType),
zap.String("data", hex.EncodeToString(message.Data)),
)

opts, err := p.GetTransationOpts(ctx)
if err != nil {
p.routerMutex.Unlock()
return fmt.Errorf("routing failed: %w", err)
return fmt.Errorf("failed to get transaction options: %w", err)
}

messageKey := message.MessageKey()

tx, err := p.SendTransaction(ctx, opts, message)
p.routerMutex.Unlock()
if err != nil {
return fmt.Errorf("routing failed: %w", err)
return fmt.Errorf("failed to send transaction: %w", err)
}
p.log.Info("transaction sent", zap.String("tx_hash", tx.Hash().String()), zap.Any("message", messageKey))
return p.WaitForTxResult(ctx, tx, messageKey, callback)

p.WaitForTxResult(ctx, tx, message.MessageKey(), callback)
return nil
}

func (p *Provider) SendTransaction(ctx context.Context, opts *bind.TransactOpts, message *providerTypes.Message) (*types.Transaction, error) {
Expand Down Expand Up @@ -116,35 +119,26 @@ func (p *Provider) SendTransaction(ctx context.Context, opts *bind.TransactOpts,
return tx, err
}

func (p *Provider) WaitForTxResult(ctx context.Context, tx *types.Transaction, m *providerTypes.MessageKey, callback providerTypes.TxResponseFunc) error {
if callback == nil {
// no point to wait for result if callback is nil
return nil
}

func (p *Provider) WaitForTxResult(ctx context.Context, tx *types.Transaction, m *providerTypes.MessageKey, callback providerTypes.TxResponseFunc) {
res := &providerTypes.TxResponse{
TxHash: tx.Hash().String(),
}

txReceipts, err := p.WaitForResults(ctx, tx)
if err != nil {
p.log.Error("failed to get tx result", zap.String("hash", res.TxHash), zap.Any("message", m), zap.Error(err))
callback(m, res, err)
return err
callback(m, res, fmt.Errorf("error waiting for tx result: %w", err))
return
}

res.Height = txReceipts.BlockNumber.Int64()

if txReceipts.Status != types.ReceiptStatusSuccessful {
err = fmt.Errorf("transaction failed to execute")
callback(m, res, err)
p.LogFailedTx(m, txReceipts, err)
return err
res.Code = providerTypes.Failed
callback(m, res, fmt.Errorf("transaction failed to execute: %+v", txReceipts.Logs))
} else {
res.Code = providerTypes.Success
callback(m, res, nil)
}
res.Code = providerTypes.Success
callback(m, res, nil)
p.LogSuccessTx(m, txReceipts)
return nil
}

func (p *Provider) LogSuccessTx(message *providerTypes.MessageKey, receipt *types.Receipt) {
Expand Down
40 changes: 19 additions & 21 deletions relayer/chains/icon/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package icon

import (
"context"
"encoding/hex"
"fmt"

"github.com/icon-project/centralized-relay/relayer/chains/icon/types"
Expand All @@ -13,10 +14,13 @@ import (

func (p *Provider) Route(ctx context.Context, message *providerTypes.Message, callback providerTypes.TxResponseFunc) error {
p.log.Info("starting to route message",
zap.String("src", message.Src),
zap.String("dst", message.Dst),
zap.Any("sn", message.Sn),
zap.Any("req_id", message.ReqID),
zap.String("src", message.Src),
zap.String("event_type", message.EventType))
zap.String("event_type", message.EventType),
zap.String("data", hex.EncodeToString(message.Data)),
)

iconMessage, err := p.MakeIconMessage(message)
if err != nil {
Expand All @@ -28,7 +32,10 @@ func (p *Provider) Route(ctx context.Context, message *providerTypes.Message, ca
if err != nil {
return errors.Wrapf(err, "error occured while sending transaction")
}
return p.WaitForTxResult(ctx, txhash, messageKey, iconMessage.Method, callback)

p.WaitForTxResult(ctx, txhash, messageKey, iconMessage.Method, callback)

return nil
}

func (p *Provider) MakeIconMessage(message *providerTypes.Message) (*IconMessage, error) {
Expand Down Expand Up @@ -125,48 +132,39 @@ func (p *Provider) SendTransaction(ctx context.Context, msg *IconMessage) ([]byt
return txParam.TxHash.Value()
}

// TODO: review try to remove wait for Tx from packet-transfer and only use this for client and connection creation
func (p *Provider) WaitForTxResult(
ctx context.Context,
txHash []byte,
messageKey *providerTypes.MessageKey,
method string,
callback providerTypes.TxResponseFunc,
) error {
if callback == nil {
// no point to wait for result if callback is nil
return nil
}

) {
txhash := types.NewHexBytes(txHash)
res := &providerTypes.TxResponse{
TxHash: string(txhash),
}

txRes, err := p.client.WaitForResults(ctx, &types.TransactionHashParam{Hash: txhash})
if err != nil {
p.log.Error("get txn result failed", zap.String("txHash", string(txhash)), zap.String("method", method), zap.Error(err))
callback(messageKey, res, err)
return err
return
}

height, err := txRes.BlockHeight.Value()
if err != nil {
callback(messageKey, res, err)
return
}
// assign tx successful height

res.Height = height

if status, err := txRes.Status.Int(); status != 1 || err != nil {
err = fmt.Errorf("error: %s", err)
callback(messageKey, res, err)
p.LogFailedTx(method, txRes, err)
return err
res.Code = providerTypes.Failed
callback(messageKey, res, fmt.Errorf("transaction failed: %w", err))
} else {
res.Code = providerTypes.Success
callback(messageKey, res, nil)
}
res.Code = providerTypes.Success
callback(messageKey, res, nil)
p.LogSuccessTx(method, txRes)
return nil
}

func (p *Provider) LogSuccessTx(method string, result *types.TransactionResult) {
Expand Down
4 changes: 2 additions & 2 deletions relayer/chains/solana/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func (p *Provider) Route(ctx context.Context, message *relayertypes.Message, cal
zap.String("src", message.Src),
zap.String("dst", message.Dst),
zap.Any("sn", message.Sn),
zap.Any("req-id", message.ReqID),
zap.String("event-type", message.EventType),
zap.Any("req_id", message.ReqID),
zap.String("event_type", message.EventType),
zap.String("data", hex.EncodeToString(message.Data)),
)

Expand Down
10 changes: 10 additions & 0 deletions relayer/chains/steller/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/base64"
"encoding/hex"
"fmt"
"strconv"

Expand All @@ -20,6 +21,15 @@ import (
)

func (p *Provider) Route(ctx context.Context, message *relayertypes.Message, callback relayertypes.TxResponseFunc) error {
p.log.Info("starting to route message",
zap.String("src", message.Src),
zap.String("dst", message.Dst),
zap.Any("sn", message.Sn),
zap.Any("req_id", message.ReqID),
zap.String("event_type", message.EventType),
zap.String("data", hex.EncodeToString(message.Data)),
)

callArgs, err := p.newContractCallArgs(*message)
if err != nil {
return err
Expand Down
34 changes: 12 additions & 22 deletions relayer/chains/sui/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@ import (

func (p *Provider) Route(ctx context.Context, message *relayertypes.Message, callback relayertypes.TxResponseFunc) error {
p.log.Info("starting to route message",
zap.String("src", message.Src),
zap.String("dst", message.Dst),
zap.Any("sn", message.Sn),
zap.Any("req_id", message.ReqID),
zap.String("src", message.Src),
zap.String("event_type", message.EventType))
zap.String("event_type", message.EventType),
zap.String("data", hex.EncodeToString(message.Data)),
)

suiMessage, err := p.MakeSuiMessage(message)
if err != nil {
Expand All @@ -43,10 +46,12 @@ func (p *Provider) Route(ctx context.Context, message *relayertypes.Message, cal
}

txRes, err := p.SendTransaction(ctx, txBytes)
go p.executeRouteCallBack(txRes, message.MessageKey(), suiMessage.Method, callback, err)
if err != nil {
return errors.Wrapf(err, "error occured while sending transaction in sui")
}

p.executeRouteCallBack(txRes, message.MessageKey(), suiMessage.Method, callback, err)

return nil
}

Expand Down Expand Up @@ -283,7 +288,6 @@ func (p *Provider) executeRouteCallBack(txRes *types.SuiTransactionBlockResponse
err = fmt.Errorf("txn execution failed; received empty tx digest")
}
callback(messageKey, &relayertypes.TxResponse{}, err)
p.log.Error("failed to execute transaction", zap.Error(err), zap.String("method", method))
return
}

Expand All @@ -293,8 +297,7 @@ func (p *Provider) executeRouteCallBack(txRes *types.SuiTransactionBlockResponse

txnData, err := p.client.GetTransaction(context.Background(), txRes.Digest.String())
if err != nil {
callback(messageKey, res, err)
p.log.Error("failed to get transaction details after execution", zap.Error(err), zap.String("method", method), zap.String("tx_hash", txRes.Digest.String()))
callback(messageKey, res, fmt.Errorf("failed to get transaction after execution: %w", err))
return
}

Expand All @@ -303,35 +306,22 @@ func (p *Provider) executeRouteCallBack(txRes *types.SuiTransactionBlockResponse
time.Sleep(3 * time.Second) //time to wait until tx is included in some checkpoint
txnData, err = p.client.GetTransaction(context.Background(), txRes.Digest.String())
if err != nil {
callback(messageKey, res, err)
p.log.Error("failed to get transaction details due to nil checkpoint after execution", zap.Error(err), zap.String("method", method), zap.String("tx_hash", txRes.Digest.String()))
callback(messageKey, res, fmt.Errorf("failed to get transaction after execution due to nil checkpoint: %w", err))
return
}
}

// assign tx successful height
res.Height = txnData.Checkpoint.Int64()
success := txRes.Effects.Data.IsSuccess()
if !success {
res.Code = relayertypes.Failed
err = fmt.Errorf("error: %s", txRes.Effects.Data.V1.Status.Error)
callback(messageKey, res, err)
p.log.Info("failed transaction",
zap.Any("message-key", messageKey),
zap.String("method", method),
zap.String("tx_hash", txRes.Digest.String()),
zap.Int64("height", txnData.Checkpoint.Int64()),
zap.Error(err),
)
return
}

res.Code = relayertypes.Success
callback(messageKey, res, nil)
p.log.Info("successful transaction",
zap.Any("message-key", messageKey),
zap.String("method", method),
zap.String("tx_hash", txRes.Digest.String()),
zap.Int64("height", txnData.Checkpoint.Int64()),
)
}

func (p *Provider) QueryTransactionReceipt(ctx context.Context, txDigest string) (*relayertypes.Receipt, error) {
Expand Down
Loading

0 comments on commit e3dd880

Please sign in to comment.