Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: evictions issue in tx client #3830

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
a2e490c
initial commit
ninabarbakadze Aug 28, 2024
faa460c
refactor: evictions and nonce adjusment works
ninabarbakadze Aug 30, 2024
c98a337
works!
ninabarbakadze Sep 2, 2024
3dd6abb
style: fmt imported and not used
ninabarbakadze Sep 2, 2024
9aa7af6
final cleaup
ninabarbakadze Sep 2, 2024
6d2c828
style: nits
ninabarbakadze Sep 2, 2024
ee0e7b2
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 2, 2024
e050bc0
style: resolve lint
ninabarbakadze Sep 2, 2024
500e5be
refactor: nonces work
ninabarbakadze Sep 3, 2024
6eb72e0
chore: delete unused mock
ninabarbakadze Sep 3, 2024
4d6b196
fix: typo
ninabarbakadze Sep 3, 2024
6e02de3
style: lint
ninabarbakadze Sep 3, 2024
bd3f5e0
fix: failing test
ninabarbakadze Sep 3, 2024
4046a59
refactor: cleanup further
ninabarbakadze Sep 3, 2024
7832fdc
refactor: naming
ninabarbakadze Sep 3, 2024
437e4a7
refactor: extract a function to make it more readable
ninabarbakadze Sep 3, 2024
1c7a7b3
refactor: more naming stuff
ninabarbakadze Sep 3, 2024
f3c0af3
fix: deadlock
ninabarbakadze Sep 3, 2024
330c525
Update pkg/user/tx_client.go
ninabarbakadze Sep 5, 2024
1e0353b
refactor: tests and more
ninabarbakadze Sep 10, 2024
1e35d86
test: remove separate eviciton test
ninabarbakadze Sep 10, 2024
9ac1e5c
style: nits
ninabarbakadze Sep 10, 2024
088aeda
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
ba4a7cc
test: make it more readable
ninabarbakadze Sep 18, 2024
0fc939f
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
046ce94
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
918089b
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
30985ae
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
30c1dc0
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
c2b72fb
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
c676a97
Update pkg/user/tx_client.go
ninabarbakadze Sep 18, 2024
1e1de7f
Update pkg/user/tx_client.go
ninabarbakadze Sep 20, 2024
def40f2
style: fix lint erros
ninabarbakadze Sep 20, 2024
54ff169
Merge branch 'main' into nina/evictions-bug
ninabarbakadze Sep 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions pkg/user/pruning_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package user

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestPruningInTxTracker(t *testing.T) {
txClient := &TxClient{
txTracker: make(map[string]txInfo),
}
numTransactions := 10

// Add 10 transactions to the tracker that are 10 minutes old
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems incorrect. It looks like the implementation below adds 5 transactions that are 10 minutes old and 5 transactions that are 5 minutes old.

Suggested change
// Add 10 transactions to the tracker that are 10 minutes old

var txsToBePruned int
var txsNotReadyToBePruned int
for i := 0; i < numTransactions; i++ {
// 5 transactions will be pruned
if i%2 == 0 {
txClient.txTracker["tx"+fmt.Sprint(i)] = txInfo{
signer: "signer" + fmt.Sprint(i),
sequence: uint64(i),
timeStamp: time.Now().
Add(-10 * time.Minute),
}
txsToBePruned++
} else {
txClient.txTracker["tx"+fmt.Sprint(i)] = txInfo{
signer: "signer" + fmt.Sprint(i),
sequence: uint64(i),
timeStamp: time.Now().
Add(-5 * time.Minute),
}
txsNotReadyToBePruned++
}
}

txTrackerBeforePruning := len(txClient.txTracker)

// All transactions were indexed
require.Equal(t, numTransactions, len(txClient.txTracker))
txClient.pruneTxTracker()
// Prunes the transactions that are 10 minutes old
// 5 transactions will be pruned
require.Equal(t, txTrackerBeforePruning-txsToBePruned, txsToBePruned)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the assertion by swapping the expected and actual values.

The assertion on line 48 compares the wrong values. It should compare the difference between the tracker size before pruning and the number of transactions to be pruned with the actual size of the tracker after pruning.

Swap the expected and actual values in the assertion:

-require.Equal(t, txTrackerBeforePruning-txsToBePruned, txsToBePruned)
+require.Equal(t, txsToBePruned, txTrackerBeforePruning-txsToBePruned)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
require.Equal(t, txTrackerBeforePruning-txsToBePruned, txsToBePruned)
require.Equal(t, txsToBePruned, txTrackerBeforePruning-txsToBePruned)

// 5 transactions will not be pruned
require.Equal(t, len(txClient.txTracker), txsNotReadyToBePruned)
}
198 changes: 100 additions & 98 deletions pkg/user/tx_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/celestiaorg/go-square/v2/share"
blobtx "github.com/celestiaorg/go-square/v2/tx"
"github.com/cosmos/cosmos-sdk/client"
nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
"github.com/cosmos/cosmos-sdk/client/grpc/tmservice"
Expand All @@ -27,20 +26,28 @@ import (

"github.com/celestiaorg/celestia-app/v3/app"
"github.com/celestiaorg/celestia-app/v3/app/encoding"
apperrors "github.com/celestiaorg/celestia-app/v3/app/errors"
"github.com/celestiaorg/celestia-app/v3/app/grpc/tx"
"github.com/celestiaorg/celestia-app/v3/pkg/appconsts"
"github.com/celestiaorg/celestia-app/v3/x/blob/types"
"github.com/celestiaorg/celestia-app/v3/x/minfee"
)

