Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(engine): parallel storage roots #10666

Merged
merged 9 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,16 @@ impl AppendableChain {
provider.block_execution_data_provider.execution_outcome().clone();
execution_outcome.extend(initial_execution_outcome.clone());
let hashed_state = execution_outcome.hash_state_slow();
ParallelStateRoot::new(consistent_view, hashed_state)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
ParallelStateRoot::new(
consistent_view,
Default::default(),
hashed_state,
prefix_sets,
)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
let hashed_state =
HashedPostState::from_bundle_state(&initial_execution_outcome.state().state);
Expand Down
3 changes: 2 additions & 1 deletion crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true

# common
futures.workspace = true
Expand Down
80 changes: 74 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use reth_primitives::{
SealedHeader, B256, U256,
};
use reth_provider::{
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
StateReader, StateRootProvider, TransactionVariant,
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
ProviderError, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
Expand All @@ -39,7 +40,8 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -518,7 +520,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTr

impl<P, E, T> EngineApiTreeHandler<P, E, T>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader,
E: BlockExecutorProvider,
T: EngineTypes,
{
Expand Down Expand Up @@ -2167,8 +2170,34 @@ where
let hashed_state = HashedPostState::from_bundle_state(&output.state.state);

let root_time = Instant::now();
let (state_root, trie_output) =
state_provider.state_root_with_updates(hashed_state.clone())?;
let mut state_root_result = None;

// We attempt to compute state root in parallel if we are currently not persisting anything
// to database. This is safe, because the database state cannot change until we
// finish parallel computation. It is important that nothing is being persisted as
// we are computing in parallel, because we initialize a different database transaction
// per thread and it might end up with a different view of the database.
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
Comment on lines +2180 to +2181
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's inline this and get rid of the tmp var

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

temp var cuz it's used later for logging

rkrasiuk marked this conversation as resolved.
Show resolved Hide resolved
state_root_result = match self
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ProviderError::ConsistentView(error)) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(error.into()),
};
}

let (state_root, trie_output) = if let Some(result) = state_root_result {
result
} else {
debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel");
state_provider.state_root_with_updates(hashed_state.clone())?
};

if state_root != block.state_root {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
Expand Down Expand Up @@ -2220,6 +2249,45 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
}

/// Compute state root for the given hashed post state in parallel.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
fn compute_state_root_in_parallel(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut trie_nodes = TrieUpdates::default();
let mut state = HashedPostState::default();
let mut prefix_sets = TriePrefixSetsMut::default();

if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
// Retrieve revert state for historical block.
let revert_state = consistent_view.revert_state(historical)?;
prefix_sets.extend(revert_state.construct_prefix_sets());
state.extend(revert_state);

// Extend with contents of parent in-memory blocks.
for block in blocks.iter().rev() {
trie_nodes.extend_ref(block.trie.as_ref());
state.extend_ref(block.hashed_state.as_ref());
}
}

// Extend with block we are validating root for.
prefix_sets.extend(hashed_state.construct_prefix_sets());
state.extend_ref(hashed_state);

Ok(ParallelStateRoot::new(consistent_view, trie_nodes, state, prefix_sets.freeze())
.incremental_root_with_updates()?)
}

/// Handles an error that occurred while inserting a block.
///
/// If this is a validation error this will mark the block as invalid.
Expand Down
20 changes: 19 additions & 1 deletion crates/storage/provider/src/providers/consistent_view.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider};
use reth_errors::ProviderError;
use reth_primitives::{GotExpected, B256};
use reth_storage_api::BlockReader;
use reth_storage_api::{BlockReader, DBProvider};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::HashedPostState;
use reth_trie_db::DatabaseHashedPostState;

pub use reth_storage_errors::provider::ConsistentViewError;

Expand Down Expand Up @@ -43,6 +46,21 @@ where
Ok(Self::new(provider, tip))
}

/// Retrieve revert hashed state down to the given block hash.
pub fn revert_state(&self, block_hash: B256) -> ProviderResult<HashedPostState> {
let provider = self.provider_ro()?;
let block_number = provider
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
if block_number == provider.best_block_number()? &&
block_number == provider.last_block_number()?
{
Ok(HashedPostState::default())
} else {
Ok(HashedPostState::from_reverts(provider.tx_ref(), block_number + 1)?)
}
}

