Skip to content

Commit

Permalink
perf(engine): parallel storage roots
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Sep 2, 2024
1 parent 333b1a7 commit e96baf2
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true

# common
futures.workspace = true
Expand Down
61 changes: 53 additions & 8 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use reth_chain_state::{
CanonicalInMemoryState, ExecutedBlock, MemoryOverlayStateProvider, NewCanonicalChain,
};
use reth_consensus::{Consensus, PostExecutionInput};
use reth_db::Database;
use reth_engine_primitives::EngineTypes;
use reth_errors::{ConsensusError, ProviderResult};
use reth_evm::execute::{BlockExecutorProvider, Executor};
Expand All @@ -27,8 +28,8 @@ use reth_primitives::{
SealedHeader, B256, U256,
};
use reth_provider::{
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
StateRootProvider,
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
ProviderError, StateProviderBox, StateProviderFactory, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
Expand All @@ -39,9 +40,11 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::HashedPostState;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
marker::PhantomData,
ops::Bound,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Expand Down Expand Up @@ -472,7 +475,8 @@ pub enum TreeAction {
/// This type is responsible for processing engine API requests, maintaining the canonical state and
/// emitting events.
#[derive(Debug)]
pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
pub struct EngineApiTreeHandler<DB, P, E, T: EngineTypes> {
__database: PhantomData<DB>,
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
Expand Down Expand Up @@ -510,9 +514,10 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
metrics: EngineApiMetrics,
}

impl<P, E, T> EngineApiTreeHandler<P, E, T>
impl<DB, P, E, T> EngineApiTreeHandler<DB, P, E, T>
where
P: BlockReader + StateProviderFactory + Clone + 'static,
DB: Database + 'static,
P: DatabaseProviderFactory<DB> + BlockReader + StateProviderFactory + Clone + 'static,
E: BlockExecutorProvider,
T: EngineTypes,
{
Expand All @@ -533,6 +538,7 @@ where
) -> Self {
let (incoming_tx, incoming) = std::sync::mpsc::channel();
Self {
__database: PhantomData,
provider,
executor_provider,
consensus,
Expand Down Expand Up @@ -1869,8 +1875,47 @@ where
let hashed_state = HashedPostState::from_bundle_state(&output.state.state);

let root_time = Instant::now();
let (state_root, trie_output) =
state_provider.state_root_with_updates(hashed_state.clone())?;
let mut state_root_result = None;
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
// NOTE: there is a possibility of a race condition here if some data was persisted. ???
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut state = HashedPostState::default();
let mut trie_nodes = TrieUpdates::default();
if let Some((historical, blocks)) =
self.state.tree_state.blocks_by_hash(block.parent_hash)
{
state.extend(consistent_view.revert_state(historical)?);
for block in blocks.iter().rev() {
state.extend_ref(block.hashed_state.as_ref());
trie_nodes.extend_ref(block.trie.as_ref());
}
}

state_root_result = match ParallelStateRoot::new(consistent_view, hashed_state.clone())
.incremental_root_with_updates()
.map_err(ProviderError::from)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ProviderError::ConsistentView(error)) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(error.into()),
};
}

if state_root_result.is_none() {
state_root_result = Some(state_provider.state_root_with_updates(hashed_state.clone())?);
}
let (state_root, trie_output) = match state_root_result {
Some(result) => result,
None => {
debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel");
state_provider.state_root_with_updates(hashed_state.clone())?
}
};

if state_root != block.state_root {
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
Expand Down
12 changes: 12 additions & 0 deletions crates/storage/provider/src/providers/consistent_view.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{BlockNumReader, DatabaseProviderFactory, DatabaseProviderRO, HeaderProvider};
use reth_db_api::database::Database;
use reth_errors::ProviderError;
use reth_primitives::{GotExpected, B256};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::HashedPostState;
use reth_trie_db::DatabaseHashedPostState;
use std::marker::PhantomData;

pub use reth_storage_errors::provider::ConsistentViewError;
Expand Down Expand Up @@ -46,6 +49,15 @@ where
Ok(Self::new(provider, tip))
}

/// Retrieve revert hashed state down to the given block hash.
pub fn revert_state(&self, block_hash: B256) -> ProviderResult<HashedPostState> {
let provider = self.provider_ro()?;
let block_number = provider
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
Ok(HashedPostState::from_reverts(provider.tx_ref(), block_number)?)
}

/// Creates new read-only provider and performs consistency checks on the current tip.
pub fn provider_ro(&self) -> ProviderResult<DatabaseProviderRO<DB>> {
// Create a new provider.
Expand Down

0 comments on commit e96baf2

Please sign in to comment.