From 91682959d66593fd7b021495a33d524ee09f826d Mon Sep 17 00:00:00 2001 From: Arsenii Kulikov Date: Fri, 15 Nov 2024 14:28:53 +0400 Subject: [PATCH] feat: make `StaticFileProvider` generic over `NodePrimitives` (#12565) --- Cargo.lock | 1 + crates/cli/commands/src/common.rs | 2 +- crates/cli/commands/src/db/stats.rs | 11 ++- crates/cli/commands/src/init_state/mod.rs | 1 - .../commands/src/init_state/without_evm.rs | 28 ++++--- crates/cli/commands/src/stage/drop.rs | 20 ++--- crates/cli/commands/src/stage/run.rs | 7 +- crates/engine/tree/src/persistence.rs | 4 +- crates/node/builder/src/launch/common.rs | 4 +- crates/node/metrics/Cargo.toml | 1 + crates/node/metrics/src/hooks.rs | 16 ++-- .../cli/src/commands/import_receipts.rs | 4 +- .../optimism/cli/src/commands/init_state.rs | 1 - crates/primitives-traits/src/node.rs | 4 +- crates/prune/prune/src/builder.rs | 19 +++-- crates/prune/prune/src/segments/set.rs | 10 ++- .../prune/src/segments/static_file/headers.rs | 14 ++-- .../src/segments/static_file/receipts.rs | 18 +++-- .../src/segments/static_file/transactions.rs | 18 +++-- crates/stages/api/src/pipeline/mod.rs | 14 +--- crates/stages/stages/src/stages/bodies.rs | 4 +- crates/stages/stages/src/stages/execution.rs | 19 ++--- crates/stages/stages/src/stages/mod.rs | 9 ++- .../stages/stages/src/test_utils/test_db.rs | 4 +- crates/storage/db-common/src/init.rs | 35 +++++---- .../src/providers/blockchain_provider.rs | 9 ++- .../provider/src/providers/consistent.rs | 4 +- .../provider/src/providers/database/mod.rs | 10 ++- .../src/providers/database/provider.rs | 10 ++- crates/storage/provider/src/providers/mod.rs | 4 +- .../provider/src/providers/static_file/jar.rs | 30 +++++--- .../src/providers/static_file/manager.rs | 76 ++++++++++++------- .../provider/src/providers/static_file/mod.rs | 22 +++--- .../src/providers/static_file/writer.rs | 54 ++++++++----- .../storage/provider/src/test_utils/noop.rs | 4 +- .../src/traits/static_file_provider.rs | 7 +- crates/storage/provider/src/writer/mod.rs | 45 ++++++----- .../provider/src/writer/static_file.rs | 3 +- 38 files changed, 324 insertions(+), 222 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d92f66e8a585..927e653a609f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8140,6 +8140,7 @@ dependencies = [ "reqwest", "reth-db-api", "reth-metrics", + "reth-primitives-traits", "reth-provider", "reth-tasks", "socket2", diff --git a/crates/cli/commands/src/common.rs b/crates/cli/commands/src/common.rs index 49fee347ed45..3a9cbaa7fbb9 100644 --- a/crates/cli/commands/src/common.rs +++ b/crates/cli/commands/src/common.rs @@ -109,7 +109,7 @@ impl> Environmen &self, config: &Config, db: Arc, - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, ) -> eyre::Result>>> { let has_receipt_pruning = config.prune.as_ref().map_or(false, |a| a.has_receipts_pruning()); let prune_modes = diff --git a/crates/cli/commands/src/db/stats.rs b/crates/cli/commands/src/db/stats.rs index ac36b866b07a..6865f01345ea 100644 --- a/crates/cli/commands/src/db/stats.rs +++ b/crates/cli/commands/src/db/stats.rs @@ -9,7 +9,9 @@ use reth_db::{mdbx, static_file::iter_static_files, DatabaseEnv, TableViewer, Ta use reth_db_api::database::Database; use reth_db_common::DbTool; use reth_fs_util as fs; -use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine}; +use reth_node_builder::{ + NodePrimitives, NodeTypesWithDB, NodeTypesWithDBAdapter, NodeTypesWithEngine, +}; use reth_node_core::dirs::{ChainPath, DataDirPath}; use reth_provider::providers::{ProviderNodeTypes, StaticFileProvider}; use reth_static_file_types::SegmentRangeInclusive; @@ -49,7 +51,7 @@ impl Command { println!("\n"); } - let static_files_stats_table = self.static_files_stats_table(data_dir)?; + let static_files_stats_table = self.static_files_stats_table::(data_dir)?; println!("{static_files_stats_table}"); println!("\n"); @@ -143,7 +145,7 @@ impl Command { Ok(table) } - fn static_files_stats_table( + fn static_files_stats_table( &self, data_dir: ChainPath, ) -> eyre::Result { @@ -173,7 +175,8 @@ impl Command { } let static_files = iter_static_files(data_dir.static_files())?; - let static_file_provider = StaticFileProvider::read_only(data_dir.static_files(), false)?; + let static_file_provider = + StaticFileProvider::::read_only(data_dir.static_files(), false)?; let mut total_data_size = 0; let mut total_index_size = 0; diff --git a/crates/cli/commands/src/init_state/mod.rs b/crates/cli/commands/src/init_state/mod.rs index adaec3e8be37..adde88870fe6 100644 --- a/crates/cli/commands/src/init_state/mod.rs +++ b/crates/cli/commands/src/init_state/mod.rs @@ -97,7 +97,6 @@ impl> InitStateC if last_block_number == 0 { without_evm::setup_without_evm( &provider_rw, - &static_file_provider, // &header, // header_hash, SealedHeader::new(header, header_hash), diff --git a/crates/cli/commands/src/init_state/without_evm.rs b/crates/cli/commands/src/init_state/without_evm.rs index 29fc2aec60e0..c6e1f9a51dd1 100644 --- a/crates/cli/commands/src/init_state/without_evm.rs +++ b/crates/cli/commands/src/init_state/without_evm.rs @@ -2,11 +2,13 @@ use alloy_primitives::{BlockNumber, B256, U256}; use alloy_rlp::Decodable; use alloy_consensus::Header; +use reth_node_builder::NodePrimitives; use reth_primitives::{ BlockBody, SealedBlock, SealedBlockWithSenders, SealedHeader, StaticFileSegment, }; use reth_provider::{ - providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileWriter, + providers::StaticFileProvider, BlockWriter, StageCheckpointWriter, StaticFileProviderFactory, + StaticFileWriter, }; use reth_stages::{StageCheckpoint, StageId}; @@ -27,21 +29,21 @@ pub(crate) fn read_header_from_file(path: PathBuf) -> Result( provider_rw: &Provider, - static_file_provider: &StaticFileProvider, header: SealedHeader, total_difficulty: U256, ) -> Result<(), eyre::Error> where - Provider: StageCheckpointWriter + BlockWriter, + Provider: StaticFileProviderFactory + StageCheckpointWriter + BlockWriter, { info!(target: "reth::cli", "Setting up dummy EVM chain before importing state."); + let static_file_provider = provider_rw.static_file_provider(); // Write EVM dummy data up to `header - 1` block - append_dummy_chain(static_file_provider, header.number - 1)?; + append_dummy_chain(&static_file_provider, header.number - 1)?; info!(target: "reth::cli", "Appending first valid block."); - append_first_block(provider_rw, static_file_provider, &header, total_difficulty)?; + append_first_block(provider_rw, &header, total_difficulty)?; for stage in StageId::ALL { provider_rw.save_stage_checkpoint(stage, StageCheckpoint::new(header.number))?; @@ -56,17 +58,21 @@ where /// /// By appending it, static file writer also verifies that all segments are at the same /// height. -fn append_first_block( - provider_rw: impl BlockWriter, - sf_provider: &StaticFileProvider, +fn append_first_block( + provider_rw: &Provider, header: &SealedHeader, total_difficulty: U256, -) -> Result<(), eyre::Error> { +) -> Result<(), eyre::Error> +where + Provider: BlockWriter + StaticFileProviderFactory, +{ provider_rw.insert_block( SealedBlockWithSenders::new(SealedBlock::new(header.clone(), BlockBody::default()), vec![]) .expect("no senders or txes"), )?; + let sf_provider = provider_rw.static_file_provider(); + sf_provider.latest_writer(StaticFileSegment::Headers)?.append_header( header, total_difficulty, @@ -85,8 +91,8 @@ fn append_first_block( /// * Headers: It will push an empty block. /// * Transactions: It will not push any tx, only increments the end block range. /// * Receipts: It will not push any receipt, only increments the end block range. -fn append_dummy_chain( - sf_provider: &StaticFileProvider, +fn append_dummy_chain( + sf_provider: &StaticFileProvider, target_height: BlockNumber, ) -> Result<(), eyre::Error> { let (tx, rx) = std::sync::mpsc::channel(); diff --git a/crates/cli/commands/src/stage/drop.rs b/crates/cli/commands/src/stage/drop.rs index 3a277cabd185..70b2caa8d16a 100644 --- a/crates/cli/commands/src/stage/drop.rs +++ b/crates/cli/commands/src/stage/drop.rs @@ -12,7 +12,9 @@ use reth_db_common::{ }; use reth_node_builder::NodeTypesWithEngine; use reth_node_core::args::StageEnum; -use reth_provider::{writer::UnifiedStorageWriter, StaticFileProviderFactory}; +use reth_provider::{ + writer::UnifiedStorageWriter, DatabaseProviderFactory, StaticFileProviderFactory, +}; use reth_prune::PruneSegment; use reth_stages::StageId; use reth_static_file_types::StaticFileSegment; @@ -33,8 +35,6 @@ impl> Command ) -> eyre::Result<()> { let Environment { provider_factory, .. } = self.env.init::(AccessRights::RW)?; - let static_file_provider = provider_factory.static_file_provider(); - let tool = DbTool::new(provider_factory)?; let static_file_segment = match self.stage { @@ -60,7 +60,7 @@ impl> Command } } - let provider_rw = tool.provider_factory.provider_rw()?; + let provider_rw = tool.provider_factory.database_provider_rw()?; let tx = provider_rw.tx_ref(); match self.stage { @@ -71,7 +71,7 @@ impl> Command tx.clear::()?; reset_stage_checkpoint(tx, StageId::Headers)?; - insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?; + insert_genesis_header(&provider_rw, &self.env.chain)?; } StageEnum::Bodies => { tx.clear::()?; @@ -83,7 +83,7 @@ impl> Command tx.clear::()?; reset_stage_checkpoint(tx, StageId::Bodies)?; - insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?; + insert_genesis_header(&provider_rw, &self.env.chain)?; } StageEnum::Senders => { tx.clear::()?; @@ -104,7 +104,7 @@ impl> Command reset_stage_checkpoint(tx, StageId::Execution)?; let alloc = &self.env.chain.genesis().alloc; - insert_genesis_state(&provider_rw.0, alloc.iter())?; + insert_genesis_state(&provider_rw, alloc.iter())?; } StageEnum::AccountHashing => { tx.clear::()?; @@ -142,20 +142,20 @@ impl> Command reset_stage_checkpoint(tx, StageId::IndexAccountHistory)?; reset_stage_checkpoint(tx, StageId::IndexStorageHistory)?; - insert_genesis_history(&provider_rw.0, self.env.chain.genesis().alloc.iter())?; + insert_genesis_history(&provider_rw, self.env.chain.genesis().alloc.iter())?; } StageEnum::TxLookup => { tx.clear::()?; reset_prune_checkpoint(tx, PruneSegment::TransactionLookup)?; reset_stage_checkpoint(tx, StageId::TransactionLookup)?; - insert_genesis_header(&provider_rw.0, &static_file_provider, &self.env.chain)?; + insert_genesis_header(&provider_rw, &self.env.chain)?; } } tx.put::(StageId::Finish.to_string(), Default::default())?; - UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?; + UnifiedStorageWriter::commit_unwind(provider_rw)?; Ok(()) } diff --git a/crates/cli/commands/src/stage/run.rs b/crates/cli/commands/src/stage/run.rs index 23d6f6f28ac6..1ac2a12d6fa7 100644 --- a/crates/cli/commands/src/stage/run.rs +++ b/crates/cli/commands/src/stage/run.rs @@ -329,10 +329,7 @@ impl> Command } if self.commit { - UnifiedStorageWriter::commit_unwind( - provider_rw, - provider_factory.static_file_provider(), - )?; + UnifiedStorageWriter::commit_unwind(provider_rw)?; provider_rw = provider_factory.database_provider_rw()?; } } @@ -355,7 +352,7 @@ impl> Command provider_rw.save_stage_checkpoint(exec_stage.id(), checkpoint)?; } if self.commit { - UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?; + UnifiedStorageWriter::commit(provider_rw)?; provider_rw = provider_factory.database_provider_rw()?; } diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index f4650a047b4f..e0c9e0362d04 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -120,7 +120,7 @@ impl PersistenceService { let new_tip_hash = provider_rw.block_hash(new_tip_num)?; UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; - UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?; + UnifiedStorageWriter::commit_unwind(provider_rw)?; debug!(target: "engine::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk"); self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); @@ -142,7 +142,7 @@ impl PersistenceService { let static_file_provider = self.provider.static_file_provider(); UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?; - UnifiedStorageWriter::commit(provider_rw, static_file_provider)?; + UnifiedStorageWriter::commit(provider_rw)?; } self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); Ok(last_block_hash_num) diff --git a/crates/node/builder/src/launch/common.rs b/crates/node/builder/src/launch/common.rs index 7fafa9e5eaca..7e6571135f0c 100644 --- a/crates/node/builder/src/launch/common.rs +++ b/crates/node/builder/src/launch/common.rs @@ -495,7 +495,7 @@ where } /// Returns the static file provider to interact with the static files. - pub fn static_file_provider(&self) -> StaticFileProvider { + pub fn static_file_provider(&self) -> StaticFileProvider { self.right().static_file_provider() } @@ -766,7 +766,7 @@ where } /// Returns the static file provider to interact with the static files. - pub fn static_file_provider(&self) -> StaticFileProvider { + pub fn static_file_provider(&self) -> StaticFileProvider<::Primitives> { self.provider_factory().static_file_provider() } diff --git a/crates/node/metrics/Cargo.toml b/crates/node/metrics/Cargo.toml index 9efdbd4959db..a823db9b467d 100644 --- a/crates/node/metrics/Cargo.toml +++ b/crates/node/metrics/Cargo.toml @@ -9,6 +9,7 @@ repository.workspace = true [dependencies] reth-db-api.workspace = true +reth-primitives-traits.workspace = true reth-provider.workspace = true reth-metrics.workspace = true reth-tasks.workspace = true diff --git a/crates/node/metrics/src/hooks.rs b/crates/node/metrics/src/hooks.rs index 18755717667c..21d12614f625 100644 --- a/crates/node/metrics/src/hooks.rs +++ b/crates/node/metrics/src/hooks.rs @@ -1,7 +1,12 @@ use metrics_process::Collector; use reth_db_api::database_metrics::DatabaseMetrics; +use reth_primitives_traits::NodePrimitives; use reth_provider::providers::StaticFileProvider; -use std::{fmt, sync::Arc}; +use std::{ + fmt::{self}, + sync::Arc, +}; + pub(crate) trait Hook: Fn() + Send + Sync {} impl Hook for T {} @@ -22,10 +27,11 @@ pub struct Hooks { impl Hooks { /// Create a new set of hooks - pub fn new( - db: Metrics, - static_file_provider: StaticFileProvider, - ) -> Self { + pub fn new(db: Metrics, static_file_provider: StaticFileProvider) -> Self + where + Metrics: DatabaseMetrics + 'static + Send + Sync, + N: NodePrimitives, + { let hooks: Vec>> = vec![ Box::new(move || db.report_metrics()), Box::new(move || { diff --git a/crates/optimism/cli/src/commands/import_receipts.rs b/crates/optimism/cli/src/commands/import_receipts.rs index 838a99818e96..ca82cf73ea40 100644 --- a/crates/optimism/cli/src/commands/import_receipts.rs +++ b/crates/optimism/cli/src/commands/import_receipts.rs @@ -150,7 +150,7 @@ where } } - let provider = provider_factory.provider_rw()?; + let provider = provider_factory.database_provider_rw()?; let mut total_decoded_receipts = 0; let mut total_receipts = 0; let mut total_filtered_out_dup_txns = 0; @@ -247,7 +247,7 @@ where provider .save_stage_checkpoint(StageId::Execution, StageCheckpoint::new(highest_block_receipts))?; - UnifiedStorageWriter::commit(provider, static_file_provider)?; + UnifiedStorageWriter::commit(provider)?; Ok(ImportReceiptsResult { total_decoded_receipts, total_filtered_out_dup_txns }) } diff --git a/crates/optimism/cli/src/commands/init_state.rs b/crates/optimism/cli/src/commands/init_state.rs index 68f5d9a585f6..6be9b73c765e 100644 --- a/crates/optimism/cli/src/commands/init_state.rs +++ b/crates/optimism/cli/src/commands/init_state.rs @@ -54,7 +54,6 @@ impl> InitStateCommandOp { if last_block_number == 0 { reth_cli_commands::init_state::without_evm::setup_without_evm( &provider_rw, - &static_file_provider, SealedHeader::new(BEDROCK_HEADER, BEDROCK_HEADER_HASH), BEDROCK_HEADER_TTD, )?; diff --git a/crates/primitives-traits/src/node.rs b/crates/primitives-traits/src/node.rs index 9ca692748314..ca490ac15aa8 100644 --- a/crates/primitives-traits/src/node.rs +++ b/crates/primitives-traits/src/node.rs @@ -3,7 +3,7 @@ use core::fmt; use crate::{BlockBody, FullBlock, FullReceipt, FullSignedTx, FullTxType}; /// Configures all the primitive types of the node. -pub trait NodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug { +pub trait NodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static { /// Block primitive. type Block: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static; /// Signed version of the transaction type. @@ -22,7 +22,7 @@ impl NodePrimitives for () { } /// Helper trait that sets trait bounds on [`NodePrimitives`]. -pub trait FullNodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug { +pub trait FullNodePrimitives: Send + Sync + Unpin + Clone + Default + fmt::Debug + 'static { /// Block primitive. type Block: FullBlock>; /// Signed version of the transaction type. diff --git a/crates/prune/prune/src/builder.rs b/crates/prune/prune/src/builder.rs index 71d73c416106..85697160115b 100644 --- a/crates/prune/prune/src/builder.rs +++ b/crates/prune/prune/src/builder.rs @@ -76,8 +76,11 @@ impl PrunerBuilder { /// Builds a [Pruner] from the current configuration with the given provider factory. pub fn build_with_provider_factory(self, provider_factory: PF) -> Pruner where - PF: DatabaseProviderFactory - + StaticFileProviderFactory, + PF: DatabaseProviderFactory< + ProviderRW: PruneCheckpointWriter + BlockReader + StaticFileProviderFactory, + > + StaticFileProviderFactory< + Primitives = ::Primitives, + >, { let segments = SegmentSet::from_components(provider_factory.static_file_provider(), self.segments); @@ -93,10 +96,16 @@ impl PrunerBuilder { } /// Builds a [Pruner] from the current configuration with the given static file provider. - pub fn build(self, static_file_provider: StaticFileProvider) -> Pruner + pub fn build( + self, + static_file_provider: StaticFileProvider, + ) -> Pruner where - Provider: - DBProvider + BlockReader + PruneCheckpointWriter + TransactionsProvider, + Provider: StaticFileProviderFactory + + DBProvider + + BlockReader + + PruneCheckpointWriter + + TransactionsProvider, { let segments = SegmentSet::::from_components(static_file_provider, self.segments); diff --git a/crates/prune/prune/src/segments/set.rs b/crates/prune/prune/src/segments/set.rs index 23d03345b096..62c252fc54b6 100644 --- a/crates/prune/prune/src/segments/set.rs +++ b/crates/prune/prune/src/segments/set.rs @@ -5,7 +5,7 @@ use crate::segments::{ use reth_db::transaction::DbTxMut; use reth_provider::{ providers::StaticFileProvider, BlockReader, DBProvider, PruneCheckpointWriter, - TransactionsProvider, + StaticFileProviderFactory, TransactionsProvider, }; use reth_prune_types::PruneModes; @@ -45,12 +45,16 @@ impl SegmentSet { impl SegmentSet where - Provider: DBProvider + TransactionsProvider + PruneCheckpointWriter + BlockReader, + Provider: StaticFileProviderFactory + + DBProvider + + TransactionsProvider + + PruneCheckpointWriter + + BlockReader, { /// Creates a [`SegmentSet`] from an existing components, such as [`StaticFileProvider`] and /// [`PruneModes`]. pub fn from_components( - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, prune_modes: PruneModes, ) -> Self { let PruneModes { diff --git a/crates/prune/prune/src/segments/static_file/headers.rs b/crates/prune/prune/src/segments/static_file/headers.rs index 8700a653b111..ea0264261afd 100644 --- a/crates/prune/prune/src/segments/static_file/headers.rs +++ b/crates/prune/prune/src/segments/static_file/headers.rs @@ -12,7 +12,7 @@ use reth_db::{ tables, transaction::DbTxMut, }; -use reth_provider::{providers::StaticFileProvider, DBProvider}; +use reth_provider::{providers::StaticFileProvider, DBProvider, StaticFileProviderFactory}; use reth_prune_types::{ PruneLimiter, PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, @@ -24,17 +24,19 @@ use tracing::trace; const HEADER_TABLES_TO_PRUNE: usize = 3; #[derive(Debug)] -pub struct Headers { - static_file_provider: StaticFileProvider, +pub struct Headers { + static_file_provider: StaticFileProvider, } -impl Headers { - pub const fn new(static_file_provider: StaticFileProvider) -> Self { +impl Headers { + pub const fn new(static_file_provider: StaticFileProvider) -> Self { Self { static_file_provider } } } -impl> Segment for Headers { +impl> Segment + for Headers +{ fn segment(&self) -> PruneSegment { PruneSegment::Headers } diff --git a/crates/prune/prune/src/segments/static_file/receipts.rs b/crates/prune/prune/src/segments/static_file/receipts.rs index f766f7ea1d35..5221418674aa 100644 --- a/crates/prune/prune/src/segments/static_file/receipts.rs +++ b/crates/prune/prune/src/segments/static_file/receipts.rs @@ -5,25 +5,29 @@ use crate::{ use reth_db::transaction::DbTxMut; use reth_provider::{ errors::provider::ProviderResult, providers::StaticFileProvider, BlockReader, DBProvider, - PruneCheckpointWriter, TransactionsProvider, + PruneCheckpointWriter, StaticFileProviderFactory, TransactionsProvider, }; use reth_prune_types::{PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, SegmentOutput}; use reth_static_file_types::StaticFileSegment; #[derive(Debug)] -pub struct Receipts { - static_file_provider: StaticFileProvider, +pub struct Receipts { + static_file_provider: StaticFileProvider, } -impl Receipts { - pub const fn new(static_file_provider: StaticFileProvider) -> Self { +impl Receipts { + pub const fn new(static_file_provider: StaticFileProvider) -> Self { Self { static_file_provider } } } -impl Segment for Receipts +impl Segment for Receipts where - Provider: DBProvider + PruneCheckpointWriter + TransactionsProvider + BlockReader, + Provider: StaticFileProviderFactory + + DBProvider + + PruneCheckpointWriter + + TransactionsProvider + + BlockReader, { fn segment(&self) -> PruneSegment { PruneSegment::Receipts diff --git a/crates/prune/prune/src/segments/static_file/transactions.rs b/crates/prune/prune/src/segments/static_file/transactions.rs index 12772af5f880..7dc7a23191a0 100644 --- a/crates/prune/prune/src/segments/static_file/transactions.rs +++ b/crates/prune/prune/src/segments/static_file/transactions.rs @@ -4,7 +4,10 @@ use crate::{ PrunerError, }; use reth_db::{tables, transaction::DbTxMut}; -use reth_provider::{providers::StaticFileProvider, BlockReader, DBProvider, TransactionsProvider}; +use reth_provider::{ + providers::StaticFileProvider, BlockReader, DBProvider, StaticFileProviderFactory, + TransactionsProvider, +}; use reth_prune_types::{ PruneMode, PruneProgress, PrunePurpose, PruneSegment, SegmentOutput, SegmentOutputCheckpoint, }; @@ -12,19 +15,20 @@ use reth_static_file_types::StaticFileSegment; use tracing::trace; #[derive(Debug)] -pub struct Transactions { - static_file_provider: StaticFileProvider, +pub struct Transactions { + static_file_provider: StaticFileProvider, } -impl Transactions { - pub const fn new(static_file_provider: StaticFileProvider) -> Self { +impl Transactions { + pub const fn new(static_file_provider: StaticFileProvider) -> Self { Self { static_file_provider } } } -impl Segment for Transactions +impl Segment for Transactions where - Provider: DBProvider + TransactionsProvider + BlockReader, + Provider: + DBProvider + TransactionsProvider + BlockReader + StaticFileProviderFactory, { fn segment(&self) -> PruneSegment { PruneSegment::Transactions diff --git a/crates/stages/api/src/pipeline/mod.rs b/crates/stages/api/src/pipeline/mod.rs index 399a3ffb4b79..bcf857fbac8d 100644 --- a/crates/stages/api/src/pipeline/mod.rs +++ b/crates/stages/api/src/pipeline/mod.rs @@ -9,7 +9,7 @@ use reth_primitives_traits::constants::BEACON_CONSENSUS_REORG_UNWIND_DEPTH; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, ChainStateBlockReader, ChainStateBlockWriter, DatabaseProviderFactory, ProviderFactory, StageCheckpointReader, - StageCheckpointWriter, StaticFileProviderFactory, + StageCheckpointWriter, }; use reth_prune::PrunerBuilder; use reth_static_file::StaticFileProducer; @@ -358,10 +358,7 @@ impl Pipeline { ))?; } - UnifiedStorageWriter::commit_unwind( - provider_rw, - self.provider_factory.static_file_provider(), - )?; + UnifiedStorageWriter::commit_unwind(provider_rw)?; stage.post_unwind_commit()?; @@ -469,10 +466,7 @@ impl Pipeline { result: out.clone(), }); - UnifiedStorageWriter::commit( - provider_rw, - self.provider_factory.static_file_provider(), - )?; + UnifiedStorageWriter::commit(provider_rw)?; stage.post_execute_commit()?; @@ -533,7 +527,7 @@ fn on_stage_error( prev_checkpoint.unwrap_or_default(), )?; - UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?; + UnifiedStorageWriter::commit(provider_rw)?; // We unwind because of a validation error. If the unwind itself // fails, we bail entirely, diff --git a/crates/stages/stages/src/stages/bodies.rs b/crates/stages/stages/src/stages/bodies.rs index 640bae866594..78aeda6feffd 100644 --- a/crates/stages/stages/src/stages/bodies.rs +++ b/crates/stages/stages/src/stages/bodies.rs @@ -311,11 +311,11 @@ where fn missing_static_data_error( last_tx_num: TxNumber, - static_file_provider: &StaticFileProvider, + static_file_provider: &StaticFileProvider, provider: &Provider, ) -> Result where - Provider: BlockReader, + Provider: BlockReader + StaticFileProviderFactory, { let mut last_block = static_file_provider .get_highest_static_file_block(StaticFileSegment::Transactions) diff --git a/crates/stages/stages/src/stages/execution.rs b/crates/stages/stages/src/stages/execution.rs index 1750758a26ad..16234ad483f7 100644 --- a/crates/stages/stages/src/stages/execution.rs +++ b/crates/stages/stages/src/stages/execution.rs @@ -12,7 +12,7 @@ use reth_evm::{ use reth_execution_types::Chain; use reth_exex::{ExExManagerHandle, ExExNotification, ExExNotificationSource}; use reth_primitives::{SealedHeader, StaticFileSegment}; -use reth_primitives_traits::format_gas_throughput; +use reth_primitives_traits::{format_gas_throughput, NodePrimitives}; use reth_provider::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter}, writer::UnifiedStorageWriter, @@ -181,7 +181,8 @@ where + StatsReader + StateChangeWriter + BlockHashReader, - for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a>>: StateWriter, + for<'a> UnifiedStorageWriter<'a, Provider, StaticFileProviderRWRefMut<'a, Provider::Primitives>>: + StateWriter, { /// Return the id of the stage fn id(&self) -> StageId { @@ -485,8 +486,8 @@ where } } -fn execution_checkpoint( - provider: &StaticFileProvider, +fn execution_checkpoint( + provider: &StaticFileProvider, start_block: BlockNumber, max_block: BlockNumber, checkpoint: StageCheckpoint, @@ -552,8 +553,8 @@ fn execution_checkpoint( }) } -fn calculate_gas_used_from_headers( - provider: &StaticFileProvider, +fn calculate_gas_used_from_headers( + provider: &StaticFileProvider, range: RangeInclusive, ) -> Result { debug!(target: "sync::stages::execution", ?range, "Calculating gas used from headers"); @@ -587,11 +588,11 @@ fn calculate_gas_used_from_headers( /// (by returning [`StageError`]) until the heights in both the database and static file match. fn prepare_static_file_producer<'a, 'b, Provider>( provider: &'b Provider, - static_file_provider: &'a StaticFileProvider, + static_file_provider: &'a StaticFileProvider, start_block: u64, -) -> Result, StageError> +) -> Result, StageError> where - Provider: DBProvider + BlockReader + HeaderProvider, + Provider: StaticFileProviderFactory + DBProvider + BlockReader + HeaderProvider, 'b: 'a, { // Get next expected receipt number diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 4b9f9295103e..9d7cc685a7ef 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -296,8 +296,8 @@ mod tests { ) { // We recreate the static file provider, since consistency heals are done on fetching the // writer for the first time. - let static_file_provider = - StaticFileProvider::read_write(db.factory.static_file_provider().path()).unwrap(); + let mut static_file_provider = db.factory.static_file_provider(); + static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap(); // Simulate corruption by removing `prune_count` rows from the data file without updating // its offset list and configuration. @@ -314,9 +314,10 @@ mod tests { // We recreate the static file provider, since consistency heals are done on fetching the // writer for the first time. + let mut static_file_provider = db.factory.static_file_provider(); + static_file_provider = StaticFileProvider::read_write(static_file_provider.path()).unwrap(); assert_eq!( - StaticFileProvider::read_write(db.factory.static_file_provider().path()) - .unwrap() + static_file_provider .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,), Ok(expected) ); diff --git a/crates/stages/stages/src/test_utils/test_db.rs b/crates/stages/stages/src/test_utils/test_db.rs index 52983cb6f69a..772e9cb78d07 100644 --- a/crates/stages/stages/src/test_utils/test_db.rs +++ b/crates/stages/stages/src/test_utils/test_db.rs @@ -24,7 +24,7 @@ use reth_provider::{ }; use reth_storage_errors::provider::ProviderResult; use reth_testing_utils::generators::ChangeSet; -use std::{collections::BTreeMap, path::Path}; +use std::{collections::BTreeMap, fmt::Debug, path::Path}; use tempfile::TempDir; /// Test database that is used for testing stage implementations. @@ -142,7 +142,7 @@ impl TestStageDB { /// Insert header to static file if `writer` exists, otherwise to DB. pub fn insert_header( - writer: Option<&mut StaticFileProviderRWRefMut<'_>>, + writer: Option<&mut StaticFileProviderRWRefMut<'_, ()>>, tx: &TX, header: &SealedHeader, td: U256, diff --git a/crates/storage/db-common/src/init.rs b/crates/storage/db-common/src/init.rs index 45fb4b76b312..e14796d26861 100644 --- a/crates/storage/db-common/src/init.rs +++ b/crates/storage/db-common/src/init.rs @@ -10,9 +10,7 @@ use reth_db_api::{transaction::DbTxMut, DatabaseError}; use reth_etl::Collector; use reth_primitives::{Account, Bytecode, GotExpected, Receipts, StaticFileSegment, StorageEntry}; use reth_provider::{ - errors::provider::ProviderResult, - providers::{StaticFileProvider, StaticFileWriter}, - writer::UnifiedStorageWriter, + errors::provider::ProviderResult, providers::StaticFileWriter, writer::UnifiedStorageWriter, BlockHashReader, BlockNumReader, BundleStateInit, ChainSpecProvider, DBProvider, DatabaseProviderFactory, ExecutionOutcome, HashingWriter, HeaderProvider, HistoryWriter, OriginalValuesKnown, ProviderError, RevertsInit, StageCheckpointWriter, StateChangeWriter, @@ -72,7 +70,8 @@ impl From for InitDatabaseError { pub fn init_genesis(factory: &PF) -> Result where PF: DatabaseProviderFactory + StaticFileProviderFactory + ChainSpecProvider + BlockHashReader, - PF::ProviderRW: StageCheckpointWriter + PF::ProviderRW: StaticFileProviderFactory + + StageCheckpointWriter + HistoryWriter + HeaderProvider + HashingWriter @@ -114,8 +113,7 @@ where insert_genesis_history(&provider_rw, alloc.iter())?; // Insert header - let static_file_provider = factory.static_file_provider(); - insert_genesis_header(&provider_rw, &static_file_provider, &chain)?; + insert_genesis_header(&provider_rw, &chain)?; insert_genesis_state(&provider_rw, alloc.iter())?; @@ -124,6 +122,7 @@ where provider_rw.save_stage_checkpoint(stage, Default::default())?; } + let static_file_provider = provider_rw.static_file_provider(); // Static file segments start empty, so we need to initialize the genesis block. let segment = StaticFileSegment::Receipts; static_file_provider.latest_writer(segment)?.increment_block(0)?; @@ -133,7 +132,7 @@ where // `commit_unwind`` will first commit the DB and then the static file provider, which is // necessary on `init_genesis`. - UnifiedStorageWriter::commit_unwind(provider_rw, static_file_provider)?; + UnifiedStorageWriter::commit_unwind(provider_rw)?; Ok(hash) } @@ -144,7 +143,11 @@ pub fn insert_genesis_state<'a, 'b, Provider>( alloc: impl Iterator, ) -> ProviderResult<()> where - Provider: DBProvider + StateChangeWriter + HeaderProvider + AsRef, + Provider: StaticFileProviderFactory + + DBProvider + + StateChangeWriter + + HeaderProvider + + AsRef, { insert_state(provider, alloc, 0) } @@ -156,7 +159,11 @@ pub fn insert_state<'a, 'b, Provider>( block: u64, ) -> ProviderResult<()> where - Provider: DBProvider + StateChangeWriter + HeaderProvider + AsRef, + Provider: StaticFileProviderFactory + + DBProvider + + StateChangeWriter + + HeaderProvider + + AsRef, { let capacity = alloc.size_hint().1.unwrap_or(0); let mut state_init: BundleStateInit = HashMap::with_capacity(capacity); @@ -296,14 +303,14 @@ where /// Inserts header for the genesis state. pub fn insert_genesis_header( provider: &Provider, - static_file_provider: &StaticFileProvider, chain: &Spec, ) -> ProviderResult<()> where - Provider: DBProvider, + Provider: StaticFileProviderFactory + DBProvider, Spec: EthChainSpec, { let (header, block_hash) = (chain.genesis_header(), chain.genesis_hash()); + let static_file_provider = provider.static_file_provider(); match static_file_provider.block_hash(0) { Ok(None) | Err(ProviderError::MissingStaticFileBlock(StaticFileSegment::Headers, 0)) => { @@ -333,7 +340,8 @@ pub fn init_from_state_dump( etl_config: EtlConfig, ) -> eyre::Result where - Provider: DBProvider + Provider: StaticFileProviderFactory + + DBProvider + BlockNumReader + BlockHashReader + ChainSpecProvider @@ -457,7 +465,8 @@ fn dump_state( block: u64, ) -> Result<(), eyre::Error> where - Provider: DBProvider + Provider: StaticFileProviderFactory + + DBProvider + HeaderProvider + HashingWriter + HistoryWriter diff --git a/crates/storage/provider/src/providers/blockchain_provider.rs b/crates/storage/provider/src/providers/blockchain_provider.rs index 0f0693471b0f..083e7fb596b6 100644 --- a/crates/storage/provider/src/providers/blockchain_provider.rs +++ b/crates/storage/provider/src/providers/blockchain_provider.rs @@ -163,7 +163,9 @@ impl DatabaseProviderFactory for BlockchainProvider2 { } impl StaticFileProviderFactory for BlockchainProvider2 { - fn static_file_provider(&self) -> StaticFileProvider { + type Primitives = N::Primitives; + + fn static_file_provider(&self) -> StaticFileProvider { self.database.static_file_provider() } } @@ -911,7 +913,7 @@ mod tests { )?; // Commit to both storages: database and static files - UnifiedStorageWriter::commit(provider_rw, factory.static_file_provider())?; + UnifiedStorageWriter::commit(provider_rw)?; let provider = BlockchainProvider2::new(factory)?; @@ -999,8 +1001,7 @@ mod tests { UnifiedStorageWriter::from(&provider_rw, &hook_provider.static_file_provider()) .save_blocks(&[lowest_memory_block]) .unwrap(); - UnifiedStorageWriter::commit(provider_rw, hook_provider.static_file_provider()) - .unwrap(); + UnifiedStorageWriter::commit(provider_rw).unwrap(); // Remove from memory hook_provider.canonical_in_memory_state.remove_persisted_blocks(num_hash); diff --git a/crates/storage/provider/src/providers/consistent.rs b/crates/storage/provider/src/providers/consistent.rs index 37c00be23bee..3b2599f49991 100644 --- a/crates/storage/provider/src/providers/consistent.rs +++ b/crates/storage/provider/src/providers/consistent.rs @@ -612,7 +612,9 @@ impl ConsistentProvider { } impl StaticFileProviderFactory for ConsistentProvider { - fn static_file_provider(&self) -> StaticFileProvider { + type Primitives = N::Primitives; + + fn static_file_provider(&self) -> StaticFileProvider { self.storage_provider.static_file_provider() } } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 0e193f8cdefb..94c83bbb4422 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -53,7 +53,7 @@ pub struct ProviderFactory { /// Chain spec chain_spec: Arc, /// Static File Provider - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, /// Optional pruning configuration prune_modes: PruneModes, } @@ -78,7 +78,7 @@ impl ProviderFactory { pub fn new( db: N::DB, chain_spec: Arc, - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, ) -> Self { Self { db, chain_spec, static_file_provider, prune_modes: PruneModes::none() } } @@ -114,7 +114,7 @@ impl>> ProviderFactory { path: P, chain_spec: Arc, args: DatabaseArguments, - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, ) -> RethResult { Ok(Self { db: Arc::new(init_db(path, args).map_err(RethError::msg)?), @@ -202,8 +202,10 @@ impl DatabaseProviderFactory for ProviderFactory { } impl StaticFileProviderFactory for ProviderFactory { + type Primitives = N::Primitives; + /// Returns static file provider - fn static_file_provider(&self) -> StaticFileProvider { + fn static_file_provider(&self) -> StaticFileProvider { self.static_file_provider.clone() } } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 30a69fbfc775..b93112e70843 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -135,7 +135,7 @@ pub struct DatabaseProvider { /// Chain spec chain_spec: Arc, /// Static File provider - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, /// Pruning configuration prune_modes: PruneModes, } @@ -199,8 +199,10 @@ impl DatabaseProvider { } impl StaticFileProviderFactory for DatabaseProvider { + type Primitives = N::Primitives; + /// Returns a static file provider - fn static_file_provider(&self) -> StaticFileProvider { + fn static_file_provider(&self) -> StaticFileProvider { self.static_file_provider.clone() } } @@ -220,7 +222,7 @@ impl DatabaseProvider { pub const fn new_rw( tx: TX, chain_spec: Arc, - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, prune_modes: PruneModes, ) -> Self { Self { tx, chain_spec, static_file_provider, prune_modes } @@ -363,7 +365,7 @@ impl DatabaseProvider { pub const fn new( tx: TX, chain_spec: Arc, - static_file_provider: StaticFileProvider, + static_file_provider: StaticFileProvider, prune_modes: PruneModes, ) -> Self { Self { tx, chain_spec, static_file_provider, prune_modes } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index d1e1822d8c9d..3bf3e7b247f6 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -204,7 +204,9 @@ impl DatabaseProviderFactory for BlockchainProvider { } impl StaticFileProviderFactory for BlockchainProvider { - fn static_file_provider(&self) -> StaticFileProvider { + type Primitives = N::Primitives; + + fn static_file_provider(&self) -> StaticFileProvider { self.database.static_file_provider() } } diff --git a/crates/storage/provider/src/providers/static_file/jar.rs b/crates/storage/provider/src/providers/static_file/jar.rs index 9c303394ed22..e87829b11338 100644 --- a/crates/storage/provider/src/providers/static_file/jar.rs +++ b/crates/storage/provider/src/providers/static_file/jar.rs @@ -12,39 +12,49 @@ use alloy_primitives::{Address, BlockHash, BlockNumber, TxHash, TxNumber, B256, use reth_chainspec::ChainInfo; use reth_db::static_file::{HeaderMask, ReceiptMask, StaticFileCursor, TransactionMask}; use reth_db_api::models::CompactU256; +use reth_node_types::NodePrimitives; use reth_primitives::{ Receipt, SealedHeader, TransactionMeta, TransactionSigned, TransactionSignedNoHash, }; use reth_storage_errors::provider::{ProviderError, ProviderResult}; use std::{ + fmt::Debug, ops::{Deref, RangeBounds}, sync::Arc, }; /// Provider over a specific `NippyJar` and range. #[derive(Debug)] -pub struct StaticFileJarProvider<'a> { +pub struct StaticFileJarProvider<'a, N> { /// Main static file segment jar: LoadedJarRef<'a>, /// Another kind of static file segment to help query data from the main one. auxiliary_jar: Option>, + /// Metrics for the static files. metrics: Option>, + /// Node primitives + _pd: std::marker::PhantomData, } -impl<'a> Deref for StaticFileJarProvider<'a> { +impl<'a, N: NodePrimitives> Deref for StaticFileJarProvider<'a, N> { type Target = LoadedJarRef<'a>; fn deref(&self) -> &Self::Target { &self.jar } } -impl<'a> From> for StaticFileJarProvider<'a> { +impl<'a, N: NodePrimitives> From> for StaticFileJarProvider<'a, N> { fn from(value: LoadedJarRef<'a>) -> Self { - StaticFileJarProvider { jar: value, auxiliary_jar: None, metrics: None } + StaticFileJarProvider { + jar: value, + auxiliary_jar: None, + metrics: None, + _pd: Default::default(), + } } } -impl<'a> StaticFileJarProvider<'a> { +impl<'a, N: NodePrimitives> StaticFileJarProvider<'a, N> { /// Provides a cursor for more granular data access. pub fn cursor<'b>(&'b self) -> ProviderResult> where @@ -76,7 +86,7 @@ impl<'a> StaticFileJarProvider<'a> { } } -impl HeaderProvider for StaticFileJarProvider<'_> { +impl HeaderProvider for StaticFileJarProvider<'_, N> { fn header(&self, block_hash: &BlockHash) -> ProviderResult> { Ok(self .cursor()? @@ -148,7 +158,7 @@ impl HeaderProvider for StaticFileJarProvider<'_> { } } -impl BlockHashReader for StaticFileJarProvider<'_> { +impl BlockHashReader for StaticFileJarProvider<'_, N> { fn block_hash(&self, number: u64) -> ProviderResult> { self.cursor()?.get_one::>(number.into()) } @@ -170,7 +180,7 @@ impl BlockHashReader for StaticFileJarProvider<'_> { } } -impl BlockNumReader for StaticFileJarProvider<'_> { +impl BlockNumReader for StaticFileJarProvider<'_, N> { fn chain_info(&self) -> ProviderResult { // Information on live database Err(ProviderError::UnsupportedProvider) @@ -195,7 +205,7 @@ impl BlockNumReader for StaticFileJarProvider<'_> { } } -impl TransactionsProvider for StaticFileJarProvider<'_> { +impl TransactionsProvider for StaticFileJarProvider<'_, N> { fn transaction_id(&self, hash: TxHash) -> ProviderResult> { let mut cursor = self.cursor()?; @@ -291,7 +301,7 @@ impl TransactionsProvider for StaticFileJarProvider<'_> { } } -impl ReceiptProvider for StaticFileJarProvider<'_> { +impl ReceiptProvider for StaticFileJarProvider<'_, N> { fn receipt(&self, num: TxNumber) -> ProviderResult> { self.cursor()?.get_one::>(num.into()) } diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index a5d4537245dd..8f6a6957502a 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -29,6 +29,7 @@ use reth_db_api::{ transaction::DbTx, }; use reth_nippy_jar::{NippyJar, NippyJarChecker, CONFIG_FILE_EXTENSION}; +use reth_node_types::NodePrimitives; use reth_primitives::{ static_file::{ find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive, @@ -42,6 +43,8 @@ use reth_storage_api::DBProvider; use reth_storage_errors::provider::{ProviderError, ProviderResult}; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, + fmt::Debug, + marker::PhantomData, ops::{Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, sync::{mpsc, Arc}, @@ -77,10 +80,16 @@ impl StaticFileAccess { } /// [`StaticFileProvider`] manages all existing [`StaticFileJarProvider`]. -#[derive(Debug, Clone)] -pub struct StaticFileProvider(pub(crate) Arc); +#[derive(Debug)] +pub struct StaticFileProvider(pub(crate) Arc>); + +impl Clone for StaticFileProvider { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} -impl StaticFileProvider { +impl StaticFileProvider { /// Creates a new [`StaticFileProvider`]. fn new(path: impl AsRef, access: StaticFileAccess) -> ProviderResult { let provider = Self(Arc::new(StaticFileProviderInner::new(path, access)?)); @@ -191,8 +200,8 @@ impl StaticFileProvider { } } -impl Deref for StaticFileProvider { - type Target = StaticFileProviderInner; +impl Deref for StaticFileProvider { + type Target = StaticFileProviderInner; fn deref(&self) -> &Self::Target { &self.0 @@ -201,7 +210,7 @@ impl Deref for StaticFileProvider { /// [`StaticFileProviderInner`] manages all existing [`StaticFileJarProvider`]. #[derive(Debug)] -pub struct StaticFileProviderInner { +pub struct StaticFileProviderInner { /// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// segments and ranges. map: DashMap<(BlockNumber, StaticFileSegment), LoadedJar>, @@ -212,7 +221,8 @@ pub struct StaticFileProviderInner { /// Directory where `static_files` are located path: PathBuf, /// Maintains a writer set of [`StaticFileSegment`]. - writers: StaticFileWriters, + writers: StaticFileWriters, + /// Metrics for the static files. metrics: Option>, /// Access rights of the provider. access: StaticFileAccess, @@ -220,9 +230,11 @@ pub struct StaticFileProviderInner { blocks_per_file: u64, /// Write lock for when access is [`StaticFileAccess::RW`]. _lock_file: Option, + /// Node primitives + _pd: PhantomData, } -impl StaticFileProviderInner { +impl StaticFileProviderInner { /// Creates a new [`StaticFileProviderInner`]. fn new(path: impl AsRef, access: StaticFileAccess) -> ProviderResult { let _lock_file = if access.is_read_write() { @@ -241,6 +253,7 @@ impl StaticFileProviderInner { access, blocks_per_file: DEFAULT_BLOCKS_PER_STATIC_FILE, _lock_file, + _pd: Default::default(), }; Ok(provider) @@ -257,7 +270,7 @@ impl StaticFileProviderInner { } } -impl StaticFileProvider { +impl StaticFileProvider { /// Set a custom number of blocks per file. #[cfg(any(test, feature = "test-utils"))] pub fn with_custom_blocks_per_file(self, blocks_per_file: u64) -> Self { @@ -323,7 +336,7 @@ impl StaticFileProvider { segment: StaticFileSegment, block: BlockNumber, path: Option<&Path>, - ) -> ProviderResult> { + ) -> ProviderResult> { self.get_segment_provider( segment, || self.get_segment_ranges_from_block(segment, block), @@ -338,7 +351,7 @@ impl StaticFileProvider { segment: StaticFileSegment, tx: TxNumber, path: Option<&Path>, - ) -> ProviderResult> { + ) -> ProviderResult> { self.get_segment_provider( segment, || self.get_segment_ranges_from_transaction(segment, tx), @@ -355,7 +368,7 @@ impl StaticFileProvider { segment: StaticFileSegment, fn_range: impl Fn() -> Option, path: Option<&Path>, - ) -> ProviderResult>> { + ) -> ProviderResult>> { // If we have a path, then get the block range from its name. // Otherwise, check `self.available_static_files` let block_range = match path { @@ -426,12 +439,12 @@ impl StaticFileProvider { &self, segment: StaticFileSegment, fixed_block_range: &SegmentRangeInclusive, - ) -> ProviderResult> { + ) -> ProviderResult> { let key = (fixed_block_range.end(), segment); // Avoid using `entry` directly to avoid a write lock in the common case. trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Getting provider"); - let mut provider: StaticFileJarProvider<'_> = if let Some(jar) = self.map.get(&key) { + let mut provider: StaticFileJarProvider<'_, N> = if let Some(jar) = self.map.get(&key) { trace!(target: "provider::static_file", ?segment, ?fixed_block_range, "Jar found in cache"); jar.into() } else { @@ -924,7 +937,7 @@ impl StaticFileProvider { pub fn find_static_file( &self, segment: StaticFileSegment, - func: impl Fn(StaticFileJarProvider<'_>) -> ProviderResult>, + func: impl Fn(StaticFileJarProvider<'_, N>) -> ProviderResult>, ) -> ProviderResult> { if let Some(highest_block) = self.get_highest_static_file_block(segment) { let mut range = self.find_fixed_range(highest_block); @@ -1167,30 +1180,35 @@ impl StaticFileProvider { /// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc ProviderResult>; + ) -> ProviderResult>; /// Returns a mutable reference to a [`StaticFileProviderRW`] of the latest /// [`StaticFileSegment`]. fn latest_writer( &self, segment: StaticFileSegment, - ) -> ProviderResult>; + ) -> ProviderResult>; /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`]. fn commit(&self) -> ProviderResult<()>; } -impl StaticFileWriter for StaticFileProvider { +impl StaticFileWriter for StaticFileProvider { + type Primitives = N; + fn get_writer( &self, block: BlockNumber, segment: StaticFileSegment, - ) -> ProviderResult> { + ) -> ProviderResult> { if self.access.is_read_only() { return Err(ProviderError::ReadOnlyStaticFileAccess) } @@ -1204,7 +1222,7 @@ impl StaticFileWriter for StaticFileProvider { fn latest_writer( &self, segment: StaticFileSegment, - ) -> ProviderResult> { + ) -> ProviderResult> { self.get_writer(self.get_highest_static_file_block(segment).unwrap_or_default(), segment) } @@ -1213,7 +1231,7 @@ impl StaticFileWriter for StaticFileProvider { } } -impl HeaderProvider for StaticFileProvider { +impl HeaderProvider for StaticFileProvider { fn header(&self, block_hash: &BlockHash) -> ProviderResult> { self.find_static_file(StaticFileSegment::Headers, |jar_provider| { Ok(jar_provider @@ -1300,7 +1318,7 @@ impl HeaderProvider for StaticFileProvider { } } -impl BlockHashReader for StaticFileProvider { +impl BlockHashReader for StaticFileProvider { fn block_hash(&self, num: u64) -> ProviderResult> { self.get_segment_provider_from_block(StaticFileSegment::Headers, num, None)?.block_hash(num) } @@ -1319,7 +1337,7 @@ impl BlockHashReader for StaticFileProvider { } } -impl ReceiptProvider for StaticFileProvider { +impl ReceiptProvider for StaticFileProvider { fn receipt(&self, num: TxNumber) -> ProviderResult> { self.get_segment_provider_from_transaction(StaticFileSegment::Receipts, num, None) .and_then(|provider| provider.receipt(num)) @@ -1356,7 +1374,7 @@ impl ReceiptProvider for StaticFileProvider { } } -impl TransactionsProviderExt for StaticFileProvider { +impl TransactionsProviderExt for StaticFileProvider { fn transaction_hashes_by_range( &self, tx_range: Range, @@ -1417,7 +1435,7 @@ impl TransactionsProviderExt for StaticFileProvider { } } -impl TransactionsProvider for StaticFileProvider { +impl TransactionsProvider for StaticFileProvider { fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult> { self.find_static_file(StaticFileSegment::Transactions, |jar_provider| { let mut cursor = jar_provider.cursor()?; @@ -1529,7 +1547,7 @@ impl TransactionsProvider for StaticFileProvider { /* Cannot be successfully implemented but must exist for trait requirements */ -impl BlockNumReader for StaticFileProvider { +impl BlockNumReader for StaticFileProvider { fn chain_info(&self) -> ProviderResult { // Required data not present in static_files Err(ProviderError::UnsupportedProvider) @@ -1551,7 +1569,7 @@ impl BlockNumReader for StaticFileProvider { } } -impl BlockReader for StaticFileProvider { +impl BlockReader for StaticFileProvider { fn find_block_by_hash( &self, _hash: B256, @@ -1629,7 +1647,7 @@ impl BlockReader for StaticFileProvider { } } -impl WithdrawalsProvider for StaticFileProvider { +impl WithdrawalsProvider for StaticFileProvider { fn withdrawals_by_block( &self, _id: BlockHashOrNumber, @@ -1645,7 +1663,7 @@ impl WithdrawalsProvider for StaticFileProvider { } } -impl StatsReader for StaticFileProvider { +impl StatsReader for StaticFileProvider { fn count_entries(&self) -> ProviderResult { match T::NAME { tables::CanonicalHeaders::NAME | diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index dd52adf52f87..30b8d0344da1 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -55,7 +55,9 @@ impl Deref for LoadedJar { #[cfg(test)] mod tests { use super::*; - use crate::{test_utils::create_test_provider_factory, HeaderProvider}; + use crate::{ + test_utils::create_test_provider_factory, HeaderProvider, StaticFileProviderFactory, + }; use alloy_consensus::{Header, Transaction}; use alloy_primitives::{BlockHash, TxNumber, B256, U256}; use rand::seq::SliceRandom; @@ -116,7 +118,7 @@ mod tests { // Create StaticFile { - let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap(); + let manager = factory.static_file_provider(); let mut writer = manager.latest_writer(StaticFileSegment::Headers).unwrap(); let mut td = U256::ZERO; @@ -131,7 +133,7 @@ mod tests { // Use providers to query Header data and compare if it matches { let db_provider = factory.provider().unwrap(); - let manager = StaticFileProvider::read_write(static_files_path.path()).unwrap(); + let manager = db_provider.static_file_provider(); let jar_provider = manager .get_segment_provider_from_block(StaticFileSegment::Headers, 0, Some(&static_file)) .unwrap(); @@ -170,7 +172,7 @@ mod tests { // [ Headers Creation and Commit ] { - let sf_rw = StaticFileProvider::read_write(&static_dir) + let sf_rw = StaticFileProvider::<()>::read_write(&static_dir) .expect("Failed to create static file provider") .with_custom_blocks_per_file(blocks_per_file); @@ -189,8 +191,8 @@ mod tests { // Helper function to prune headers and validate truncation results fn prune_and_validate( - writer: &mut StaticFileProviderRWRefMut<'_>, - sf_rw: &StaticFileProvider, + writer: &mut StaticFileProviderRWRefMut<'_, ()>, + sf_rw: &StaticFileProvider<()>, static_dir: impl AsRef, prune_count: u64, expected_tip: Option, @@ -302,13 +304,13 @@ mod tests { /// * `10..=19`: no txs/receipts /// * `20..=29`: only one tx/receipt fn setup_tx_based_scenario( - sf_rw: &StaticFileProvider, + sf_rw: &StaticFileProvider<()>, segment: StaticFileSegment, blocks_per_file: u64, ) { fn setup_block_ranges( - writer: &mut StaticFileProviderRWRefMut<'_>, - sf_rw: &StaticFileProvider, + writer: &mut StaticFileProviderRWRefMut<'_, ()>, + sf_rw: &StaticFileProvider<()>, segment: StaticFileSegment, block_range: &Range, mut tx_count: u64, @@ -413,7 +415,7 @@ mod tests { #[allow(clippy::too_many_arguments)] fn prune_and_validate( - sf_rw: &StaticFileProvider, + sf_rw: &StaticFileProvider<()>, static_dir: impl AsRef, segment: StaticFileSegment, prune_count: u64, diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 2e54fb943a7a..796e16c9a130 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -8,6 +8,7 @@ use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; use reth_codecs::Compact; use reth_db_api::models::CompactU256; use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter}; +use reth_node_types::NodePrimitives; use reth_primitives::{ static_file::{SegmentHeader, SegmentRangeInclusive}, Receipt, StaticFileSegment, @@ -15,6 +16,7 @@ use reth_primitives::{ use reth_storage_errors::provider::{ProviderError, ProviderResult}; use std::{ borrow::Borrow, + fmt::Debug, path::{Path, PathBuf}, sync::{Arc, Weak}, time::Instant, @@ -25,19 +27,29 @@ use tracing::debug; /// /// WARNING: Trying to use more than one writer for the same segment type **will result in a /// deadlock**. -#[derive(Debug, Default)] -pub(crate) struct StaticFileWriters { - headers: RwLock>, - transactions: RwLock>, - receipts: RwLock>, +#[derive(Debug)] +pub(crate) struct StaticFileWriters { + headers: RwLock>>, + transactions: RwLock>>, + receipts: RwLock>>, +} + +impl Default for StaticFileWriters { + fn default() -> Self { + Self { + headers: Default::default(), + transactions: Default::default(), + receipts: Default::default(), + } + } } -impl StaticFileWriters { +impl StaticFileWriters { pub(crate) fn get_or_create( &self, segment: StaticFileSegment, - create_fn: impl FnOnce() -> ProviderResult, - ) -> ProviderResult> { + create_fn: impl FnOnce() -> ProviderResult>, + ) -> ProviderResult> { let mut write_guard = match segment { StaticFileSegment::Headers => self.headers.write(), StaticFileSegment::Transactions => self.transactions.write(), @@ -64,19 +76,19 @@ impl StaticFileWriters { /// Mutable reference to a [`StaticFileProviderRW`] behind a [`RwLockWriteGuard`]. #[derive(Debug)] -pub struct StaticFileProviderRWRefMut<'a>( - pub(crate) RwLockWriteGuard<'a, RawRwLock, Option>, +pub struct StaticFileProviderRWRefMut<'a, N>( + pub(crate) RwLockWriteGuard<'a, RawRwLock, Option>>, ); -impl std::ops::DerefMut for StaticFileProviderRWRefMut<'_> { +impl std::ops::DerefMut for StaticFileProviderRWRefMut<'_, N> { fn deref_mut(&mut self) -> &mut Self::Target { // This is always created by [`StaticFileWriters::get_or_create`] self.0.as_mut().expect("static file writer provider should be init") } } -impl std::ops::Deref for StaticFileProviderRWRefMut<'_> { - type Target = StaticFileProviderRW; +impl std::ops::Deref for StaticFileProviderRWRefMut<'_, N> { + type Target = StaticFileProviderRW; fn deref(&self) -> &Self::Target { // This is always created by [`StaticFileWriters::get_or_create`] @@ -86,11 +98,11 @@ impl std::ops::Deref for StaticFileProviderRWRefMut<'_> { #[derive(Debug)] /// Extends `StaticFileProvider` with writing capabilities -pub struct StaticFileProviderRW { +pub struct StaticFileProviderRW { /// Reference back to the provider. We need [Weak] here because [`StaticFileProviderRW`] is /// stored in a [`dashmap::DashMap`] inside the parent [`StaticFileProvider`].which is an /// [Arc]. If we were to use an [Arc] here, we would create a reference cycle. - reader: Weak, + reader: Weak>, /// A [`NippyJarWriter`] instance. writer: NippyJarWriter, /// Path to opened file. @@ -104,7 +116,7 @@ pub struct StaticFileProviderRW { prune_on_commit: Option<(u64, Option)>, } -impl StaticFileProviderRW { +impl StaticFileProviderRW { /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`]. /// /// Before use, transaction based segments should ensure the block end range is the expected @@ -112,7 +124,7 @@ impl StaticFileProviderRW { pub fn new( segment: StaticFileSegment, block: BlockNumber, - reader: Weak, + reader: Weak>, metrics: Option>, ) -> ProviderResult { let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?; @@ -133,7 +145,7 @@ impl StaticFileProviderRW { fn open( segment: StaticFileSegment, block: u64, - reader: Weak, + reader: Weak>, metrics: Option>, ) -> ProviderResult<(NippyJarWriter, PathBuf)> { let start = Instant::now(); @@ -751,7 +763,7 @@ impl StaticFileProviderRW { Ok(()) } - fn reader(&self) -> StaticFileProvider { + fn reader(&self) -> StaticFileProvider { Self::upgrade_provider_to_strong_reference(&self.reader) } @@ -764,8 +776,8 @@ impl StaticFileProviderRW { /// active. In reality, it's impossible to detach the [`StaticFileProviderRW`] from the /// [`StaticFileProvider`]. fn upgrade_provider_to_strong_reference( - provider: &Weak, - ) -> StaticFileProvider { + provider: &Weak>, + ) -> StaticFileProvider { provider.upgrade().map(StaticFileProvider).expect("StaticFileProvider is dropped") } diff --git a/crates/storage/provider/src/test_utils/noop.rs b/crates/storage/provider/src/test_utils/noop.rs index 38fab0dc3112..d12539a2c27f 100644 --- a/crates/storage/provider/src/test_utils/noop.rs +++ b/crates/storage/provider/src/test_utils/noop.rs @@ -556,7 +556,9 @@ impl PruneCheckpointReader for NoopProvider { } impl StaticFileProviderFactory for NoopProvider { - fn static_file_provider(&self) -> StaticFileProvider { + type Primitives = (); + + fn static_file_provider(&self) -> StaticFileProvider { StaticFileProvider::read_only(PathBuf::default(), false).unwrap() } } diff --git a/crates/storage/provider/src/traits/static_file_provider.rs b/crates/storage/provider/src/traits/static_file_provider.rs index 24d69569205c..d465121fb46c 100644 --- a/crates/storage/provider/src/traits/static_file_provider.rs +++ b/crates/storage/provider/src/traits/static_file_provider.rs @@ -1,7 +1,12 @@ +use reth_node_types::NodePrimitives; + use crate::providers::StaticFileProvider; /// Static file provider factory. pub trait StaticFileProviderFactory { + /// The network primitives type [`StaticFileProvider`] is using. + type Primitives: NodePrimitives; + /// Create new instance of static file provider. - fn static_file_provider(&self) -> StaticFileProvider; + fn static_file_provider(&self) -> StaticFileProvider; } diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index 0fbec6c1b881..1c3894e9cfdb 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -1,7 +1,8 @@ use crate::{ providers::{StaticFileProvider, StaticFileProviderRWRefMut, StaticFileWriter as SfWriter}, writer::static_file::StaticFileWriter, - BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, TrieWriter, + BlockExecutionWriter, BlockWriter, HistoryWriter, StateChangeWriter, StateWriter, + StaticFileProviderFactory, TrieWriter, }; use alloy_consensus::Header; use alloy_primitives::{BlockNumber, B256, U256}; @@ -115,15 +116,13 @@ impl UnifiedStorageWriter<'_, (), ()> { /// start-up. /// /// NOTE: If unwinding data from storage, use `commit_unwind` instead! - pub fn commit

( - database: impl Into

+ AsRef

, - static_file: StaticFileProvider, - ) -> ProviderResult<()> + pub fn commit

(provider: P) -> ProviderResult<()> where - P: DBProvider, + P: DBProvider + StaticFileProviderFactory, { + let static_file = provider.static_file_provider(); static_file.commit()?; - database.into().into_tx().commit()?; + provider.commit()?; Ok(()) } @@ -135,20 +134,18 @@ impl UnifiedStorageWriter<'_, (), ()> { /// checkpoints on the next start-up. /// /// NOTE: Should only be used after unwinding data from storage! - pub fn commit_unwind

( - database: impl Into

+ AsRef

, - static_file: StaticFileProvider, - ) -> ProviderResult<()> + pub fn commit_unwind

(provider: P) -> ProviderResult<()> where - P: DBProvider, + P: DBProvider + StaticFileProviderFactory, { - database.into().into_tx().commit()?; + let static_file = provider.static_file_provider(); + provider.commit()?; static_file.commit()?; Ok(()) } } -impl UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider> +impl UnifiedStorageWriter<'_, ProviderDB, &StaticFileProvider> where ProviderDB: DBProvider + BlockWriter @@ -158,7 +155,8 @@ where + HistoryWriter + StageCheckpointWriter + BlockExecutionWriter - + AsRef, + + AsRef + + StaticFileProviderFactory, { /// Writes executed blocks and receipts to storage. pub fn save_blocks(&self, blocks: &[ExecutedBlock]) -> ProviderResult<()> { @@ -319,9 +317,10 @@ where } } -impl UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_>> +impl + UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>> where - ProviderDB: DBProvider + HeaderProvider, + ProviderDB: DBProvider + HeaderProvider + StaticFileProviderFactory, { /// Ensures that the static file writer is set and of the right [`StaticFileSegment`] variant. /// @@ -430,9 +429,10 @@ where } } -impl UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_>> +impl + UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>> where - ProviderDB: DBProvider + HeaderProvider, + ProviderDB: DBProvider + HeaderProvider + StaticFileProviderFactory, { /// Appends receipts block by block. /// @@ -512,9 +512,12 @@ where } impl StateWriter - for UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_>> + for UnifiedStorageWriter<'_, ProviderDB, StaticFileProviderRWRefMut<'_, ProviderDB::Primitives>> where - ProviderDB: DBProvider + StateChangeWriter + HeaderProvider, + ProviderDB: DBProvider + + StateChangeWriter + + HeaderProvider + + StaticFileProviderFactory, { /// Write the data and receipts to the database or static files if `static_file_producer` is /// `Some`. It should be `None` if there is any kind of pruning/filtering over the receipts. diff --git a/crates/storage/provider/src/writer/static_file.rs b/crates/storage/provider/src/writer/static_file.rs index 5514e211e58f..f7227d21ef34 100644 --- a/crates/storage/provider/src/writer/static_file.rs +++ b/crates/storage/provider/src/writer/static_file.rs @@ -1,12 +1,13 @@ use crate::providers::StaticFileProviderRWRefMut; use alloy_primitives::{BlockNumber, TxNumber}; use reth_errors::ProviderResult; +use reth_node_types::NodePrimitives; use reth_primitives::Receipt; use reth_storage_api::ReceiptWriter; pub(crate) struct StaticFileWriter<'a, W>(pub(crate) &'a mut W); -impl ReceiptWriter for StaticFileWriter<'_, StaticFileProviderRWRefMut<'_>> { +impl ReceiptWriter for StaticFileWriter<'_, StaticFileProviderRWRefMut<'_, N>> { fn append_block_receipts( &mut self, first_tx_index: TxNumber,