Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

take pruning logic out of create_transaction logic #11845

Merged
merged 17 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions common/txmgr/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ func NewSendEveryStrategy() txmgrtypes.TxStrategy {
type SendEveryStrategy struct{}

func (SendEveryStrategy) Subject() uuid.NullUUID { return uuid.NullUUID{} }
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (int64, error) {
return 0, nil
func (SendEveryStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) ([]int64, error) {
return nil, nil
}

var _ txmgrtypes.TxStrategy = DropOldestStrategy{}
Expand All @@ -56,14 +56,15 @@ func (s DropOldestStrategy) Subject() uuid.NullUUID {
return uuid.NullUUID{UUID: s.subject, Valid: true}
}

func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (n int64, err error) {
func (s DropOldestStrategy) PruneQueue(ctx context.Context, pruneService txmgrtypes.UnstartedTxQueuePruner) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.queryTimeout)
defer cancel()

n, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize, s.subject)
// NOTE: We prune one less than the queue size because we will be adding a new transaction to the queue right after this PruneQueue call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comments helps! But I'm not yet sure why we can't just use the queueSize. The prune is occurring before the create call right? This means that the new transaction isn't in the queue yet, so it should be safe to use s.queueSize as opposed to queueSize - 1 right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The concern that prashant brought up was that if we didnt do a -1 we could end up with a queue size +1 above the max queue size. Here is an example

  1. Queue is full at 500 txns
  2. PruneQueue is run... it sees 500txns which matches the max so it moves on without pruning
  3. CreateTransaction puts a new txn in the queue making the new queue size 501 which is technically above the max limit of 500

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be only off by 1 if we chose to stick with queueSize, but let me know if i am not fully understanding your idea

ids, err = pruneService.PruneUnstartedTxQueue(ctx, s.queueSize-1, s.subject)
if err != nil {
return 0, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
return ids, fmt.Errorf("DropOldestStrategy#PruneQueue failed: %w", err)
}
return
}
37 changes: 35 additions & 2 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) CreateTran
return tx, fmt.Errorf("Txm#CreateTransaction: %w", err)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, b.chainID)
tx, err = b.createTxnAndPruneQueue(ctx, txRequest, b.chainID)
if err != nil {
return tx, err
}
Expand Down Expand Up @@ -562,7 +562,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SendNative
FeeLimit: gasLimit,
Strategy: NewSendEveryStrategy(),
}
etx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
etx, err = b.createTxnAndPruneQueue(ctx, txRequest, chainID)
if err != nil {
return etx, fmt.Errorf("SendNativeToken failed to insert tx: %w", err)
}
Expand Down Expand Up @@ -682,3 +682,36 @@ func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Fin
func (n *NullTxManager[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) CountTransactionsByState(ctx context.Context, state txmgrtypes.TxState) (count uint32, err error) {
return count, errors.New(n.ErrMsg)
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) createTxnAndPruneQueue(
ctx context.Context,
txRequest txmgrtypes.TxRequest[ADDR, TX_HASH],
chainID CHAIN_ID,
) (
tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
err error,
) {
pruned, err := txRequest.Strategy.PruneQueue(ctx, b.txStore)
if err != nil {
return tx, err
}
if len(pruned) > 0 {
b.logger.Warnw(fmt.Sprintf("Pruned %d old unstarted transactions", len(pruned)),
"subject", txRequest.Strategy.Subject(),
"pruned-tx-ids", pruned,
)
}

tx, err = b.txStore.CreateTransaction(ctx, txRequest, chainID)
if err != nil {
return tx, err
}
b.logger.Debugw("Created transaction",
"fromAddress", txRequest.FromAddress,
"toAddress", txRequest.ToAddress,
"meta", txRequest.Meta,
"transactionID", tx.ID,
)

return tx, nil
}
12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 7 additions & 5 deletions common/txmgr/types/mocks/tx_strategy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion common/txmgr/types/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type TxStrategy interface {
// PruneQueue is called after tx insertion
// It accepts the service responsible for deleting
// unstarted txs and deletion options
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (n int64, err error)
PruneQueue(ctx context.Context, pruneService UnstartedTxQueuePruner) (ids []int64, err error)
}

type TxAttemptState int8
Expand Down
2 changes: 1 addition & 1 deletion common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ type TxHistoryReaper[CHAIN_ID types.ID] interface {
}

type UnstartedTxQueuePruner interface {
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error)
PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error)
}

// R is the raw unparsed transaction receipt
Expand Down
20 changes: 7 additions & 13 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1841,28 +1841,20 @@ RETURNING "txes".*
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to insert evm tx")
}
var pruned int64
pruned, err = txRequest.Strategy.PruneQueue(ctx, o)
if err != nil {
return pkgerrors.Wrap(err, "CreateEthTransaction failed to prune evm.txes")
}
if pruned > 0 {
o.logger.Warnw(fmt.Sprintf("Dropped %d old transactions from transaction queue", pruned), "fromAddress", txRequest.FromAddress, "toAddress", txRequest.ToAddress, "meta", txRequest.Meta, "subject", txRequest.Strategy.Subject(), "replacementID", dbEtx.ID)
}
return nil
})
var etx Tx
dbEtx.ToTx(&etx)
return etx, err
}

