diff --git a/Cargo.lock b/Cargo.lock index fceb2c06ac7b..f8fe6fca3c66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6978,6 +6978,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "reth-trie", + "reth-trie-parallel", "thiserror", "tokio", "tracing", diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index b54e8730433c..e720b35df648 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -225,10 +225,16 @@ impl AppendableChain { provider.block_execution_data_provider.execution_outcome().clone(); execution_outcome.extend(initial_execution_outcome.clone()); let hashed_state = execution_outcome.hash_state_slow(); - ParallelStateRoot::new(consistent_view, hashed_state) - .incremental_root_with_updates() - .map(|(root, updates)| (root, Some(updates))) - .map_err(ProviderError::from)? + let prefix_sets = hashed_state.construct_prefix_sets().freeze(); + ParallelStateRoot::new( + consistent_view, + Default::default(), + hashed_state, + prefix_sets, + ) + .incremental_root_with_updates() + .map(|(root, updates)| (root, Some(updates))) + .map_err(ProviderError::from)? } else { let hashed_state = HashedPostState::from_bundle_state(&initial_execution_outcome.state().state); diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 2d8aef52c23d..2c8d1922a039 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -31,8 +31,9 @@ reth-revm.workspace = true reth-rpc-types.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true -reth-trie.workspace = true reth-node-types.workspace = true +reth-trie.workspace = true +reth-trie-parallel.workspace = true # common futures.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 9c8377596ba6..11b2e517003a 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -27,8 +27,9 @@ use reth_primitives::{ SealedHeader, B256, U256, }; use reth_provider::{ - BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory, - StateReader, StateRootProvider, TransactionVariant, + providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, + ProviderError, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, + TransactionVariant, }; use reth_revm::database::StateProviderDatabase; use reth_rpc_types::{ @@ -39,7 +40,8 @@ use reth_rpc_types::{ ExecutionPayload, }; use reth_stages_api::ControlFlow; -use reth_trie::{updates::TrieUpdates, HashedPostState}; +use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState}; +use reth_trie_parallel::parallel_root::ParallelStateRoot; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque}, @@ -518,7 +520,8 @@ impl std::fmt::Debug for EngineApiTr impl EngineApiTreeHandler where - P: BlockReader + StateProviderFactory + StateReader + Clone + 'static, + P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static, +

::Provider: BlockReader, E: BlockExecutorProvider, T: EngineTypes, { @@ -2167,8 +2170,34 @@ where let hashed_state = HashedPostState::from_bundle_state(&output.state.state); let root_time = Instant::now(); - let (state_root, trie_output) = - state_provider.state_root_with_updates(hashed_state.clone())?; + let mut state_root_result = None; + + // We attempt to compute state root in parallel if we are currently not persisting anything + // to database. This is safe, because the database state cannot change until we + // finish parallel computation. It is important that nothing is being persisted as + // we are computing in parallel, because we initialize a different database transaction + // per thread and it might end up with a different view of the database. + let persistence_in_progress = self.persistence_state.in_progress(); + if !persistence_in_progress { + state_root_result = match self + .compute_state_root_in_parallel(block.parent_hash, &hashed_state) + { + Ok((state_root, trie_output)) => Some((state_root, trie_output)), + Err(ProviderError::ConsistentView(error)) => { + debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back"); + None + } + Err(error) => return Err(error.into()), + }; + } + + let (state_root, trie_output) = if let Some(result) = state_root_result { + result + } else { + debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel"); + state_provider.state_root_with_updates(hashed_state.clone())? + }; + if state_root != block.state_root { // call post-block hook self.invalid_block_hook.on_invalid_block( @@ -2220,6 +2249,45 @@ where Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid)) } + /// Compute state root for the given hashed post state in parallel. + /// + /// # Returns + /// + /// Returns `Ok(_)` if computed successfully. + /// Returns `Err(_)` if error was encountered during computation. + /// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation + /// should be used instead. + fn compute_state_root_in_parallel( + &self, + parent_hash: B256, + hashed_state: &HashedPostState, + ) -> ProviderResult<(B256, TrieUpdates)> { + let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + let mut trie_nodes = TrieUpdates::default(); + let mut state = HashedPostState::default(); + let mut prefix_sets = TriePrefixSetsMut::default(); + + if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) { + // Retrieve revert state for historical block. + let revert_state = consistent_view.revert_state(historical)?; + prefix_sets.extend(revert_state.construct_prefix_sets()); + state.extend(revert_state); + + // Extend with contents of parent in-memory blocks. + for block in blocks.iter().rev() { + trie_nodes.extend_ref(block.trie.as_ref()); + state.extend_ref(block.hashed_state.as_ref()); + } + } + + // Extend with block we are validating root for. + prefix_sets.extend(hashed_state.construct_prefix_sets()); + state.extend_ref(hashed_state); + + Ok(ParallelStateRoot::new(consistent_view, trie_nodes, state, prefix_sets.freeze()) + .incremental_root_with_updates()?) + } + /// Handles an error that occurred while inserting a block. /// /// If this is a validation error this will mark the block as invalid. diff --git a/crates/storage/provider/src/providers/consistent_view.rs b/crates/storage/provider/src/providers/consistent_view.rs index c5d98a238448..b86eef226850 100644 --- a/crates/storage/provider/src/providers/consistent_view.rs +++ b/crates/storage/provider/src/providers/consistent_view.rs @@ -1,7 +1,10 @@ use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider}; +use reth_errors::ProviderError; use reth_primitives::{GotExpected, B256}; -use reth_storage_api::BlockReader; +use reth_storage_api::{BlockReader, DBProvider}; use reth_storage_errors::provider::ProviderResult; +use reth_trie::HashedPostState; +use reth_trie_db::DatabaseHashedPostState; pub use reth_storage_errors::provider::ConsistentViewError; @@ -43,6 +46,21 @@ where Ok(Self::new(provider, tip)) } + /// Retrieve revert hashed state down to the given block hash. + pub fn revert_state(&self, block_hash: B256) -> ProviderResult { + let provider = self.provider_ro()?; + let block_number = provider + .block_number(block_hash)? + .ok_or(ProviderError::BlockHashNotFound(block_hash))?; + if block_number == provider.best_block_number()? && + block_number == provider.last_block_number()? + { + Ok(HashedPostState::default()) + } else { + Ok(HashedPostState::from_reverts(provider.tx_ref(), block_number + 1)?) + } + } + /// Creates new read-only provider and performs consistency checks on the current tip. pub fn provider_ro(&self) -> ProviderResult { // Create a new provider. diff --git a/crates/storage/provider/src/test_utils/mock.rs b/crates/storage/provider/src/test_utils/mock.rs index 9dd43e8fa3a7..c539c0d2c72e 100644 --- a/crates/storage/provider/src/test_utils/mock.rs +++ b/crates/storage/provider/src/test_utils/mock.rs @@ -1,26 +1,29 @@ use crate::{ traits::{BlockSource, ReceiptProvider}, AccountReader, BlockExecutionReader, BlockHashReader, BlockIdReader, BlockNumReader, - BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, EvmEnvProvider, - HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider, StateProviderBox, - StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, TransactionsProvider, - WithdrawalsProvider, + BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, DatabaseProvider, + EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider, + StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, + TransactionsProvider, WithdrawalsProvider, }; use parking_lot::Mutex; use reth_chainspec::{ChainInfo, ChainSpec}; +use reth_db::mock::{DatabaseMock, TxMock}; use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices}; use reth_evm::ConfigureEvmEnv; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_primitives::{ keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber, - BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, Header, Receipt, SealedBlock, + BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, GotExpected, Header, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, StorageKey, StorageValue, TransactionMeta, TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256, U256, }; use reth_stages_types::{StageCheckpoint, StageId}; -use reth_storage_api::{StageCheckpointReader, StateProofProvider, StorageRootProvider}; -use reth_storage_errors::provider::{ProviderError, ProviderResult}; +use reth_storage_api::{ + DatabaseProviderFactory, StageCheckpointReader, StateProofProvider, StorageRootProvider, +}; +use reth_storage_errors::provider::{ConsistentViewError, ProviderError, ProviderResult}; use reth_trie::{ prefix_set::TriePrefixSetsMut, updates::TrieUpdates, AccountProof, HashedPostState, HashedStorage, @@ -141,6 +144,15 @@ impl MockEthProvider { } } +impl DatabaseProviderFactory for MockEthProvider { + type DB = DatabaseMock; + type Provider = DatabaseProvider; + + fn database_provider_ro(&self) -> ProviderResult { + Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into()) + } +} + impl HeaderProvider for MockEthProvider { fn header(&self, block_hash: &BlockHash) -> ProviderResult> { let lock = self.headers.lock(); diff --git a/crates/trie/parallel/benches/root.rs b/crates/trie/parallel/benches/root.rs index 2408874011bd..f5dc2566fdc8 100644 --- a/crates/trie/parallel/benches/root.rs +++ b/crates/trie/parallel/benches/root.rs @@ -62,7 +62,14 @@ pub fn calculate_state_root(c: &mut Criterion) { // parallel root group.bench_function(BenchmarkId::new("parallel root", size), |b| { b.to_async(&runtime).iter_with_setup( - || ParallelStateRoot::new(view.clone(), updated_state.clone()), + || { + ParallelStateRoot::new( + view.clone(), + Default::default(), + updated_state.clone(), + updated_state.construct_prefix_sets().freeze(), + ) + }, |calculator| async { calculator.incremental_root() }, ); }); diff --git a/crates/trie/parallel/src/parallel_root.rs b/crates/trie/parallel/src/parallel_root.rs index b41d9319cbdc..b38ea2f7f6a9 100644 --- a/crates/trie/parallel/src/parallel_root.rs +++ b/crates/trie/parallel/src/parallel_root.rs @@ -11,7 +11,8 @@ use reth_provider::{ use reth_trie::{ hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory}, node_iter::{TrieElement, TrieNodeIter}, - trie_cursor::TrieCursorFactory, + prefix_set::TriePrefixSets, + trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, updates::TrieUpdates, walker::TrieWalker, HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount, @@ -37,8 +38,12 @@ use tracing::*; pub struct ParallelStateRoot { /// Consistent view of the database. view: ConsistentDbView, + /// Cached trie nodes. + trie_nodes: TrieUpdates, /// Changed hashed state. hashed_state: HashedPostState, + /// A set of prefix sets that have changed. + prefix_sets: TriePrefixSets, /// Parallel state root metrics. #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics, @@ -46,10 +51,17 @@ pub struct ParallelStateRoot { impl ParallelStateRoot { /// Create new parallel state root calculator. - pub fn new(view: ConsistentDbView, hashed_state: HashedPostState) -> Self { + pub fn new( + view: ConsistentDbView, + trie_nodes: TrieUpdates, + hashed_state: HashedPostState, + prefix_sets: TriePrefixSets, + ) -> Self { Self { view, + trie_nodes, hashed_state, + prefix_sets, #[cfg(feature = "metrics")] metrics: ParallelStateRootMetrics::default(), } @@ -77,12 +89,15 @@ where retain_updates: bool, ) -> Result<(B256, TrieUpdates), ParallelStateRootError> { let mut tracker = ParallelTrieTracker::default(); - let prefix_sets = self.hashed_state.construct_prefix_sets().freeze(); + let trie_nodes_sorted = self.trie_nodes.into_sorted(); + let hashed_state_sorted = self.hashed_state.into_sorted(); let storage_root_targets = StorageRootTargets::new( - self.hashed_state.accounts.keys().copied(), - prefix_sets.storage_prefix_sets, + self.prefix_sets + .account_prefix_set + .iter() + .map(|nibbles| B256::from_slice(&nibbles.pack())), + self.prefix_sets.storage_prefix_sets, ); - let hashed_state_sorted = self.hashed_state.into_sorted(); // Pre-calculate storage roots in parallel for accounts which were changed. tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64); @@ -91,7 +106,10 @@ where .into_par_iter() .map(|(hashed_address, prefix_set)| { let provider_ro = self.view.provider_ro()?; - let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref()); + let trie_cursor_factory = InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &trie_nodes_sorted, + ); let hashed_cursor_factory = HashedPostStateCursorFactory::new( DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), &hashed_state_sorted, @@ -113,15 +131,18 @@ where let mut trie_updates = TrieUpdates::default(); let provider_ro = self.view.provider_ro()?; + let trie_cursor_factory = InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &trie_nodes_sorted, + ); let hashed_cursor_factory = HashedPostStateCursorFactory::new( DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), &hashed_state_sorted, ); - let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref()); let walker = TrieWalker::new( trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, - prefix_sets.account_prefix_set, + self.prefix_sets.account_prefix_set, ) .with_deletions_retained(retain_updates); let mut account_node_iter = TrieNodeIter::new( @@ -171,7 +192,7 @@ where trie_updates.finalize( account_node_iter.walker, hash_builder, - prefix_sets.destroyed_accounts, + self.prefix_sets.destroyed_accounts, ); let stats = tracker.finish(); @@ -270,9 +291,14 @@ mod tests { } assert_eq!( - ParallelStateRoot::new(consistent_view.clone(), HashedPostState::default()) - .incremental_root() - .unwrap(), + ParallelStateRoot::new( + consistent_view.clone(), + Default::default(), + HashedPostState::default(), + Default::default() + ) + .incremental_root() + .unwrap(), test_utils::state_root(state.clone()) ); @@ -301,8 +327,11 @@ mod tests { } } + let prefix_sets = hashed_state.construct_prefix_sets().freeze(); assert_eq!( - ParallelStateRoot::new(consistent_view, hashed_state).incremental_root().unwrap(), + ParallelStateRoot::new(consistent_view, Default::default(), hashed_state, prefix_sets) + .incremental_root() + .unwrap(), test_utils::state_root(state) ); } diff --git a/crates/trie/trie/src/trie.rs b/crates/trie/trie/src/trie.rs index 4047028658ea..b8aa133d6fa3 100644 --- a/crates/trie/trie/src/trie.rs +++ b/crates/trie/trie/src/trie.rs @@ -25,7 +25,7 @@ pub struct StateRoot { pub trie_cursor_factory: T, /// The factory for hashed cursors. pub hashed_cursor_factory: H, - /// A set of prefix sets that have changes. + /// A set of prefix sets that have changed. pub prefix_sets: TriePrefixSets, /// Previous intermediate state. previous_state: Option,