Skip to content

Commit

Permalink
feat(trie): async root intermediate nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Sep 16, 2024
1 parent 1c20e0e commit ed13bfc
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 19 deletions.
10 changes: 9 additions & 1 deletion crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,15 @@ pub fn calculate_state_root(c: &mut Criterion) {
// async root
group.bench_function(BenchmarkId::new("async root", size), |b| {
b.to_async(&runtime).iter_with_setup(
|| AsyncStateRoot::new(view.clone(), blocking_pool.clone(), updated_state.clone()),
|| {
AsyncStateRoot::new(
view.clone(),
blocking_pool.clone(),
Default::default(),
updated_state.clone(),
updated_state.construct_prefix_sets(),
)
},
|calculator| calculator.incremental_root(),
);
});
Expand Down
63 changes: 45 additions & 18 deletions crates/trie/parallel/src/async_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use reth_tasks::pool::BlockingTaskPool;
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
trie_cursor::TrieCursorFactory,
prefix_set::TriePrefixSets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
walker::TrieWalker,
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
Expand Down Expand Up @@ -41,8 +42,12 @@ pub struct AsyncStateRoot<Factory> {
view: ConsistentDbView<Factory>,
/// Blocking task pool.
blocking_pool: BlockingTaskPool,
/// Cached trie nodes.
trie_nodes: TrieUpdates,
/// Changed hashed state.
hashed_state: HashedPostState,
/// A set of prefix sets that have changed.
prefix_sets: TriePrefixSets,
/// Parallel state root metrics.
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics,
Expand All @@ -53,12 +58,16 @@ impl<Factory> AsyncStateRoot<Factory> {
pub fn new(
view: ConsistentDbView<Factory>,
blocking_pool: BlockingTaskPool,
trie_nodes: TrieUpdates,
hashed_state: HashedPostState,
prefix_sets: TriePrefixSets,
) -> Self {
Self {
view,
blocking_pool,
trie_nodes,
hashed_state,
prefix_sets,
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics::default(),
}
Expand Down Expand Up @@ -86,12 +95,15 @@ where
retain_updates: bool,
) -> Result<(B256, TrieUpdates), AsyncStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let prefix_sets = self.hashed_state.construct_prefix_sets().freeze();
let trie_nodes_sorted = Arc::new(self.trie_nodes.into_sorted());
let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted());
let storage_root_targets = StorageRootTargets::new(
self.hashed_state.accounts.keys().copied(),
prefix_sets.storage_prefix_sets,
self.prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
self.prefix_sets.storage_prefix_sets,
);
let hashed_state_sorted = Arc::new(self.hashed_state.into_sorted());

// Pre-calculate storage roots async for accounts which were changed.
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
Expand All @@ -102,14 +114,18 @@ where
{
let view = self.view.clone();
let hashed_state_sorted = hashed_state_sorted.clone();
let trie_nodes_sorted = trie_nodes_sorted.clone();
#[cfg(feature = "metrics")]
let metrics = self.metrics.storage_trie.clone();
let handle =
self.blocking_pool.spawn_fifo(move || -> Result<_, AsyncStateRootError> {
let provider = view.provider_ro()?;
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider.tx_ref());
let provider_ro = view.provider_ro()?;
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_state = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider.tx_ref()),
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
);
Ok(StorageRoot::new_hashed(
Expand All @@ -129,16 +145,18 @@ where
let mut trie_updates = TrieUpdates::default();

let provider_ro = self.view.provider_ro()?;
let tx = provider_ro.tx_ref();
let trie_cursor_factory = DatabaseTrieCursorFactory::new(tx);
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(tx),
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
);

let walker = TrieWalker::new(
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
prefix_sets.account_prefix_set,
self.prefix_sets.account_prefix_set,
)
.with_deletions_retained(retain_updates);
let mut account_node_iter = TrieNodeIter::new(
Expand Down Expand Up @@ -190,7 +208,7 @@ where
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
self.prefix_sets.destroyed_accounts,
);

let stats = tracker.finish();
Expand Down Expand Up @@ -290,7 +308,9 @@ mod tests {
AsyncStateRoot::new(
consistent_view.clone(),
blocking_pool.clone(),
HashedPostState::default()
Default::default(),
HashedPostState::default(),
Default::default(),
)
.incremental_root()
.await
Expand Down Expand Up @@ -323,11 +343,18 @@ mod tests {
}
}

let prefix_sets = hashed_state.construct_prefix_sets();
assert_eq!(
AsyncStateRoot::new(consistent_view.clone(), blocking_pool.clone(), hashed_state)
.incremental_root()
.await
.unwrap(),
AsyncStateRoot::new(
consistent_view.clone(),
blocking_pool.clone(),
Default::default(),
hashed_state,
prefix_sets
)
.incremental_root()
.await
.unwrap(),
test_utils::state_root(state)
);
}
Expand Down

0 comments on commit ed13bfc

Please sign in to comment.