Skip to content

Commit

Permalink
feat: add trie prefetch
Browse files Browse the repository at this point in the history
  • Loading branch information
Keefe Liu committed Jul 17, 2024
1 parent f603e75 commit 2f57d19
Show file tree
Hide file tree
Showing 24 changed files with 673 additions and 62 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ members = [
"crates/transaction-pool/",
"crates/trie/common",
"crates/trie/parallel/",
"crates/trie/prefetch/",
"crates/trie/trie",
"crates/bsc/node/",
"crates/bsc/engine/",
Expand Down Expand Up @@ -384,6 +385,7 @@ reth-transaction-pool = { path = "crates/transaction-pool" }
reth-trie = { path = "crates/trie/trie" }
reth-trie-common = { path = "crates/trie/common" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-prefetch = { path = "crates/trie/prefetch" }

# revm
revm = { version = "11.0.0", features = [
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/debug_cmd/build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,9 +276,9 @@ impl Command {
#[cfg(feature = "bsc")]
let executor =
block_executor!(provider_factory.chain_spec(), provider_factory.clone())
.executor(db);
.executor(db, None);
#[cfg(not(feature = "bsc"))]
let executor = block_executor!(provider_factory.chain_spec()).executor(db);
let executor = block_executor!(provider_factory.chain_spec()).executor(db, None);

let BlockExecutionOutput { state, receipts, requests, .. } =
executor.execute((&block_with_senders.clone().unseal(), U256::MAX).into())?;
Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ impl Command {
));

#[cfg(feature = "bsc")]
let executor =
block_executor!(provider_factory.chain_spec(), provider_factory.clone()).executor(db);
let executor = block_executor!(provider_factory.chain_spec(), provider_factory.clone())
.executor(db, None);
#[cfg(not(feature = "bsc"))]
let executor = block_executor!(provider_factory.chain_spec()).executor(db);
let executor = block_executor!(provider_factory.chain_spec()).executor(db, None);

let merkle_block_td =
provider.header_td_by_number(merkle_block_number)?.unwrap_or_default();
Expand Down
1 change: 1 addition & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-parallel = { workspace = true, features = ["parallel"] }
reth-network.workspace = true
reth-consensus.workspace = true
reth-trie-prefetch.workspace = true

# common
parking_lot.workspace = true
Expand Down
24 changes: 20 additions & 4 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ use reth_provider::{
use reth_revm::database::StateProviderDatabase;
use reth_trie::updates::TrieUpdates;
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
collections::BTreeMap,
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -167,17 +169,17 @@ impl AppendableChain {
/// - [`BlockAttachment`] represents if the block extends the canonical chain, and thus we can
/// cache the trie state updates.
/// - [`BlockValidationKind`] determines if the state root __should__ be validated.
fn validate_and_execute<EDP, DB, E>(
fn validate_and_execute<'a, EDP, DB, E>(
block: SealedBlockWithSenders,
parent_block: &SealedHeader,
parent_block: &'a SealedHeader,
bundle_state_data_provider: EDP,
externals: &TreeExternals<DB, E>,
block_attachment: BlockAttachment,
block_validation_kind: BlockValidationKind,
) -> Result<(ExecutionOutcome, Option<TrieUpdates>), BlockExecutionError>
where
EDP: FullExecutionDataProvider,
DB: Database + Clone,
DB: Database + Clone + 'a,
E: BlockExecutorProvider,
{
// some checks are done before blocks comes here.
Expand All @@ -204,11 +206,22 @@ impl AppendableChain {

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

// todo: make it a config
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();

let db = StateProviderDatabase::new(&provider);
let executor = externals.executor_factory.executor(db);
let executor = externals.executor_factory.executor(db, Some(prefetch_tx));
let block_hash = block.hash();
let block = block.unseal();

let provider_ro = Arc::new(consistent_view.provider_ro().unwrap());
let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
tokio::spawn(async move {
trie_prefetch.run::<DB>(provider_ro, prefetch_rx, interrupt_rx).await;
});

let state = executor.execute((&block, U256::MAX).into())?;
let BlockExecutionOutput { state, receipts, requests, .. } = state;
externals
Expand All @@ -218,6 +231,9 @@ impl AppendableChain {
let initial_execution_outcome =
ExecutionOutcome::new(state, receipts.into(), block.number, vec![requests.into()]);

// stop the prefetch task.
let _ = interrupt_tx.send(());

// check state root if the block extends the canonical chain __and__ if state root
// validation was requested.
if block_validation_kind.is_exhaustive() {
Expand Down
4 changes: 4 additions & 0 deletions crates/bsc/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ reth-prune-types.workspace = true
reth-revm.workspace = true
reth-provider.workspace = true
reth-bsc-consensus.workspace = true
reth-trie.workspace = true

# Revm
revm-primitives.workspace = true
Expand All @@ -34,6 +35,9 @@ bitset = "0.1.2"
lru = "0.12.3"
blst = "0.3.12"

# async
tokio = { workspace = true, features = ["sync", "time"] }

[dev-dependencies]
reth-revm = { workspace = true, features = ["test-utils"] }
reth-provider = { workspace = true, features = ["test-utils"] }
Expand Down
84 changes: 72 additions & 12 deletions crates/bsc/evm/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ use reth_revm::{
db::states::bundle_state::BundleRetention,
Evm, State,
};
use reth_trie::HashedPostState;
use revm_primitives::{
db::{Database, DatabaseCommit},
BlockEnv, CfgEnvWithHandlerCfg, EVMError, EnvWithHandlerCfg, ResultAndState, TransactTo,
};
use std::{collections::HashMap, num::NonZeroUsize, sync::Arc, time::Instant};
use tokio::sync::mpsc::UnboundedSender;
use tracing::{debug, warn};

const SNAP_CACHE_NUM: usize = 2048;
Expand Down Expand Up @@ -81,17 +83,40 @@ where
P: Clone,
EvmConfig: ConfigureEvm,
{
fn bsc_executor<DB>(&self, db: DB) -> BscBlockExecutor<EvmConfig, DB, P>
fn bsc_executor<DB>(
&self,
db: DB,
prefetch_tx: Option<UnboundedSender<HashedPostState>>,
) -> BscBlockExecutor<EvmConfig, DB, P>
where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
{
BscBlockExecutor::new(
self.chain_spec.clone(),
self.evm_config.clone(),
self.parlia_config.clone(),
State::builder().with_database(db).with_bundle_update().without_state_clear().build(),
self.provider.clone(),
)
if let Some(tx) = prefetch_tx {
BscBlockExecutor::new_with_prefetch_tx(
self.chain_spec.clone(),
self.evm_config.clone(),
self.parlia_config.clone(),
State::builder()
.with_database(db)
.with_bundle_update()
.without_state_clear()
.build(),
self.provider.clone(),
tx,
)
} else {
BscBlockExecutor::new(
self.chain_spec.clone(),
self.evm_config.clone(),
self.parlia_config.clone(),
State::builder()
.with_database(db)
.with_bundle_update()
.without_state_clear()
.build(),
self.provider.clone(),
)
}
}
}

Expand All @@ -106,18 +131,22 @@ where
type BatchExecutor<DB: Database<Error: Into<ProviderError> + std::fmt::Display>> =
BscBatchExecutor<EvmConfig, DB, P>;

fn executor<DB>(&self, db: DB) -> Self::Executor<DB>
fn executor<DB>(
&self,
db: DB,
prefetch_tx: Option<UnboundedSender<HashedPostState>>,
) -> Self::Executor<DB>
where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
{
self.bsc_executor(db)
self.bsc_executor(db, prefetch_tx)
}

fn batch_executor<DB>(&self, db: DB) -> Self::BatchExecutor<DB>
where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
{
let executor = self.bsc_executor(db);
let executor = self.bsc_executor(db, None);
BscBatchExecutor {
executor,
batch_record: BlockBatchRecord::default(),
Expand Down Expand Up @@ -159,6 +188,7 @@ where
&self,
block: &BlockWithSenders,
mut evm: Evm<'_, Ext, &mut State<DB>>,
tx: Option<UnboundedSender<HashedPostState>>,
) -> Result<(Vec<TransactionSigned>, Vec<Receipt>, u64), BlockExecutionError>
where
DB: Database<Error: Into<ProviderError> + std::fmt::Display>,
Expand Down Expand Up @@ -211,6 +241,13 @@ where
}
})?;

if let Some(tx) = tx.as_ref() {
let post_state = HashedPostState::from_state(state.clone());
tx.send(post_state).unwrap_or_else(|err| {
debug!(target: "evm_executor", ?err, "Failed to send post state to prefetch channel")
});
}

evm.db_mut().commit(state);

self.patch_mainnet_after_tx(transaction, evm.db_mut());
Expand Down Expand Up @@ -251,6 +288,8 @@ pub struct BscBlockExecutor<EvmConfig, DB, P> {
pub(crate) provider: Arc<P>,
/// Parlia consensus instance
pub(crate) parlia: Arc<Parlia>,
/// Prefetch channel
prefetch_tx: Option<UnboundedSender<HashedPostState>>,
}

impl<EvmConfig, DB, P> BscBlockExecutor<EvmConfig, DB, P> {
Expand All @@ -269,6 +308,27 @@ impl<EvmConfig, DB, P> BscBlockExecutor<EvmConfig, DB, P> {
state,
provider: shared_provider,
parlia,
prefetch_tx: None,
}
}

/// Creates a new BSC block executor with a prefetch channel.
pub fn new_with_prefetch_tx(
chain_spec: Arc<ChainSpec>,
evm_config: EvmConfig,
parlia_config: ParliaConfig,
state: State<DB>,
provider: P,
tx: UnboundedSender<HashedPostState>,
) -> Self {
let parlia = Arc::new(Parlia::new(Arc::clone(&chain_spec), parlia_config));
let shared_provider = Arc::new(provider);
Self {
executor: BscEvmExecutor { chain_spec, evm_config },
state,
provider: shared_provider,
parlia,
prefetch_tx: Some(tx),
}
}

Expand Down Expand Up @@ -347,7 +407,7 @@ where

let (mut system_txs, mut receipts, mut gas_used) = {
let evm = self.executor.evm_config.evm_with_env(&mut self.state, env.clone());
self.executor.execute_pre_and_transactions(block, evm)
self.executor.execute_pre_and_transactions(block, evm, self.prefetch_tx.clone())
}?;

// 5. apply post execution changes
Expand Down
2 changes: 1 addition & 1 deletion crates/consensus/auto-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ impl StorageInner {
requests: block_execution_requests,
gas_used,
..
} = executor.executor(&mut db).execute((&block, U256::ZERO).into())?;
} = executor.executor(&mut db, None).execute((&block, U256::ZERO).into())?;
let execution_outcome = ExecutionOutcome::new(
state,
receipts.into(),
Expand Down
Loading

0 comments on commit 2f57d19

Please sign in to comment.