Skip to content

Commit

Permalink
chore: use NoopBodiesDownloader & NoopHeaderDownloader on `stage …
Browse files Browse the repository at this point in the history
…unwind` command (#8165)
  • Loading branch information
joshieDo authored May 9, 2024
1 parent aa07257 commit 1184e8c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 41 deletions.
46 changes: 5 additions & 41 deletions bin/reth/src/commands/stage/unwind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,9 @@ use reth_beacon_consensus::EthBeaconConsensus;
use reth_config::{Config, PruneConfig};
use reth_consensus::Consensus;
use reth_db::{database::Database, open_db};
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_downloaders::{bodies::noop::NoopBodiesDownloader, headers::noop::NoopHeaderDownloader};
use reth_exex::ExExManagerHandle;
use reth_node_core::{
args::{get_secret_key, NetworkArgs},
dirs::ChainPath,
};
use reth_node_core::args::NetworkArgs;
use reth_primitives::{BlockHashOrNumber, ChainSpec, PruneModes, B256};
use reth_provider::{
BlockExecutionWriter, BlockNumReader, ChainSpecProvider, HeaderSyncMode, ProviderFactory,
Expand Down Expand Up @@ -110,8 +104,7 @@ impl Command {
.filter(|highest_static_file_block| highest_static_file_block >= range.start())
{
info!(target: "reth::cli", ?range, ?highest_static_block, "Executing a pipeline unwind.");
let mut pipeline =
self.build_pipeline(data_dir, config, provider_factory.clone()).await?;
let mut pipeline = self.build_pipeline(config, provider_factory.clone()).await?;

// Move all applicable data from database to static files.
pipeline.produce_static_files()?;
Expand Down Expand Up @@ -142,40 +135,11 @@ impl Command {

async fn build_pipeline<DB: Database + 'static>(
self,
data_dir: ChainPath<DataDirPath>,
config: Config,
provider_factory: ProviderFactory<Arc<DB>>,
) -> Result<Pipeline<Arc<DB>>, eyre::Error> {
// Even though we are not planning to download anything, we need to initialize Body and
// Header stage with a network client
let network_secret_path =
self.network.p2p_secret_key.clone().unwrap_or_else(|| data_dir.p2p_secret());
let p2p_secret_key = get_secret_key(&network_secret_path)?;
let default_peers_path = data_dir.known_peers();
let network = self
.network
.network_config(
&config,
provider_factory.chain_spec(),
p2p_secret_key,
default_peers_path,
)
.build(provider_factory.clone())
.start_network()
.await?;

let consensus: Arc<dyn Consensus> =
Arc::new(EthBeaconConsensus::new(provider_factory.chain_spec()));

// building network downloaders using the fetch client
let fetch_client = network.fetch_client().await?;
let header_downloader = ReverseHeadersDownloaderBuilder::new(config.stages.headers)
.build(fetch_client.clone(), Arc::clone(&consensus));
let body_downloader = BodiesDownloaderBuilder::new(config.stages.bodies).build(
fetch_client,
Arc::clone(&consensus),
provider_factory.clone(),
);
let stage_conf = &config.stages;

let (tip_tx, tip_rx) = watch::channel(B256::ZERO);
Expand All @@ -189,8 +153,8 @@ impl Command {
provider_factory.clone(),
header_mode,
Arc::clone(&consensus),
header_downloader,
body_downloader,
NoopHeaderDownloader::default(),
NoopBodiesDownloader::default(),
executor.clone(),
stage_conf.etl.clone(),
)
Expand Down
3 changes: 3 additions & 0 deletions crates/net/downloaders/src/bodies/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
#[allow(clippy::module_inception)]
pub mod bodies;

/// A body downloader that does nothing. Useful to build unwind-only pipelines.
pub mod noop;

/// A downloader implementation that spawns a downloader to a task
pub mod task;

Expand Down
29 changes: 29 additions & 0 deletions crates/net/downloaders/src/bodies/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use futures::Stream;
use reth_interfaces::p2p::{
bodies::{downloader::BodyDownloader, response::BlockResponse},
error::{DownloadError, DownloadResult},
};
use reth_primitives::BlockNumber;
use std::ops::RangeInclusive;

/// A [BodyDownloader] implementation that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopBodiesDownloader;

impl BodyDownloader for NoopBodiesDownloader {
fn set_download_range(&mut self, _: RangeInclusive<BlockNumber>) -> DownloadResult<()> {
Ok(())
}
}

impl Stream for NoopBodiesDownloader {
type Item = Result<Vec<BlockResponse>, DownloadError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
panic!("NoopBodiesDownloader shouldn't be polled.")
}
}
3 changes: 3 additions & 0 deletions crates/net/downloaders/src/headers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
/// A Linear downloader implementation.
pub mod reverse_headers;

/// A header downloader that does nothing. Useful to build unwind-only pipelines.
pub mod noop;

/// A downloader implementation that spawns a downloader to a task
pub mod task;

Expand Down
30 changes: 30 additions & 0 deletions crates/net/downloaders/src/headers/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
use futures::Stream;
use reth_interfaces::p2p::headers::{
downloader::{HeaderDownloader, SyncTarget},
error::HeadersDownloaderError,
};
use reth_primitives::SealedHeader;

/// A [HeaderDownloader] implementation that does nothing.
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct NoopHeaderDownloader;

impl HeaderDownloader for NoopHeaderDownloader {
fn update_local_head(&mut self, _: SealedHeader) {}

fn update_sync_target(&mut self, _: SyncTarget) {}

fn set_batch_size(&mut self, _: usize) {}
}

impl Stream for NoopHeaderDownloader {
type Item = Result<Vec<SealedHeader>, HeadersDownloaderError>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
panic!("NoopHeaderDownloader shouldn't be polled.")
}
}

0 comments on commit 1184e8c

Please sign in to comment.