From 1f24b9cd3d99968512ab02fb930596fb69bb06a0 Mon Sep 17 00:00:00 2001 From: Oliver Nordbjerg Date: Wed, 21 Aug 2024 18:44:18 +0200 Subject: [PATCH] refactor: move exec metrics into executor --- Cargo.lock | 2 + crates/blockchain-tree/src/chain.rs | 4 +- crates/engine/tree/src/tree/metrics.rs | 12 +++++- crates/engine/tree/src/tree/mod.rs | 17 ++++---- crates/evm/Cargo.toml | 2 + crates/evm/src/lib.rs | 1 + crates/evm/src/metrics.rs | 42 +++++++++++++++++++ crates/node/builder/src/setup.rs | 19 ++++----- crates/stages/api/src/metrics/listener.rs | 9 ---- crates/stages/api/src/metrics/sync_metrics.rs | 14 +------ crates/stages/stages/src/stages/execution.rs | 35 +++++++--------- 11 files changed, 93 insertions(+), 64 deletions(-) create mode 100644 crates/evm/src/metrics.rs diff --git a/Cargo.lock b/Cargo.lock index f3c40d8afe9a6..837dd4c857e16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7286,10 +7286,12 @@ dependencies = [ "alloy-eips", "auto_impl", "futures-util", + "metrics", "parking_lot 0.12.3", "reth-chainspec", "reth-execution-errors", "reth-execution-types", + "reth-metrics", "reth-primitives", "reth-prune-types", "reth-storage-errors", diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index dbc0c1d04b5f7..d843c152f65ba 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -98,7 +98,7 @@ impl AppendableChain { block_validation_kind, )?; - Ok(Self { chain: Chain::new(vec![block], bundle_state, trie_updates) }) + Ok(Self::new(Chain::new(vec![block], bundle_state, trie_updates))) } /// Create a new chain that forks off of an existing sidechain. @@ -155,7 +155,7 @@ impl AppendableChain { execution_outcome.set_first_block(block.number); // If all is okay, return new chain back. Present chain is not modified. - Ok(Self { chain: Chain::from_block(block, execution_outcome, None) }) + Ok(Self::new(Chain::from_block(block, execution_outcome, None))) } /// Validate and execute the given block that _extends the canonical chain_, validating its diff --git a/crates/engine/tree/src/tree/metrics.rs b/crates/engine/tree/src/tree/metrics.rs index 1a1c2edf29900..7df149072da56 100644 --- a/crates/engine/tree/src/tree/metrics.rs +++ b/crates/engine/tree/src/tree/metrics.rs @@ -1,12 +1,22 @@ +use reth_evm::metrics::ExecutorMetrics; use reth_metrics::{ metrics::{Counter, Gauge, Histogram}, Metrics, }; +/// Metrics for the `EngineApi`. +#[derive(Debug, Default)] +pub(crate) struct EngineApiMetrics { + /// Engine API-specific metrics. + pub(crate) engine: EngineMetrics, + /// Block executor metrics. + pub(crate) executor: ExecutorMetrics, +} + /// Metrics for the `EngineApi`. #[derive(Metrics)] #[metrics(scope = "consensus.engine.beacon")] -pub(crate) struct EngineApiMetrics { +pub(crate) struct EngineMetrics { /// How many executed blocks are currently stored. pub(crate) executed_blocks: Gauge, /// The number of times the pipeline was run. diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index d790f421c588f..f79145adebef3 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -588,7 +588,7 @@ where cancun_fields: Option, ) -> Result, InsertBlockFatalError> { trace!(target: "engine", "invoked new payload"); - self.metrics.new_payload_messages.increment(1); + self.metrics.engine.new_payload_messages.increment(1); // Ensures that the given payload does not violate any consensus rules that concern the // block's layout, like: @@ -712,7 +712,7 @@ where attrs: Option, ) -> ProviderResult> { trace!(target: "engine", ?attrs, "invoked forkchoice update"); - self.metrics.forkchoice_updated_messages.increment(1); + self.metrics.engine.forkchoice_updated_messages.increment(1); self.canonical_in_memory_state.on_forkchoice_update_received(); if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? { @@ -881,7 +881,7 @@ where // Check if persistence has complete match rx.try_recv() { Ok(last_persisted_block_hash) => { - self.metrics.persistence_duration.record(start_time.elapsed()); + self.metrics.engine.persistence_duration.record(start_time.elapsed()); let Some(last_persisted_block_hash) = last_persisted_block_hash else { // if this happened, then we persisted no blocks because we sent an // empty vec of blocks @@ -1004,7 +1004,7 @@ where // state house keeping after backfill sync // remove all executed blocks below the backfill height self.state.tree_state.remove_before(Bound::Included(backfill_height)); - self.metrics.executed_blocks.set(self.state.tree_state.block_count() as f64); + self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64); // remove all buffered blocks below the backfill height self.state.buffer.remove_old_blocks(backfill_height); @@ -1114,7 +1114,7 @@ where } self.backfill_sync_state = BackfillSyncState::Pending; - self.metrics.pipeline_runs.increment(1); + self.metrics.engine.pipeline_runs.increment(1); debug!(target: "engine", "emitting backfill action event"); } @@ -1739,7 +1739,10 @@ where let block = block.unseal(); let exec_time = Instant::now(); - let output = executor.execute((&block, U256::MAX).into())?; + let output = self + .metrics + .executor + .metered((&block, U256::MAX).into(), |input| executor.execute(input))?; debug!(target: "engine", elapsed=?exec_time.elapsed(), ?block_number, "Executed block"); self.consensus.validate_block_post_execution( @@ -1781,7 +1784,7 @@ where } self.state.tree_state.insert_executed(executed); - self.metrics.executed_blocks.set(self.state.tree_state.block_count() as f64); + self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64); // emit insert event let engine_event = if self.state.tree_state.is_fork(block_hash) { diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index ab338371984bd..191d11848ccf9 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -17,6 +17,7 @@ reth-execution-errors.workspace = true reth-primitives.workspace = true revm-primitives.workspace = true reth-prune-types.workspace = true +reth-metrics.workspace = true reth-storage-errors.workspace = true reth-execution-types.workspace = true @@ -24,6 +25,7 @@ revm.workspace = true alloy-eips.workspace = true auto_impl.workspace = true futures-util.workspace = true +metrics.workspace = true parking_lot = { workspace = true, optional = true } [dev-dependencies] diff --git a/crates/evm/src/lib.rs b/crates/evm/src/lib.rs index 7b52a8accfc72..82cb2a7125648 100644 --- a/crates/evm/src/lib.rs +++ b/crates/evm/src/lib.rs @@ -25,6 +25,7 @@ use revm_primitives::{ pub mod builder; pub mod either; pub mod execute; +pub mod metrics; pub mod noop; pub mod provider; pub mod system_calls; diff --git a/crates/evm/src/metrics.rs b/crates/evm/src/metrics.rs new file mode 100644 index 0000000000000..7498ca51278dd --- /dev/null +++ b/crates/evm/src/metrics.rs @@ -0,0 +1,42 @@ +//! Executor metrics. +//! +//! Block processing related to syncing should take care to update the metrics by using e.g. +//! [`ExecutorMetrics::metered`]. +use std::time::Instant; + +use metrics::{Counter, Gauge}; +use reth_execution_types::BlockExecutionInput; +use reth_metrics::Metrics; +use reth_primitives::BlockWithSenders; + +/// Executor metrics. +// TODO(onbjerg): add sload/sstore, acc load/acc change, bytecode metrics +#[derive(Metrics, Clone)] +#[metrics(scope = "sync.execution")] +pub struct ExecutorMetrics { + /// The total amount of gas processed. + pub gas_processed_total: Counter, + /// The instantaneous amount of gas processed per second. + pub gas_per_second: Gauge, +} + +impl ExecutorMetrics { + /// Execute the given block and update metrics for the execution. + pub fn metered(&self, input: BlockExecutionInput<'_, BlockWithSenders>, f: F) -> R + where + F: FnOnce(BlockExecutionInput<'_, BlockWithSenders>) -> R, + { + let gas_used = input.block.gas_used; + + // Execute the block and record the elapsed time. + let execute_start = Instant::now(); + let output = f(input); + let execution_duration = execute_start.elapsed().as_secs_f64(); + + // Update gas metrics. + self.gas_processed_total.increment(gas_used); + self.gas_per_second.set(gas_used as f64 / execution_duration); + + output + } +} diff --git a/crates/node/builder/src/setup.rs b/crates/node/builder/src/setup.rs index 802ec51fa3710..e4b26649e8788 100644 --- a/crates/node/builder/src/setup.rs +++ b/crates/node/builder/src/setup.rs @@ -102,7 +102,7 @@ where let pipeline = builder .with_tip_sender(tip_tx) - .with_metrics_tx(metrics_tx.clone()) + .with_metrics_tx(metrics_tx) .add_stages( DefaultStages::new( provider_factory.clone(), @@ -114,16 +114,13 @@ where stage_config.clone(), prune_modes.clone(), ) - .set( - ExecutionStage::new( - executor, - stage_config.execution.into(), - stage_config.execution_external_clean_threshold(), - prune_modes, - exex_manager_handle, - ) - .with_metrics_tx(metrics_tx), - ), + .set(ExecutionStage::new( + executor, + stage_config.execution.into(), + stage_config.execution_external_clean_threshold(), + prune_modes, + exex_manager_handle, + )), ) .build(provider_factory, static_file_producer); diff --git a/crates/stages/api/src/metrics/listener.rs b/crates/stages/api/src/metrics/listener.rs index e37eaa3d7267b..aba001a92f17b 100644 --- a/crates/stages/api/src/metrics/listener.rs +++ b/crates/stages/api/src/metrics/listener.rs @@ -1,6 +1,5 @@ use crate::{metrics::SyncMetrics, StageCheckpoint, StageId}; use alloy_primitives::BlockNumber; -use reth_primitives_traits::constants::MEGAGAS; use std::{ future::Future, pin::Pin, @@ -30,11 +29,6 @@ pub enum MetricEvent { /// If specified, `entities_total` metric is updated. max_block_number: Option, }, - /// Execution stage processed some amount of gas. - ExecutionStageGas { - /// Gas processed. - gas: u64, - }, } /// Metrics routine that listens to new metric events on the `events_rx` receiver. @@ -82,9 +76,6 @@ impl MetricsListener { stage_metrics.entities_total.set(total as f64); } } - MetricEvent::ExecutionStageGas { gas } => { - self.sync_metrics.execution_stage.mgas_processed_total.increment(gas / MEGAGAS) - } } } } diff --git a/crates/stages/api/src/metrics/sync_metrics.rs b/crates/stages/api/src/metrics/sync_metrics.rs index 3ee2964ea715c..b89d7b8822e72 100644 --- a/crates/stages/api/src/metrics/sync_metrics.rs +++ b/crates/stages/api/src/metrics/sync_metrics.rs @@ -1,14 +1,10 @@ use crate::StageId; -use reth_metrics::{ - metrics::{Counter, Gauge}, - Metrics, -}; +use reth_metrics::{metrics::Gauge, Metrics}; use std::collections::HashMap; #[derive(Debug, Default)] pub(crate) struct SyncMetrics { pub(crate) stages: HashMap, - pub(crate) execution_stage: ExecutionStageMetrics, } impl SyncMetrics { @@ -31,11 +27,3 @@ pub(crate) struct StageMetrics { /// The number of total entities of the last commit for a stage, if applicable. pub(crate) entities_total: Gauge, } - -/// Execution stage metrics. -#[derive(Metrics)] -#[metrics(scope = "sync.execution")] -pub(crate) struct ExecutionStageMetrics { - /// The total amount of gas processed (in millions) - pub(crate) mgas_processed_total: Counter, -} diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index b3d2122661d23..d09343d00c72b 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -3,7 +3,10 @@ use num_traits::Zero; use reth_config::config::ExecutionConfig; use reth_db::{static_file::HeaderMask, tables}; use reth_db_api::{cursor::DbCursorRO, database::Database, transaction::DbTx}; -use reth_evm::execute::{BatchExecutor, BlockExecutorProvider}; +use reth_evm::{ + execute::{BatchExecutor, BlockExecutorProvider}, + metrics::ExecutorMetrics, +}; use reth_execution_types::{Chain, ExecutionOutcome}; use reth_exex::{ExExManagerHandle, ExExNotification}; use reth_primitives::{BlockNumber, Header, StaticFileSegment}; @@ -18,8 +21,8 @@ use reth_prune_types::PruneModes; use reth_revm::database::StateProviderDatabase; use reth_stages_api::{ BlockErrorKind, CheckpointBlockRange, EntitiesCheckpoint, ExecInput, ExecOutput, - ExecutionCheckpoint, ExecutionStageThresholds, MetricEvent, MetricEventsSender, Stage, - StageCheckpoint, StageError, StageId, UnwindInput, UnwindOutput, + ExecutionCheckpoint, ExecutionStageThresholds, Stage, StageCheckpoint, StageError, StageId, + UnwindInput, UnwindOutput, }; use std::{ cmp::Ordering, @@ -61,7 +64,6 @@ use tracing::*; // false positive, we cannot derive it if !DB: Debug. #[allow(missing_debug_implementations)] pub struct ExecutionStage { - metrics_tx: Option, /// The stage's internal block executor executor_provider: E, /// The commit thresholds of the execution stage. @@ -83,11 +85,13 @@ pub struct ExecutionStage { post_unwind_commit_input: Option, /// Handle to communicate with `ExEx` manager. exex_manager_handle: ExExManagerHandle, + /// Executor metrics. + metrics: ExecutorMetrics, } impl ExecutionStage { /// Create new execution stage with specified config. - pub const fn new( + pub fn new( executor_provider: E, thresholds: ExecutionStageThresholds, external_clean_threshold: u64, @@ -95,7 +99,6 @@ impl ExecutionStage { exex_manager_handle: ExExManagerHandle, ) -> Self { Self { - metrics_tx: None, external_clean_threshold, executor_provider, thresholds, @@ -103,6 +106,7 @@ impl ExecutionStage { post_execute_commit_input: None, post_unwind_commit_input: None, exex_manager_handle, + metrics: ExecutorMetrics::default(), } } @@ -135,12 +139,6 @@ impl ExecutionStage { ) } - /// Set the metric events sender. - pub fn with_metrics_tx(mut self, metrics_tx: MetricEventsSender) -> Self { - self.metrics_tx = Some(metrics_tx); - self - } - /// Adjusts the prune modes related to changesets. /// /// This function verifies whether the [`super::MerkleStage`] or Hashing stages will run from @@ -272,12 +270,13 @@ where // Execute the block let execute_start = Instant::now(); - executor.execute_and_verify_one((&block, td).into()).map_err(|error| { - StageError::Block { + self.metrics.metered((&block, td).into(), |input| { + executor.execute_and_verify_one(input).map_err(|error| StageError::Block { block: Box::new(block.header.clone().seal_slow()), error: BlockErrorKind::Execution(error), - } + }) })?; + execution_duration += execute_start.elapsed(); // Log execution throughput @@ -296,12 +295,6 @@ where last_log_instant = Instant::now(); } - // Gas metrics - if let Some(metrics_tx) = &mut self.metrics_tx { - let _ = - metrics_tx.send(MetricEvent::ExecutionStageGas { gas: block.header.gas_used }); - } - stage_progress = block_number; stage_checkpoint.progress.processed += block.gas_used;