Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: size limits implemented #6262

Merged
merged 14 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions prdoc/pr_6262.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
title: "Size limits implemented for fork aware transaction pool"

doc:
- audience: Node Dev
description: |
Size limits are now obeyed in fork aware transaction pool

crates:
- name: sc-transaction-pool
bump: minor
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,7 @@ where
AddView(BlockHash<C>, ViewStream<C>),
/// Removes an existing view's stream associated with a specific block hash.
RemoveView(BlockHash<C>),
/// Adds initial views for given extrinsics hashes.
///
/// This message should be sent when the external submission of a transaction occures. It
/// provides the list of initial views for given extrinsics hashes.
/// The dropped notification is not sent if it comes from the initial views. It allows to keep
/// transaction in the mempool, even if all the views are full at the time of submitting
/// transaction to the pool.
AddInitialViews(Vec<ExtrinsicHash<C>>, BlockHash<C>),
/// Removes all initial views for given extrinsic hashes.
/// Removes internal states for given extrinsic hashes.
///
/// Intended to ba called on finalization.
RemoveFinalizedTxs(Vec<ExtrinsicHash<C>>),
Expand All @@ -90,7 +82,6 @@ where
match self {
Command::AddView(..) => write!(f, "AddView"),
Command::RemoveView(..) => write!(f, "RemoveView"),
Command::AddInitialViews(..) => write!(f, "AddInitialViews"),
Command::RemoveFinalizedTxs(..) => write!(f, "RemoveFinalizedTxs"),
}
}
Expand Down Expand Up @@ -118,13 +109,6 @@ where
///
/// Once transaction is dropped, dropping view is removed from the set.
transaction_states: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,

/// The list of initial view for every extrinsic.
///
/// Dropped notifications from initial views will be silenced. This allows to accept the
/// transaction into the mempool, even if all the views are full at the time of submitting new
/// transaction.
initial_views: HashMap<ExtrinsicHash<C>, HashSet<BlockHash<C>>>,
}

impl<C> MultiViewDropWatcherContext<C>
Expand Down Expand Up @@ -164,15 +148,7 @@ where
.iter()
.all(|h| !self.stream_map.contains_key(h))
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
{
return self
.initial_views
.get(&tx_hash)
.map(|list| !list.contains(&block_hash))
.unwrap_or(true)
.then(|| {
debug!("[{:?}] dropped_watcher: removing tx", tx_hash);
tx_hash
})
return Some(tx_hash)
}
} else {
debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash);
Expand Down Expand Up @@ -201,7 +177,6 @@ where
stream_map: StreamMap::new(),
command_receiver,
transaction_states: Default::default(),
initial_views: Default::default(),
};

let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
Expand All @@ -217,17 +192,13 @@ where
Command::RemoveView(key) => {
trace!(target: LOG_TARGET,"dropped_watcher: Command::RemoveView {key:?} views:{:?}",ctx.stream_map.keys().collect::<Vec<_>>());
ctx.stream_map.remove(&key);
},
Command::AddInitialViews(xts,block_hash) => {
log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: xt initial view added {block_hash:?}");
xts.into_iter().for_each(|xt| {
ctx.initial_views.entry(xt).or_default().insert(block_hash);
ctx.transaction_states.iter_mut().for_each(|(_,state)| {
state.remove(&key);
});
},
Command::RemoveFinalizedTxs(xts) => {
log_xt_trace!(target: LOG_TARGET, xts.clone(), "[{:?}] dropped_watcher: finalized xt removed");
xts.iter().for_each(|xt| {
ctx.initial_views.remove(xt);
ctx.transaction_states.remove(xt);
});

Expand Down Expand Up @@ -291,34 +262,13 @@ where
});
}

/// Adds the initial view for the given transactions hashes.
///
/// This message should be called when the external submission of a transaction occures. It
/// provides the list of initial views for given extrinsics hashes.
///
/// The dropped notification is not sent if it comes from the initial views. It allows to keep
/// transaction in the mempool, even if all the views are full at the time of submitting
/// transaction to the pool.
pub fn add_initial_views(
&self,
xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone,
block_hash: BlockHash<C>,
) {
let _ = self
.controller
.unbounded_send(Command::AddInitialViews(xts.into_iter().collect(), block_hash))
.map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: add_initial_views_ send message failed: {e}");
});
}

/// Removes all initial views for finalized transactions.
/// Removes status info for finalized transactions.
pub fn remove_finalized_txs(&self, xts: impl IntoIterator<Item = ExtrinsicHash<C>> + Clone) {
let _ = self
.controller
.unbounded_send(Command::RemoveFinalizedTxs(xts.into_iter().collect()))
.map_err(|e| {
trace!(target: LOG_TARGET, "dropped_watcher: remove_initial_views send message failed: {e}");
trace!(target: LOG_TARGET, "dropped_watcher: remove_finalized_txs send message failed: {e}");
});
}
}
Expand Down Expand Up @@ -471,63 +421,4 @@ mod dropped_watcher_tests {
let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![tx_hash]);
}

#[tokio::test]
async fn test06() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
assert!(output_stream.next().now_or_never().is_none());

let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0b);

let view_stream0 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
assert!(output_stream.next().now_or_never().is_none());

let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();

