Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: evictions issue in tx client #3830

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a2e490c
initial commit
ninabarbakadze Aug 28, 2024
faa460c
refactor: evictions and nonce adjusment works
ninabarbakadze Aug 30, 2024
c98a337
works!
ninabarbakadze Sep 2, 2024
3dd6abb
style: fmt imported and not used
ninabarbakadze Sep 2, 2024
9aa7af6
final cleaup
ninabarbakadze Sep 2, 2024
6d2c828
style: nits
ninabarbakadze Sep 2, 2024
ee0e7b2
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 2, 2024
e050bc0
style: resolve lint
ninabarbakadze Sep 2, 2024
500e5be
refactor: nonces work
ninabarbakadze Sep 3, 2024
6eb72e0
chore: delete unused mock
ninabarbakadze Sep 3, 2024
4d6b196
fix: typo
ninabarbakadze Sep 3, 2024
6e02de3
style: lint
ninabarbakadze Sep 3, 2024
bd3f5e0
fix: failing test
ninabarbakadze Sep 3, 2024
4046a59
refactor: cleanup further
ninabarbakadze Sep 3, 2024
7832fdc
refactor: naming
ninabarbakadze Sep 3, 2024
437e4a7
refactor: extract a function to make it more readable
ninabarbakadze Sep 3, 2024
1c7a7b3
refactor: more naming stuff
ninabarbakadze Sep 3, 2024
f3c0af3
fix: deadlock
ninabarbakadze Sep 3, 2024
330c525
Update pkg/user/tx_client.go
ninabarbakadze Sep 5, 2024
1e0353b
refactor: tests and more
ninabarbakadze Sep 10, 2024
1e35d86
test: remove separate eviciton test
ninabarbakadze Sep 10, 2024
9ac1e5c
style: nits
ninabarbakadze Sep 10, 2024
088aeda
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
ba4a7cc
test: make it more readable
ninabarbakadze Sep 18, 2024
0fc939f
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
046ce94
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
918089b
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
30985ae
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
30c1dc0
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
c2b72fb
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
c676a97
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
1e1de7f
Update pkg/user/tx_client.go
ninabarbakadze Sep 20, 2024
def40f2
style: fix lint erros
ninabarbakadze Sep 20, 2024
54ff169
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions pkg/user/mocks/tx.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 49 additions & 74 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/celestiaorg/go-square/v2/share"
blobtx "github.com/celestiaorg/go-square/v2/tx"
"github.com/cosmos/cosmos-sdk/client"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
Expand All @@ -27,7 +26,6 @@ import (

"github.com/celestiaorg/celestia-app/v3/app"
"github.com/celestiaorg/celestia-app/v3/app/encoding"
apperrors "github.com/celestiaorg/celestia-app/v3/app/errors"
"github.com/celestiaorg/celestia-app/v3/app/grpc/tx"
"github.com/celestiaorg/celestia-app/v3/pkg/appconsts"
"github.com/celestiaorg/celestia-app/v3/x/blob/types"
Expand All @@ -41,6 +39,13 @@ const (

type Option func(client *TxClient)

// PoolTxInfo is a struct that holds the nonce and the signer of a transaction
// in the local mempool.
type PoolTxInfo struct {
Nonce uint64
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
Signer string
}

// TxResponse is a response from the chain after
// a transaction has been submitted.
type TxResponse struct {
Expand Down Expand Up @@ -120,6 +125,12 @@ func WithDefaultAccount(name string) Option {
}
}

func WithTxService(node tx.TxClient) Option {
return func(c *TxClient) {
c.txService = node
}
}

// TxClient is an abstraction for building, signing, and broadcasting Celestia transactions
// It supports multiple accounts. If none is specified, it will
// try use the default account.
Expand All @@ -137,6 +148,9 @@ type TxClient struct {
defaultGasPrice float64
defaultAccount string
defaultAddress sdktypes.AccAddress
txPool map[string]PoolTxInfo
// txService is the client API for Tx service.
txService tx.TxClient
}

// NewTxClient returns a new signer using the provided keyring
Expand Down Expand Up @@ -169,6 +183,8 @@ func NewTxClient(
defaultGasPrice: appconsts.DefaultMinGasPrice,
defaultAccount: records[0].Name,
defaultAddress: addr,
txPool: make(map[string]PoolTxInfo),
txService: tx.NewTxClient(conn),
}

for _, opt := range options {
Expand Down Expand Up @@ -368,102 +384,40 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer
return nil, err
}
if resp.TxResponse.Code != abci.CodeTypeOK {
if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) {
// query the account to update the sequence number on-chain for the account
_, seqNum, err := QueryAccount(ctx, client.grpc, client.registry, client.signer.accounts[signer].address)
if err != nil {
return nil, fmt.Errorf("querying account for new sequence number: %w\noriginal tx response: %s", err, resp.TxResponse.RawLog)
}
if err := client.signer.SetSequence(signer, seqNum); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
return client.retryBroadcastingTx(ctx, txBytes)
}
broadcastTxErr := &BroadcastTxError{
TxHash: resp.TxResponse.TxHash,
Code: resp.TxResponse.Code,
ErrorLog: resp.TxResponse.RawLog,
}
return resp.TxResponse, broadcastTxErr
return nil, broadcastTxErr
}

// after the transaction has been submitted, we can increment the
// sequence of the signer
if err := client.signer.IncrementSequence(signer); err != nil {
return nil, fmt.Errorf("increment sequencing: %w", err)
}
return resp.TxResponse, nil
}

// retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the
// new sequence number. It then calls `broadcastTx` and attempts to submit the transaction
func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) {
blobTx, isBlobTx, err := blobtx.UnmarshalBlobTx(txBytes)
if isBlobTx {
// only check the error if the bytes are supposed to be of type blob tx
if err != nil {
return nil, err
}
txBytes = blobTx.Tx
}
tx, err := client.signer.DecodeTx(txBytes)
if err != nil {
return nil, err
}

opts := make([]TxOption, 0)
if granter := tx.FeeGranter(); granter != nil {
opts = append(opts, SetFeeGranter(granter))
}
if payer := tx.FeePayer(); payer != nil {
opts = append(opts, SetFeePayer(payer))
}
if memo := tx.GetMemo(); memo != "" {
opts = append(opts, SetMemo(memo))
}
if fee := tx.GetFee(); fee != nil {
opts = append(opts, SetFee(fee.AmountOf(appconsts.BondDenom).Uint64()))
}
if gas := tx.GetGas(); gas > 0 {
opts = append(opts, SetGasLimit(gas))
}

txBuilder, err := client.signer.txBuilder(tx.GetMsgs(), opts...)
if err != nil {
return nil, err
}
signer, _, err := client.signer.signTransaction(txBuilder)
if err != nil {
return nil, fmt.Errorf("resigning transaction: %w", err)
}

newTxBytes, err := client.signer.EncodeTx(txBuilder.GetTx())
if err != nil {
return nil, err
// save the nonce and signer of the transaction in the local pool
client.txPool[resp.TxResponse.TxHash] = PoolTxInfo{
Nonce: client.signer.accounts[signer].Sequence(),
Signer: signer,
}

// rewrap the blob tx if it was originally a blob tx
if isBlobTx {
newTxBytes, err = blobtx.MarshalBlobTx(newTxBytes, blobTx.Blobs...)
if err != nil {
return nil, err
}
}

return client.broadcastTx(ctx, newTxBytes, signer)
return resp.TxResponse, nil
}

// ConfirmTx periodically pings the provided node for the commitment of a transaction by its
// hash. It will continually loop until the context is cancelled, the tx is found or an error
// is encountered.
func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxResponse, error) {
txClient := tx.NewTxClient(client.grpc)
client.mtx.Lock()
defer client.mtx.Unlock()

pollTicker := time.NewTicker(client.pollTime)
defer pollTicker.Stop()

for {
resp, err := txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
resp, err := client.txService.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
if err != nil {
return nil, err
}
Expand All @@ -474,6 +428,7 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
delete(client.txPool, txHash)
return nil, ctx.Err()
case <-pollTicker.C:
continue
Expand All @@ -490,13 +445,26 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
Code: resp.ExecutionCode,
ErrorLog: resp.Error,
}
delete(client.txPool, txHash)
return nil, executionErr
}
delete(client.txPool, txHash)
return txResponse, nil
case core.TxStatusEvicted:
// Get transaction from the local pool
poolTx, exists := client.txPool[txHash]
if !exists {
return nil, fmt.Errorf("tx not found in tx client local pool: %s", txHash)
}
// Set the signers sequence to the nonce of the tx that was evicted
if err := client.signer.SetSequence(poolTx.Signer, poolTx.Nonce); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
delete(client.txPool, txHash)
return nil, fmt.Errorf("tx was evicted from the mempool")
default:
return nil, fmt.Errorf("unknown tx: %s", txHash)
delete(client.txPool, txHash)
return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash)
}
}
}
Expand Down Expand Up @@ -576,6 +544,7 @@ func (client *TxClient) checkAccountLoaded(ctx context.Context, account string)
if err != nil {
return fmt.Errorf("retrieving address from keyring: %w", err)
}
// FIXME: have a less trusting way of getting the account number and sequence
accNum, sequence, err := QueryAccount(ctx, client.grpc, client.registry, addr)
if err != nil {
return fmt.Errorf("querying account %s: %w", account, err)
Expand Down Expand Up @@ -604,6 +573,12 @@ func (client *TxClient) getAccountNameFromMsgs(msgs []sdktypes.Msg) (string, err
return record.Name, nil
}

// Method to get transaction info from the local pool by hash
func (client *TxClient) GetTxInfo(hash string) (PoolTxInfo, bool) {
txInfo, exists := client.txPool[hash]
return txInfo, exists
}

// Signer exposes the tx clients underlying signer
func (client *TxClient) Signer() *Signer {
return client.signer
Expand Down
Loading
Loading