Skip to content

Commit

Permalink
undo multi-tx-changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenlanders committed Jan 25, 2024
1 parent 217570a commit 41b1b8f
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 389 deletions.
5 changes: 0 additions & 5 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,4 @@ type ResponseCheckTxV2 struct {
IsPendingTransaction bool
Checker PendingTxChecker // must not be nil if IsPendingTransaction is true
ExpireTxHandler ExpireTxHandler

// helper properties for prioritization in mempool
EVMNonce uint64
EVMSenderAddress string
IsEVM bool
}
19 changes: 8 additions & 11 deletions internal/mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,14 +292,11 @@ func (txmp *TxMempool) CheckTx(
}

wtx := &WrappedTx{
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
evmNonce: res.EVMNonce,
evmAddress: res.EVMSenderAddress,
isEVM: res.IsEVM,
removeHandler: func(removeFromCache bool) {
tx: tx,
hash: txHash,
timestamp: time.Now().UTC(),
height: txmp.height,
expiredCallback: func(removeFromCache bool) {
if removeFromCache {
txmp.cache.Remove(tx)
}
Expand Down Expand Up @@ -855,7 +852,7 @@ func (txmp *TxMempool) removeTx(wtx *WrappedTx, removeFromCache bool) {

atomic.AddInt64(&txmp.sizeBytes, int64(-wtx.Size()))

wtx.removeHandler(removeFromCache)
wtx.expiredCallback(removeFromCache)
}

// purgeExpiredTxs removes all transactions that have exceeded their respective
Expand Down Expand Up @@ -910,8 +907,8 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {

// remove pending txs that have expired
txmp.pendingTxs.PurgeExpired(txmp.config.TTLNumBlocks, blockHeight, txmp.config.TTLDuration, now, func(wtx *WrappedTx) {
txmp.metrics.ExpiredTxs.Add(1)
wtx.removeHandler(!txmp.config.KeepInvalidTxsInCache)
txmp.metrics.ExpiredTxs.Add(1)
wtx.expiredCallback(!txmp.config.KeepInvalidTxsInCache)
})
}

Expand Down
94 changes: 1 addition & 93 deletions internal/mempool/mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,42 +43,6 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
sender string
)

if strings.HasPrefix(string(req.Tx), "evm") {
// format is evm-sender-0=account=priority=nonce
// split into respective vars
parts := bytes.Split(req.Tx, []byte("="))
sender = string(parts[0])
account := string(parts[1])
v, err := strconv.ParseInt(string(parts[2]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 100,
GasWanted: 1,
}}, nil
}
nonce, err := strconv.ParseInt(string(parts[3]), 10, 64)
if err != nil {
// could not parse
return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Code: 101,
GasWanted: 1,
}}, nil
}
return &abci.ResponseCheckTxV2{
ResponseCheckTx: &abci.ResponseCheckTx{
Priority: v,
Code: code.CodeTypeOK,
GasWanted: 1,
},
EVMNonce: uint64(nonce),
EVMSenderAddress: account,
IsEVM: true,
}, nil
}

// infer the priority from the raw transaction value (sender=key=value)
parts := bytes.Split(req.Tx, []byte("="))
if len(parts) == 3 {
Expand All @@ -100,6 +64,7 @@ func (app *application) CheckTx(_ context.Context, req *abci.RequestCheckTx) (*a
GasWanted: 1,
}}, nil
}

