Skip to content

Commit

Permalink
feat: make StaticFileProvider generic over NodePrimitives (#12565)
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr authored Nov 15, 2024
1 parent 44964ac commit 9168295
Show file tree
Hide file tree
Showing 38 changed files with 324 additions and 222 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/cli/commands/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Environmen
&self,
config: &Config,
db: Arc<DatabaseEnv>,
static_file_provider: StaticFileProvider,
static_file_provider: StaticFileProvider<N::Primitives>,
) -> eyre::Result<ProviderFactory<NodeTypesWithDBAdapter<N, Arc<DatabaseEnv>>>> {
let has_receipt_pruning = config.prune.as_ref().map_or(false, |a| a.has_receipts_pruning());
let prune_modes =
Expand Down
11 changes: 7 additions & 4 deletions crates/cli/commands/src/db/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use reth_db::{mdbx, static_file::iter_static_files, DatabaseEnv, TableViewer, Ta
use reth_db_api::database::Database;
use reth_db_common::DbTool;
use reth_fs_util as fs;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine};
use reth_node_builder::{
NodePrimitives, NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine,
};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::providers::{ProviderNodeTypes, StaticFileProvider};
use reth_static_file_types::SegmentRangeInclusive;
Expand Down Expand Up @@ -49,7 +51,7 @@ impl Command {
println!("\n");
}

let static_files_stats_table = self.static_files_stats_table(data_dir)?;
let static_files_stats_table = self.static_files_stats_table::<N::Primitives>(data_dir)?;
println!("{static_files_stats_table}");

println!("\n");
Expand Down Expand Up @@ -143,7 +145,7 @@ impl Command {
Ok(table)
}

fn static_files_stats_table(
fn static_files_stats_table<N: NodePrimitives>(
&self,
data_dir: ChainPath<DataDirPath>,
) -> eyre::Result<ComfyTable> {
Expand Down Expand Up @@ -173,7 +175,8 @@ impl Command {
}

let static_files = iter_static_files(data_dir.static_files())?;
let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?;
let static_file_provider =
StaticFileProvider::<N>::read_only(data_dir.static_files(), false)?;

let mut total_data_size = 0;
let mut total_index_size = 0;
Expand Down
1 change: 0 additions & 1 deletion crates/cli/commands/src/init_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> InitStateC
if last_block_number == 0 {
without_evm::setup_without_evm(
&provider_rw,
&static_file_provider,
// &header,
// header_hash,
SealedHeader::new(header, header_hash),
Expand Down
28 changes: 17 additions & 11 deletions crates/cli/commands/src/init_state/without_evm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ use alloy_primitives::{BlockNumber, B256, U256};
use alloy_rlp::Decodable;

use alloy_consensus::Header;
use reth_node_builder::NodePrimitives;
use reth_primitives::{
BlockBody, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment,
};
use reth_provider::{
providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileWriter,
providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileProviderFactory,
StaticFileWriter,
};
use reth_stages::{StageCheckpoint, StageId};

Expand All @@ -27,21 +29,21 @@ pub(crate) fn read_header_from_file(path: PathBuf) -> Result<Header, eyre::Error
/// first valid block.
pub fn setup_without_evm<Provider>(
provider_rw: &Provider,
static_file_provider: &StaticFileProvider,
header: SealedHeader,
total_difficulty: U256,
) -> Result<(), eyre::Error>
where
Provider: StageCheckpointWriter + BlockWriter,
Provider: StaticFileProviderFactory + StageCheckpointWriter + BlockWriter,
{
info!(target: "reth::cli", "Setting up dummy EVM chain before importing state.");

let static_file_provider = provider_rw.static_file_provider();
// Write EVM dummy data up to `header - 1` block
append_dummy_chain(static_file_provider, header.number - 1)?;
append_dummy_chain(&static_file_provider, header.number - 1)?;

info!(target: "reth::cli", "Appending first valid block.");

append_first_block(provider_rw, static_file_provider, &header, total_difficulty)?;
append_first_block(provider_rw, &header, total_difficulty)?;

for stage in StageId::ALL {
provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(header.number))?;
Expand All @@ -56,17 +58,21 @@ where
///
/// By appending it, static file writer also verifies that all segments are at the same
/// height.
fn append_first_block(
provider_rw: impl BlockWriter,
sf_provider: &StaticFileProvider,
fn append_first_block<Provider>(
provider_rw: &Provider,
header: &SealedHeader,
total_difficulty: U256,
) -> Result<(), eyre::Error> {
) -> Result<(), eyre::Error>
where
Provider: BlockWriter + StaticFileProviderFactory,
{
provider_rw.insert_block(
SealedBlockWithSenders::new(SealedBlock::new(header.clone(), BlockBody::default()), vec![])
.expect("no senders or txes"),
)?;

let sf_provider = provider_rw.static_file_provider();

sf_provider.latest_writer(StaticFileSegment::Headers)?.append_header(
header,
total_difficulty,
Expand All @@ -85,8 +91,8 @@ fn append_first_block(
/// * Headers: It will push an empty block.
/// * Transactions: It will not push any tx, only increments the end block range.
/// * Receipts: It will not push any receipt, only increments the end block range.
fn append_dummy_chain(
sf_provider: &StaticFileProvider,
fn append_dummy_chain<N: NodePrimitives>(
sf_provider: &StaticFileProvider<N>,
target_height: BlockNumber,
) -> Result<(), eyre::Error> {
let (tx, rx) = std::sync::mpsc::channel();
Expand Down
20 changes: 10 additions & 10 deletions crates/cli/commands/src/stage/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use reth_db_common::{
};
use reth_node_builder::NodeTypesWithEngine;
use reth_node_core::args::StageEnum;
use reth_provider::{writer::UnifiedStorageWriter, StaticFileProviderFactory};
use reth_provider::{
writer::UnifiedStorageWriter, DatabaseProviderFactory, StaticFileProviderFactory,
};
use reth_prune::PruneSegment;
use reth_stages::StageId;
use reth_static_file_types::StaticFileSegment;
Expand All @@ -33,8 +35,6 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
) -> eyre::Result<()> {
let Environment { provider_factory, .. } = self.env.init::<N>(AccessRights::RW)?;

let static_file_provider = provider_factory.static_file_provider();

let tool = DbTool::new(provider_factory)?;

let static_file_segment = match self.stage {
Expand All @@ -60,7 +60,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
}
}

let provider_rw = tool.provider_factory.provider_rw()?;
let provider_rw = tool.provider_factory.database_provider_rw()?;
let tx = provider_rw.tx_ref();

match self.stage {
Expand All @@ -71,7 +71,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
tx.clear::<tables::HeaderNumbers>()?;
reset_stage_checkpoint(tx, StageId::Headers)?;

insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?;
insert_genesis_header(&provider_rw, &self.env.chain)?;
}
StageEnum::Bodies => {
tx.clear::<tables::BlockBodyIndices>()?;
Expand All @@ -83,7 +83,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
tx.clear::<tables::BlockWithdrawals>()?;
reset_stage_checkpoint(tx, StageId::Bodies)?;

insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?;
insert_genesis_header(&provider_rw, &self.env.chain)?;
}
StageEnum::Senders => {
tx.clear::<tables::TransactionSenders>()?;
Expand All @@ -104,7 +104,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
reset_stage_checkpoint(tx, StageId::Execution)?;

let alloc = &self.env.chain.genesis().alloc;
insert_genesis_state(&provider_rw.0, alloc.iter())?;
insert_genesis_state(&provider_rw, alloc.iter())?;
}
StageEnum::AccountHashing => {
tx.clear::<tables::HashedAccounts>()?;
Expand Down Expand Up @@ -142,20 +142,20 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?;
reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?;

insert_genesis_history(&provider_rw.0, self.env.chain.genesis().alloc.iter())?;
insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?;
}
StageEnum::TxLookup => {
tx.clear::<tables::TransactionHashNumbers>()?;
reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?;

reset_stage_checkpoint(tx, StageId::TransactionLookup)?;
insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?;
insert_genesis_header(&provider_rw, &self.env.chain)?;
}
}

tx.put::<tables::StageCheckpoints>(StageId::Finish.to_string(), Default::default())?;

UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;

Ok(())
}
Expand Down
7 changes: 2 additions & 5 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,10 +329,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
}

if self.commit {
UnifiedStorageWriter::commit_unwind(
provider_rw,
provider_factory.static_file_provider(),
)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;
provider_rw = provider_factory.database_provider_rw()?;
}
}
Expand All @@ -355,7 +352,7 @@ impl<C: ChainSpecParser<ChainSpec: EthChainSpec + EthereumHardforks>> Command<C>
provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?;
}
if self.commit {
UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?;
UnifiedStorageWriter::commit(provider_rw)?;
provider_rw = provider_factory.database_provider_rw()?;
}

Expand Down
4 changes: 2 additions & 2 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {

let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
UnifiedStorageWriter::commit_unwind(provider_rw)?;

debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Expand All @@ -142,7 +142,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
let static_file_provider = self.provider.static_file_provider();

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
UnifiedStorageWriter::commit(provider_rw)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash_num)
Expand Down
4 changes: 2 additions & 2 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ where
}

/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
pub fn static_file_provider(&self) -> StaticFileProvider<T::Primitives> {
self.right().static_file_provider()
}

Expand Down Expand Up @@ -766,7 +766,7 @@ where
}

/// Returns the static file provider to interact with the static files.
pub fn static_file_provider(&self) -> StaticFileProvider {
pub fn static_file_provider(&self) -> StaticFileProvider<<T::Types as NodeTypes>::Primitives> {
self.provider_factory().static_file_provider()
}

Expand Down
1 change: 1 addition & 0 deletions crates/node/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ repository.workspace = true

[dependencies]
reth-db-api.workspace = true
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-metrics.workspace = true
reth-tasks.workspace = true
Expand Down
16 changes: 11 additions & 5 deletions crates/node/metrics/src/hooks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use metrics_process::Collector;
use reth_db_api::database_metrics::DatabaseMetrics;
use reth_primitives_traits::NodePrimitives;
use reth_provider::providers::StaticFileProvider;
use std::{fmt, sync::Arc};
use std::{
fmt::{self},
sync::Arc,
};

pub(crate) trait Hook: Fn() + Send + Sync {}
impl<T: Fn() + Send + Sync> Hook for T {}

Expand All @@ -22,10 +27,11 @@ pub struct Hooks {

impl Hooks {
/// Create a new set of hooks
pub fn new<Metrics: DatabaseMetrics + 'static + Send + Sync>(
db: Metrics,
static_file_provider: StaticFileProvider,
) -> Self {
pub fn new<Metrics, N>(db: Metrics, static_file_provider: StaticFileProvider<N>) -> Self
where
Metrics: DatabaseMetrics + 'static + Send + Sync,
N: NodePrimitives,
{
let hooks: Vec<Box<dyn Hook<Output = ()>>> = vec![
Box::new(move || db.report_metrics()),
Box::new(move || {
Expand Down
4 changes: 2 additions & 2 deletions crates/optimism/cli/src/commands/import_receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ where
}
}

let provider = provider_factory.provider_rw()?;
let provider = provider_factory.database_provider_rw()?;
let mut total_decoded_receipts = 0;
let mut total_receipts = 0;
let mut total_filtered_out_dup_txns = 0;
Expand Down Expand Up @@ -247,7 +247,7 @@ where
provider
.save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?;

UnifiedStorageWriter::commit(provider, static_file_provider)?;
UnifiedStorageWriter::commit(provider)?;

Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns })
}
Expand Down
1 change: 0 additions & 1 deletion crates/optimism/cli/src/commands/init_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ impl<C: ChainSpecParser<ChainSpec = OpChainSpec>> InitStateCommandOp<C> {
if last_block_number == 0 {
reth_cli_commands::init_state::without_evm::setup_without_evm(
&provider_rw,
&static_file_provider,
SealedHeader::new(BEDROCK_HEADER, BEDROCK_HEADER_HASH),
BEDROCK_HEADER_TTD,
)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/primitives-traits/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use core::fmt;
use crate::{BlockBody, FullBlock, FullReceipt, FullSignedTx, FullTxType};

/// Configures all the primitive types of the node.
pub trait NodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug {
pub trait NodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static {
/// Block primitive.
type Block: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static;
/// Signed version of the transaction type.
Expand All @@ -22,7 +22,7 @@ impl NodePrimitives for () {
}

/// Helper trait that sets trait bounds on [`NodePrimitives`].
pub trait FullNodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug {
pub trait FullNodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static {
/// Block primitive.
type Block: FullBlock<Body: BlockBody<Transaction = Self::SignedTx>>;
/// Signed version of the transaction type.
Expand Down
19 changes: 14 additions & 5 deletions crates/prune/prune/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,11 @@ impl PrunerBuilder {
/// Builds a [Pruner] from the current configuration with the given provider factory.
pub fn build_with_provider_factory<PF>(self, provider_factory: PF) -> Pruner<PF::ProviderRW, PF>
where
PF: DatabaseProviderFactory<ProviderRW: PruneCheckpointWriter + BlockReader>
+ StaticFileProviderFactory,
PF: DatabaseProviderFactory<
ProviderRW: PruneCheckpointWriter + BlockReader + StaticFileProviderFactory,
> + StaticFileProviderFactory<
Primitives = <PF::ProviderRW as StaticFileProviderFactory>::Primitives,
>,
{
let segments =
SegmentSet::from_components(provider_factory.static_file_provider(), self.segments);
Expand All @@ -93,10 +96,16 @@ impl PrunerBuilder {
}

/// Builds a [Pruner] from the current configuration with the given static file provider.
pub fn build<Provider>(self, static_file_provider: StaticFileProvider) -> Pruner<Provider, ()>
pub fn build<Provider>(
self,
static_file_provider: StaticFileProvider<Provider::Primitives>,
) -> Pruner<Provider, ()>
where
Provider:
DBProvider<Tx: DbTxMut> + BlockReader + PruneCheckpointWriter + TransactionsProvider,
Provider: StaticFileProviderFactory
+ DBProvider<Tx: DbTxMut>
+ BlockReader
+ PruneCheckpointWriter
+ TransactionsProvider,
{
let segments = SegmentSet::<Provider>::from_components(static_file_provider, self.segments);

Expand Down
Loading

0 comments on commit 9168295

Please sign in to comment.