Skip to content

Commit

Permalink
replace txs with the same sender key and higher priority
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenlanders committed Dec 28, 2023
1 parent 20b4487 commit 0673d5c
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 11 deletions.
31 changes: 20 additions & 11 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func (txmp *TxMempool) CheckTx(
if err == nil {
// only add new transaction if checkTx passes and is not pending
if !res.IsPendingTransaction {
err = txmp.addNewTransaction(wtx, res.ResponseCheckTx, txInfo)
err = txmp.addNewTransaction(wtx, res, txInfo)

if err != nil {
return err
Expand Down Expand Up @@ -535,7 +535,8 @@ func (txmp *TxMempool) Update(
//
// NOTE:
// - An explicit lock is NOT required.
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheckTx, txInfo TxInfo) error {
func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, resV2 *abci.ResponseCheckTxV2, txInfo TxInfo) error {
res := resV2.ResponseCheckTx
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, res)
Expand Down Expand Up @@ -572,15 +573,23 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
sender := res.Sender
priority := res.Priority

// if sender is set, then priority must be higher on the new tx to replace
// otherwise this rejects that transaction. At this time, only EVM transactions
// set the sender
if len(sender) > 0 {
if wtx := txmp.txStore.GetTxBySender(sender); wtx != nil {
txmp.logger.Error(
"rejected incoming good transaction; tx already exists for sender",
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"sender", sender,
)
txmp.metrics.RejectedTxs.Add(1)
return nil
if existingTx := txmp.txStore.GetTxBySender(sender); existingTx != nil {
if existingTx.priority >= priority {
txmp.logger.Error(
"rejected duplicate nonce transaction, priority must be higher to replace",
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"sender", sender,
)
txmp.metrics.RejectedTxs.Add(1)
return nil
}

// remove existing transaction and fall-through for replacement
txmp.removeTx(existingTx, true)
}
}

Expand Down Expand Up @@ -932,7 +941,7 @@ func (txmp *TxMempool) AppendCheckTxErr(existingLogs string, log string) string
func (txmp *TxMempool) handlePendingTransactions() {
accepted, rejected := txmp.pendingTxs.EvaluatePendingTransactions()
for _, tx := range accepted {
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse.ResponseCheckTx, tx.txInfo); err != nil {
if err := txmp.addNewTransaction(tx.tx, tx.checkTxResponse, tx.txInfo); err != nil {
txmp.logger.Error(fmt.Sprintf("error adding pending transaction: %s", err))
}
}
Expand Down
24 changes: 24 additions & 0 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,30 @@ func (e *TestPeerEvictor) Errored(peerID types.NodeID, err error) {
e.evicting[peerID] = struct{}{}
}

func TestTxMempool_CheckTxReplacement(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 0)

// Create a transaction with a certain priority
tx1 := []byte("sender=key=1")
require.NoError(t, txmp.CheckTx(ctx, tx1, nil, TxInfo{SenderID: 0}))

// Create another transaction with the same sender but a higher priority
tx2 := []byte("sender=key=2")
require.NoError(t, txmp.CheckTx(ctx, tx2, nil, TxInfo{SenderID: 0}))

// Check that the new transaction replaced the old one
require.Equal(t, 1, txmp.Size())
}

func TestTxMempool_TxsAvailable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down

0 comments on commit 0673d5c

Please sign in to comment.