Skip to content

Commit

Permalink
feat(mempool): abstract mempool state (#2027)
Browse files Browse the repository at this point in the history
  • Loading branch information
elintul authored Nov 18, 2024
1 parent a6f4987 commit 9912c0d
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 43 deletions.
149 changes: 111 additions & 38 deletions crates/starknet_mempool/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,89 @@ impl Default for MempoolConfig {

type AddressToNonce = HashMap<ContractAddress, Nonce>;

/// Represents the state tracked by the mempool.
/// It is partitioned into categories, each serving a distinct role in the lifecycle of transaction
/// management.
#[derive(Debug, Default)]
pub struct MempoolState {
/// Finalized nonces committed in blocks.
committed: AddressToNonce,
/// Provisionally incremented nonces during block creation.
staged: AddressToNonce,
/// Temporary information on accounts that haven't appeared in recent blocks,
/// nor proposed for sequencing.
tentative: AddressToNonce,
}

impl MempoolState {
fn get(&self, address: ContractAddress) -> Option<Nonce> {
self.staged
.get(&address)
.or_else(|| self.committed.get(&address))
.or_else(|| self.tentative.get(&address))
.copied()
}

fn get_or_insert(&mut self, address: ContractAddress, nonce: Nonce) -> Nonce {
self.get(address).unwrap_or_else(|| {
self.tentative.insert(address, nonce);
nonce
})
}

fn stage(&mut self, tx_reference: &TransactionReference) -> MempoolResult<()> {
let next_nonce = try_increment_nonce(tx_reference.nonce)?;
if let Some(existing_nonce) = self.staged.insert(tx_reference.address, next_nonce) {
assert_eq!(
try_increment_nonce(existing_nonce)?,
next_nonce,
"Staged nonce should be an increment of an existing nonce."
);
}

Ok(())
}

fn commit(&mut self, address_to_nonce: AddressToNonce) -> Vec<ContractAddress> {
let addresses_to_rewind: Vec<_> = self
.staged
.keys()
.filter(|&key| !address_to_nonce.contains_key(key))
.copied()
.collect();

self.tentative.retain(|address, _| !address_to_nonce.contains_key(address));
self.committed.extend(address_to_nonce);
self.staged.clear();

addresses_to_rewind
}

fn validate_incoming_tx(&self, tx_reference: TransactionReference) -> MempoolResult<()> {
let TransactionReference { address, nonce: tx_nonce, .. } = tx_reference;
if self.get(address).is_some_and(|existing_nonce| tx_nonce < existing_nonce) {
return Err(MempoolError::NonceTooOld { address, nonce: tx_nonce });
}

Ok(())
}

fn validate_commitment(&self, address: ContractAddress, next_nonce: Nonce) {
// FIXME: Remove after first POC.
// If commit_block wants to decrease the stored account nonce this can mean one of two
// things:
// 1. this is a reorg, which should be handled by a dedicated TBD mechanism and not inside
// commit_block
// 2. the stored nonce originated from add_tx, so should be treated as tentative due to
// possible races with the gateway; these types of nonces should be tagged somehow so
// that commit_block can override them. Regardless, in the first POC this cannot happen
// because the GW nonces are always 1.
if let Some(&committed_nonce) = self.committed.get(&address) {
assert!(committed_nonce <= next_nonce, "NOT SUPPORTED YET {address:?} {next_nonce:?}.")
}
}
}

#[derive(Debug, Default)]
pub struct Mempool {
config: MempoolConfig,
Expand All @@ -45,10 +128,7 @@ pub struct Mempool {
tx_pool: TransactionPool,
// Transactions eligible for sequencing.
tx_queue: TransactionQueue,
// Represents the state of the mempool during block creation.
mempool_state: AddressToNonce,
// The most recent account nonces received, for all account in the pool.
account_nonces: AddressToNonce,
state: MempoolState,
}

impl Mempool {
Expand All @@ -75,9 +155,8 @@ impl Mempool {
}

// Update the mempool state with the given transactions' nonces.
for tx_ref in &eligible_tx_references {
let next_nonce = try_increment_nonce(tx_ref.nonce)?;
self.mempool_state.insert(tx_ref.address, next_nonce);
for tx_reference in &eligible_tx_references {
self.state.stage(tx_reference)?;
}

tracing::debug!(
Expand All @@ -87,9 +166,9 @@ impl Mempool {

Ok(eligible_tx_references
.iter()
.map(|tx_ref| {
.map(|tx_reference| {
self.tx_pool
.get_by_tx_hash(tx_ref.tx_hash)
.get_by_tx_hash(tx_reference.tx_hash)
.expect("Transaction hash from queue must appear in pool.")
})
.cloned() // Soft-delete: return without deleting from mempool.
Expand All @@ -110,20 +189,17 @@ impl Mempool {
)]
pub fn add_tx(&mut self, args: AddTransactionArgs) -> MempoolResult<()> {
let AddTransactionArgs { tx, account_state } = args;
let tx_ref = TransactionReference::new(&tx);
self.validate_incoming_tx_nonce(tx_ref.address, tx_ref.nonce)?;
let tx_reference = TransactionReference::new(&tx);
self.validate_incoming_tx(tx_reference)?;

self.handle_fee_escalation(&tx)?;
self.tx_pool.insert(tx)?;

// Align to account nonce, only if it is at least the one stored.
let AccountState { address, nonce: incoming_account_nonce } = account_state;
// TODO(Elin): abstract mempool nonces.
let mempool_account_nonce = self.mempool_state.get(&address).unwrap_or_else(|| {
self.account_nonces.entry(address).or_insert(incoming_account_nonce)
});
if tx_ref.nonce == *mempool_account_nonce {
self.tx_queue.insert(tx_ref);
let mempool_account_nonce = self.state.get_or_insert(address, incoming_account_nonce);
if tx_reference.nonce == mempool_account_nonce {
self.tx_queue.insert(tx_reference);
}

Ok(())
Expand All @@ -138,7 +214,7 @@ impl Mempool {

// Align mempool data to committed nonces.
for (&address, &next_nonce) in &address_to_nonce {
self.validate_committed_nonce(address, next_nonce);
self.validate_commitment(address, next_nonce);

// Maybe remove out-of-date transactions.
if self
Expand All @@ -151,9 +227,6 @@ impl Mempool {

// Remove from pool.
self.tx_pool.remove_up_to_nonce(address, next_nonce);
// TODO(clean_account_nonces): remove address from nonce table after a block cycle /
// TTL.
self.account_nonces.insert(address, next_nonce);

// Maybe close nonce gap.
if self.tx_queue.get_nonce(address).is_none() {
Expand All @@ -164,12 +237,10 @@ impl Mempool {
}
}
}
tracing::debug!("Aligned mempool to committed nonces.");

// Rewind nonces of addresses that were not included in block.
let known_addresses_not_included_in_block =
self.mempool_state.keys().filter(|&key| !address_to_nonce.contains_key(key)).copied();
for address in known_addresses_not_included_in_block {
// Commit block and rewind nonces of addresses that were not included in block.
let addresses_to_rewind = self.state.commit(address_to_nonce);
for address in addresses_to_rewind {
// Account nonce is the minimal nonce of this address: it was proposed but not included.
let tx_reference = self
.tx_pool
Expand All @@ -180,40 +251,42 @@ impl Mempool {
self.tx_queue.insert(*tx_reference);
}

tracing::debug!("Aligned mempool to committed nonces.");

// Hard-delete: finally, remove committed transactions from the mempool.
for tx_hash in tx_hashes {
let Ok(_tx) = self.tx_pool.remove(tx_hash) else {
continue; // Transaction hash unknown to mempool, from a different node.
};

// TODO(clean_account_nonces): remove address from nonce table after a block cycle /
// TODO(clean_accounts): remove address with no transactions left after a block cycle /
// TTL.
}
tracing::debug!("Removed committed transactions known to mempool.");

// Commit: clear block creation staged state.
self.mempool_state.clear();
Ok(())
}

tracing::debug!("Successfully committed block to mempool.");
fn validate_incoming_tx(&self, tx_reference: TransactionReference) -> MempoolResult<()> {
self.state.validate_incoming_tx(tx_reference)
}

Ok(())
fn validate_commitment(&self, address: ContractAddress, next_nonce: Nonce) {
self.state.validate_commitment(address, next_nonce);
}

// TODO(Mohammad): Rename this method once consensus API is added.
pub fn update_gas_price_threshold(&mut self, threshold: GasPrice) {
self.tx_queue.update_gas_price_threshold(threshold);
}

fn validate_incoming_tx_nonce(
fn _validate_incoming_tx_nonce(
&self,
address: ContractAddress,
tx_nonce: Nonce,
) -> MempoolResult<()> {
// Check nonce against mempool state.
if self
.mempool_state
.get(&address)
.is_some_and(|&mempool_state_nonce| tx_nonce < mempool_state_nonce)
if self.state.get(address).is_some_and(|mempool_state_nonce| tx_nonce < mempool_state_nonce)
{
return Err(MempoolError::NonceTooOld { address, nonce: tx_nonce });
}
Expand All @@ -227,7 +300,7 @@ impl Mempool {
Ok(())
}

fn validate_committed_nonce(&self, address: ContractAddress, next_nonce: Nonce) {
fn _validate_committed_nonce(&self, address: ContractAddress, next_nonce: Nonce) {
// FIXME: Remove after first POC.
// If commit_block wants to decrease the stored account nonce this can mean one of two
// things:
Expand All @@ -237,7 +310,7 @@ impl Mempool {
// to possible races with the gateway; these types of nonces should be tagged somehow
// so that commit_block can override them. Regardless, in the first POC this cannot
// happen because the GW nonces are always 1.
if let Some(&stored_nonce) = self.account_nonces.get(&address) {
if let Some(stored_nonce) = self.state.get(address) {
assert!(stored_nonce <= next_nonce, "NOT SUPPORTED YET {address:?} {next_nonce:?}.")
}
}
Expand Down
7 changes: 3 additions & 4 deletions crates/starknet_mempool/src/mempool_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ impl From<MempoolContent> for Mempool {
.map(|content| content.complete_to_tx_queue())
.unwrap_or_default(),
// TODO: Add implementation when needed.
mempool_state: Default::default(),
account_nonces: Default::default(),
state: Default::default(),
}
}
}
Expand Down Expand Up @@ -168,7 +167,7 @@ fn add_txs_and_verify_no_replacement(

#[fixture]
fn mempool() -> Mempool {
Mempool::default()
MempoolContentBuilder::new().build_into_mempool()
}

// Tests.
Expand Down Expand Up @@ -376,7 +375,7 @@ fn test_add_tx_lower_than_queued_nonce(mut mempool: Mempool) {
add_tx_expect_error(
&mut mempool,
&invalid_input,
MempoolError::DuplicateNonce { address: contract_address!("0x0"), nonce: nonce!(0) },
MempoolError::NonceTooOld { address: contract_address!("0x0"), nonce: nonce!(0) },
);

let invalid_input = add_tx_input!(tx_hash: 2, address: "0x0", tx_nonce: 1, account_nonce: 0);
Expand Down
2 changes: 1 addition & 1 deletion crates/starknet_mempool/tests/flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ fn test_commit_block_fills_nonce_gap(mut mempool: Mempool) {
add_tx_expect_error(
&mut mempool,
&tx_nonce_4_account_nonce_4,
MempoolError::DuplicateNonce { address: contract_address!("0x0"), nonce: nonce!(4) },
MempoolError::NonceTooOld { address: contract_address!("0x0"), nonce: nonce!(4) },
);

get_txs_and_assert_expected(&mut mempool, 2, &[tx_nonce_5_account_nonce_3.tx]);
Expand Down

0 comments on commit 9912c0d

Please sign in to comment.