Skip to content

Commit

Permalink
refactor: move exec metrics into executor
Browse files Browse the repository at this point in the history
  • Loading branch information
onbjerg committed Aug 22, 2024
1 parent aa8b3de commit 1f24b9c
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 64 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl AppendableChain {
block_validation_kind,
)?;

Ok(Self { chain: Chain::new(vec![block], bundle_state, trie_updates) })
Ok(Self::new(Chain::new(vec![block], bundle_state, trie_updates)))
}

/// Create a new chain that forks off of an existing sidechain.
Expand Down Expand Up @@ -155,7 +155,7 @@ impl AppendableChain {
execution_outcome.set_first_block(block.number);

// If all is okay, return new chain back. Present chain is not modified.
Ok(Self { chain: Chain::from_block(block, execution_outcome, None) })
Ok(Self::new(Chain::from_block(block, execution_outcome, None)))
}

/// Validate and execute the given block that _extends the canonical chain_, validating its
Expand Down
12 changes: 11 additions & 1 deletion crates/engine/tree/src/tree/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,22 @@
use reth_evm::metrics::ExecutorMetrics;
use reth_metrics::{
metrics::{Counter, Gauge, Histogram},
Metrics,
};

/// Metrics for the `EngineApi`.
#[derive(Debug, Default)]
pub(crate) struct EngineApiMetrics {
/// Engine API-specific metrics.
pub(crate) engine: EngineMetrics,
/// Block executor metrics.
pub(crate) executor: ExecutorMetrics,
}