return &abci.ResponseCheckTxV2{ResponseCheckTx: &abci.ResponseCheckTx{
Priority: priority,
Sender: sender,
Expand Down Expand Up @@ -447,63 +412,6 @@ func TestTxMempool_CheckTxExceedsMaxSize(t *testing.T) {
require.NoError(t, txmp.CheckTx(ctx, tx, nil, TxInfo{SenderID: 0}))
}

func TestTxMempool_Prioritization(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

client := abciclient.NewLocalClient(log.NewNopLogger(), &application{Application: kvstore.NewApplication()})
if err := client.Start(ctx); err != nil {
t.Fatal(err)
}
t.Cleanup(client.Wait)

txmp := setup(t, client, 100)
peerID := uint16(1)

address1 := "0xeD23B3A9DE15e92B9ef9540E587B3661E15A12fA"
address2 := "0xfD23B3A9DE15e92B9ef9540E587B3661E15A12fA"

// Generate transactions with different priorities
// there are two formats to comply with the above mocked CheckTX
// EVM: evm-sender=account=priority=nonce
// Non-EVM: sender=peer=priority
txs := [][]byte{
[]byte(fmt.Sprintf("sender-0-1=peer=%d", 9)),
[]byte(fmt.Sprintf("sender-1-1=peer=%d", 8)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 7, 0)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address1, 9, 1)),
[]byte(fmt.Sprintf("evm-sender=%s=%d=%d", address2, 6, 0)),
[]byte(fmt.Sprintf("sender-2-1=peer=%d", 5)),
[]byte(fmt.Sprintf("sender-3-1=peer=%d", 4)),
}

// copy the slice of txs and shuffle the order randomly
txsCopy := make([][]byte, len(txs))
copy(txsCopy, txs)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
rng.Shuffle(len(txsCopy), func(i, j int) {
txsCopy[i], txsCopy[j] = txsCopy[j], txsCopy[i]
})

for i := range txsCopy {
require.NoError(t, txmp.CheckTx(ctx, txsCopy[i], nil, TxInfo{SenderID: peerID}))
}

// Reap the transactions
reapedTxs := txmp.ReapMaxTxs(len(txs))
// Check if the reaped transactions are in the correct order of their priorities
for _, tx := range txs {
fmt.Printf("expected: %s\n", string(tx))
}
fmt.Println("**************")
for _, reapedTx := range reapedTxs {
fmt.Printf("received: %s\n", string(reapedTx))
}
for i, reapedTx := range reapedTxs {
require.Equal(t, txs[i], []byte(reapedTx))
}
}

