Skip to content

Commit

Permalink
feat: add invalid block hook field to tree
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Aug 23, 2024
1 parent b75e3df commit 2715369
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions book/cli/reth/node.md
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,13 @@ Debug:
--debug.engine-api-store <PATH>
The path to store engine API messages at. If specified, all of the intercepted engine API messages will be written to specified location

--debug.bad-block-hook <BAD_BLOCK_HOOK>
Determines which type of bad block hook to install

Example: `witness,prestate`

[possible values: witness, pre-state, opcode]

Database:
--db.log-level <LOG_LEVEL>
Database logging level. Levels higher than "notice" require a debug build
Expand Down
5 changes: 4 additions & 1 deletion crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_engine_tree::{
download::BasicBlockDownloader,
engine::{EngineApiRequest, EngineApiRequestHandler, EngineHandler},
persistence::PersistenceHandle,
tree::{EngineApiTreeHandler, TreeConfig},
tree::{EngineApiTreeHandler, InvalidBlockHook, TreeConfig},
};
pub use reth_engine_tree::{
chain::{ChainEvent, ChainOrchestrator},
Expand Down Expand Up @@ -80,6 +80,7 @@ where
pruner: Pruner<DB, ProviderFactory<DB>>,
payload_builder: PayloadBuilderHandle<T>,
tree_config: TreeConfig,
invalid_block_hook: InvalidBlockHook,
) -> Self {
let downloader = BasicBlockDownloader::new(client, consensus.clone());

Expand All @@ -97,6 +98,7 @@ where
payload_builder,
canonical_in_memory_state,
tree_config,
invalid_block_hook,
);

let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
Expand Down Expand Up @@ -196,6 +198,7 @@ mod tests {
pruner,
PayloadBuilderHandle::new(tx),
TreeConfig::default(),
Box::new(|_, _, _, _| {}),
);
}
}
71 changes: 62 additions & 9 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ use reth_payload_builder::PayloadBuilderHandle;
use reth_payload_primitives::{PayloadAttributes, PayloadBuilderAttributes};
use reth_payload_validator::ExecutionPayloadValidator;
use reth_primitives::{
Block, BlockNumHash, BlockNumber, GotExpected, Header, Receipts, Requests, SealedBlock,
SealedBlockWithSenders, SealedHeader, B256, U256,
Block, BlockNumHash, BlockNumber, GotExpected, Header, Receipt, Receipts, Requests,
SealedBlock, SealedBlockWithSenders, SealedHeader, B256, U256,
};
use reth_provider::{
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
StateRootProvider,
BlockExecutionOutput, BlockReader, ExecutionOutcome, ProviderError, StateProviderBox,
StateProviderFactory, StateRootProvider,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
Expand All @@ -40,9 +40,10 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::HashedPostState;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
ops::Bound,
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError, Sender},
Expand All @@ -62,6 +63,16 @@ mod metrics;
use crate::{engine::EngineApiRequest, tree::metrics::EngineApiMetrics};
pub use config::TreeConfig;

/// This exists only to share the hook definition across many function definitions easily
pub type InvalidBlockHook = Box<
dyn Fn(
SealedBlockWithSenders,
SealedHeader,
BlockExecutionOutput<Receipt>,
Option<(TrieUpdates, B256)>,
) + Send,
>;

/// Keeps track of the state of the tree.
///
/// ## Invariants
Expand Down Expand Up @@ -377,7 +388,7 @@ pub enum TreeAction {
///
/// This type is responsible for processing engine API requests, maintaining the canonical state and
/// emitting events.
#[derive(Debug)]
// #[derive(Debug)]
pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
provider: P,
executor_provider: E,
Expand Down Expand Up @@ -414,6 +425,8 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
config: TreeConfig,
/// Metrics for the engine api.
metrics: EngineApiMetrics,
/// An invalid block hook
invalid_block_hook: InvalidBlockHook,
}

impl<P, E, T> EngineApiTreeHandler<P, E, T>
Expand Down Expand Up @@ -454,9 +467,15 @@ where
config,
metrics: Default::default(),
incoming_tx,
invalid_block_hook: Box::new(|_, _, _, _| {}),
}
}

/// Sets the invalid block hook to be the given function
fn set_invalid_block_hook(&mut self, invalid_block_hook: InvalidBlockHook) {
self.invalid_block_hook = invalid_block_hook;
}

