Skip to content

Commit

Permalink
fix(tree): many fixes, pass engine-api hive
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Sep 5, 2024
1 parent c267c1c commit d339b30
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 117 deletions.
15 changes: 14 additions & 1 deletion crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,24 @@ impl CanonicalInMemoryState {
///
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_height: u64) {
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
//
// This can happen if the persistence task takes a long time, while a reorg is happening.
{
if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() {
// do nothing
return
}
}

{
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();

let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;

// clear all numbers
numbers.clear();

Expand Down
51 changes: 29 additions & 22 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![allow(dead_code)]

use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_node_types::NodeTypesWithDB;
use reth_primitives::B256;
use reth_primitives::BlockNumHash;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
StaticFileProviderFactory,
Expand Down Expand Up @@ -87,7 +85,10 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
Ok(())
}

fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
fn on_remove_blocks_above(
&self,
new_tip_num: u64,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
Expand All @@ -97,24 +98,30 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;

debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(new_tip_hash)
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
}

fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks");
fn on_save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash = blocks.last().map(|block| block.block().hash());
let last_block_hash_num = blocks
.last()
.map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number });

if last_block_hash.is_some() {
if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash)
Ok(last_block_hash_num)
}
}

Expand All @@ -138,13 +145,13 @@ pub enum PersistenceAction {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<B256>>),
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<BlockNumHash>>),

/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),
RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),

/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
Expand Down Expand Up @@ -209,7 +216,7 @@ impl PersistenceHandle {
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
Expand All @@ -222,7 +229,7 @@ impl PersistenceHandle {
pub fn remove_blocks_above(
&self,
block_num: u64,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
Expand Down Expand Up @@ -296,11 +303,12 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
.expect("no hash returned");
let BlockNumHash { hash: actual_hash, number: _ } =
tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
.expect("no hash returned");

assert_eq!(block_hash, actual_hash);
}
Expand All @@ -316,8 +324,7 @@ mod tests {
let (tx, rx) = oneshot::channel();

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}

Expand All @@ -335,7 +342,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
}
Expand Down
Loading

0 comments on commit d339b30

Please sign in to comment.