Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: index submitted txs in tx client and improve nonce management #3830

Merged
merged 40 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
a2e490c
initial commit
ninabarbakadze Aug 28, 2024
faa460c
refactor: evictions and nonce adjusment works
ninabarbakadze Aug 30, 2024
c98a337
works!
ninabarbakadze Sep 2, 2024
3dd6abb
style: fmt imported and not used
ninabarbakadze Sep 2, 2024
9aa7af6
final cleaup
ninabarbakadze Sep 2, 2024
6d2c828
style: nits
ninabarbakadze Sep 2, 2024
ee0e7b2
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 2, 2024
e050bc0
style: resolve lint
ninabarbakadze Sep 2, 2024
500e5be
refactor: nonces work
ninabarbakadze Sep 3, 2024
6eb72e0
chore: delete unused mock
ninabarbakadze Sep 3, 2024
4d6b196
fix: typo
ninabarbakadze Sep 3, 2024
6e02de3
style: lint
ninabarbakadze Sep 3, 2024
bd3f5e0
fix: failing test
ninabarbakadze Sep 3, 2024
4046a59
refactor: cleanup further
ninabarbakadze Sep 3, 2024
7832fdc
refactor: naming
ninabarbakadze Sep 3, 2024
437e4a7
refactor: extract a function to make it more readable
ninabarbakadze Sep 3, 2024
1c7a7b3
refactor: more naming stuff
ninabarbakadze Sep 3, 2024
f3c0af3
fix: deadlock
ninabarbakadze Sep 3, 2024
330c525
Update pkg/user/tx_client.go
ninabarbakadze Sep 5, 2024
1e0353b
refactor: tests and more
ninabarbakadze Sep 10, 2024
1e35d86
test: remove separate eviciton test
ninabarbakadze Sep 10, 2024
9ac1e5c
style: nits
ninabarbakadze Sep 10, 2024
088aeda
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
ba4a7cc
test: make it more readable
ninabarbakadze Sep 18, 2024
0fc939f
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
046ce94
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
918089b
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
30985ae
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
30c1dc0
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
c2b72fb
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
c676a97
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
1e1de7f
Update pkg/user/tx_client.go
ninabarbakadze Sep 20, 2024
def40f2
style: fix lint erros
ninabarbakadze Sep 20, 2024
54ff169
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 20, 2024
5b1f1d2
Merge branch 'main' into nina/evictions-bug
cmwaters Sep 23, 2024
a492e39
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 23, 2024
4d47ce9
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 24, 2024
58d6c23
Merge branch 'main' into nina/evictions-bug
evan-forbes Sep 25, 2024
d7ea9b9
test: skip testevictions
ninabarbakadze Sep 25, 2024
ac47d8f
test: fix the misleading comment
ninabarbakadze Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 86 additions & 104 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/celestiaorg/go-square/v2/share"
blobtx "github.com/celestiaorg/go-square/v2/tx"
"github.com/cosmos/cosmos-sdk/client"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
Expand All @@ -27,7 +26,6 @@ import (

"github.com/celestiaorg/celestia-app/v3/app"
"github.com/celestiaorg/celestia-app/v3/app/encoding"
apperrors "github.com/celestiaorg/celestia-app/v3/app/errors"
"github.com/celestiaorg/celestia-app/v3/app/grpc/tx"
"github.com/celestiaorg/celestia-app/v3/pkg/appconsts"
"github.com/celestiaorg/celestia-app/v3/x/blob/types"
Expand All @@ -41,6 +39,13 @@ const (

type Option func(client *TxClient)

// poolTxInfo is a struct that holds the nonce and the signer of a transaction
// in the local mempool.
type poolTxInfo struct {
Nonce uint64
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
Signer string
}

// TxResponse is a response from the chain after
// a transaction has been submitted.
type TxResponse struct {
Expand Down Expand Up @@ -120,6 +125,12 @@ func WithDefaultAccount(name string) Option {
}
}

func WithTxService(node tx.TxClient) Option {
return func(c *TxClient) {
c.txService = node
}
}

// TxClient is an abstraction for building, signing, and broadcasting Celestia transactions
// It supports multiple accounts. If none is specified, it will
// try use the default account.
Expand All @@ -137,6 +148,9 @@ type TxClient struct {
defaultGasPrice float64
defaultAccount string
defaultAddress sdktypes.AccAddress
txPool map[string]poolTxInfo
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Question] do we want to have a way of periodically clearing the cache?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@evan-forbes @cmwaters I'd appreciate your input here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, it seems like we should never be a scenario where we need this to be cleared. Either a tx is included and we continually try to get it included, or we handle each case when that does not occur and remove the tx at that point

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the only time tx doesn't get removed from this local cache is if context gets cancelled while polling for tx status which means that the said tx was never confirmed. If the transaction doesn't get confirmed it can linger in that cache until it ooms

Copy link
Member

@evan-forbes evan-forbes Sep 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by periodically, do we mean a ttl?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scenario that I see as more problematic is if a user decides to use BroadcastTx which creates the map entry and doesn't use ConfirmTx which cleans it up (because maybe they don't care if it lands or not).

I think we should garbage collect, but not as a separate go routine but within BroadcastTx.

I would give at least 10 minutes before we clean up.

// txService is the client API for Tx service.
txService tx.TxClient
}