watcher.add_view(block_hash1, view_stream1);
watcher.add_initial_views(vec![tx_hash], block_hash1);
assert!(output_stream.next().now_or_never().is_none());
}

#[tokio::test]
async fn test07() {
sp_tracing::try_init_simple();
let (watcher, mut output_stream) = MultiViewDroppedWatcher::new();
assert!(output_stream.next().now_or_never().is_none());

let block_hash0 = H256::repeat_byte(0x01);
let block_hash1 = H256::repeat_byte(0x02);
let tx_hash = H256::repeat_byte(0x0b);

let view_stream0 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Future),
(tx_hash, TransactionStatus::InBlock((block_hash1, 0))),
])
.boxed();
watcher.add_view(block_hash0, view_stream0);
watcher.add_initial_views(vec![tx_hash], block_hash0);
assert!(output_stream.next().now_or_never().is_none());

let view_stream1 = futures::stream::iter(vec![
(tx_hash, TransactionStatus::Ready),
(tx_hash, TransactionStatus::Dropped),
])
.boxed();
watcher.add_view(block_hash1, view_stream1);

let handle = tokio::spawn(async move { output_stream.take(1).collect::<Vec<_>>().await });
assert_eq!(handle.await.unwrap(), vec![tx_hash]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use futures::{
use parking_lot::Mutex;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_transaction_pool_api::{
error::{Error, IntoPoolError},
ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolFuture, PoolStatus,
TransactionFor, TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
};
Expand Down Expand Up @@ -193,6 +192,7 @@ where
listener.clone(),
Default::default(),
mempool_max_transactions_count,
ready_limits.total_bytes + future_limits.total_bytes,
));

let (dropped_stream_controller, dropped_stream) =
Expand Down Expand Up @@ -283,6 +283,7 @@ where
listener.clone(),
metrics.clone(),
TXMEMPOOL_TRANSACTION_LIMIT_MULTIPLIER * (options.ready.count + options.future.count),
options.ready.total_bytes + options.future.total_bytes,
));

let (dropped_stream_controller, dropped_stream) =
Expand Down Expand Up @@ -599,48 +600,37 @@ where
log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count());
log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at");
let xts = xts.into_iter().map(Arc::from).collect::<Vec<_>>();
let mempool_result = self.mempool.extend_unwatched(source, &xts);
let mempool_results = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return future::ready(Ok(mempool_result)).boxed()
return future::ready(Ok(mempool_results)).boxed()
}

let (hashes, to_be_submitted): (Vec<TxHash<Self>>, Vec<ExtrinsicFor<ChainApi>>) =
mempool_result
.iter()
.zip(xts)
.filter_map(|(result, xt)| result.as_ref().ok().map(|xt_hash| (xt_hash, xt)))
.unzip();
let to_be_submitted = mempool_results
.iter()
.zip(xts)
.filter_map(|(result, xt)| result.as_ref().ok().map(|_| xt))
.collect::<Vec<_>>();

self.metrics
.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));

let mempool = self.mempool.clone();
async move {
let results_map = view_store.submit(source, to_be_submitted.into_iter(), hashes).await;
let results_map = view_store.submit(source, to_be_submitted.into_iter()).await;
let mut submission_results = reduce_multiview_result(results_map).into_iter();

Ok(mempool_result
Ok(mempool_results
.into_iter()
.map(|result| {
result.and_then(|xt_hash| {
let result = submission_results
submission_results
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.");
result.or_else(|error| {
let error = error.into_pool_error();
match error {
Ok(
// The transaction is still in mempool it may get included into the view for the next block.
Error::ImmediatelyDropped
) => Ok(xt_hash),
Ok(e) => {
mempool.remove(xt_hash);
Err(e.into())
},
Err(e) => Err(e),
}
})
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
.map_err(|err| {
mempool.remove(xt_hash);
err
})
})
})
.collect::<Vec<_>>())
Expand Down Expand Up @@ -692,26 +682,10 @@ where
let view_store = self.view_store.clone();
let mempool = self.mempool.clone();
async move {
let result = view_store.submit_and_watch(at, source, xt).await;
let result = result.or_else(|(e, maybe_watcher)| {
let error = e.into_pool_error();
match (error, maybe_watcher) {
(
Ok(
// The transaction is still in mempool it may get included into the
// view for the next block.
Error::ImmediatelyDropped,
),
Some(watcher),
) => Ok(watcher),
(Ok(e), _) => {
mempool.remove(xt_hash);
Err(e.into())
},
(Err(e), _) => Err(e),
}
});
result
view_store.submit_and_watch(at, source, xt).await.map_err(|err| {
mempool.remove(xt_hash);
err
})
}
.boxed()
}
Expand Down Expand Up @@ -1056,7 +1030,7 @@ where
future::join_all(results).await
}

/// Updates the given view with the transaction from the internal mempol.
/// Updates the given view with the transactions from the internal mempol.
///
/// All transactions from the mempool (excluding those which are either already imported or
/// already included in blocks since recently finalized block) are submitted to the
Expand Down Expand Up @@ -1139,12 +1113,9 @@ where
// out the invalid event, and remove transaction.
if self.view_store.is_empty() {
for result in watched_results {
match result {
Err(tx_hash) => {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
},
Ok(_) => {},
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
}
}
}
Expand Down
Loading
Loading