Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add trie prefetch when executing blocks #56

Merged
merged 5 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ members = [
"crates/transaction-pool/",
"crates/trie/common",
"crates/trie/parallel/",
"crates/trie/prefetch/",
"crates/trie/trie",
"crates/bsc/node/",
"crates/bsc/engine/",
Expand Down Expand Up @@ -384,6 +385,7 @@ reth-trie = { path = "crates/trie/trie" }
reth-trie-common = { path = "crates/trie/common" }
reth-trie-db = { path = "crates/trie/db" }
reth-trie-parallel = { path = "crates/trie/parallel" }
reth-trie-prefetch = { path = "crates/trie/prefetch" }

# revm
revm = { version = "12.1.0", features = [
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ build-native-%:
cargo build --bin reth --target $* --features "$(FEATURES)" --profile "$(PROFILE)"

op-build-native-%:
cargo build --bin op-reth --target $* --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)"
cargo build --bin op-reth --target $* --features "optimism opbnb $(FEATURES)" --profile "$(PROFILE)"

bsc-build-native-%:
cargo build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)"
cargo build --bin bsc-reth --target $* --features "bsc $(FEATURES)" --profile "$(PROFILE)"

# The following commands use `cross` to build a cross-compile.
#
Expand Down Expand Up @@ -123,11 +123,11 @@ build-%:

op-build-%:
RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \
cross build --bin op-reth --target $* --features "optimism,opbnb,$(FEATURES)" --profile "$(PROFILE)"
cross build --bin op-reth --target $* --features "optimism opbnb $(FEATURES)" --profile "$(PROFILE)"

bsc-build-%:
RUSTFLAGS="-C link-arg=-lgcc -Clink-arg=-static-libgcc" \
cross build --bin bsc-reth --target $* --features "bsc,$(FEATURES)" --profile "$(PROFILE)"
cross build --bin bsc-reth --target $* --features "bsc $(FEATURES)" --profile "$(PROFILE)"

# Unfortunately we can't easily use cross to build for Darwin because of licensing issues.
# If we wanted to, we would need to build a custom Docker image with the SDK available.
Expand Down
4 changes: 2 additions & 2 deletions bin/reth/src/commands/debug_cmd/build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ impl Command {
#[cfg(feature = "bsc")]
let executor =
block_executor!(provider_factory.chain_spec(), provider_factory.clone())
.executor(db);
.executor(db, None);
#[cfg(not(feature = "bsc"))]
let executor = block_executor!(provider_factory.chain_spec()).executor(db);
let executor = block_executor!(provider_factory.chain_spec()).executor(db, None);

let BlockExecutionOutput { state, receipts, requests, .. } = executor
.execute((&block_with_senders.clone().unseal(), U256::MAX, None).into())?;
Expand Down
6 changes: 3 additions & 3 deletions bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ impl Command {
));

#[cfg(feature = "bsc")]
let executor =
block_executor!(provider_factory.chain_spec(), provider_factory.clone()).executor(db);
let executor = block_executor!(provider_factory.chain_spec(), provider_factory.clone())
.executor(db, None);
#[cfg(not(feature = "bsc"))]
let executor = block_executor!(provider_factory.chain_spec()).executor(db);
let executor = block_executor!(provider_factory.chain_spec()).executor(db, None);

let merkle_block_td =
provider.header_td_by_number(merkle_block_number)?.unwrap_or_default();
Expand Down
1 change: 1 addition & 0 deletions crates/blockchain-tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ reth-trie = { workspace = true, features = ["metrics"] }
reth-trie-parallel = { workspace = true, features = ["parallel"] }
reth-network.workspace = true
reth-consensus.workspace = true
reth-trie-prefetch.workspace = true

# common
parking_lot.workspace = true
Expand Down
14 changes: 13 additions & 1 deletion crates/blockchain-tree/src/blockchain_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ pub struct BlockchainTree<DB, E> {
sync_metrics_tx: Option<MetricEventsSender>,
/// Metrics for the blockchain tree.
metrics: TreeMetrics,
/// Whether to enable prefetch when execute block
enable_prefetch: bool,
}

impl<DB, E> BlockchainTree<DB, E> {
Expand All @@ -90,7 +92,7 @@ impl<DB, E> BlockchainTree<DB, E> {

impl<DB, E> BlockchainTree<DB, E>
where
DB: Database + Clone,
DB: Database + Clone + 'static,
E: BlockExecutorProvider,
{
/// Builds the blockchain tree for the node.
Expand Down Expand Up @@ -143,6 +145,7 @@ where
canon_state_notification_sender,
sync_metrics_tx: None,
metrics: Default::default(),
enable_prefetch: false,
})
}

Expand All @@ -167,6 +170,12 @@ where
self
}