/// Creates a new [`EngineApiTreeHandler`] instance and spawns it in its
/// own thread.
///
Expand All @@ -472,6 +491,7 @@ where
payload_builder: PayloadBuilderHandle<T>,
canonical_in_memory_state: CanonicalInMemoryState,
config: TreeConfig,
invalid_block_hook: InvalidBlockHook,
) -> (Sender<FromEngine<EngineApiRequest<T>>>, UnboundedReceiver<EngineApiEvent>) {
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();
Expand All @@ -489,7 +509,7 @@ where
header.num_hash(),
);

let task = Self::new(
let mut task = Self::new(
provider,
executor_provider,
consensus,
Expand All @@ -502,6 +522,7 @@ where
payload_builder,
config,
);
task.set_invalid_block_hook(invalid_block_hook);
let incoming = task.incoming_tx.clone();
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
(incoming, outgoing)
Expand Down Expand Up @@ -1742,17 +1763,28 @@ where
let output = executor.execute((&block, U256::MAX).into())?;
debug!(target: "engine", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");

self.consensus.validate_block_post_execution(
if let Err(err) = self.consensus.validate_block_post_execution(
&block,
PostExecutionInput::new(&output.receipts, &output.requests),
)?;
) {
// call post-block hook
(self.invalid_block_hook)(block.seal_slow(), parent_block, output, None);
return Err(err.into())
}

let hashed_state = HashedPostState::from_bundle_state(&output.state.state);

let root_time = Instant::now();
let (state_root, trie_output) =
state_provider.hashed_state_root_with_updates(hashed_state.clone())?;
if state_root != block.state_root {
// call post-block hook
(self.invalid_block_hook)(
block.clone().seal_slow(),
parent_block,
output,
Some((trie_output, state_root)),
);
return Err(ConsensusError::BodyStateRootDiff(
GotExpected { got: state_root, expected: block.state_root }.into(),
)
Expand Down Expand Up @@ -1999,6 +2031,27 @@ where
}
}

impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTreeHandler<P, E, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineApiTreeHandler")
.field("provider", &self.provider)
.field("executor_provider", &self.executor_provider)
.field("consensus", &self.consensus)
.field("payload_validator", &self.payload_validator)
.field("state", &self.state)
.field("incoming_tx", &self.incoming_tx)
.field("persistence", &self.persistence)
.field("persistence_state", &self.persistence_state)
.field("backfill_sync_state", &self.backfill_sync_state)
.field("canonical_in_memory_state", &self.canonical_in_memory_state)
.field("payload_builder", &self.payload_builder)
.field("config", &self.config)
.field("metrics", &self.metrics)
.field("invalid_block_hook", &format!("{:p}", self.invalid_block_hook))
.finish()
}
}

/// The state of the persistence task.
#[derive(Default, Debug)]
pub struct PersistenceState {
Expand Down
10 changes: 9 additions & 1 deletion crates/node/builder/src/launch/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use reth_rpc_engine_api::{capabilities::EngineCapabilities, EngineApi};
use reth_rpc_types::engine::ClientVersionV1;
use reth_tasks::TaskExecutor;
use reth_tokio_util::EventSender;
use reth_tracing::tracing::{debug, error, info};
use reth_tracing::tracing::{debug, error, info, warn};
use tokio::sync::{mpsc::unbounded_channel, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;

Expand Down Expand Up @@ -196,6 +196,13 @@ where
let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");

// TODO: implement methods which convert this value into an actual function
if let Some(ref hook_type) = ctx.node_config().debug.bad_block_hook {
warn!(target: "reth::cli", ?hook_type, "Bad block hooks are not implemented yet! The `debug.bad-block-hook` flag will do nothing for now.");
}

let bad_block_hook = Box::new(|_, _, _, _| {});

// Configure the consensus engine
let mut eth_service = EngineService::new(
ctx.consensus(),
Expand All @@ -210,6 +217,7 @@ where
pruner,
ctx.components().payload_builder().clone(),
TreeConfig::default(),
bad_block_hook,
);

let event_sender = EventSender::default();
Expand Down
1 change: 1 addition & 0 deletions crates/node/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ rand.workspace = true
derive_more.workspace = true
toml.workspace = true
serde.workspace = true
strum = { workspace = true, features = ["derive"] }

# io
dirs-next = "2.0.0"
Expand Down
Loading

0 comments on commit 2715369

Please sign in to comment.