func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (n int64, err error) {
func (o *evmTxStore) PruneUnstartedTxQueue(ctx context.Context, queueSize uint32, subject uuid.UUID) (ids []int64, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
err = qq.Transaction(func(tx pg.Queryer) error {
res, err := qq.Exec(`
err := qq.Select(&ids, `
DELETE FROM evm.txes
WHERE state = 'unstarted' AND subject = $1 AND
id < (
Expand All @@ -1873,11 +1865,13 @@ id < (
ORDER BY id DESC
LIMIT $3
) numbers
)`, subject, subject, queueSize)
) RETURNING id`, subject, subject, queueSize)
if err != nil {
return pkgerrors.Wrap(err, "DeleteUnstartedEthTx failed")
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return fmt.Errorf("PruneUnstartedTxQueue failed: %w", err)
}
n, err = res.RowsAffected()
return err
})
return
Expand Down
11 changes: 4 additions & 7 deletions core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"gopkg.in/guregu/null.v4"
)
Expand Down Expand Up @@ -1723,7 +1722,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1780,7 +1778,6 @@ func TestORM_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
etx, err := txStore.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
Expand Down Expand Up @@ -1816,22 +1813,22 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

t.Run("does not prune if queue has not exceeded capacity", func(t *testing.T) {
t.Run("does not prune if queue has not exceeded capacity-1", func(t *testing.T) {
subject1 := uuid.New()
strategy1 := txmgrcommon.NewDropOldestStrategy(subject1, uint32(5), cfg.Database().DefaultQueryTimeout())
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy1))
}
AssertCountPerSubject(t, txStore, int64(5), subject1)
AssertCountPerSubject(t, txStore, int64(4), subject1)
})

t.Run("prunes if queue has exceeded capacity", func(t *testing.T) {
t.Run("prunes if queue has exceeded capacity-1", func(t *testing.T) {
subject2 := uuid.New()
strategy2 := txmgrcommon.NewDropOldestStrategy(subject2, uint32(3), cfg.Database().DefaultQueryTimeout())
for i := 0; i < 5; i++ {
mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, &cltest.FixtureChainID, txRequestWithStrategy(strategy2))
}
AssertCountPerSubject(t, txStore, int64(3), subject2)
AssertCountPerSubject(t, txStore, int64(2), subject2)
})
}

Expand Down
12 changes: 7 additions & 5 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions core/chains/evm/txmgr/strategies_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func Test_SendEveryStrategy(t *testing.T) {

assert.Equal(t, uuid.NullUUID{}, s.Subject())

n, err := s.PruneQueue(testutils.Context(t), nil)
ids, err := s.PruneQueue(testutils.Context(t), nil)
assert.NoError(t, err)
assert.Equal(t, int64(0), n)
assert.Len(t, ids, 0)
}

func Test_DropOldestStrategy_Subject(t *testing.T) {
Expand All @@ -47,9 +47,9 @@ func Test_DropOldestStrategy_PruneQueue(t *testing.T) {

t.Run("calls PrineUnstartedTxQueue for the given subject and queueSize, ignoring fromAddress", func(t *testing.T) {
strategy1 := txmgrcommon.NewDropOldestStrategy(subject, queueSize, queryTimeout)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize, subject, mock.Anything, mock.Anything).Once().Return(int64(2), nil)
n, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
mockTxStore.On("PruneUnstartedTxQueue", mock.Anything, queueSize-1, subject, mock.Anything, mock.Anything).Once().Return([]int64{1, 2}, nil)
ids, err := strategy1.PruneQueue(testutils.Context(t), mockTxStore)
require.NoError(t, err)
assert.Equal(t, int64(2), n)
assert.Equal(t, []int64{1, 2}, ids)
})
}
12 changes: 8 additions & 4 deletions core/chains/evm/txmgr/txmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func TestTxm_CreateTransaction(t *testing.T) {
subject := uuid.New()
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)
evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: fromAddress,
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, otherKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -430,7 +430,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
mustInsertUnconfirmedEthTxWithInsufficientEthAttempt(t, txStore, 0, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: evmFromAddress,
Expand All @@ -451,7 +451,7 @@ func TestTxm_CreateTransaction_OutOfEth(t *testing.T) {
cltest.MustInsertConfirmedEthTxWithLegacyAttempt(t, txStore, 0, 42, thisKey.Address)
strategy := newMockTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{})
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.Anything).Return(nil, nil)

evmConfig.MaxQueued = uint64(1)
etx, err := txm.CreateTransaction(testutils.Context(t), txmgr.TxRequest{
Expand Down Expand Up @@ -799,6 +799,10 @@ func mustCreateUnstartedTx(t testing.TB, txStore txmgr.EvmTxStore, fromAddress c
func mustCreateUnstartedTxFromEvmTxRequest(t testing.TB, txStore txmgr.EvmTxStore, txRequest txmgr.TxRequest, chainID *big.Int) (tx txmgr.Tx) {
tx, err := txStore.CreateTransaction(testutils.Context(t), txRequest, chainID)
require.NoError(t, err)

_, err = txRequest.Strategy.PruneQueue(testutils.Context(t), txStore)
require.NoError(t, err)

return tx
}

Expand Down
2 changes: 1 addition & 1 deletion core/web/eth_keys_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func TestETHKeysController_ChainSuccess_ResetWithAbandon(t *testing.T) {
subject := uuid.New()
strategy := commontxmmocks.NewTxStrategy(t)
strategy.On("Subject").Return(uuid.NullUUID{UUID: subject, Valid: true})
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(int64(0), nil)
strategy.On("PruneQueue", mock.Anything, mock.AnythingOfType("*txmgr.evmTxStore")).Return(nil, nil)
_, err := chain.TxManager().CreateTransaction(testutils.Context(t), txmgr.TxRequest{
FromAddress: addr,
ToAddress: testutils.NewAddress(),
Expand Down
Loading