Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenlanders committed Jan 26, 2024
1 parent 71abf88 commit 304fb81
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 138 deletions.
2 changes: 0 additions & 2 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,6 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, res *abci.ResponseCheck
"num_txs", txmp.Size(),
)
txmp.notifyTxsAvailable()
} else {
fmt.Println("DEBUG: ******************************** NOT INSERTING, ALREADY EXISTS")
}

return nil
Expand Down
230 changes: 94 additions & 136 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package mempool

import (
"container/heap"
"fmt"
"github.com/tendermint/tendermint/types"
"sort"
"sync"

Expand All @@ -15,9 +13,8 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)
// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx // priority heap
evmQueue map[string][]*WrappedTx // sorted by nonce
keys map[types.TxKey]struct{} // unique keys in the queue
txs []*WrappedTx // priority heap
evmQueue map[string][]*WrappedTx // sorted by nonce
}

func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx, i int) []*WrappedTx {
Expand All @@ -33,7 +30,7 @@ func binarySearch(queue []*WrappedTx, tx *WrappedTx) int {
low, high := 0, len(queue)
for low < high {
mid := low + (high-low)/2
if queue[mid].evmNonce < tx.evmNonce || (queue[mid].evmNonce == tx.evmNonce && queue[mid].timestamp.Before(tx.timestamp)) {
if queue[mid].IsBefore(tx) {
low = mid + 1
} else {
high = mid
Expand All @@ -46,22 +43,13 @@ func NewTxPriorityQueue() *TxPriorityQueue {
pq := &TxPriorityQueue{
txs: make([]*WrappedTx, 0),
evmQueue: make(map[string][]*WrappedTx),
keys: make(map[types.TxKey]struct{}),
}

heap.Init(pq)

return pq
}

func (pq *TxPriorityQueue) lock() {
pq.mtx.Lock()
}

func (pq *TxPriorityQueue) unlock() {
pq.mtx.Unlock()
}

// GetEvictableTxs attempts to find and return a list of *WrappedTx than can be
// evicted to make room for another *WrappedTx with higher priority. If no such
// list of *WrappedTx exists, nil will be returned. The returned list of *WrappedTx
Expand Down Expand Up @@ -149,12 +137,7 @@ func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
pq.mtx.Lock()
pq.checkInvariants("RemoveTx start")
defer func() {
delete(pq.keys, tx.tx.Key())
pq.checkInvariants("RemoveTx end")
pq.mtx.Unlock()
}()
defer pq.mtx.Unlock()

if idx, ok := pq.findTxIndexUnsafe(tx); ok {
heap.Remove(pq, idx)
Expand All @@ -170,11 +153,6 @@ func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
}

func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
if _, ok := pq.keys[tx.tx.Key()]; ok {
return
}
//pq.keys[tx.tx.Key()] = struct{}{}

if !tx.isEVM {
heap.Push(pq, tx)
return
Expand All @@ -188,124 +166,111 @@ func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
}

first := queue[0]
if tx.evmNonce < first.evmNonce || (tx.evmNonce == first.evmNonce && tx.timestamp.Before(first.timestamp)) {
if tx.IsBefore(first) {
if idx, ok := pq.findTxIndexUnsafe(first); ok {
heap.Remove(pq, idx)
}
heap.Push(pq, tx)
}

pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx, binarySearch(queue, tx))

}

// These are available if we need to test the invariant checks
// these can be used to troubleshoot invariant violations
func (pq *TxPriorityQueue) checkInvariants(msg string) {
//uniqHashes := make(map[string]bool)
//for idx, tx := range pq.txs {
// if tx == nil {
// pq.print()
// panic(fmt.Sprintf("DEBUG PRINT: found nil item on heap: idx=%d\n", idx))
// }
// if tx.tx == nil {
// pq.print()
// panic(fmt.Sprintf("DEBUG PRINT: found nil tx.tx on heap: idx=%d\n", idx))
// }
// if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key()))
// }
// uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true
//
// //if _, ok := pq.keys[tx.tx.Key()]; !ok {
// // pq.print()
// // panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in keys hash=%x", msg, tx.tx.Key()))
// //}
//
// if tx.isEVM {
// if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
// if queue[0].tx.Key() != tx.tx.Key() {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not at front of evmQueue hash=%x", msg, tx.tx.Key()))
// }
// } else {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in evmQueue hash=%x", msg, tx.tx.Key()))
// }
// }
//}
//
//// each item in all queues should be unique nonce
//for _, queue := range pq.evmQueue {
// hashes := make(map[string]bool)
// for idx, tx := range queue {
// if idx == 0 {
// _, ok := pq.findTxIndexUnsafe(tx)
// if !ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce))
// }
// }
// //if _, ok := pq.keys[tx.tx.Key()]; !ok {
// // pq.print()
// // panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in keys hash=%x", msg, tx.tx.Key()))
// //}
// if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce))
// }
// hashes[fmt.Sprintf("%x", tx.tx.Key())] = true
// }
//}
}
//func (pq *TxPriorityQueue) checkInvariants(msg string) {
// uniqHashes := make(map[string]bool)
// for idx, tx := range pq.txs {
// if tx == nil {
// pq.print()
// panic(fmt.Sprintf("DEBUG PRINT: found nil item on heap: idx=%d\n", idx))
// }
// if tx.tx == nil {
// pq.print()
// panic(fmt.Sprintf("DEBUG PRINT: found nil tx.tx on heap: idx=%d\n", idx))
// }
// if _, ok := uniqHashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in heap", msg, tx.tx.Key()))
// }
// uniqHashes[fmt.Sprintf("%x", tx.tx.Key())] = true
//
// //if _, ok := pq.keys[tx.tx.Key()]; !ok {
// // pq.print()
// // panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in keys hash=%x", msg, tx.tx.Key()))
// //}
//
// if tx.isEVM {
// if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
// if queue[0].tx.Key() != tx.tx.Key() {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not at front of evmQueue hash=%x", msg, tx.tx.Key()))
// }
// } else {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in evmQueue hash=%x", msg, tx.tx.Key()))
// }
// }
// }
//
// // each item in all queues should be unique nonce
// for _, queue := range pq.evmQueue {
// hashes := make(map[string]bool)
// for idx, tx := range queue {
// if idx == 0 {
// _, ok := pq.findTxIndexUnsafe(tx)
// if !ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): did not find tx[0] hash=%x nonce=%d in heap", msg, tx.tx.Key(), tx.evmNonce))
// }
// }
// //if _, ok := pq.keys[tx.tx.Key()]; !ok {
// // pq.print()
// // panic(fmt.Sprintf("INVARIANT (%s): tx in heap but not in keys hash=%x", msg, tx.tx.Key()))
// //}
// if _, ok := hashes[fmt.Sprintf("%x", tx.tx.Key())]; ok {
// pq.print()
// panic(fmt.Sprintf("INVARIANT (%s): duplicate hash=%x in queue nonce=%d", msg, tx.tx.Key(), tx.evmNonce))
// }
// hashes[fmt.Sprintf("%x", tx.tx.Key())] = true
// }
// }
//}

// for debugging situations where invariant violations occur
func (pq *TxPriorityQueue) print() {
fmt.Println("PRINT PRIORITY QUEUE ****************** ")
for _, tx := range pq.txs {
if tx == nil {
fmt.Printf("DEBUG PRINT: heap (nil): nonce=?, hash=?\n")
continue
}
if tx.tx == nil {
fmt.Printf("DEBUG PRINT: heap (%s): nonce=%d, tx.tx is nil \n", tx.evmAddress, tx.evmNonce)
continue
}
fmt.Printf("DEBUG PRINT: heap (%s): nonce=%d, hash=%x, time=%d\n", tx.evmAddress, tx.evmNonce, tx.tx.Key(), tx.timestamp.UnixNano())
}

for addr, queue := range pq.evmQueue {
for idx, tx := range queue {
if tx == nil {
fmt.Printf("DEBUG PRINT: found nil item on evmQueue(%s): idx=%d\n", addr, idx)
continue
}
if tx.tx == nil {
fmt.Printf("DEBUG PRINT: found nil tx.tx on evmQueue(%s): idx=%d\n", addr, idx)
continue
}

fmt.Printf("DEBUG PRINT: evmQueue(%s)[%d]: nonce=%d, hash=%x, time=%d\n", tx.evmAddress, idx, tx.evmNonce, tx.tx.Key(), tx.timestamp.UnixNano())
}
}
}
//func (pq *TxPriorityQueue) print() {
// fmt.Println("PRINT PRIORITY QUEUE ****************** ")
// for _, tx := range pq.txs {
// if tx == nil {
// fmt.Printf("DEBUG PRINT: heap (nil): nonce=?, hash=?\n")
// continue
// }
// if tx.tx == nil {
// fmt.Printf("DEBUG PRINT: heap (%s): nonce=%d, tx.tx is nil \n", tx.evmAddress, tx.evmNonce)
// continue
// }
// fmt.Printf("DEBUG PRINT: heap (%s): nonce=%d, hash=%x, time=%d\n", tx.evmAddress, tx.evmNonce, tx.tx.Key(), tx.timestamp.UnixNano())
// }
//
// for addr, queue := range pq.evmQueue {
// for idx, tx := range queue {
// if tx == nil {
// fmt.Printf("DEBUG PRINT: found nil item on evmQueue(%s): idx=%d\n", addr, idx)
// continue
// }
// if tx.tx == nil {
// fmt.Printf("DEBUG PRINT: found nil tx.tx on evmQueue(%s): idx=%d\n", addr, idx)
// continue
// }
//
// fmt.Printf("DEBUG PRINT: evmQueue(%s)[%d]: nonce=%d, hash=%x, time=%d\n", tx.evmAddress, idx, tx.evmNonce, tx.tx.Key(), tx.timestamp.UnixNano())
// }
// }
//}

// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
pq.checkInvariants("PushTx start")
defer func() {
pq.checkInvariants("PushTx end")
pq.mtx.Unlock()
}()

if tx == nil {
panic("PushTx: nil tx")
}
if tx.tx == nil {
panic("PushTx: nil tx.tx")
}
defer pq.mtx.Unlock()

pq.pushTxUnsafe(tx)
}
Expand All @@ -321,8 +286,6 @@ func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {

tx := x.(*WrappedTx)

defer delete(pq.keys, tx.tx.Key())

if !tx.isEVM {
return tx
}
Expand All @@ -338,12 +301,7 @@ func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {
// PopTx removes the top priority transaction from the queue. It is thread safe.
func (pq *TxPriorityQueue) PopTx() *WrappedTx {
pq.mtx.Lock()

pq.checkInvariants("PopTx start")
defer func() {
pq.checkInvariants("PopTx end")
pq.mtx.Unlock()
}()
defer pq.mtx.Unlock()

return pq.popTxUnsafe()
}
Expand Down

0 comments on commit 304fb81

Please sign in to comment.