From 35e61fdded1c36189c6e1212d973b4ee780328c0 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 17 Jan 2024 17:30:34 -0500 Subject: [PATCH 1/5] add mempool prioritization with evm nonce --- abci/types/types.go | 5 +++++ internal/mempool/mempool.go | 11 +++++++---- internal/mempool/priority_queue.go | 6 ++++++ internal/mempool/tx.go | 9 +++++++++ 4 files changed, 27 insertions(+), 4 deletions(-) diff --git a/abci/types/types.go b/abci/types/types.go index b40865524..1ecb0a9f3 100644 --- a/abci/types/types.go +++ b/abci/types/types.go @@ -256,4 +256,9 @@ type ResponseCheckTxV2 struct { IsPendingTransaction bool Checker PendingTxChecker // must not be nil if IsPendingTransaction is true ExpireTxHandler ExpireTxHandler + + // helper properties for prioritization in mempool + EVMNonce uint64 + EVMSenderAddress string + IsEVM bool } diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index b76e1f007..7bccb5479 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -292,10 +292,13 @@ func (txmp *TxMempool) CheckTx( } wtx := &WrappedTx{ - tx: tx, - hash: txHash, - timestamp: time.Now().UTC(), - height: txmp.height, + tx: tx, + hash: txHash, + timestamp: time.Now().UTC(), + height: txmp.height, + evmNonce: res.EVMNonce, + evmAddress: res.EVMSenderAddress, + isEVM: res.IsEVM, expiredCallback: func(removeFromCache bool) { txmp.metrics.ExpiredTxs.Add(1) if removeFromCache { diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index ad3a347a3..41272406c 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -167,6 +167,12 @@ func (pq *TxPriorityQueue) Len() int { // Less implements the Heap interface. It returns true if the transaction at // position i in the queue is of less priority than the transaction at position j. func (pq *TxPriorityQueue) Less(i, j int) bool { + // If the txs have the same evm address, then only consider nonce for sorting + if pq.txs[i].HasSameEVMAddress(pq.txs[j]) { + // The transaction with the lower nonce must always come first + return pq.txs[i].evmNonce < pq.txs[j].evmNonce + } + // If there exists two transactions with the same priority, consider the one // that we saw the earliest as the higher priority transaction. if pq.txs[i].priority == pq.txs[j].priority { diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index be1d9290f..0e7c2629d 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -67,6 +67,15 @@ type WrappedTx struct { // this is the callback that can be called when a transaction expires expiredCallback func(removeFromCache bool) + + // evm properties that aid in prioritization + evmAddress string + evmNonce uint64 + isEVM bool +} + +func (wtx *WrappedTx) HasSameEVMAddress(other *WrappedTx) bool { + return wtx.isEVM && other.isEVM && wtx.evmAddress == other.evmAddress } func (wtx *WrappedTx) Size() int { From fa52fcbaabc3119924d1d0e2bfdfe36daf1a540d Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Wed, 17 Jan 2024 23:51:56 -0500 Subject: [PATCH 2/5] fix priority stability --- internal/mempool/mempool_test.go | 94 +++++++++++++++++- internal/mempool/priority_queue.go | 88 +++++++++++++---- internal/mempool/priority_queue_test.go | 122 ++++++++++++++++++++++++ internal/mempool/tx.go | 4 - 4 files changed, 286 insertions(+), 22 deletions(-) diff --git a/internal/mempool/mempool_test.go b/internal/mempool/mempool_test.go index d90a12277..dd4874417 100644 --- a/internal/mempool/mempool_test.go +++ b/internal/mempool/mempool_test.go @@ -43,6 +43,42 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a sender string ) + if strings.HasPrefix(string(req.Tx), "evm") { + // format is evm-sender-0=account=priority=nonce + // split into respective vars + parts := bytes.Split(req.Tx, []byte("=")) + sender = string(parts[0]) + account := string(parts[1]) + v, err := strconv.ParseInt(string(parts[2]), 10, 64) + if err != nil { + // could not parse + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: priority, + Code: 100, + GasWanted: 1, + }}, nil + } + nonce, err := strconv.ParseInt(string(parts[3]), 10, 64) + if err != nil { + // could not parse + return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: priority, + Code: 101, + GasWanted: 1, + }}, nil + } + return &abci.ResponseCheckTxV2{ + ResponseCheckTx: &abci.ResponseCheckTx{ + Priority: v, + Code: code.CodeTypeOK, + GasWanted: 1, + }, + EVMNonce: uint64(nonce), + EVMSenderAddress: account, + IsEVM: true, + }, nil + } + // infer the priority from the raw transaction value (sender=key=value) parts := bytes.Split(req.Tx, []byte("=")) if len(parts) == 3 { @@ -64,7 +100,6 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a GasWanted: 1, }}, nil } - return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{ Priority: priority, Sender: sender, @@ -412,6 +447,63 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) { require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0})) } +func TestTxMempool_Prioritization(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()}) + if err := client.Start(ctx); err != nil { + t.Fatal(err) + } + t.Cleanup(client.Wait) + + txmp := setup(t, client, 100) + peerID := uint16(1) + + address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA" + + // Generate transactions with different priorities + // there are two formats to comply with the above mocked CheckTX + // EVM: evm-sender=account=priority=nonce + // Non-EVM: sender=peer=priority + txs := [][]byte{ + []byte(fmt.Sprintf("sender-0-1=peer=%d", 9)), + []byte(fmt.Sprintf("sender-1-1=peer=%d", 8)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)), + []byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)), + []byte(fmt.Sprintf("sender-2-1=peer=%d", 5)), + []byte(fmt.Sprintf("sender-3-1=peer=%d", 4)), + } + + // copy the slice of txs and shuffle the order randomly + txsCopy := make([][]byte, len(txs)) + copy(txsCopy, txs) + rng := rand.New(rand.NewSource(time.Now().UnixNano())) + rng.Shuffle(len(txsCopy), func(i, j int) { + txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i] + }) + + for i := range txsCopy { + require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID})) + } + + // Reap the transactions + reapedTxs := txmp.ReapMaxTxs(len(txs)) + // Check if the reaped transactions are in the correct order of their priorities + for _, tx := range txs { + fmt.Printf("expected: %s\n", string(tx)) + } + fmt.Println("**************") + for _, reapedTx := range reapedTxs { + fmt.Printf("received: %s\n", string(reapedTx)) + } + for i, reapedTx := range reapedTxs { + require.Equal(t, txs[i], []byte(reapedTx)) + } +} + func TestTxMempool_CheckTxSamePeer(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 41272406c..85818dec8 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -12,13 +12,15 @@ var _ heap.Interface = (*TxPriorityQueue)(nil) // TxPriorityQueue defines a thread-safe priority queue for valid transactions. type TxPriorityQueue struct { - mtx sync.RWMutex - txs []*WrappedTx + mtx sync.RWMutex + txs []*WrappedTx + evmTxQueues map[string][]*WrappedTx // map[EVM address][]*WrappedTx } func NewTxPriorityQueue() *TxPriorityQueue { pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), + txs: make([]*WrappedTx, 0), + evmTxQueues: make(map[string][]*WrappedTx), } heap.Init(pq) @@ -91,20 +93,73 @@ func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() + pq.pushTxUnsafe(tx) +} + +func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { + if !tx.isEVM { + heap.Push(pq, tx) + return + } + + queue, exists := pq.evmTxQueues[tx.evmAddress] + if !exists || len(queue) == 0 { + pq.evmTxQueues[tx.evmAddress] = []*WrappedTx{tx} + heap.Push(pq, tx) + return + } + + var pushToHeap bool + if tx.evmNonce < queue[0].evmNonce { + // Remove the current lowest nonce transaction from the heap + heap.Remove(pq, queue[0].heapIndex) + pushToHeap = true + } + + queue = append(queue, tx) + sort.Slice(queue, func(i, j int) bool { + return queue[i].evmNonce < queue[j].evmNonce + }) + pq.evmTxQueues[tx.evmAddress] = queue + + if pushToHeap { + heap.Push(pq, queue[0]) + } - heap.Push(pq, tx) } // PopTx removes the top priority transaction from the queue. It is thread safe. func (pq *TxPriorityQueue) PopTx() *WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() + return pq.popTxUnsafe() +} +// popTxUnsafe this assumes a lock has been acquired +func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { x := heap.Pop(pq) if x != nil { - return x.(*WrappedTx) - } + wtx := x.(*WrappedTx) + // if not evm, return (no need to consider nonce) + if !wtx.isEVM { + return wtx + } + + // if evm, then we need to queue the next nonce if there is one + // this nonce will land in priority order + queue := pq.evmTxQueues[wtx.evmAddress] + if len(queue) > 0 { + queue = queue[1:] + if len(queue) > 0 { + heap.Push(pq, queue[0]) + pq.evmTxQueues[wtx.evmAddress] = queue + } else { + delete(pq.evmTxQueues, wtx.evmAddress) + } + } + return wtx + } return nil } @@ -114,22 +169,27 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { defer pq.mtx.Unlock() numTxs := len(pq.txs) + numQueued := 0 + for _, queue := range pq.evmTxQueues { + numQueued += len(queue) + } + if max < 0 { - max = numTxs + max = numTxs + numQueued } - cap := tmmath.MinInt(numTxs, max) + cap := tmmath.MinInt(numTxs+numQueued, max) res := make([]*WrappedTx, 0, cap) for i := 0; i < cap; i++ { - popped := heap.Pop(pq) + popped := pq.popTxUnsafe() if popped == nil { break } - res = append(res, popped.(*WrappedTx)) + res = append(res, popped) } for _, tx := range res { - heap.Push(pq, tx) + pq.pushTxUnsafe(tx) } return res } @@ -167,12 +227,6 @@ func (pq *TxPriorityQueue) Len() int { // Less implements the Heap interface. It returns true if the transaction at // position i in the queue is of less priority than the transaction at position j. func (pq *TxPriorityQueue) Less(i, j int) bool { - // If the txs have the same evm address, then only consider nonce for sorting - if pq.txs[i].HasSameEVMAddress(pq.txs[j]) { - // The transaction with the lower nonce must always come first - return pq.txs[i].evmNonce < pq.txs[j].evmNonce - } - // If there exists two transactions with the same priority, consider the one // that we saw the earliest as the higher priority transaction. if pq.txs[i].priority == pq.txs[j].priority { diff --git a/internal/mempool/priority_queue_test.go b/internal/mempool/priority_queue_test.go index ddc84806d..bd7dda126 100644 --- a/internal/mempool/priority_queue_test.go +++ b/internal/mempool/priority_queue_test.go @@ -1,6 +1,7 @@ package mempool import ( + "fmt" "math/rand" "sort" "sync" @@ -10,6 +11,127 @@ import ( "github.com/stretchr/testify/require" ) +// TxTestCase represents a single test case for the TxPriorityQueue +type TxTestCase struct { + name string + inputTxs []*WrappedTx // Input transactions + expectedOutput []int64 // Expected order of transaction IDs +} + +func TestTxPriorityQueue_ReapHalf(t *testing.T) { + pq := NewTxPriorityQueue() + + // Generate transactions with different priorities and nonces + txs := make([]*WrappedTx, 100) + for i := range txs { + txs[i] = &WrappedTx{ + tx: []byte(fmt.Sprintf("tx-%d", i)), + priority: int64(i), + } + + // Push the transaction + pq.PushTx(txs[i]) + } + + //reverse sort txs by priority + sort.Slice(txs, func(i, j int) bool { + return txs[i].priority > txs[j].priority + }) + + // Reap half of the transactions + reapedTxs := pq.PeekTxs(len(txs) / 2) + + // Check if the reaped transactions are in the correct order of their priorities and nonces + for i, reapedTx := range reapedTxs { + require.Equal(t, txs[i].priority, reapedTx.priority) + } +} + +func TestTxPriorityQueue_PriorityAndNonceOrdering(t *testing.T) { + testCases := []TxTestCase{ + { + name: "PriorityWithEVMAndNonEVM", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "2", isEVM: false, priority: 9}, + {sender: "4", isEVM: true, evmAddress: "0xabc", evmNonce: 0, priority: 9}, // Same EVM address as first, lower nonce + {sender: "5", isEVM: true, evmAddress: "0xdef", evmNonce: 1, priority: 7}, + {sender: "3", isEVM: true, evmAddress: "0xdef", evmNonce: 0, priority: 8}, + {sender: "6", isEVM: false, priority: 6}, + {sender: "7", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{2, 4, 1, 3, 5, 6, 7}, + }, + { + name: "IdenticalPrioritiesAndNoncesDifferentAddresses", + inputTxs: []*WrappedTx{ + {sender: "1", isEVM: true, evmAddress: "0xabc", evmNonce: 2, priority: 5}, + {sender: "2", isEVM: true, evmAddress: "0xdef", evmNonce: 2, priority: 5}, + {sender: "3", isEVM: true, evmAddress: "0xghi", evmNonce: 2, priority: 5}, + }, + expectedOutput: []int64{1, 2, 3}, + }, + { + name: "InterleavedEVAndNonEVMTransactions", + inputTxs: []*WrappedTx{ + {sender: "7", isEVM: false, priority: 15}, + {sender: "8", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 20}, + {sender: "9", isEVM: false, priority: 10}, + {sender: "10", isEVM: true, evmAddress: "0xdef", evmNonce: 2, priority: 20}, + }, + expectedOutput: []int64{8, 10, 7, 9}, + }, + { + name: "SameAddressPriorityDifferentNonces", + inputTxs: []*WrappedTx{ + {sender: "11", isEVM: true, evmAddress: "0xabc", evmNonce: 3, priority: 10}, + {sender: "12", isEVM: true, evmAddress: "0xabc", evmNonce: 1, priority: 10}, + {sender: "13", isEVM: true, evmAddress: "0xabc", evmNonce: 2, priority: 10}, + }, + expectedOutput: []int64{12, 13, 11}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + pq := NewTxPriorityQueue() + now := time.Now() + + // Add input transactions to the queue and set timestamp to order inserted + for i, tx := range tc.inputTxs { + tx.timestamp = now.Add(time.Duration(i) * time.Second) + pq.PushTx(tx) + } + + results := pq.PeekTxs(len(tc.inputTxs)) + // Validate the order of transactions + require.Len(t, results, len(tc.expectedOutput)) + for i, expectedTxID := range tc.expectedOutput { + tx := results[i] + require.Equal(t, fmt.Sprintf("%d", expectedTxID), tx.sender) + } + }) + } +} + +func TestTxPriorityQueue_SameAddressDifferentNonces(t *testing.T) { + pq := NewTxPriorityQueue() + address := "0x123" + + // Insert transactions with the same address but different nonces and priorities + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 2, priority: 10}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 1, priority: 5}) + pq.PushTx(&WrappedTx{isEVM: true, evmAddress: address, evmNonce: 3, priority: 15}) + + // Pop transactions and verify they are in the correct order of nonce + tx1 := pq.PopTx() + require.Equal(t, uint64(1), tx1.evmNonce) + tx2 := pq.PopTx() + require.Equal(t, uint64(2), tx2.evmNonce) + tx3 := pq.PopTx() + require.Equal(t, uint64(3), tx3.evmNonce) +} + func TestTxPriorityQueue(t *testing.T) { pq := NewTxPriorityQueue() numTxs := 1000 diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 0e7c2629d..411065db8 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -74,10 +74,6 @@ type WrappedTx struct { isEVM bool } -func (wtx *WrappedTx) HasSameEVMAddress(other *WrappedTx) bool { - return wtx.isEVM && other.isEVM && wtx.evmAddress == other.evmAddress -} - func (wtx *WrappedTx) Size() int { return len(wtx.tx) } From b6959148e70d11347efd35caf1b7466844e8de21 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 18 Jan 2024 01:07:37 -0500 Subject: [PATCH 3/5] index fixes --- internal/mempool/priority_queue.go | 142 ++++++++++++++++------------- 1 file changed, 81 insertions(+), 61 deletions(-) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 85818dec8..22751a4b7 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -12,15 +12,15 @@ var _ heap.Interface = (*TxPriorityQueue)(nil) // TxPriorityQueue defines a thread-safe priority queue for valid transactions. type TxPriorityQueue struct { - mtx sync.RWMutex - txs []*WrappedTx - evmTxQueues map[string][]*WrappedTx // map[EVM address][]*WrappedTx + mtx sync.RWMutex + txs []*WrappedTx + evmQueue map[string][]*WrappedTx } func NewTxPriorityQueue() *TxPriorityQueue { pq := &TxPriorityQueue{ - txs: make([]*WrappedTx, 0), - evmTxQueues: make(map[string][]*WrappedTx), + txs: make([]*WrappedTx, 0), + evmQueue: make(map[string][]*WrappedTx), } heap.Init(pq) @@ -70,13 +70,46 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6 return nil } +// requires read lock +func (pq *TxPriorityQueue) numQueuedUnsafe() int { + var result int + for _, queue := range pq.evmQueue { + result += len(queue) + } + // first items in queue are also in heap, subtract one + return result - len(pq.evmQueue) +} + // NumTxs returns the number of transactions in the priority queue. It is // thread safe. func (pq *TxPriorityQueue) NumTxs() int { pq.mtx.RLock() defer pq.mtx.RUnlock() - return len(pq.txs) + return len(pq.txs) + pq.numQueuedUnsafe() +} + +func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { + if queue, ok := pq.evmQueue[tx.evmAddress]; ok { + for i, t := range queue { + if t == tx { + pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) + if len(pq.evmQueue[tx.evmAddress]) == 0 { + delete(pq.evmQueue, tx.evmAddress) + } + break + } + } + } +} + +func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) { + for i, t := range pq.txs { + if t == tx { + return i, true + } + } + return 0, false } // RemoveTx removes a specific transaction from the priority queue. @@ -84,16 +117,13 @@ func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() - if tx.heapIndex < len(pq.txs) { - heap.Remove(pq, tx.heapIndex) + if idx, ok := pq.findTxIndexUnsafe(tx); ok { + heap.Remove(pq, idx) } -} -// PushTx adds a valid transaction to the priority queue. It is thread safe. -func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { - pq.mtx.Lock() - defer pq.mtx.Unlock() - pq.pushTxUnsafe(tx) + if tx.isEVM { + pq.removeQueuedEvmTxUnsafe(tx) + } } func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { @@ -102,65 +132,59 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { return } - queue, exists := pq.evmTxQueues[tx.evmAddress] - if !exists || len(queue) == 0 { - pq.evmTxQueues[tx.evmAddress] = []*WrappedTx{tx} + queue, exists := pq.evmQueue[tx.evmAddress] + if !exists { + pq.evmQueue[tx.evmAddress] = []*WrappedTx{tx} heap.Push(pq, tx) return } - var pushToHeap bool - if tx.evmNonce < queue[0].evmNonce { - // Remove the current lowest nonce transaction from the heap - heap.Remove(pq, queue[0].heapIndex) - pushToHeap = true + first := queue[0] + if tx.evmNonce < first.evmNonce { + if idx, ok := pq.findTxIndexUnsafe(first); ok { + heap.Remove(pq, idx) + } + heap.Push(pq, tx) } - queue = append(queue, tx) sort.Slice(queue, func(i, j int) bool { return queue[i].evmNonce < queue[j].evmNonce }) - pq.evmTxQueues[tx.evmAddress] = queue - - if pushToHeap { - heap.Push(pq, queue[0]) - } - + pq.evmQueue[tx.evmAddress] = queue } -// PopTx removes the top priority transaction from the queue. It is thread safe. -func (pq *TxPriorityQueue) PopTx() *WrappedTx { +// PushTx adds a valid transaction to the priority queue. It is thread safe. +func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) { pq.mtx.Lock() defer pq.mtx.Unlock() - return pq.popTxUnsafe() + pq.pushTxUnsafe(tx) } -// popTxUnsafe this assumes a lock has been acquired func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx { x := heap.Pop(pq) - if x != nil { - wtx := x.(*WrappedTx) + if x == nil { + return nil + } - // if not evm, return (no need to consider nonce) - if !wtx.isEVM { - return wtx - } + tx := x.(*WrappedTx) - // if evm, then we need to queue the next nonce if there is one - // this nonce will land in priority order - queue := pq.evmTxQueues[wtx.evmAddress] - if len(queue) > 0 { - queue = queue[1:] - if len(queue) > 0 { - heap.Push(pq, queue[0]) - pq.evmTxQueues[wtx.evmAddress] = queue - } else { - delete(pq.evmTxQueues, wtx.evmAddress) - } - } - return wtx + if !tx.isEVM { + return tx } - return nil + + pq.removeQueuedEvmTxUnsafe(tx) + if len(pq.evmQueue[tx.evmAddress]) > 0 { + heap.Push(pq, pq.evmQueue[tx.evmAddress][0]) + } + + return tx +} + +// PopTx removes the top priority transaction from the queue. It is thread safe. +func (pq *TxPriorityQueue) PopTx() *WrappedTx { + pq.mtx.Lock() + defer pq.mtx.Unlock() + return pq.popTxUnsafe() } // dequeue up to `max` transactions and reenqueue while locked @@ -168,23 +192,19 @@ func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx { pq.mtx.Lock() defer pq.mtx.Unlock() - numTxs := len(pq.txs) - numQueued := 0 - for _, queue := range pq.evmTxQueues { - numQueued += len(queue) - } - + numTxs := len(pq.txs) + pq.numQueuedUnsafe() if max < 0 { - max = numTxs + numQueued + max = numTxs } - cap := tmmath.MinInt(numTxs+numQueued, max) + cap := tmmath.MinInt(numTxs, max) res := make([]*WrappedTx, 0, cap) for i := 0; i < cap; i++ { popped := pq.popTxUnsafe() if popped == nil { break } + res = append(res, popped) } From c3238d8331236049b5709d74adc76a8a1bd85d27 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 18 Jan 2024 09:00:30 -0500 Subject: [PATCH 4/5] replace with binary search insert --- internal/mempool/priority_queue.go | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 22751a4b7..55215e60d 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -2,6 +2,7 @@ package mempool import ( "container/heap" + "slices" "sort" "sync" @@ -17,6 +18,25 @@ type TxPriorityQueue struct { evmQueue map[string][]*WrappedTx } +func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { + // Using BinarySearch to find the appropriate index to insert tx + i, _ := slices.BinarySearchFunc(queue, tx, func(a, b *WrappedTx) int { + if a.evmNonce < b.evmNonce { + return -1 + } + if a.evmNonce > b.evmNonce { + return 1 + } + return 0 + }) + + // Make room for new value and add it + queue = append(queue, nil) + copy(queue[i+1:], queue[i:]) + queue[i] = tx + return queue +} + func NewTxPriorityQueue() *TxPriorityQueue { pq := &TxPriorityQueue{ txs: make([]*WrappedTx, 0), @@ -92,7 +112,7 @@ func (pq *TxPriorityQueue) NumTxs() int { func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) { if queue, ok := pq.evmQueue[tx.evmAddress]; ok { for i, t := range queue { - if t == tx { + if t.evmNonce == tx.evmNonce { pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...) if len(pq.evmQueue[tx.evmAddress]) == 0 { delete(pq.evmQueue, tx.evmAddress) @@ -146,11 +166,8 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) { } heap.Push(pq, tx) } - queue = append(queue, tx) - sort.Slice(queue, func(i, j int) bool { - return queue[i].evmNonce < queue[j].evmNonce - }) - pq.evmQueue[tx.evmAddress] = queue + + pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx) } // PushTx adds a valid transaction to the priority queue. It is thread safe. From 5cb45fdc52622c2b6622b3b018ded12bd3561d10 Mon Sep 17 00:00:00 2001 From: Steven Landers Date: Thu, 18 Jan 2024 09:09:37 -0500 Subject: [PATCH 5/5] impl binary search --- internal/mempool/priority_queue.go | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/internal/mempool/priority_queue.go b/internal/mempool/priority_queue.go index 55215e60d..3886da0cf 100644 --- a/internal/mempool/priority_queue.go +++ b/internal/mempool/priority_queue.go @@ -2,7 +2,6 @@ package mempool import ( "container/heap" - "slices" "sort" "sync" @@ -20,15 +19,7 @@ type TxPriorityQueue struct { func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { // Using BinarySearch to find the appropriate index to insert tx - i, _ := slices.BinarySearchFunc(queue, tx, func(a, b *WrappedTx) int { - if a.evmNonce < b.evmNonce { - return -1 - } - if a.evmNonce > b.evmNonce { - return 1 - } - return 0 - }) + i := binarySearch(queue, tx) // Make room for new value and add it queue = append(queue, nil) @@ -37,6 +28,20 @@ func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx { return queue } +// binarySearch finds the index at which tx should be inserted in queue +func binarySearch(queue []*WrappedTx, tx *WrappedTx) int { + low, high := 0, len(queue) + for low < high { + mid := low + (high-low)/2 + if queue[mid].evmNonce < tx.evmNonce { + low = mid + 1 + } else { + high = mid + } + } + return low +} + func NewTxPriorityQueue() *TxPriorityQueue { pq := &TxPriorityQueue{ txs: make([]*WrappedTx, 0),