Skip to content

Commit

Permalink
feat: move body writing to BlockWriter trait (#12538)
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr authored Nov 14, 2024
1 parent 5c655e4 commit ff6b78a
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 51 deletions.
8 changes: 8 additions & 0 deletions crates/net/p2p/src/bodies/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ impl<B> BlockResponse<B> {
Self::Empty(header) => header.difficulty,
}
}

/// Return the reference to the response body
pub fn into_body(self) -> Option<B> {
match self {
Self::Full(block) => Some(block.body),
Self::Empty(_) => None,
}
}
}

impl<B: InMemorySize> InMemorySize for BlockResponse<B> {
Expand Down
72 changes: 24 additions & 48 deletions crates/stages/stages/src/stages/bodies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ use futures_util::TryStreamExt;
use tracing::*;

use alloy_primitives::TxNumber;
use reth_db::tables;
use reth_db::{tables, transaction::DbTx};
use reth_db_api::{
cursor::{DbCursorRO, DbCursorRW},
models::{StoredBlockBodyIndices, StoredBlockOmmers, StoredBlockWithdrawals},
transaction::DbTxMut,
};
use reth_network_p2p::bodies::{downloader::BodyDownloader, response::BlockResponse};
use reth_primitives::StaticFileSegment;
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
BlockReader, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader,
BlockReader, BlockWriter, DBProvider, ProviderError, StaticFileProviderFactory, StatsReader,
};
use reth_stages_api::{
EntitiesCheckpoint, ExecInput, ExecOutput, Stage, StageCheckpoint, StageError, StageId,
Expand Down Expand Up @@ -72,7 +71,11 @@ impl<D: BodyDownloader> BodyStage<D> {

impl<Provider, D> Stage<Provider> for BodyStage<D>
where
Provider: DBProvider<Tx: DbTxMut> + StaticFileProviderFactory + StatsReader + BlockReader,
Provider: DBProvider<Tx: DbTxMut>
+ StaticFileProviderFactory
+ StatsReader
+ BlockReader
+ BlockWriter,
D: BodyDownloader<Body = reth_primitives::BlockBody>,
{
/// Return the id of the stage
Expand Down Expand Up @@ -116,15 +119,13 @@ where
}
let (from_block, to_block) = input.next_block_range().into_inner();

// Cursors used to write bodies, ommers and transactions
let tx = provider.tx_ref();
let mut block_indices_cursor = tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut tx_block_cursor = tx.cursor_write::<tables::TransactionBlocks>()?;
let mut ommers_cursor = tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = tx.cursor_write::<tables::BlockWithdrawals>()?;

// Get id for the next tx_num of zero if there are no transactions.
let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();
let mut next_tx_num = provider
.tx_ref()
.cursor_read::<tables::TransactionBlocks>()?
.last()?
.map(|(id, _)| id + 1)
.unwrap_or_default();

let static_file_provider = provider.static_file_provider();
let mut static_file_producer =
Expand Down Expand Up @@ -166,17 +167,10 @@ where
let buffer = self.buffer.take().ok_or(StageError::MissingDownloadBuffer)?;
trace!(target: "sync::stages::bodies", bodies_len = buffer.len(), "Writing blocks");
let mut highest_block = from_block;
for response in buffer {
// Write block
let block_number = response.block_number();

let block_indices = StoredBlockBodyIndices {
first_tx_num: next_tx_num,
tx_count: match &response {
BlockResponse::Full(block) => block.body.transactions.len() as u64,
BlockResponse::Empty(_) => 0,
},
};
// Firstly, write transactions to static files
for response in &buffer {
let block_number = response.block_number();

// Increment block on static file header.
if block_number > 0 {
Expand All @@ -195,15 +189,10 @@ where

match response {
BlockResponse::Full(block) => {
// write transaction block index
if !block.body.transactions.is_empty() {
tx_block_cursor.append(block_indices.last_tx_num(), block.number)?;
}

// Write transactions
for transaction in block.body.transactions {
for transaction in &block.body.transactions {
let appended_tx_number = static_file_producer
.append_transaction(next_tx_num, &transaction.into())?;
.append_transaction(next_tx_num, &transaction.clone().into())?;

if appended_tx_number != next_tx_num {
// This scenario indicates a critical error in the logic of adding new
Expand All @@ -218,32 +207,19 @@ where
// Increment transaction id for each transaction.
next_tx_num += 1;
}

// Write ommers if any
if !block.body.ommers.is_empty() {
ommers_cursor.append(
block_number,
StoredBlockOmmers { ommers: block.body.ommers },
)?;
}

// Write withdrawals if any
if let Some(withdrawals) = block.body.withdrawals {
if !withdrawals.is_empty() {
withdrawals_cursor
.append(block_number, StoredBlockWithdrawals { withdrawals })?;
}
}
}
BlockResponse::Empty(_) => {}
};

// insert block meta
block_indices_cursor.append(block_number, block_indices)?;

highest_block = block_number;
}

// Write bodies to database. This will NOT write transactions to database as we've already
// written them directly to static files.
provider.append_block_bodies(
buffer.into_iter().map(|response| (response.block_number(), response.into_body())),
)?;

// The stage is "done" if:
// - We got fewer blocks than our target
// - We reached our target and the target was not limited by the batch size of the stage
Expand Down
2 changes: 1 addition & 1 deletion crates/storage/db-models/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub type NumTransactions = u64;
///
/// It has the pointer to the transaction Number of the first
/// transaction in the block and the total number of transactions.
#[derive(Debug, Default, Eq, PartialEq, Clone, Serialize, Deserialize, Compact)]
#[derive(Debug, Default, Eq, PartialEq, Clone, Copy, Serialize, Deserialize, Compact)]
#[cfg_attr(any(test, feature = "arbitrary"), derive(arbitrary::Arbitrary))]
#[add_arbitrary_tests(compact)]
pub struct StoredBlockBodyIndices {
Expand Down
46 changes: 45 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3250,7 +3250,7 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks> +
}

let block_indices = StoredBlockBodyIndices { first_tx_num, tx_count };
self.tx.put::<tables::BlockBodyIndices>(block_number, block_indices.clone())?;
self.tx.put::<tables::BlockBodyIndices>(block_number, block_indices)?;
durations_recorder.record_relative(metrics::Action::InsertBlockBodyIndices);

if !block_indices.is_empty() {
Expand All @@ -3268,6 +3268,50 @@ impl<TX: DbTxMut + DbTx + 'static, N: NodeTypes<ChainSpec: EthereumHardforks> +
Ok(block_indices)
}

fn append_block_bodies(
&self,
bodies: impl Iterator<Item = (BlockNumber, Option<BlockBody>)>,
) -> ProviderResult<()> {
let mut block_indices_cursor = self.tx.cursor_write::<tables::BlockBodyIndices>()?;
let mut tx_block_cursor = self.tx.cursor_write::<tables::TransactionBlocks>()?;
let mut ommers_cursor = self.tx.cursor_write::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = self.tx.cursor_write::<tables::BlockWithdrawals>()?;

// Get id for the next tx_num of zero if there are no transactions.
let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default();

for (block_number, body) in bodies {
let tx_count = body.as_ref().map(|b| b.transactions.len() as u64).unwrap_or_default();
let block_indices = StoredBlockBodyIndices { first_tx_num: next_tx_num, tx_count };

// insert block meta
block_indices_cursor.append(block_number, block_indices)?;

next_tx_num += tx_count;
let Some(body) = body else { continue };

// write transaction block index
if !body.transactions.is_empty() {
tx_block_cursor.append(block_indices.last_tx_num(), block_number)?;
}

// Write ommers if any
if !body.ommers.is_empty() {
ommers_cursor.append(block_number, StoredBlockOmmers { ommers: body.ommers })?;
}

// Write withdrawals if any
if let Some(withdrawals) = body.withdrawals {
if !withdrawals.is_empty() {
withdrawals_cursor
.append(block_number, StoredBlockWithdrawals { withdrawals })?;
}
}
}

Ok(())
}

/// TODO(joshie): this fn should be moved to `UnifiedStorageWriter` eventually
fn append_blocks_with_state(
&self,
Expand Down
12 changes: 11 additions & 1 deletion crates/storage/provider/src/traits/block.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use alloy_primitives::BlockNumber;
use reth_db_api::models::StoredBlockBodyIndices;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::SealedBlockWithSenders;
use reth_primitives::{BlockBody, SealedBlockWithSenders};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::{updates::TrieUpdates, HashedPostStateSorted};
use std::ops::RangeInclusive;
Expand Down Expand Up @@ -40,6 +40,16 @@ pub trait BlockWriter: Send + Sync {
fn insert_block(&self, block: SealedBlockWithSenders)
-> ProviderResult<StoredBlockBodyIndices>;

/// Appends a batch of block bodies extending the canonical chain. This is invoked during
/// `Bodies` stage and does not write to `TransactionHashNumbers` and `TransactionSenders`
/// tables which are populated on later stages.
///
/// Bodies are passed as [`Option`]s, if body is `None` the corresponding block is empty.
fn append_block_bodies(
&self,
bodies: impl Iterator<Item = (BlockNumber, Option<BlockBody>)>,
) -> ProviderResult<()>;

/// Appends a batch of sealed blocks to the blockchain, including sender information, and
/// updates the post-state.
///
Expand Down

0 comments on commit ff6b78a

Please sign in to comment.