func TestTxMempool_CheckTxSamePeer(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down
137 changes: 18 additions & 119 deletions internal/mempool/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,13 @@ var _ heap.Interface = (*TxPriorityQueue)(nil)

// TxPriorityQueue defines a thread-safe priority queue for valid transactions.
type TxPriorityQueue struct {
mtx sync.RWMutex
txs []*WrappedTx
evmQueue map[string][]*WrappedTx
}

func insertToEVMQueue(queue []*WrappedTx, tx *WrappedTx) []*WrappedTx {
// Using BinarySearch to find the appropriate index to insert tx
i := binarySearch(queue, tx)

// Make room for new value and add it
queue = append(queue, nil)
copy(queue[i+1:], queue[i:])
queue[i] = tx
return queue
}

// binarySearch finds the index at which tx should be inserted in queue
func binarySearch(queue []*WrappedTx, tx *WrappedTx) int {
low, high := 0, len(queue)
for low < high {
mid := low + (high-low)/2
if queue[mid].evmNonce < tx.evmNonce {
low = mid + 1
} else {
high = mid
}
}
return low
mtx sync.RWMutex
txs []*WrappedTx
}

func NewTxPriorityQueue() *TxPriorityQueue {
pq := &TxPriorityQueue{
txs: make([]*WrappedTx, 0),
evmQueue: make(map[string][]*WrappedTx),
txs: make([]*WrappedTx, 0),
}

heap.Init(pq)
Expand Down Expand Up @@ -95,142 +68,68 @@ func (pq *TxPriorityQueue) GetEvictableTxs(priority, txSize, totalSize, cap int6
return nil
}

// requires read lock
func (pq *TxPriorityQueue) numQueuedUnsafe() int {
var result int
for _, queue := range pq.evmQueue {
result += len(queue)
}
// first items in queue are also in heap, subtract one
return result - len(pq.evmQueue)
}

// NumTxs returns the number of transactions in the priority queue. It is
// thread safe.
func (pq *TxPriorityQueue) NumTxs() int {
pq.mtx.RLock()
defer pq.mtx.RUnlock()

return len(pq.txs) + pq.numQueuedUnsafe()
}

func (pq *TxPriorityQueue) removeQueuedEvmTxUnsafe(tx *WrappedTx) {
if queue, ok := pq.evmQueue[tx.evmAddress]; ok {
for i, t := range queue {
if t.evmNonce == tx.evmNonce {
pq.evmQueue[tx.evmAddress] = append(queue[:i], queue[i+1:]...)
if len(pq.evmQueue[tx.evmAddress]) == 0 {
delete(pq.evmQueue, tx.evmAddress)
} else {
heap.Push(pq, pq.evmQueue[tx.evmAddress][0])
}
break
}
}
}
}

func (pq *TxPriorityQueue) findTxIndexUnsafe(tx *WrappedTx) (int, bool) {
for i, t := range pq.txs {
if t == tx {
return i, true
}
}
return 0, false
return len(pq.txs)
}

// RemoveTx removes a specific transaction from the priority queue.
func (pq *TxPriorityQueue) RemoveTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()

if idx, ok := pq.findTxIndexUnsafe(tx); ok {
heap.Remove(pq, idx)
}

if tx.isEVM {
pq.removeQueuedEvmTxUnsafe(tx)
}
}

func (pq *TxPriorityQueue) pushTxUnsafe(tx *WrappedTx) {
if !tx.isEVM {
heap.Push(pq, tx)
return
}

queue, exists := pq.evmQueue[tx.evmAddress]
if !exists {
pq.evmQueue[tx.evmAddress] = []*WrappedTx{tx}
heap.Push(pq, tx)
return
}

first := queue[0]
if tx.evmNonce < first.evmNonce {
if idx, ok := pq.findTxIndexUnsafe(first); ok {
heap.Remove(pq, idx)
}
heap.Push(pq, tx)
if tx.heapIndex < len(pq.txs) {
heap.Remove(pq, tx.heapIndex)
}

pq.evmQueue[tx.evmAddress] = insertToEVMQueue(queue, tx)
}

// PushTx adds a valid transaction to the priority queue. It is thread safe.
func (pq *TxPriorityQueue) PushTx(tx *WrappedTx) {
pq.mtx.Lock()
defer pq.mtx.Unlock()
pq.pushTxUnsafe(tx)
}

func (pq *TxPriorityQueue) popTxUnsafe() *WrappedTx {
x := heap.Pop(pq)
if x == nil {
return nil
}

tx := x.(*WrappedTx)

if !tx.isEVM {
return tx
}

pq.removeQueuedEvmTxUnsafe(tx)

return tx
heap.Push(pq, tx)
}

// PopTx removes the top priority transaction from the queue. It is thread safe.
func (pq *TxPriorityQueue) PopTx() *WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()
return pq.popTxUnsafe()

x := heap.Pop(pq)
if x != nil {
return x.(*WrappedTx)
}

return nil
}

// dequeue up to `max` transactions and reenqueue while locked
func (pq *TxPriorityQueue) PeekTxs(max int) []*WrappedTx {
pq.mtx.Lock()
defer pq.mtx.Unlock()

numTxs := len(pq.txs) + pq.numQueuedUnsafe()
numTxs := len(pq.txs)
if max < 0 {
max = numTxs
}

cap := tmmath.MinInt(numTxs, max)
res := make([]*WrappedTx, 0, cap)
for i := 0; i < cap; i++ {
popped := pq.popTxUnsafe()
popped := heap.Pop(pq)
if popped == nil {
break
}

res = append(res, popped)
res = append(res, popped.(*WrappedTx))
}

for _, tx := range res {
pq.pushTxUnsafe(tx)
heap.Push(pq, tx)
}
return res
}
Expand Down
Loading

0 comments on commit 41b1b8f

Please sign in to comment.