Skip to content

Commit

Permalink
refactor: evictions and nonce adjusment works
Browse files Browse the repository at this point in the history
  • Loading branch information
ninabarbakadze committed Aug 30, 2024
1 parent 36cf18e commit 3e4e1d1
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 113 deletions.
97 changes: 97 additions & 0 deletions pkg/user/mocks/tx.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

158 changes: 49 additions & 109 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/celestiaorg/go-square/v2/share"
blobtx "github.com/celestiaorg/go-square/v2/tx"
"github.com/cosmos/cosmos-sdk/client"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
Expand All @@ -27,7 +26,6 @@ import (

"github.com/celestiaorg/celestia-app/v3/app"
"github.com/celestiaorg/celestia-app/v3/app/encoding"
apperrors "github.com/celestiaorg/celestia-app/v3/app/errors"
"github.com/celestiaorg/celestia-app/v3/app/grpc/tx"
"github.com/celestiaorg/celestia-app/v3/pkg/appconsts"
"github.com/celestiaorg/celestia-app/v3/x/blob/types"
Expand All @@ -41,10 +39,9 @@ const (

type Option func(client *TxClient)

type TxPoolTx struct {
Timeout int64
Nonce uint64
Signer string
type PoolTxInfo struct {
Nonce uint64
Signer string
}

// TxResponse is a response from the chain after
Expand Down Expand Up @@ -126,6 +123,12 @@ func WithDefaultAccount(name string) Option {
}
}

func WithConsensusNode(node tx.TxClient) Option {
return func(c *TxClient) {
c.txClient = node
}
}

// TxClient is an abstraction for building, signing, and broadcasting Celestia transactions
// It supports multiple accounts. If none is specified, it will
// try use the default account.
Expand All @@ -143,7 +146,9 @@ type TxClient struct {
defaultGasPrice float64
defaultAccount string
defaultAddress sdktypes.AccAddress
txPool map[string]TxPoolTx
txPool map[string]PoolTxInfo
// txClient is the client API for Tx service.
txClient tx.TxClient
}

// NewTxClient returns a new signer using the provided keyring
Expand Down Expand Up @@ -176,6 +181,8 @@ func NewTxClient(
defaultGasPrice: appconsts.DefaultMinGasPrice,
defaultAccount: records[0].Name,
defaultAddress: addr,
txPool: make(map[string]PoolTxInfo),
txClient: tx.NewTxClient(conn),
}

for _, opt := range options {
Expand Down Expand Up @@ -380,40 +387,23 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer
return nil, err
}
if resp.TxResponse.Code != abci.CodeTypeOK {
if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) {
_, seqNum, err := QueryAccount(ctx, client.grpc, client.registry, client.signer.accounts[signer].address)
// query the account to update the sequence number on-chain for the account
if err != nil {
return nil, fmt.Errorf("querying account for new sequence number: %w\noriginal tx response: %s", err, resp.TxResponse.RawLog)
}
if err := client.signer.SetSequence(signer, seqNum); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
return client.retryBroadcastingTx(ctx, txBytes)
}
broadcastTxErr := &BroadcastTxError{
TxHash: resp.TxResponse.TxHash,
Code: resp.TxResponse.Code,
ErrorLog: resp.TxResponse.RawLog,
}
// transaction failed
// check if the signer has other txs in the pool
for txHash, tx := range client.txPool {
if tx.Signer == signer {
// set the time for when the tx failed + 1 minute (they can be resubmitted after this time)
tx.Timeout = time.Now().Add(time.Minute).Unix()
// update the nonce of other txs in the pool
tx.Nonce = seqNum
client.txPool[txHash] = tx

}
}
// if yes, we need to adjust the nonce of the txs in the pool
// wait for them to be invalidated by their max heights
// and then resubmit the tx
return nil, broadcastTxErr
}

// add the broadcasted transaction to the local pool
fmt.Println(resp.TxResponse.TxHash, "TX HASH we are saving")
client.txPool[resp.TxResponse.TxHash] = PoolTxInfo{
Nonce: client.signer.accounts[signer].Sequence(),
Signer: signer,
}

fmt.Println("queried tx hash", client.txPool[resp.TxResponse.TxHash])

// after the transaction has been submitted, we can increment the
// sequence of the signer
if err := client.signer.IncrementSequence(signer); err != nil {
Expand All @@ -422,89 +412,15 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer
return resp.TxResponse, nil
}

func (client *TxClient) updateNonces(signer string, newSeqNum uint64) {
client.mtx.Lock()
defer client.mtx.Unlock()

latestNonce := newSeqNum
for txHash, tx := range client.txPool {
if tx.Signer == signer {
tx.Nonce = latestNonce
client.txPool[txHash] = tx
latestNonce++
}
}
}

// retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the
// new sequence number. It then calls `broadcastTx` and attempts to submit the transaction
func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) {
blobTx, isBlobTx, err := blobtx.UnmarshalBlobTx(txBytes)
if isBlobTx {
// only check the error if the bytes are supposed to be of type blob tx
if err != nil {
return nil, err
}
txBytes = blobTx.Tx
}
tx, err := client.signer.DecodeTx(txBytes)
if err != nil {
return nil, err
}

opts := make([]TxOption, 0)
if granter := tx.FeeGranter(); granter != nil {
opts = append(opts, SetFeeGranter(granter))
}
if payer := tx.FeePayer(); payer != nil {
opts = append(opts, SetFeePayer(payer))
}
if memo := tx.GetMemo(); memo != "" {
opts = append(opts, SetMemo(memo))
}
if fee := tx.GetFee(); fee != nil {
opts = append(opts, SetFee(fee.AmountOf(appconsts.BondDenom).Uint64()))
}
if gas := tx.GetGas(); gas > 0 {
opts = append(opts, SetGasLimit(gas))
}

txBuilder, err := client.signer.txBuilder(tx.GetMsgs(), opts...)
if err != nil {
return nil, err
}
signer, _, err := client.signer.signTransaction(txBuilder)
if err != nil {
return nil, fmt.Errorf("resigning transaction: %w", err)
}

newTxBytes, err := client.signer.EncodeTx(txBuilder.GetTx())
if err != nil {
return nil, err
}

// rewrap the blob tx if it was originally a blob tx
if isBlobTx {
newTxBytes, err = blobtx.MarshalBlobTx(newTxBytes, blobTx.Blobs...)
if err != nil {
return nil, err
}
}

return client.broadcastTx(ctx, newTxBytes, signer)
}

