diff --git a/internal/mempool/mempool.go b/internal/mempool/mempool.go index d2b84910c..d38fe9e5c 100644 --- a/internal/mempool/mempool.go +++ b/internal/mempool/mempool.go @@ -177,7 +177,7 @@ func (txmp *TxMempool) Unlock() { // Size returns the number of valid transactions in the mempool. It is // thread-safe. func (txmp *TxMempool) Size() int { - return txmp.txStore.Size() + return txmp.txStore.Size() + txmp.pendingTxs.Size() } // SizeBytes return the total sum in bytes of all the valid transactions in the @@ -437,6 +437,13 @@ func (txmp *TxMempool) ReapMaxTxs(max int) types.Txs { for _, wtx := range wTxs { txs = append(txs, wtx.tx) } + if len(txs) < max { + // retrieve more from pending txs + pending := txmp.pendingTxs.Peek(max - len(txs)) + for _, ptx := range pending { + txs = append(txs, ptx.tx.tx) + } + } return txs } diff --git a/internal/mempool/tx.go b/internal/mempool/tx.go index 9570935eb..e5fd7b16f 100644 --- a/internal/mempool/tx.go +++ b/internal/mempool/tx.go @@ -294,7 +294,7 @@ func (wtl *WrappedTxList) Remove(wtx *WrappedTx) { } type PendingTxs struct { - mtx *sync.Mutex + mtx *sync.RWMutex txs []PendingTxInfo } @@ -306,7 +306,7 @@ type PendingTxInfo struct { func NewPendingTxs() *PendingTxs { return &PendingTxs{ - mtx: &sync.Mutex{}, + mtx: &sync.RWMutex{}, txs: []PendingTxInfo{}, } } @@ -356,3 +356,19 @@ func (p *PendingTxs) Insert(tx *WrappedTx, resCheckTx *abci.ResponseCheckTxV2, t txInfo: txInfo, }) } + +func (p *PendingTxs) Peek(max int) []PendingTxInfo { + p.mtx.RLock() + defer p.mtx.RUnlock() + // priority is fifo + if max > len(p.txs) { + return p.txs + } + return p.txs[:max] +} + +func (p *PendingTxs) Size() int { + p.mtx.RLock() + defer p.mtx.RUnlock() + return len(p.txs) +} diff --git a/internal/rpc/core/mempool.go b/internal/rpc/core/mempool.go index 917e00b47..12fa2d339 100644 --- a/internal/rpc/core/mempool.go +++ b/internal/rpc/core/mempool.go @@ -149,6 +149,7 @@ func (env *Environment) BroadcastTxCommit(ctx context.Context, req *coretypes.Re // More: https://docs.tendermint.com/master/rpc/#/Info/unconfirmed_txs func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.RequestUnconfirmedTxs) (*coretypes.ResultUnconfirmedTxs, error) { totalCount := env.Mempool.Size() + fmt.Printf("EVMTEST mempool size: %d\n", totalCount) perPage := env.validatePerPage(req.PerPage.IntPtr()) page, err := validatePage(req.Page.IntPtr(), perPage, totalCount) if err != nil { @@ -160,6 +161,7 @@ func (env *Environment) UnconfirmedTxs(ctx context.Context, req *coretypes.Reque txs := env.Mempool.ReapMaxTxs(skipCount + tmmath.MinInt(perPage, totalCount-skipCount)) result := txs[skipCount:] + fmt.Printf("EVMTEST unconfirmed tx result count: %d, skipping %d\n", len(result), skipCount) return &coretypes.ResultUnconfirmedTxs{ Count: len(result), Total: totalCount,