/// Creates new read-only provider and performs consistency checks on the current tip.
pub fn provider_ro(&self) -> ProviderResult<Factory::Provider> {
// Create a new provider.
Expand Down
26 changes: 19 additions & 7 deletions crates/storage/provider/src/test_utils/mock.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
use crate::{
traits::{BlockSource, ReceiptProvider},
AccountReader, BlockExecutionReader, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider, StateProviderBox,
StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, TransactionsProvider,
WithdrawalsProvider,
BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, DatabaseProvider,
EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider,
StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
TransactionsProvider, WithdrawalsProvider,
};
use parking_lot::Mutex;
use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db::mock::{DatabaseMock, TxMock};
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber,
BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, Header, Receipt, SealedBlock,
BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, GotExpected, Header, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, StorageKey, StorageValue, TransactionMeta,
TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256,
U256,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{StageCheckpointReader, StateProofProvider, StorageRootProvider};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use reth_storage_api::{
DatabaseProviderFactory, StageCheckpointReader, StateProofProvider, StorageRootProvider,
};
use reth_storage_errors::provider::{ConsistentViewError, ProviderError, ProviderResult};
use reth_trie::{
prefix_set::TriePrefixSetsMut, updates::TrieUpdates, AccountProof, HashedPostState,
HashedStorage,
Expand Down Expand Up @@ -141,6 +144,15 @@ impl MockEthProvider {
}
}

impl DatabaseProviderFactory for MockEthProvider {
type DB = DatabaseMock;
type Provider = DatabaseProvider<TxMock>;

fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into())
}
}

impl HeaderProvider for MockEthProvider {
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Header>> {
let lock = self.headers.lock();
Expand Down
9 changes: 8 additions & 1 deletion crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ pub fn calculate_state_root(c: &mut Criterion) {
// parallel root
group.bench_function(BenchmarkId::new("parallel root", size), |b| {
b.to_async(&runtime).iter_with_setup(
|| ParallelStateRoot::new(view.clone(), updated_state.clone()),
|| {
ParallelStateRoot::new(
view.clone(),
Default::default(),
updated_state.clone(),
updated_state.construct_prefix_sets().freeze(),
)
},
|calculator| async { calculator.incremental_root() },
);
});
Expand Down
57 changes: 43 additions & 14 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use reth_provider::{
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
trie_cursor::TrieCursorFactory,
prefix_set::TriePrefixSets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
walker::TrieWalker,
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
Expand All @@ -37,19 +38,30 @@ use tracing::*;
pub struct ParallelStateRoot<Factory> {
/// Consistent view of the database.
view: ConsistentDbView<Factory>,
/// Cached trie nodes.
trie_nodes: TrieUpdates,
/// Changed hashed state.
hashed_state: HashedPostState,
/// A set of prefix sets that have changed.
prefix_sets: TriePrefixSets,
/// Parallel state root metrics.
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics,
}

impl<Factory> ParallelStateRoot<Factory> {
/// Create new parallel state root calculator.
pub fn new(view: ConsistentDbView<Factory>, hashed_state: HashedPostState) -> Self {
pub fn new(
view: ConsistentDbView<Factory>,
trie_nodes: TrieUpdates,
hashed_state: HashedPostState,
prefix_sets: TriePrefixSets,
) -> Self {
Self {
view,
trie_nodes,
hashed_state,
prefix_sets,
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics::default(),
}
Expand Down Expand Up @@ -77,12 +89,15 @@ where
retain_updates: bool,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let prefix_sets = self.hashed_state.construct_prefix_sets().freeze();
let trie_nodes_sorted = self.trie_nodes.into_sorted();
let hashed_state_sorted = self.hashed_state.into_sorted();
let storage_root_targets = StorageRootTargets::new(
self.hashed_state.accounts.keys().copied(),
prefix_sets.storage_prefix_sets,
self.prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
self.prefix_sets.storage_prefix_sets,
);
let hashed_state_sorted = self.hashed_state.into_sorted();

// Pre-calculate storage roots in parallel for accounts which were changed.
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
Expand All @@ -91,7 +106,10 @@ where
.into_par_iter()
.map(|(hashed_address, prefix_set)| {
let provider_ro = self.view.provider_ro()?;
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref());
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
Expand All @@ -113,15 +131,18 @@ where
let mut trie_updates = TrieUpdates::default();

let provider_ro = self.view.provider_ro()?;
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref());

let walker = TrieWalker::new(
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
prefix_sets.account_prefix_set,
self.prefix_sets.account_prefix_set,
)
.with_deletions_retained(retain_updates);
let mut account_node_iter = TrieNodeIter::new(
Expand Down Expand Up @@ -171,7 +192,7 @@ where
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
self.prefix_sets.destroyed_accounts,
);

let stats = tracker.finish();
Expand Down Expand Up @@ -270,9 +291,14 @@ mod tests {
}

assert_eq!(
ParallelStateRoot::new(consistent_view.clone(), HashedPostState::default())
.incremental_root()
.unwrap(),
ParallelStateRoot::new(
consistent_view.clone(),
Default::default(),
HashedPostState::default(),
Default::default()
)
.incremental_root()
.unwrap(),
test_utils::state_root(state.clone())
);

Expand Down Expand Up @@ -301,8 +327,11 @@ mod tests {
}
}

let prefix_sets = hashed_state.construct_prefix_sets().freeze();
assert_eq!(
ParallelStateRoot::new(consistent_view, hashed_state).incremental_root().unwrap(),
ParallelStateRoot::new(consistent_view, Default::default(), hashed_state, prefix_sets)
.incremental_root()
.unwrap(),
test_utils::state_root(state)
);
}
Expand Down
Loading
Loading