// ConfirmTx periodically pings the provided node for the commitment of a transaction by its
// hash. It will continually loop until the context is cancelled, the tx is found or an error
// is encountered.
func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxResponse, error) {
txClient := tx.NewTxClient(client.grpc)

pollTicker := time.NewTicker(client.pollTime)
defer pollTicker.Stop()

for {
resp, err := txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
resp, err := client.txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
if err != nil {
return nil, err
}
Expand All @@ -515,6 +431,7 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
delete(client.txPool, txHash)
return nil, ctx.Err()
case <-pollTicker.C:
continue
Expand All @@ -531,13 +448,29 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
Code: resp.ExecutionCode,
ErrorLog: resp.Error,
}
delete(client.txPool, txHash)
return nil, executionErr
}
// remove it form the local pool
delete(client.txPool, txHash)
return txResponse, nil
case core.TxStatusEvicted:
fmt.Println("tx was evicted from the mempool")
// get transaction from the local pool
txPoolTx, exists := client.txPool[txHash]
fmt.Println("txPoolTx", txPoolTx)
if !exists {
return nil, fmt.Errorf("tx not found in tx client local pool: %s", txHash)
}
// set the signers sequence to the nonce of the tx
if err := client.signer.SetSequence(txPoolTx.Signer, txPoolTx.Nonce); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
delete(client.txPool, txHash)
return nil, fmt.Errorf("tx was evicted from the mempool")
default:
return nil, fmt.Errorf("unknown tx: %s", txHash)
delete(client.txPool, txHash)
return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash)
}
}
}
Expand Down Expand Up @@ -617,6 +550,7 @@ func (client *TxClient) checkAccountLoaded(ctx context.Context, account string)
if err != nil {
return fmt.Errorf("retrieving address from keyring: %w", err)
}
// FIXME: have a less trusting way of getting the account number and sequence
accNum, sequence, err := QueryAccount(ctx, client.grpc, client.registry, addr)
if err != nil {
return fmt.Errorf("querying account %s: %w", account, err)
Expand Down Expand Up @@ -645,6 +579,12 @@ func (client *TxClient) getAccountNameFromMsgs(msgs []sdktypes.Msg) (string, err
return record.Name, nil
}

// Method to get transaction info by hash
func (client *TxClient) GetTxInfo(hash string) (PoolTxInfo, bool) {
txInfo, exists := client.txPool[hash]
return txInfo, exists
}

// Signer exposes the tx clients underlying signer
func (client *TxClient) Signer() *Signer {
return client.signer
Expand Down
Loading

0 comments on commit 3e4e1d1

Please sign in to comment.