Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TXM In-memory: address_state methods: step 2-02 #12162

Closed
wants to merge 6 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 61 additions & 6 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package txmgr

import (
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -166,24 +167,60 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FetchT

// PruneUnstartedTxQueue removes the transactions with the given IDs from the unstarted transaction queue.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PruneUnstartedTxQueue(ids []int64) {
as.Lock()
defer as.Unlock()

txs := as.unstartedTxs.PruneByTxIDs(ids)
as.deleteTxs(txs...)
}

// DeleteTxs removes the transactions with the given IDs from the address state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) DeleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
as.Lock()
defer as.Unlock()

as.deleteTxs(txs...)
}

// PeekNextUnstartedTx returns the next unstarted transaction in the queue without removing it from the unstarted queue.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
// If there are no unstarted transactions, nil is returned.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekNextUnstartedTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
as.RLock()
defer as.RUnlock()

return as.unstartedTxs.PeekNextTx()
}

// PeekInProgressTx returns the in-progress transaction without removing it from the in-progress state.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() (*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) {
return nil, nil
// If there is no in-progress transaction, nil is returned.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) PeekInProgressTx() *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
as.RLock()
defer as.RUnlock()

return as.inprogressTx
}

// AddTxToUnstarted adds the given transaction to the unstarted queue.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstarted(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
// AddTxToUnstartedQueue adds the given transaction to the unstarted queue.
// If the queue is full, an error is returned.
// If the transaction was successfully added, nil is returned.
// If the transaction's idempotency key already exists, an error is returned.
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AddTxToUnstartedQueue(tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
as.Lock()
defer as.Unlock()

if tx.IdempotencyKey != nil && as.idempotencyKeyToTx[*tx.IdempotencyKey] != nil {
return fmt.Errorf("add_tx_to_unstarted_queue: address %s idempotency key %s already exists", as.fromAddress, *tx.IdempotencyKey)
}
if as.unstartedTxs.Len() >= as.unstartedTxs.Cap() {
return fmt.Errorf("add_tx_to_unstarted_queue: address %s unstarted queue capacity has been reached", as.fromAddress)
}

as.unstartedTxs.AddTx(tx)
as.allTxs[tx.ID] = tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}

return nil
}

Expand Down Expand Up @@ -252,3 +289,21 @@ func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveIn
func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MoveConfirmedToUnconfirmed(attempt txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
return nil
}

func (as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) deleteTxs(txs ...txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) {
for _, tx := range txs {
if tx.IdempotencyKey != nil {
delete(as.idempotencyKeyToTx, *tx.IdempotencyKey)
}
txID := tx.ID
if as.inprogressTx != nil && as.inprogressTx.ID == txID {
as.inprogressTx = nil
}
delete(as.allTxs, txID)
delete(as.unconfirmedTxs, txID)
delete(as.confirmedMissingReceiptTxs, txID)
delete(as.confirmedTxs, txID)
delete(as.fatalErroredTxs, txID)
as.unstartedTxs.RemoveTxByID(txID)
}
}
Loading