Skip to content

Commit

Permalink
add log and interrupt check
Browse files Browse the repository at this point in the history
  • Loading branch information
Keefe Liu committed Aug 9, 2024
1 parent 73da6d1 commit 84418ba
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 9 deletions.
12 changes: 11 additions & 1 deletion crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,9 @@ impl AppendableChain {
#[cfg(feature = "prefetch")]
{
let mut trie_prefetch = TriePrefetch::new();
let consistent_view = Arc::new(ConsistentDbView::new_with_latest_tip(externals.provider_factory.clone())?);
let consistent_view = Arc::new(ConsistentDbView::new_with_latest_tip(
externals.provider_factory.clone(),
)?);

tokio::spawn({
async move {
Expand All @@ -232,7 +234,15 @@ impl AppendableChain {
});
}

let start = Instant::now();
let state = executor.execute((&block, U256::MAX).into())?;
tracing::info!(
target: "blockchain_tree::chain",
number = block.number,
hash = %block_hash,
elapsed = ?start.elapsed(),
"Executed block"
);
let BlockExecutionOutput { state, receipts, requests, .. } = state;
externals
.consensus
Expand Down
35 changes: 27 additions & 8 deletions crates/trie/prefetch/src/prefetch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use rayon::prelude::*;
use reth_db::database::Database;
use reth_execution_errors::StorageRootError;
use reth_primitives::B256;
use reth_provider::{ProviderError, ProviderFactory};
use reth_provider::{providers::ConsistentDbView, ProviderError, ProviderFactory};
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
metrics::TrieRootMetrics,
Expand All @@ -11,14 +12,11 @@ use reth_trie::{
walker::TrieWalker,
HashedPostState, HashedStorage, StorageRoot,
};
use reth_trie_parallel::StorageRootTargets;
use reth_trie_parallel::{parallel_root::ParallelStateRootError, StorageRootTargets};
use std::{collections::HashMap, sync::Arc};
use rayon::prelude::*;
use thiserror::Error;
use tokio::sync::{mpsc::UnboundedReceiver, oneshot::Receiver};
use tokio::sync::{mpsc::UnboundedReceiver, oneshot::Receiver, watch};
use tracing::{debug, trace};
use reth_provider::providers::ConsistentDbView;
use reth_trie_parallel::parallel_root::ParallelStateRootError;

/// Prefetch trie storage when executing transactions.
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -58,6 +56,7 @@ impl TriePrefetch {
) where
DB: Database + 'static,
{
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let mut count = 0u64;
loop {
tokio::select! {
Expand All @@ -69,15 +68,17 @@ impl TriePrefetch {
let hashed_state = self.deduplicate_and_update_cached(&hashed_state);

let self_clone = Arc::new(self.clone());
let mut shutdown_rx = shutdown_rx.clone();
tokio::spawn(async move {
if let Err(e) = self_clone.prefetch_once::<DB>(consistent_view, hashed_state) {
if let Err(e) = self_clone.prefetch_once::<DB>(consistent_view, hashed_state, count, &mut shutdown_rx).await {
debug!(target: "trie::trie_prefetch", ?e, "Error while prefetching trie storage");
};
});
}
}
_ = &mut interrupt_rx => {
debug!(target: "trie::trie_prefetch", "Interrupted trie prefetch task. Processed {:?}, left {:?}", count, prefetch_rx.len());
let _ = shutdown_tx.send(true);
return
}
}
Expand Down Expand Up @@ -134,14 +135,18 @@ impl TriePrefetch {
}

/// Prefetch trie storage for the given hashed state.
pub fn prefetch_once<DB>(
pub async fn prefetch_once<DB>(
self: Arc<Self>,
consistent_view: Arc<ConsistentDbView<DB, ProviderFactory<DB>>>,
hashed_state: HashedPostState,
count: u64,
shutdown_rx: &mut watch::Receiver<bool>,
) -> Result<(), TriePrefetchError>
where
DB: Database,
{
debug!("prefetch once started {:?}", count);

let mut tracker = TrieTracker::default();

let prefix_sets = hashed_state.construct_prefix_sets().freeze();
Expand All @@ -155,6 +160,10 @@ impl TriePrefetch {
let mut storage_roots = storage_root_targets
.into_par_iter()
.map(|(hashed_address, prefix_set)| {
if *shutdown_rx.borrow() {
return Ok((B256::ZERO, 0)); // return early if shutdown
}

let provider_ro = consistent_view.provider_ro()?;

let storage_root_result = StorageRoot::new_hashed(
Expand All @@ -171,6 +180,10 @@ impl TriePrefetch {
})
.collect::<Result<HashMap<_, _>, ParallelStateRootError>>()?;

if *shutdown_rx.borrow() {
return Ok(()); // return early if shutdown
}

trace!(target: "trie::trie_prefetch", "prefetching account tries");
let provider_ro = consistent_view.provider_ro()?;
let hashed_cursor_factory =
Expand All @@ -188,6 +201,10 @@ impl TriePrefetch {
);

while let Some(node) = account_node_iter.try_next().map_err(ProviderError::Database)? {
if *shutdown_rx.borrow() {
return Ok(()); // return early if shutdown
}

match node {
TrieElement::Branch(_) => {
tracker.inc_branch();
Expand Down Expand Up @@ -226,6 +243,8 @@ impl TriePrefetch {
"prefetched account trie"
);

debug!("prefetch once finished {:?}", count);

Ok(())
}
}
Expand Down

0 comments on commit 84418ba

Please sign in to comment.