/// Enable prefetch.
pub const fn enable_prefetch(mut self) -> Self {
self.enable_prefetch = true;
self
}

/// Check if the block is known to blockchain tree or database and return its status.
///
/// Function will check:
Expand Down Expand Up @@ -434,6 +443,7 @@ where
&self.externals,
block_attachment,
block_validation_kind,
self.enable_prefetch,
)?;

self.insert_chain(chain);
Expand Down Expand Up @@ -491,6 +501,7 @@ where
canonical_fork,
block_attachment,
block_validation_kind,
self.enable_prefetch,
)?;

self.state.block_indices.insert_non_fork_block(block_number, block_hash, chain_id);
Expand All @@ -505,6 +516,7 @@ where
canonical_fork,
&self.externals,
block_validation_kind,
self.enable_prefetch,
)?;
self.insert_chain(chain);
BlockAttachment::HistoricalFork
Expand Down
65 changes: 59 additions & 6 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use reth_provider::{
providers::{BundleStateProvider, ConsistentDbView},
FullExecutionDataProvider, ProviderError, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_revm::{database::StateProviderDatabase, primitives::EvmState};
use reth_trie::updates::TrieUpdates;
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use reth_trie_prefetch::TriePrefetch;
use std::{
collections::{BTreeMap, HashMap},
ops::{Deref, DerefMut},
sync::Arc,
time::Instant,
};

Expand Down Expand Up @@ -67,6 +69,7 @@ impl AppendableChain {
///
/// if [`BlockValidationKind::Exhaustive`] is specified, the method will verify the state root
/// of the block.
#[allow(clippy::too_many_arguments)]
pub fn new_canonical_fork<DB, E>(
block: SealedBlockWithSenders,
parent_header: &SealedHeader,
Expand All @@ -75,9 +78,10 @@ impl AppendableChain {
externals: &TreeExternals<DB, E>,
block_attachment: BlockAttachment,
block_validation_kind: BlockValidationKind,
enable_prefetch: bool,
) -> Result<Self, InsertBlockErrorKind>
where
DB: Database + Clone,
DB: Database + Clone + 'static,
E: BlockExecutorProvider,
{
let execution_outcome = ExecutionOutcome::default();
Expand All @@ -98,6 +102,7 @@ impl AppendableChain {
externals,
block_attachment,
block_validation_kind,
enable_prefetch,
)?;

Ok(Self { chain: Chain::new(vec![block], bundle_state, trie_updates) })
Expand All @@ -106,6 +111,7 @@ impl AppendableChain {
/// Create a new chain that forks off of an existing sidechain.
///
/// This differs from [`AppendableChain::new_canonical_fork`] in that this starts a new fork.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new_chain_fork<DB, E>(
&self,
block: SealedBlockWithSenders,
Expand All @@ -114,9 +120,10 @@ impl AppendableChain {
canonical_fork: ForkBlock,
externals: &TreeExternals<DB, E>,
block_validation_kind: BlockValidationKind,
enable_prefetch: bool,
) -> Result<Self, InsertBlockErrorKind>
where
DB: Database + Clone,
DB: Database + Clone + 'static,
E: BlockExecutorProvider,
{
let parent_number =
Expand Down Expand Up @@ -145,6 +152,7 @@ impl AppendableChain {
externals,
BlockAttachment::HistoricalFork,
block_validation_kind,
enable_prefetch,
)?;
// extending will also optimize few things, mostly related to selfdestruct and wiping of
// storage.
Expand All @@ -170,6 +178,7 @@ impl AppendableChain {
/// - [`BlockAttachment`] represents if the block extends the canonical chain, and thus we can
/// cache the trie state updates.
/// - [`BlockValidationKind`] determines if the state root __should__ be validated.
#[allow(clippy::too_many_arguments)]
fn validate_and_execute<EDP, DB, E>(
block: SealedBlockWithSenders,
parent_block: &SealedHeader,
Expand All @@ -178,10 +187,11 @@ impl AppendableChain {
externals: &TreeExternals<DB, E>,
block_attachment: BlockAttachment,
block_validation_kind: BlockValidationKind,
enable_prefetch: bool,
) -> Result<(ExecutionOutcome, Option<TrieUpdates>), BlockExecutionError>
where
EDP: FullExecutionDataProvider,
DB: Database + Clone,
DB: Database + Clone + 'static,
E: BlockExecutorProvider,
{
// some checks are done before blocks comes here.
Expand All @@ -208,8 +218,12 @@ impl AppendableChain {

let provider = BundleStateProvider::new(state_provider, bundle_state_data_provider);

let (prefetch_tx, interrupt_tx) =
if enable_prefetch { Self::setup_prefetch(externals) } else { (None, None) };

let db = StateProviderDatabase::new(&provider);
let executor = externals.executor_factory.executor(db);
let executor = externals.executor_factory.executor(db, prefetch_tx);

let block_hash = block.hash();
let block = block.unseal();

Expand All @@ -222,6 +236,11 @@ impl AppendableChain {
let initial_execution_outcome =
ExecutionOutcome::new(state, receipts.into(), block.number, vec![requests.into()]);

// stop the prefetch task.
if let Some(interrupt_tx) = interrupt_tx {
let _ = interrupt_tx.send(());
}

// check state root if the block extends the canonical chain __and__ if state root
// validation was requested.
if block_validation_kind.is_exhaustive() {
Expand Down Expand Up @@ -282,9 +301,10 @@ impl AppendableChain {
canonical_fork: ForkBlock,
block_attachment: BlockAttachment,
block_validation_kind: BlockValidationKind,
enable_prefetch: bool,
) -> Result<(), InsertBlockErrorKind>
where
DB: Database + Clone,
DB: Database + Clone + 'static,
E: BlockExecutorProvider,
{
let parent_block = self.chain.tip();
Expand All @@ -307,10 +327,43 @@ impl AppendableChain {
externals,
block_attachment,
block_validation_kind,
enable_prefetch,
)?;
// extend the state.
self.chain.append_block(block, block_state);

Ok(())
}

fn setup_prefetch<DB, E>(
externals: &TreeExternals<DB, E>,
) -> (
Option<tokio::sync::mpsc::UnboundedSender<EvmState>>,
Option<tokio::sync::oneshot::Sender<()>>,
)
where
DB: Database + Clone + 'static,
E: BlockExecutorProvider,
{
let (prefetch_tx, prefetch_rx) = tokio::sync::mpsc::unbounded_channel();
let (interrupt_tx, interrupt_rx) = tokio::sync::oneshot::channel();

let mut trie_prefetch = TriePrefetch::new();
let consistent_view = if let Ok(view) =
ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())
{
view
} else {
tracing::debug!("Failed to create consistent view for trie prefetch");
return (None, None)
};

tokio::spawn({
async move {
trie_prefetch.run::<DB>(Arc::new(consistent_view), prefetch_rx, interrupt_rx).await;
}
});

(Some(prefetch_tx), Some(interrupt_tx))
}
}
Loading
Loading