Skip to content

Commit

Permalink
fix(tree): many fixes, pass engine-api hive
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Sep 4, 2024
1 parent 98b214f commit 9895b2d
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 110 deletions.
15 changes: 14 additions & 1 deletion crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,24 @@ impl CanonicalInMemoryState {
///
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_height: u64) {
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
//
// This can happen if the persistence task takes a long time, while a reorg is happening.
{
if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() {
// do nothing
return
}
}

{
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();

let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;

// clear all numbers
numbers.clear();

Expand Down
38 changes: 23 additions & 15 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderError;
use reth_primitives::B256;
use reth_primitives::{BlockNumber, B256};
use reth_provider::{
writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory,
};
Expand Down Expand Up @@ -89,7 +89,10 @@ where
Ok(())
}

fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
fn on_remove_blocks_above(
&self,
new_tip_num: u64,
) -> Result<Option<(B256, BlockNumber)>, PersistenceError> {
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
Expand All @@ -99,24 +102,29 @@ where
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;

debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(new_tip_hash)
Ok(new_tip_hash.map(|hash| (hash, new_tip_num)))
}

fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks");
fn on_save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
) -> Result<Option<(B256, BlockNumber)>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash = blocks.last().map(|block| block.block().hash());
let last_block_hash_num =
blocks.last().map(|block| (block.block().hash(), block.block().number));

if last_block_hash.is_some() {
if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash)
Ok(last_block_hash_num)
}
}

Expand All @@ -140,13 +148,13 @@ pub enum PersistenceAction {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<B256>>),
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<(B256, u64)>>),

/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),
RemoveBlocksAbove(u64, oneshot::Sender<Option<(B256, u64)>>),

/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
Expand Down Expand Up @@ -211,7 +219,7 @@ impl PersistenceHandle {
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<(B256, BlockNumber)>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
Expand All @@ -224,7 +232,7 @@ impl PersistenceHandle {
pub fn remove_blocks_above(
&self,
block_num: u64,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<(B256, BlockNumber)>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
Expand Down Expand Up @@ -298,7 +306,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
let (actual_hash, _) = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
Expand All @@ -319,7 +327,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let (actual_hash, _) = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}

Expand All @@ -337,7 +345,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let (actual_hash, _) = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
}
Expand Down
Loading

0 comments on commit 9895b2d

Please sign in to comment.