Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

eval: split TestTransactionGroup from BlockEvaluator using TransactionGroupTester #5818

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
30 changes: 24 additions & 6 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
"github.com/algorand/go-algorand/data/transactions"
"github.com/algorand/go-algorand/data/transactions/logic"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/ledger/eval"
"github.com/algorand/go-algorand/ledger/ledgercore"
"github.com/algorand/go-algorand/logging"
"github.com/algorand/go-algorand/logging/telemetryspec"
Expand Down Expand Up @@ -97,6 +98,9 @@
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool

txnGroupTester *eval.TransactionGroupTester
txnGroupTesterMu deadlock.RWMutex

// shutdown is set to true when the pool is being shut down. It is checked in exported methods
// to prevent pool operations like remember and recomputing the block evaluator
// from using down stream resources like ledger that may be shutting down.
Expand All @@ -105,7 +109,6 @@

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
type BlockEvaluator interface {
TestTransactionGroup(txgroup []transactions.SignedTxn) error
Round() basics.Round
PaySetSize() int
TransactionGroup(txads []transactions.SignedTxnWithAD) error
Expand Down Expand Up @@ -390,19 +393,20 @@

// Test performs basic duplicate detection and well-formedness checks
// on a transaction group without storing the group.
// It may be called concurrently.
func (pool *TransactionPool) Test(txgroup []transactions.SignedTxn) error {
if err := pool.checkPendingQueueSize(txgroup); err != nil {
return err
}

pool.mu.Lock()
defer pool.mu.Unlock()
pool.txnGroupTesterMu.RLock()
defer pool.txnGroupTesterMu.RUnlock()

Check warning on line 403 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L402-L403

Added lines #L402 - L403 were not covered by tests

if pool.pendingBlockEvaluator == nil {
return fmt.Errorf("Test: pendingBlockEvaluator is nil")
if pool.txnGroupTester == nil {
return fmt.Errorf("Test: txnGroupTester is nil")

Check warning on line 406 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L405-L406

Added lines #L405 - L406 were not covered by tests
}

return pool.pendingBlockEvaluator.TestTransactionGroup(txgroup)
return pool.txnGroupTester.TestTransactionGroup(txgroup)

Check warning on line 409 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L409

Added line #L409 was not covered by tests
}

type poolIngestParams struct {
Expand Down Expand Up @@ -749,6 +753,20 @@
return
}

nextProto := config.Consensus[next.CurrentProtocol]
pool.txnGroupTesterMu.Lock()
pool.txnGroupTester = eval.NewTransactionGroupTester(
nextProto,
transactions.SpecialAddresses{
FeeSink: next.FeeSink,
RewardsPool: next.RewardsPool,
},
next,
func(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error {
return pool.ledger.CheckDup(nextProto, next.BlockHeader.Round, firstValid, lastValid, txid, txl)
})

Check warning on line 767 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L766-L767

Added lines #L766 - L767 were not covered by tests
pool.txnGroupTesterMu.Unlock()

var asmStats telemetryspec.AssembleBlockMetrics
asmStats.StartCount = len(txgroups)
asmStats.StopReason = telemetryspec.AssembleBlockEmpty
Expand Down
3 changes: 2 additions & 1 deletion data/txHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ func (handler *TxHandler) Start() {
},
})

handler.backlogWg.Add(2)
handler.backlogWg.Add(3)
go handler.backlogWorker()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 2 backlog workers vs say 4?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no particular thought on the best number, and often wonder what the best way to manage worker numbers across many different places in the code base that might have some static choice.

I do wonder if all the static choice ought to be written as a fraction of the total cores available, rather than an actual numeric constant,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left this here as a stand-in but maybe I should have it be configurable, default 1 or 2

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any useful thing to do here in the short term except pick a random small number like 2 or 3, so I vote for resolving.

go handler.backlogWorker()
go handler.backlogGaugeThread()
handler.streamVerifier.Start(handler.ctx)
Expand Down
75 changes: 53 additions & 22 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,21 +2204,46 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
handler.postProcessCheckedTxn(&wi)
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxnDead))

txn1 := transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[0],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: poolAddr,
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
makeTxn := func() transactions.Transaction {
return transactions.Transaction{
Type: protocol.PaymentTx,
Header: transactions.Header{
Sender: addresses[0],
Fee: basics.MicroAlgos{Raw: proto.MinTxnFee * 2},
FirstValid: 0,
LastValid: basics.Round(proto.MaxTxnLife),
GenesisHash: genesisHash,
},
PaymentTxnFields: transactions.PaymentTxnFields{
Receiver: poolAddr,
Amount: basics.MicroAlgos{Raw: mockBalancesMinBalance + (rand.Uint64() % 10000)},
},
}
}
txn1 := makeTxn()

// add a round 1 block with oldTxn1 and oldTxn2 in it (for txID and lease checking later)
oldTxn1 := makeTxn()
oldTxn2 := oldTxn1
crypto.RandBytes(oldTxn2.Lease[:])
prev, err := ledger.BlockHdr(ledger.Latest())
require.NoError(t, err)
next := bookkeeping.MakeBlock(prev)
blockEval, err := ledger.StartEvaluator(next.BlockHeader, 0, 0, nil)
require.NoError(t, err)
err = blockEval.Transaction(oldTxn1.Sign(secrets[0]), transactions.ApplyData{})
require.NoError(t, err)
err = blockEval.Transaction(oldTxn2.Sign(secrets[0]), transactions.ApplyData{})
require.NoError(t, err)