const (
DefaultPollTime = 3 * time.Second
DefaultGasMultiplier float64 = 1.1
DefaultPollTime = 3 * time.Second
DefaultGasMultiplier float64 = 1.1
txTrackerPruningInterval = 10 * time.Minute
)

type Option func(client *TxClient)

// txInfo is a struct that holds the sequence and the signer of a transaction
// in the local tx pool.
type txInfo struct {
sequence uint64
signer string
timeStamp time.Time
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
}

// TxResponse is a response from the chain after
// a transaction has been submitted.
type TxResponse struct {
Expand Down Expand Up @@ -137,6 +144,9 @@ type TxClient struct {
defaultGasPrice float64
defaultAccount string
defaultAddress sdktypes.AccAddress
// txTracker maps the tx hash to the Sequence and signer of the transaction
// that was submitted to the chain
txTracker map[string]txInfo
}

// NewTxClient returns a new signer using the provided keyring
Expand Down Expand Up @@ -169,6 +179,7 @@ func NewTxClient(
defaultGasPrice: appconsts.DefaultMinGasPrice,
defaultAccount: records[0].Name,
defaultAddress: addr,
txTracker: make(map[string]txInfo),
}

for _, opt := range options {
Expand Down Expand Up @@ -302,6 +313,12 @@ func (client *TxClient) SubmitTx(ctx context.Context, msgs []sdktypes.Msg, opts
func (client *TxClient) BroadcastTx(ctx context.Context, msgs []sdktypes.Msg, opts ...TxOption) (*sdktypes.TxResponse, error) {
client.mtx.Lock()
defer client.mtx.Unlock()

// prune transactions that are older than 10 minutes
// pruning has to be done in broadcast, since users
// might not always call ConfirmTx().
client.pruneTxTracker()

account, err := client.getAccountNameFromMsgs(msgs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -368,23 +385,20 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer
return nil, err
}
if resp.TxResponse.Code != abci.CodeTypeOK {
if apperrors.IsNonceMismatchCode(resp.TxResponse.Code) {
// query the account to update the sequence number on-chain for the account
_, seqNum, err := QueryAccount(ctx, client.grpc, client.registry, client.signer.accounts[signer].address)
if err != nil {
return nil, fmt.Errorf("querying account for new sequence number: %w\noriginal tx response: %s", err, resp.TxResponse.RawLog)
}
if err := client.signer.SetSequence(signer, seqNum); err != nil {
return nil, fmt.Errorf("setting sequence: %w", err)
}
return client.retryBroadcastingTx(ctx, txBytes)
}
broadcastTxErr := &BroadcastTxError{
TxHash: resp.TxResponse.TxHash,
Code: resp.TxResponse.Code,
ErrorLog: resp.TxResponse.RawLog,
}
return resp.TxResponse, broadcastTxErr
return nil, broadcastTxErr
}

// save the sequence and signer of the transaction in the local pool
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
// before the sequence is incremented
client.txTracker[resp.TxResponse.TxHash] = txInfo{
sequence: client.signer.accounts[signer].Sequence(),
signer: signer,
timeStamp: time.Now(),
}

// after the transaction has been submitted, we can increment the
Expand All @@ -395,62 +409,13 @@ func (client *TxClient) broadcastTx(ctx context.Context, txBytes []byte, signer
return resp.TxResponse, nil
}

// retryBroadcastingTx creates a new transaction by copying over an existing transaction but creates a new signature with the
// new sequence number. It then calls `broadcastTx` and attempts to submit the transaction
func (client *TxClient) retryBroadcastingTx(ctx context.Context, txBytes []byte) (*sdktypes.TxResponse, error) {
blobTx, isBlobTx, err := blobtx.UnmarshalBlobTx(txBytes)
if isBlobTx {
// only check the error if the bytes are supposed to be of type blob tx
if err != nil {
return nil, err
// pruneTxTracker removes transactions from the local tx pool that are older than 10 minutes
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
func (client *TxClient) pruneTxTracker() {
for hash, txInfo := range client.txTracker {
if time.Since(txInfo.timeStamp) >= txTrackerPruningInterval {
delete(client.txTracker, hash)
}
txBytes = blobTx.Tx
}
tx, err := client.signer.DecodeTx(txBytes)
if err != nil {
return nil, err
}

opts := make([]TxOption, 0)
if granter := tx.FeeGranter(); granter != nil {
opts = append(opts, SetFeeGranter(granter))
}
if payer := tx.FeePayer(); payer != nil {
opts = append(opts, SetFeePayer(payer))
}
if memo := tx.GetMemo(); memo != "" {
opts = append(opts, SetMemo(memo))
}
if fee := tx.GetFee(); fee != nil {
opts = append(opts, SetFee(fee.AmountOf(appconsts.BondDenom).Uint64()))
}
if gas := tx.GetGas(); gas > 0 {
opts = append(opts, SetGasLimit(gas))
}

txBuilder, err := client.signer.txBuilder(tx.GetMsgs(), opts...)
if err != nil {
return nil, err
}
signer, _, err := client.signer.signTransaction(txBuilder)
if err != nil {
return nil, fmt.Errorf("resigning transaction: %w", err)
}

newTxBytes, err := client.signer.EncodeTx(txBuilder.GetTx())
if err != nil {
return nil, err
}

// rewrap the blob tx if it was originally a blob tx
if isBlobTx {
newTxBytes, err = blobtx.MarshalBlobTx(newTxBytes, blobTx.Blobs...)
if err != nil {
return nil, err
}
}

return client.broadcastTx(ctx, newTxBytes, signer)
}

// ConfirmTx periodically pings the provided node for the commitment of a transaction by its
Expand All @@ -468,40 +433,68 @@ func (client *TxClient) ConfirmTx(ctx context.Context, txHash string) (*TxRespon
return nil, err
}

if resp != nil {
switch resp.Status {
case core.TxStatusPending:
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-pollTicker.C:
continue
}
case core.TxStatusCommitted:
txResponse := &TxResponse{
Height: resp.Height,
TxHash: txHash,
Code: resp.ExecutionCode,
}
if resp.ExecutionCode != abci.CodeTypeOK {
executionErr := &ExecutionError{
TxHash: txHash,
Code: resp.ExecutionCode,
ErrorLog: resp.Error,
}
return nil, executionErr
switch resp.Status {
case core.TxStatusPending:
// Continue polling if the transaction is still pending
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-pollTicker.C:
continue
}
case core.TxStatusCommitted:
txResponse := &TxResponse{
Height: resp.Height,
TxHash: txHash,
Code: resp.ExecutionCode,
}
if resp.ExecutionCode != abci.CodeTypeOK {
executionErr := &ExecutionError{
TxHash: txHash,
Code: resp.ExecutionCode,
ErrorLog: resp.Error,
}
return txResponse, nil
case core.TxStatusEvicted:
return nil, fmt.Errorf("tx was evicted from the mempool")
default:
return nil, fmt.Errorf("unknown tx: %s", txHash)
client.deleteFromTxTracker(txHash)
return nil, executionErr
}
client.deleteFromTxTracker(txHash)
return txResponse, nil
case core.TxStatusEvicted:
return nil, client.handleEvictions(txHash)
default:
client.deleteFromTxTracker(txHash)
return nil, fmt.Errorf("transaction with hash %s not found; it was likely rejected", txHash)
}
}
}

// handleEvictions handles the scenario where a transaction is evicted from the mempool.
// It removes the evicted transaction from the local tx pool without incrementing
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
// the signer's sequence.
func (client *TxClient) handleEvictions(txHash string) error {
client.mtx.Lock()
defer client.mtx.Unlock()
// Get transaction from the local pool
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
txInfo, exists := client.txTracker[txHash]
if !exists {
return fmt.Errorf("tx: %s not found in tx client local pool; likely failed during broadcast", txHash)
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
}
// The sequence should be rolled back to the sequence of the transaction that was evicted to be
// ready for resubmission. All transactions with a later nonce will be kicked by the nodes tx pool.
if err := client.signer.SetSequence(txInfo.signer, txInfo.sequence); err != nil {
return fmt.Errorf("setting sequence: %w", err)
}
delete(client.txTracker, txHash)
return fmt.Errorf("tx was evicted from the mempool")
}

// deleteFromTxTracker safely deletes a transaction from the local tx pool.
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
func (client *TxClient) deleteFromTxTracker(txHash string) {
client.mtx.Lock()
defer client.mtx.Unlock()
delete(client.txTracker, txHash)
}

// EstimateGas simulates the transaction, calculating the amount of gas that was consumed during execution. The final
// result will be multiplied by gasMultiplier(that is set in TxClient)
func (client *TxClient) EstimateGas(ctx context.Context, msgs []sdktypes.Msg, opts ...TxOption) (uint64, error) {
Expand Down Expand Up @@ -576,6 +569,7 @@ func (client *TxClient) checkAccountLoaded(ctx context.Context, account string)
if err != nil {
return fmt.Errorf("retrieving address from keyring: %w", err)
}
// FIXME: have a less trusting way of getting the account number and sequence
accNum, sequence, err := QueryAccount(ctx, client.grpc, client.registry, addr)
if err != nil {
return fmt.Errorf("querying account %s: %w", account, err)
Expand Down Expand Up @@ -604,6 +598,14 @@ func (client *TxClient) getAccountNameFromMsgs(msgs []sdktypes.Msg) (string, err
return record.Name, nil
}

// GetTxFromTxTracker gets transaction info from the tx client's local tx pool by its hash
ninabarbakadze marked this conversation as resolved.
Show resolved Hide resolved
func (client *TxClient) GetTxFromTxTracker(hash string) (sequence uint64, signer string, exists bool) {
client.mtx.Lock()
defer client.mtx.Unlock()
txInfo, exists := client.txTracker[hash]
return txInfo.sequence, txInfo.signer, exists
}

// Signer exposes the tx clients underlying signer
func (client *TxClient) Signer() *Signer {
return client.signer
Expand Down
Loading
Loading