// NewTxClient returns a new signer using the provided keyring
Expand Down Expand Up @@ -169,6 +183,8 @@ func NewTxClient(
defaultGasPrice: appconsts.DefaultMinGasPrice,
defaultAccount: records[0].Name,
defaultAddress: addr,
txPool: make(map[string]poolTxInfo),
txService: tx.NewTxClient(conn),
}

for _, opt := range options {
Expand Down Expand Up @@ -368,136 +384,93 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer
return nil, err
}
if resp.TxResponse.Code != abci.CodeTypeOK {
if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) {
// query the account to update the sequence number on-chain for the account
_, seqNum, err := QueryAccount(ctx, client.grpc, client.registry, client.signer.accounts[signer].address)
if err != nil {
return nil, fmt.Errorf("querying account for new sequence number: %w\noriginal tx response: %s", err, resp.TxResponse.RawLog)
}
if err := client.signer.SetSequence(signer, seqNum); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
return client.retryBroadcastingTx(ctx, txBytes)
}
broadcastTxErr := &BroadcastTxError{
TxHash: resp.TxResponse.TxHash,
Code: resp.TxResponse.Code,
ErrorLog: resp.TxResponse.RawLog,
}
return resp.TxResponse, broadcastTxErr
return nil, broadcastTxErr
}

// save the nonce and signer of the transaction in the local pool
client.txPool[resp.TxResponse.TxHash] = poolTxInfo{
Nonce: client.signer.accounts[signer].Sequence(),
Signer: signer,
}

// after the transaction has been submitted, we can increment the
// sequence of the signer
if err := client.signer.IncrementSequence(signer); err != nil {
return nil, fmt.Errorf("increment sequencing: %w", err)
}
return resp.TxResponse, nil
}

// retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the
// new sequence number. It then calls `broadcastTx` and attempts to submit the transaction
func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) {
blobTx, isBlobTx, err := blobtx.UnmarshalBlobTx(txBytes)
if isBlobTx {
// only check the error if the bytes are supposed to be of type blob tx
if err != nil {
return nil, err
}
txBytes = blobTx.Tx
}
tx, err := client.signer.DecodeTx(txBytes)
if err != nil {
return nil, err
}

opts := make([]TxOption, 0)
if granter := tx.FeeGranter(); granter != nil {
opts = append(opts, SetFeeGranter(granter))
}
if payer := tx.FeePayer(); payer != nil {
opts = append(opts, SetFeePayer(payer))
}
if memo := tx.GetMemo(); memo != "" {
opts = append(opts, SetMemo(memo))
}
if fee := tx.GetFee(); fee != nil {
opts = append(opts, SetFee(fee.AmountOf(appconsts.BondDenom).Uint64()))
}
if gas := tx.GetGas(); gas > 0 {
opts = append(opts, SetGasLimit(gas))
}

txBuilder, err := client.signer.txBuilder(tx.GetMsgs(), opts...)
if err != nil {
return nil, err
}
signer, _, err := client.signer.signTransaction(txBuilder)
if err != nil {
return nil, fmt.Errorf("resigning transaction: %w", err)
}

newTxBytes, err := client.signer.EncodeTx(txBuilder.GetTx())
if err != nil {
return nil, err
}

// rewrap the blob tx if it was originally a blob tx
if isBlobTx {
newTxBytes, err = blobtx.MarshalBlobTx(newTxBytes, blobTx.Blobs...)
if err != nil {
return nil, err
}
}

return client.broadcastTx(ctx, newTxBytes, signer)
return resp.TxResponse, nil
}

