diff --git a/abci/types/types.go b/abci/types/types.go index b40865524..1ecb0a9f3 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -256,4 +256,9 @@ type ResponseCheckTxV2 struct { IsPendingTransaction bool Checker PendingTxChecker // must not be nil if IsPendingTransaction is true ExpireTxHandler ExpireTxHandler + + // helper properties for prioritization in mempool + EVMNonce uint64 + EVMSenderAddress string + IsEVM bool } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b76e1f007..7bccb5479 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -292,10 +292,13 @@ func (txmp *TxMempool) CheckTx( } wtx := &WrappedTx{ - tx: tx, - hash: txHash, - timestamp: time.Now().UTC(), - height: txmp.height, + tx: tx, + hash: txHash, + timestamp: time.Now().UTC(), + height: txmp.height, + evmNonce: res.EVMNonce, + evmAddress: res.EVMSenderAddress, + isEVM: res.IsEVM, expiredCallback: func(removeFromCache bool) { txmp.metrics.ExpiredTxs.Add(1) if removeFromCache { diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index d90a12277..dd4874417 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -43,6 +43,42 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a sender string ) + if strings.HasPrefix(string(req.Tx), "evm") { + // format is evm-sender-0=account=priority=nonce + // split into respective vars + parts := bytes.Split(req.Tx, []byte("=")) + sender = string(parts[0]) + account := string(parts[1]) + v, err := strconv.ParseInt(string(parts[2]), 10, 64) + if err != nil { + // could not parse + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: priority, + Code: 100, + GasWanted: 1, + }}, nil + } + nonce, err := strconv.ParseInt(string(parts[3]), 10, 64) + if err != nil { + // could not parse + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: priority, + Code: 101, + GasWanted: 1, + }}, nil + } + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: v, + Code: code.CodeTypeOK, + GasWanted: 1, + }, + EVMNonce: uint64(nonce), + EVMSenderAddress: account, + IsEVM: true, + }, nil + } + // infer the priority from the raw transaction value (sender=key=value) parts := bytes.Split(req.Tx, []byte("=")) if len(parts) == 3 { @@ -64,7 +100,6 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a GasWanted: 1, }}, nil } - return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Sender: sender, @@ -412,6 +447,63 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0})) } +func TestTxMempool_Prioritization(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 100) + peerID := uint16(1) + + address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + + // Generate transactions with different priorities + // there are two formats to comply with the above mocked CheckTX + // EVM: evm-sender=account=priority=nonce + // Non-EVM: sender=peer=priority + txs := [][]byte{ + []byte(fmt.Sprintf("sender-0-1=peer=%d", 9)), + []byte(fmt.Sprintf("sender-1-1=peer=%d", 8)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)), + []byte(fmt.Sprintf("sender-2-1=peer=%d", 5)), + []byte(fmt.Sprintf("sender-3-1=peer=%d", 4)), + } + + // copy the slice of txs and shuffle the order randomly + txsCopy := make([][]byte, len(txs)) + copy(txsCopy, txs) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + rng.Shuffle(len(txsCopy), func(i, j int) { + txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i] + }) + + for i := range txsCopy { + require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID})) + } + + // Reap the transactions + reapedTxs := txmp.ReapMaxTxs(len(txs)) + // Check if the reaped transactions are in the correct order of their priorities + for _, tx := range txs { + fmt.Printf("expected: %s\n", string(tx)) + } + fmt.Println("**************") + for _, reapedTx := range reapedTxs { + fmt.Printf("received: %s\n", string(reapedTx)) + } + for i, reapedTx := range reapedTxs { + require.Equal(t, txs[i], []byte(reapedTx)) + } +} + func TestTxMempool_CheckTxSamePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index ad3a347a3..3886da0cf 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -12,13 +12,40 @@ var _ heap.Interface = (*TxPriorityQueue)(nil) // TxPriorityQueue defines a thread-safe priority queue for valid transactions. type TxPriorityQueue struct { - mtx sync.RWMutex - txs []*WrappedTx + mtx sync.RWMutex + txs []*WrappedTx + evmQueue map[string][]*WrappedTx +} + +func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { + // Using BinarySearch to find the appropriate index to insert tx + i := binarySearch(queue, tx) + + // Make room for new value and add it + queue = append(queue, nil) + copy(queue[i+1:], queue[i:]) + queue[i] = tx + return queue +} + +// binarySearch finds the index at which tx should be inserted in queue +func binarySearch(queue []*WrappedTx, tx *WrappedTx) int { + low, high := 0, len(queue) + for low < high { + mid := low + (high-low)/2 + if queue[mid].evmNonce < tx.evmNonce { + low = mid + 1 + } else { + high = mid + } + } + return low } func NewTxPriorityQueue() *TxPriorityQueue { pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), + txs: make([]*WrappedTx, 0), + evmQueue: make(map[string][]*WrappedTx), } heap.Init(pq) @@ -68,13 +95,46 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6 return nil } +// requires read lock +func (pq *TxPriorityQueue) numQueuedUnsafe() int { + var result int + for _, queue := range pq.evmQueue { + result += len(queue) + } + // first items in queue are also in heap, subtract one + return result - len(pq.evmQueue) +} + // NumTxs returns the number of transactions in the priority queue. It is // thread safe. func (pq *TxPriorityQueue) NumTxs() int { pq.mtx.RLock() defer pq.mtx.RUnlock() - return len(pq.txs) + return len(pq.txs) + pq.numQueuedUnsafe() +} + +func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { + if queue, ok := pq.evmQueue[tx.evmAddress]; ok { + for i, t := range queue { + if t.evmNonce == tx.evmNonce { + pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) + if len(pq.evmQueue[tx.evmAddress]) == 0 { + delete(pq.evmQueue, tx.evmAddress) + } + break + } + } + } +} + +func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { + for i, t := range pq.txs { + if t == tx { + return i, true + } + } + return 0, false } // RemoveTx removes a specific transaction from the priority queue. @@ -82,30 +142,71 @@ func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) + if idx, ok := pq.findTxIndexUnsafe(tx); ok { + heap.Remove(pq, idx) + } + + if tx.isEVM { + pq.removeQueuedEvmTxUnsafe(tx) } } +func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { + if !tx.isEVM { + heap.Push(pq, tx) + return + } + + queue, exists := pq.evmQueue[tx.evmAddress] + if !exists { + pq.evmQueue[tx.evmAddress] = []*WrappedTx{tx} + heap.Push(pq, tx) + return + } + + first := queue[0] + if tx.evmNonce < first.evmNonce { + if idx, ok := pq.findTxIndexUnsafe(first); ok { + heap.Remove(pq, idx) + } + heap.Push(pq, tx) + } + + pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx) +} + // PushTx adds a valid transaction to the priority queue. It is thread safe. func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() + pq.pushTxUnsafe(tx) +} + +func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { + x := heap.Pop(pq) + if x == nil { + return nil + } - heap.Push(pq, tx) + tx := x.(*WrappedTx) + + if !tx.isEVM { + return tx + } + + pq.removeQueuedEvmTxUnsafe(tx) + if len(pq.evmQueue[tx.evmAddress]) > 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } + + return tx } // PopTx removes the top priority transaction from the queue. It is thread safe. func (pq *TxPriorityQueue) PopTx() *WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() - - x := heap.Pop(pq) - if x != nil { - return x.(*WrappedTx) - } - - return nil + return pq.popTxUnsafe() } // dequeue up to `max` transactions and reenqueue while locked @@ -113,7 +214,7 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() - numTxs := len(pq.txs) + numTxs := len(pq.txs) + pq.numQueuedUnsafe() if max < 0 { max = numTxs } @@ -121,15 +222,16 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { cap := tmmath.MinInt(numTxs, max) res := make([]*WrappedTx, 0, cap) for i := 0; i < cap; i++ { - popped := heap.Pop(pq) + popped := pq.popTxUnsafe() if popped == nil { break } - res = append(res, popped.(*WrappedTx)) + + res = append(res, popped) } for _, tx := range res { - heap.Push(pq, tx) + pq.pushTxUnsafe(tx) } return res } diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index ddc84806d..bd7dda126 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -1,6 +1,7 @@ package mempool import ( + "fmt" "math/rand" "sort" "sync" @@ -10,6 +11,127 @@ import ( "github.com/stretchr/testify/require" ) +// TxTestCase represents a single test case for the TxPriorityQueue +type TxTestCase struct { + name string + inputTxs []*WrappedTx // Input transactions + expectedOutput []int64 // Expected order of transaction IDs +} + +func TestTxPriorityQueue_ReapHalf(t *testing.T) { + pq := NewTxPriorityQueue() + + // Generate transactions with different priorities and nonces + txs := make([]*WrappedTx, 100) + for i := range txs { + txs[i] = &WrappedTx{ + tx: []byte(fmt.Sprintf("tx-%d", i)), + priority: int64(i), + } + + // Push the transaction + pq.PushTx(txs[i]) + } + + //reverse sort txs by priority + sort.Slice(txs, func(i, j int) bool { + return txs[i].priority > txs[j].priority + }) + + // Reap half of the transactions + reapedTxs := pq.PeekTxs(len(txs) / 2) + + // Check if the reaped transactions are in the correct order of their priorities and nonces + for i, reapedTx := range reapedTxs { + require.Equal(t, txs[i].priority, reapedTx.priority) + } +} + +func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { + testCases := []TxTestCase{ + { + name: "PriorityWithEVMAndNonEVM", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "2", isEVM: false, priority: 9}, + {sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8}, + {sender: "6", isEVM: false, priority: 6}, + {sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{2, 4, 1, 3, 5, 6, 7}, + }, + { + name: "IdenticalPrioritiesAndNoncesDifferentAddresses", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 2, priority: 5}, + {sender: "2", isEVM: true, evmAddress: "0xdef", evmNonce: 2, priority: 5}, + {sender: "3", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{1, 2, 3}, + }, + { + name: "InterleavedEVAndNonEVMTransactions", + inputTxs: []*WrappedTx{ + {sender: "7", isEVM: false, priority: 15}, + {sender: "8", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 20}, + {sender: "9", isEVM: false, priority: 10}, + {sender: "10", isEVM: true, evmAddress: "0xdef", evmNonce: 2, priority: 20}, + }, + expectedOutput: []int64{8, 10, 7, 9}, + }, + { + name: "SameAddressPriorityDifferentNonces", + inputTxs: []*WrappedTx{ + {sender: "11", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 10}, + {sender: "12", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "13", isEVM: true, evmAddress: "0xabc", evmNonce: 2, priority: 10}, + }, + expectedOutput: []int64{12, 13, 11}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pq := NewTxPriorityQueue() + now := time.Now() + + // Add input transactions to the queue and set timestamp to order inserted + for i, tx := range tc.inputTxs { + tx.timestamp = now.Add(time.Duration(i) * time.Second) + pq.PushTx(tx) + } + + results := pq.PeekTxs(len(tc.inputTxs)) + // Validate the order of transactions + require.Len(t, results, len(tc.expectedOutput)) + for i, expectedTxID := range tc.expectedOutput { + tx := results[i] + require.Equal(t, fmt.Sprintf("%d", expectedTxID), tx.sender) + } + }) + } +} + +func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) { + pq := NewTxPriorityQueue() + address := "0x123" + + // Insert transactions with the same address but different nonces and priorities + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15}) + + // Pop transactions and verify they are in the correct order of nonce + tx1 := pq.PopTx() + require.Equal(t, uint64(1), tx1.evmNonce) + tx2 := pq.PopTx() + require.Equal(t, uint64(2), tx2.evmNonce) + tx3 := pq.PopTx() + require.Equal(t, uint64(3), tx3.evmNonce) +} + func TestTxPriorityQueue(t *testing.T) { pq := NewTxPriorityQueue() numTxs := 1000 diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index be1d9290f..411065db8 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -67,6 +67,11 @@ type WrappedTx struct { // this is the callback that can be called when a transaction expires expiredCallback func(removeFromCache bool) + + // evm properties that aid in prioritization + evmAddress string + evmNonce uint64 + isEVM bool } func (wtx *WrappedTx) Size() int {