// simulate this transaction was applied
ufblk, err := blockEval.GenerateBlock(nil)
require.NoError(t, err)
block := ledgercore.MakeValidatedBlock(ufblk.UnfinishedBlock(), ufblk.UnfinishedDeltas())
err = ledger.AddValidatedBlock(block, agreement.Certificate{})
require.NoError(t, err)

// trigger hitting pending queue max
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
for i := 0; i <= cfg.TxPoolSize; i++ {
txn := txn1
Expand Down Expand Up @@ -2288,24 +2313,30 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
// trigger TransactionInLedgerError (txid) error
wi.unverifiedTxGroup = []transactions.SignedTxn{txn1.Sign(secrets[0])}
wi.rawmsg = &network.IncomingMessage{}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember
handler.postProcessCheckedTxn(&wi) // calls Remember again
require.Equal(t, 1, getMetricCounter(txPoolRememberTagTxIDEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxIDEval))
// check transaction committed in round 1 (calls Ledger.CheckDup)
wi.unverifiedTxGroup = []transactions.SignedTxn{oldTxn1.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi) // calls Test
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagTxID))

// trigger LeaseInLedgerError (lease) error
txn2 = txn1
crypto.RandBytes(txn2.Lease[:])
txn3 := txn2
txn3.Receiver = addr
wi.unverifiedTxGroup = []transactions.SignedTxn{txn2.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember
wi.unverifiedTxGroup = []transactions.SignedTxn{txn3.Sign(secrets[0])}
handler.postProcessCheckedTxn(&wi)
handler.postProcessCheckedTxn(&wi) // calls Remember again
require.Equal(t, 1, getMetricCounter(txPoolRememberTagLeaseEval))
handler.checkAlreadyCommitted(&wi)
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLeaseEval))
// check transaction lease conflict with round 1 txn (calls Ledger.CheckDup)
oldTxn3 := oldTxn2
oldTxn3.Receiver = addr
wi.unverifiedTxGroup = []transactions.SignedTxn{oldTxn3.Sign(secrets[0])}
handler.checkAlreadyCommitted(&wi) // calls Test
require.Equal(t, 1, getCheckMetricCounter(txPoolRememberTagLease))

// TODO: not sure how to trigger fee error - need to return ErrNoSpace from ledger
// trigger pool fee error
Expand All @@ -2323,7 +2354,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
ledger.RegisterBlockListeners(blockListeners)

// add few blocks: on ci sometimes blockTicker is not fired in time in case of a single block
for i := basics.Round(1); i <= 3; i++ {
for i := basics.Round(2); i <= 4; i++ {
hdr := bookkeeping.BlockHeader{
Round: i,
UpgradeState: bookkeeping.UpgradeState{
Expand Down
34 changes: 30 additions & 4 deletions ledger/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -904,10 +904,34 @@ func (eval *BlockEvaluator) ResetTxnBytes() {
eval.blockTxBytes = 0
}

// TestTransactionGroup is only called by tests.
func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
return NewTransactionGroupTester(eval.proto, eval.specials, eval.block, eval.state.checkDup).TestTransactionGroup(txgroup)
}

// TransactionGroupTester performs basic transaction checks for well-formedness and duplicate detection.
type TransactionGroupTester struct {
proto config.ConsensusParams
specials transactions.SpecialAddresses
txnContext transactions.TxnContext
checkDup func(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error
}

// NewTransactionGroupTester creates a new TransactionGroupTester for use in calling TestTransactionGroup.
func NewTransactionGroupTester(proto config.ConsensusParams, specials transactions.SpecialAddresses, txnContext transactions.TxnContext, checkDup func(firstValid, lastValid basics.Round, txid transactions.Txid, txl ledgercore.Txlease) error) *TransactionGroupTester {
return &TransactionGroupTester{
proto: proto,
specials: specials,
txnContext: txnContext,
checkDup: checkDup,
}
}

// TestTransactionGroup performs basic duplicate detection and well-formedness checks
// on a transaction group, but does not actually add the transactions to the block
// evaluator, or modify the block evaluator state in any other visible way.
func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
// It uses a TestEvalContext to access needed recent ledger state.
func (eval TransactionGroupTester) TestTransactionGroup(txgroup []transactions.SignedTxn) error {
// Nothing to do if there are no transactions.
if len(txgroup) == 0 {
return nil
Expand Down Expand Up @@ -966,9 +990,9 @@ func (eval *BlockEvaluator) TestTransactionGroup(txgroup []transactions.SignedTx
// TestTransaction performs basic duplicate detection and well-formedness checks
// on a single transaction, but does not actually add the transaction to the block
// evaluator, or modify the block evaluator state in any other visible way.
func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {
func (eval TransactionGroupTester) TestTransaction(txn transactions.SignedTxn) error {
// Transaction valid (not expired)?
err := txn.Txn.Alive(eval.block)
err := txn.Txn.Alive(eval.txnContext)
if err != nil {
return err
}
Expand All @@ -981,7 +1005,8 @@ func (eval *BlockEvaluator) TestTransaction(txn transactions.SignedTxn) error {

// Transaction already in the ledger?
txid := txn.ID()
err = eval.state.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
// BlockEvaluator.transaction will check again using cow.checkDup later, if the pool tries to add this transaction to the block.
err = eval.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
if err != nil {
return err
}
Expand Down Expand Up @@ -1163,6 +1188,7 @@ func (eval *BlockEvaluator) transaction(txn transactions.SignedTxn, evalParams *
}

// Transaction already in the ledger?
// this checks against the txns added to this evaluator; testTransaction currently only checks against committed txns.
err = cow.checkDup(txn.Txn.First(), txn.Txn.Last(), txid, ledgercore.Txlease{Sender: txn.Txn.Sender, Lease: txn.Txn.Lease})
if err != nil {
return err
Expand Down
Loading