/// Metrics for the `EngineApi`.
#[derive(Metrics)]
#[metrics(scope = "consensus.engine.beacon")]
pub(crate) struct EngineApiMetrics {
pub(crate) struct EngineMetrics {
/// How many executed blocks are currently stored.
pub(crate) executed_blocks: Gauge,
/// The number of times the pipeline was run.
Expand Down
17 changes: 10 additions & 7 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ where
cancun_fields: Option<CancunPayloadFields>,
) -> Result<TreeOutcome<PayloadStatus>, InsertBlockFatalError> {
trace!(target: "engine", "invoked new payload");
self.metrics.new_payload_messages.increment(1);
self.metrics.engine.new_payload_messages.increment(1);

// Ensures that the given payload does not violate any consensus rules that concern the
// block's layout, like:
Expand Down Expand Up @@ -712,7 +712,7 @@ where
attrs: Option<T::PayloadAttributes>,
) -> ProviderResult<TreeOutcome<OnForkChoiceUpdated>> {
trace!(target: "engine", ?attrs, "invoked forkchoice update");
self.metrics.forkchoice_updated_messages.increment(1);
self.metrics.engine.forkchoice_updated_messages.increment(1);
self.canonical_in_memory_state.on_forkchoice_update_received();

if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? {
Expand Down Expand Up @@ -881,7 +881,7 @@ where
// Check if persistence has complete
match rx.try_recv() {
Ok(last_persisted_block_hash) => {
self.metrics.persistence_duration.record(start_time.elapsed());
self.metrics.engine.persistence_duration.record(start_time.elapsed());
let Some(last_persisted_block_hash) = last_persisted_block_hash else {
// if this happened, then we persisted no blocks because we sent an
// empty vec of blocks
Expand Down Expand Up @@ -1004,7 +1004,7 @@ where
// state house keeping after backfill sync
// remove all executed blocks below the backfill height
self.state.tree_state.remove_before(Bound::Included(backfill_height));
self.metrics.executed_blocks.set(self.state.tree_state.block_count() as f64);
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);

// remove all buffered blocks below the backfill height
self.state.buffer.remove_old_blocks(backfill_height);
Expand Down Expand Up @@ -1114,7 +1114,7 @@ where
}

self.backfill_sync_state = BackfillSyncState::Pending;
self.metrics.pipeline_runs.increment(1);
self.metrics.engine.pipeline_runs.increment(1);
debug!(target: "engine", "emitting backfill action event");
}

Expand Down Expand Up @@ -1739,7 +1739,10 @@ where
let block = block.unseal();

let exec_time = Instant::now();
let output = executor.execute((&block, U256::MAX).into())?;
let output = self
.metrics
.executor
.metered((&block, U256::MAX).into(), |input| executor.execute(input))?;
debug!(target: "engine", elapsed=?exec_time.elapsed(), ?block_number, "Executed block");

self.consensus.validate_block_post_execution(
Expand Down Expand Up @@ -1781,7 +1784,7 @@ where
}

self.state.tree_state.insert_executed(executed);
self.metrics.executed_blocks.set(self.state.tree_state.block_count() as f64);
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);

// emit insert event
let engine_event = if self.state.tree_state.is_fork(block_hash) {
Expand Down
2 changes: 2 additions & 0 deletions crates/evm/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ reth-execution-errors.workspace = true
reth-primitives.workspace = true
revm-primitives.workspace = true
reth-prune-types.workspace = true
reth-metrics.workspace = true
reth-storage-errors.workspace = true
reth-execution-types.workspace = true

revm.workspace = true
alloy-eips.workspace = true
auto_impl.workspace = true
futures-util.workspace = true
metrics.workspace = true
parking_lot = { workspace = true, optional = true }

[dev-dependencies]
Expand Down
1 change: 1 addition & 0 deletions crates/evm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use revm_primitives::{
pub mod builder;
pub mod either;
pub mod execute;
pub mod metrics;
pub mod noop;
pub mod provider;
pub mod system_calls;
Expand Down
42 changes: 42 additions & 0 deletions crates/evm/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
//! Executor metrics.
//!
//! Block processing related to syncing should take care to update the metrics by using e.g.
//! [`ExecutorMetrics::metered`].
use std::time::Instant;

use metrics::{Counter, Gauge};
use reth_execution_types::BlockExecutionInput;
use reth_metrics::Metrics;
use reth_primitives::BlockWithSenders;

/// Executor metrics.
// TODO(onbjerg): add sload/sstore, acc load/acc change, bytecode metrics
#[derive(Metrics, Clone)]
#[metrics(scope = "sync.execution")]
pub struct ExecutorMetrics {
/// The total amount of gas processed.
pub gas_processed_total: Counter,
/// The instantaneous amount of gas processed per second.
pub gas_per_second: Gauge,
}

impl ExecutorMetrics {
/// Execute the given block and update metrics for the execution.
pub fn metered<F, R>(&self, input: BlockExecutionInput<'_, BlockWithSenders>, f: F) -> R
where
F: FnOnce(BlockExecutionInput<'_, BlockWithSenders>) -> R,
{
let gas_used = input.block.gas_used;

// Execute the block and record the elapsed time.
let execute_start = Instant::now();
let output = f(input);
let execution_duration = execute_start.elapsed().as_secs_f64();

// Update gas metrics.
self.gas_processed_total.increment(gas_used);
self.gas_per_second.set(gas_used as f64 / execution_duration);

output
}
}
19 changes: 8 additions & 11 deletions crates/node/builder/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where

let pipeline = builder
.with_tip_sender(tip_tx)
.with_metrics_tx(metrics_tx.clone())
.with_metrics_tx(metrics_tx)
.add_stages(
DefaultStages::new(
provider_factory.clone(),
Expand All @@ -114,16 +114,13 @@ where
stage_config.clone(),
prune_modes.clone(),
)
.set(
ExecutionStage::new(
executor,
stage_config.execution.into(),
stage_config.execution_external_clean_threshold(),
prune_modes,
exex_manager_handle,
)
.with_metrics_tx(metrics_tx),
),
.set(ExecutionStage::new(
executor,
stage_config.execution.into(),
stage_config.execution_external_clean_threshold(),
prune_modes,
exex_manager_handle,
)),
)
.build(provider_factory, static_file_producer);

Expand Down
9 changes: 0 additions & 9 deletions crates/stages/api/src/metrics/listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crate::{metrics::SyncMetrics, StageCheckpoint, StageId};
use alloy_primitives::BlockNumber;
use reth_primitives_traits::constants::MEGAGAS;
use std::{
future::Future,
pin::Pin,
Expand Down Expand Up @@ -30,11 +29,6 @@ pub enum MetricEvent {
/// If specified, `entities_total` metric is updated.
max_block_number: Option<BlockNumber>,
},
/// Execution stage processed some amount of gas.
ExecutionStageGas {
/// Gas processed.
gas: u64,
},
}

/// Metrics routine that listens to new metric events on the `events_rx` receiver.
Expand Down Expand Up @@ -82,9 +76,6 @@ impl MetricsListener {
stage_metrics.entities_total.set(total as f64);
}
}
MetricEvent::ExecutionStageGas { gas } => {
self.sync_metrics.execution_stage.mgas_processed_total.increment(gas / MEGAGAS)
}
}
}
}
Expand Down
14 changes: 1 addition & 13 deletions crates/stages/api/src/metrics/sync_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
use crate::StageId;
use reth_metrics::{
metrics::{Counter, Gauge},
Metrics,
};
use reth_metrics::{metrics::Gauge, Metrics};
use std::collections::HashMap;

#[derive(Debug, Default)]
pub(crate) struct SyncMetrics {
pub(crate) stages: HashMap<StageId, StageMetrics>,
pub(crate) execution_stage: ExecutionStageMetrics,
}

impl SyncMetrics {
Expand All @@ -31,11 +27,3 @@ pub(crate) struct StageMetrics {
/// The number of total entities of the last commit for a stage, if applicable.
pub(crate) entities_total: Gauge,
}

/// Execution stage metrics.
#[derive(Metrics)]
#[metrics(scope = "sync.execution")]
pub(crate) struct ExecutionStageMetrics {
/// The total amount of gas processed (in millions)
pub(crate) mgas_processed_total: Counter,
}
35 changes: 14 additions & 21 deletions crates/stages/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use num_traits::Zero;
use reth_config::config::ExecutionConfig;
use reth_db::{static_file::HeaderMask, tables};
use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx};
use reth_evm::execute::{BatchExecutor, BlockExecutorProvider};
use reth_evm::{
execute::{BatchExecutor, BlockExecutorProvider},
metrics::ExecutorMetrics,
};
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_exex::{ExExManagerHandle, ExExNotification};
use reth_primitives::{BlockNumber, Header, StaticFileSegment};
Expand All @@ -18,8 +21,8 @@ use reth_prune_types::PruneModes;
use reth_revm::database::StateProviderDatabase;
use reth_stages_api::{
BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput,
ExecutionCheckpoint, ExecutionStageThresholds, MetricEvent, MetricEventsSender, Stage,
StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput,
ExecutionCheckpoint, ExecutionStageThresholds, Stage, StageCheckpoint, StageError, StageId,
UnwindInput, UnwindOutput,
};
use std::{
cmp::Ordering,
Expand Down Expand Up @@ -61,7 +64,6 @@ use tracing::*;
// false positive, we cannot derive it if !DB: Debug.
#[allow(missing_debug_implementations)]
pub struct ExecutionStage<E> {
metrics_tx: Option<MetricEventsSender>,
/// The stage's internal block executor
executor_provider: E,
/// The commit thresholds of the execution stage.
Expand All @@ -83,26 +85,28 @@ pub struct ExecutionStage<E> {
post_unwind_commit_input: Option<Chain>,
/// Handle to communicate with `ExEx` manager.
exex_manager_handle: ExExManagerHandle,
/// Executor metrics.
metrics: ExecutorMetrics,
}

impl<E> ExecutionStage<E> {
/// Create new execution stage with specified config.
pub const fn new(
pub fn new(
executor_provider: E,
thresholds: ExecutionStageThresholds,
external_clean_threshold: u64,
prune_modes: PruneModes,
exex_manager_handle: ExExManagerHandle,
) -> Self {
Self {
metrics_tx: None,
external_clean_threshold,
executor_provider,
thresholds,
prune_modes,
post_execute_commit_input: None,
post_unwind_commit_input: None,
exex_manager_handle,
metrics: ExecutorMetrics::default(),
}
}

Expand Down Expand Up @@ -135,12 +139,6 @@ impl<E> ExecutionStage<E> {
)
}

/// Set the metric events sender.
pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self {
self.metrics_tx = Some(metrics_tx);
self
}

/// Adjusts the prune modes related to changesets.
///
/// This function verifies whether the [`super::MerkleStage`] or Hashing stages will run from
Expand Down Expand Up @@ -272,12 +270,13 @@ where
// Execute the block
let execute_start = Instant::now();

executor.execute_and_verify_one((&block, td).into()).map_err(|error| {
StageError::Block {
self.metrics.metered((&block, td).into(), |input| {
executor.execute_and_verify_one(input).map_err(|error| StageError::Block {
block: Box::new(block.header.clone().seal_slow()),
error: BlockErrorKind::Execution(error),
}
})
})?;

execution_duration += execute_start.elapsed();

// Log execution throughput
Expand All @@ -296,12 +295,6 @@ where
last_log_instant = Instant::now();
}

// Gas metrics
if let Some(metrics_tx) = &mut self.metrics_tx {
let _ =
metrics_tx.send(MetricEvent::ExecutionStageGas { gas: block.header.gas_used });
}

stage_progress = block_number;
stage_checkpoint.progress.processed += block.gas_used;

Expand Down

0 comments on commit 1f24b9c

Please sign in to comment.