// ConfirmTx periodically pings the provided node for the commitment of a transaction by its
// hash. It will continually loop until the context is cancelled, the tx is found or an error
// is encountered.
func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxResponse, error) {
txClient := tx.NewTxClient(client.grpc)

var evictFromTxPool bool
pollTicker := time.NewTicker(client.pollTime)
defer pollTicker.Stop()

defer func() {
pollTicker.Stop()

if evictFromTxPool {
client.mtx.Lock()
delete(client.txPool, txHash)
client.mtx.Unlock()
}
}()

for {
resp, err := txClient.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
resp, err := client.txService.TxStatus(ctx, &tx.TxStatusRequest{TxId: txHash})
if err != nil {
return nil, err
}

if resp != nil {
switch resp.Status {
case core.TxStatusPending:
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-pollTicker.C:
continue
}
case core.TxStatusCommitted:
txResponse := &TxResponse{
Height: resp.Height,
TxHash: txHash,
Code: resp.ExecutionCode,
}
if resp.ExecutionCode != abci.CodeTypeOK {
executionErr := &ExecutionError{
TxHash: txHash,
Code: resp.ExecutionCode,
ErrorLog: resp.Error,
}
return nil, executionErr
switch resp.Status {
case core.TxStatusPending:
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-pollTicker.C:
continue
}
case core.TxStatusCommitted:
txResponse := &TxResponse{
Height: resp.Height,
TxHash: txHash,
Code: resp.ExecutionCode,
}
if resp.ExecutionCode != abci.CodeTypeOK {
executionErr := &ExecutionError{
TxHash: txHash,
Code: resp.ExecutionCode,
ErrorLog: resp.Error,
}
return txResponse, nil
case core.TxStatusEvicted:
return nil, fmt.Errorf("tx was evicted from the mempool")
default:
return nil, fmt.Errorf("unknown tx: %s", txHash)
evictFromTxPool = true
return nil, executionErr
}
evictFromTxPool = true
return txResponse, nil
case core.TxStatusEvicted:
// Get transaction from the local pool
nonce, signer, exists := client.GetTxInfoFromLocalPool(txHash)
if !exists {
return nil, fmt.Errorf("tx not found in tx client local pool: %s", txHash)
}
// Set the signers sequence to the nonce of the tx that was evicted
if err := client.signer.SetSequence(signer, nonce); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
evictFromTxPool = true
return nil, fmt.Errorf("tx was evicted from the mempool")
default:
evictFromTxPool = true
return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash)
}
}
}
Expand Down Expand Up @@ -576,6 +549,7 @@ func (client *TxClient) checkAccountLoaded(ctx context.Context, account string)
if err != nil {
return fmt.Errorf("retrieving address from keyring: %w", err)
}
// FIXME: have a less trusting way of getting the account number and sequence
accNum, sequence, err := QueryAccount(ctx, client.grpc, client.registry, addr)
if err != nil {
return fmt.Errorf("querying account %s: %w", account, err)
Expand Down Expand Up @@ -604,6 +578,14 @@ func (client *TxClient) getAccountNameFromMsgs(msgs []sdktypes.Msg) (string, err
return record.Name, nil
}

// GetTxInfoFromLocalPool gets transaction info from the local pool by its hash (testing purposes only)
func (client *TxClient) GetTxInfoFromLocalPool(hash string) (nonce uint64, signer string, exists bool) {
client.mtx.Lock()
defer client.mtx.Unlock()
txInfo, exists := client.txPool[hash]
return txInfo.Nonce, txInfo.Signer, exists
}

// Signer exposes the tx clients underlying signer
func (client *TxClient) Signer() *Signer {
return client.signer
Expand Down
Loading
Loading