From e96baf2658d9f563e53323d630572c41183936fd Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 2 Sep 2024 23:07:33 +0200 Subject: [PATCH] perf(engine): parallel storage roots --- Cargo.lock | 1 + crates/engine/tree/Cargo.toml | 1 + crates/engine/tree/src/tree/mod.rs | 61 ++++++++++++++++--- .../provider/src/providers/consistent_view.rs | 12 ++++ 4 files changed, 67 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c7e48f85c6f1..e0d19e11490f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6894,6 +6894,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "reth-trie", + "reth-trie-parallel", "thiserror", "tokio", "tracing", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index c978acc107de..6bb6d26dc26b 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -34,6 +34,7 @@ reth-rpc-types.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true reth-trie.workspace = true +reth-trie-parallel.workspace = true # common futures.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index fd6a8051bddb..63f0bfc76032 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -16,6 +16,7 @@ use reth_chain_state::{ CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain, }; use reth_consensus::{Consensus, PostExecutionInput}; +use reth_db::Database; use reth_engine_primitives::EngineTypes; use reth_errors::{ConsensusError, ProviderResult}; use reth_evm::execute::{BlockExecutorProvider, Executor}; @@ -27,8 +28,8 @@ use reth_primitives::{ SealedHeader, B256, U256, }; use reth_provider::{ - BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory, - StateRootProvider, + providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome, + ProviderError, StateProviderBox, StateProviderFactory, StateRootProvider, }; use reth_revm::database::StateProviderDatabase; use reth_rpc_types::{ @@ -39,9 +40,11 @@ use reth_rpc_types::{ ExecutionPayload, }; use reth_stages_api::ControlFlow; -use reth_trie::HashedPostState; +use reth_trie::{updates::TrieUpdates, HashedPostState}; +use reth_trie_parallel::parallel_root::ParallelStateRoot; use std::{ collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque}, + marker::PhantomData, ops::Bound, sync::{ mpsc::{Receiver, RecvError, RecvTimeoutError, Sender}, @@ -472,7 +475,8 @@ pub enum TreeAction { /// This type is responsible for processing engine API requests, maintaining the canonical state and /// emitting events. #[derive(Debug)] -pub struct EngineApiTreeHandler { +pub struct EngineApiTreeHandler { + __database: PhantomData, provider: P, executor_provider: E, consensus: Arc, @@ -510,9 +514,10 @@ pub struct EngineApiTreeHandler { metrics: EngineApiMetrics, } -impl EngineApiTreeHandler +impl EngineApiTreeHandler where - P: BlockReader + StateProviderFactory + Clone + 'static, + DB: Database + 'static, + P: DatabaseProviderFactory + BlockReader + StateProviderFactory + Clone + 'static, E: BlockExecutorProvider, T: EngineTypes, { @@ -533,6 +538,7 @@ where ) -> Self { let (incoming_tx, incoming) = std::sync::mpsc::channel(); Self { + __database: PhantomData, provider, executor_provider, consensus, @@ -1869,8 +1875,47 @@ where let hashed_state = HashedPostState::from_bundle_state(&output.state.state); let root_time = Instant::now(); - let (state_root, trie_output) = - state_provider.state_root_with_updates(hashed_state.clone())?; + let mut state_root_result = None; + let persistence_in_progress = self.persistence_state.in_progress(); + if !persistence_in_progress { + // NOTE: there is a possibility of a race condition here if some data was persisted. ??? + let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + let mut state = HashedPostState::default(); + let mut trie_nodes = TrieUpdates::default(); + if let Some((historical, blocks)) = + self.state.tree_state.blocks_by_hash(block.parent_hash) + { + state.extend(consistent_view.revert_state(historical)?); + for block in blocks.iter().rev() { + state.extend_ref(block.hashed_state.as_ref()); + trie_nodes.extend_ref(block.trie.as_ref()); + } + } + + state_root_result = match ParallelStateRoot::new(consistent_view, hashed_state.clone()) + .incremental_root_with_updates() + .map_err(ProviderError::from) + { + Ok((state_root, trie_output)) => Some((state_root, trie_output)), + Err(ProviderError::ConsistentView(error)) => { + debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back"); + None + } + Err(error) => return Err(error.into()), + }; + } + + if state_root_result.is_none() { + state_root_result = Some(state_provider.state_root_with_updates(hashed_state.clone())?); + } + let (state_root, trie_output) = match state_root_result { + Some(result) => result, + None => { + debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel"); + state_provider.state_root_with_updates(hashed_state.clone())? + } + }; + if state_root != block.state_root { return Err(ConsensusError::BodyStateRootDiff( GotExpected { got: state_root, expected: block.state_root }.into(), diff --git a/crates/storage/provider/src/providers/consistent_view.rs b/crates/storage/provider/src/providers/consistent_view.rs index fe9b65941107..8031992de94e 100644 --- a/crates/storage/provider/src/providers/consistent_view.rs +++ b/crates/storage/provider/src/providers/consistent_view.rs @@ -1,7 +1,10 @@ use crate::{BlockNumReader, DatabaseProviderFactory, DatabaseProviderRO, HeaderProvider}; use reth_db_api::database::Database; +use reth_errors::ProviderError; use reth_primitives::{GotExpected, B256}; use reth_storage_errors::provider::ProviderResult; +use reth_trie::HashedPostState; +use reth_trie_db::DatabaseHashedPostState; use std::marker::PhantomData; pub use reth_storage_errors::provider::ConsistentViewError; @@ -46,6 +49,15 @@ where Ok(Self::new(provider, tip)) } + /// Retrieve revert hashed state down to the given block hash. + pub fn revert_state(&self, block_hash: B256) -> ProviderResult { + let provider = self.provider_ro()?; + let block_number = provider + .block_number(block_hash)? + .ok_or(ProviderError::BlockHashNotFound(block_hash))?; + Ok(HashedPostState::from_reverts(provider.tx_ref(), block_number)?) + } + /// Creates new read-only provider and performs consistency checks on the current tip. pub fn provider_ro(&self) -> ProviderResult> { // Create a new provider.