Skip to content

Commit

Permalink
fix(tree): many fixes, pass engine-api hive
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Sep 4, 2024
1 parent e7bb51d commit ef55f7c
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 112 deletions.
18 changes: 17 additions & 1 deletion crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::{
time::Instant,
};
use tokio::sync::{broadcast, watch};
use tracing::debug;

/// Size of the broadcast channel used to notify canonical state events.
const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256;
Expand Down Expand Up @@ -245,6 +246,7 @@ impl CanonicalInMemoryState {

// we first remove the blocks from the reorged chain
for block in reorged {
debug!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "removing block from in_memory_state");
let hash = block.block().hash();
let number = block.block().number;
blocks.remove(&hash);
Expand All @@ -253,6 +255,7 @@ impl CanonicalInMemoryState {

// insert the new blocks
for block in new_blocks {
debug!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "reinserting block into in_memory_state");
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state =
BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
Expand Down Expand Up @@ -288,11 +291,24 @@ impl CanonicalInMemoryState {
///
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_height: u64) {
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
//
// This can happen if the persistence task takes a long time, while a reorg is happening.
{
if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() {
// do nothing
return
}
}

{
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();

let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;

// clear all numbers
numbers.clear();

Expand Down
54 changes: 37 additions & 17 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderError;
use reth_primitives::B256;
use reth_primitives::{BlockNumber, B256};
use reth_provider::{
writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory,
};
Expand Down Expand Up @@ -89,34 +89,54 @@ where
Ok(())
}

fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
fn on_remove_blocks_above(
&self,
new_tip_num: u64,
) -> Result<Option<(B256, BlockNumber)>, PersistenceError> {
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();

let provider_rw = self.provider.provider_rw()?;
let sf_provider = self.provider.static_file_provider();

let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Got new tip hash");
if let Err(err) =
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)
{
error!(target: "tree::persistence", ?err, ?new_tip_num, ?new_tip_hash, "Error while removing blocks from disk");
return Err(err.into())
}

if let Err(err) = UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider) {
error!(target: "tree::persistence", ?err, ?new_tip_num, ?new_tip_hash, "Error while committing block removal");
return Err(err.into())
}

debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");

self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(new_tip_hash)
Ok(new_tip_hash.map(|hash| (hash, new_tip_num)))
}

fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks");
fn on_save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
) -> Result<Option<(B256, BlockNumber)>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash = blocks.last().map(|block| block.block().hash());
let last_block_hash_num =
blocks.last().map(|block| (block.block().hash(), block.block().number));

if last_block_hash.is_some() {
if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash)
Ok(last_block_hash_num)
}
}

Expand All @@ -140,13 +160,13 @@ pub enum PersistenceAction {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<B256>>),
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<(B256, u64)>>),

/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),
RemoveBlocksAbove(u64, oneshot::Sender<Option<(B256, u64)>>),

/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
Expand Down Expand Up @@ -211,7 +231,7 @@ impl PersistenceHandle {
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<(B256, BlockNumber)>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
Expand All @@ -224,7 +244,7 @@ impl PersistenceHandle {
pub fn remove_blocks_above(
&self,
block_num: u64,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<(B256, BlockNumber)>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
Expand Down Expand Up @@ -298,7 +318,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
let (actual_hash, _) = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
Expand All @@ -319,7 +339,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let (actual_hash, _) = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}

Expand All @@ -337,7 +357,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let (actual_hash, _) = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
}
Expand Down
Loading

0 comments on commit ef55f7c

Please sign in to comment.