Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(tree): remove_blocks fixes, return hash and number in persistence task #10678

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,24 @@ impl CanonicalInMemoryState {
///
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
pub fn remove_persisted_blocks(&self, persisted_height: u64) {
pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
// if the persisted hash is not in the canonical in memory state, do nothing, because it
// means canonical blocks were not actually persisted.
//
// This can happen if the persistence task takes a long time, while a reorg is happening.
{
if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() {
// do nothing
return
}
}

{
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();

let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;

// clear all numbers
numbers.clear();

Expand Down
51 changes: 29 additions & 22 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![allow(dead_code)]

use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_node_types::NodeTypesWithDB;
use reth_primitives::B256;
use reth_primitives::BlockNumHash;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
StaticFileProviderFactory,
Expand Down Expand Up @@ -87,7 +85,10 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
Ok(())
}

fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<Option<B256>, PersistenceError> {
fn on_remove_blocks_above(
&self,
new_tip_num: u64,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
Expand All @@ -97,24 +98,30 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;

debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
Ok(new_tip_hash)
Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num }))
}

fn on_save_blocks(&self, blocks: Vec<ExecutedBlock>) -> Result<Option<B256>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks");
fn on_save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
let last_block_hash = blocks.last().map(|block| block.block().hash());
let last_block_hash_num = blocks
.last()
.map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number });

if last_block_hash.is_some() {
if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
Ok(last_block_hash)
Ok(last_block_hash_num)
}
}

Expand All @@ -138,13 +145,13 @@ pub enum PersistenceAction {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<B256>>),
SaveBlocks(Vec<ExecutedBlock>, oneshot::Sender<Option<BlockNumHash>>),

/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
RemoveBlocksAbove(u64, oneshot::Sender<Option<B256>>),
RemoveBlocksAbove(u64, oneshot::Sender<Option<BlockNumHash>>),

/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
Expand Down Expand Up @@ -209,7 +216,7 @@ impl PersistenceHandle {
pub fn save_blocks(
&self,
blocks: Vec<ExecutedBlock>,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
Expand All @@ -222,7 +229,7 @@ impl PersistenceHandle {
pub fn remove_blocks_above(
&self,
block_num: u64,
tx: oneshot::Sender<Option<B256>>,
tx: oneshot::Sender<Option<BlockNumHash>>,
) -> Result<(), SendError<PersistenceAction>> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
Expand Down Expand Up @@ -296,11 +303,12 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
.expect("no hash returned");
let BlockNumHash { hash: actual_hash, number: _ } =
tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
.expect("no hash returned");

assert_eq!(block_hash, actual_hash);
}
Expand All @@ -316,8 +324,7 @@ mod tests {
let (tx, rx) = oneshot::channel();

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}

Expand All @@ -335,7 +342,7 @@ mod tests {

persistence_handle.save_blocks(blocks, tx).unwrap();

let actual_hash = rx.await.unwrap().unwrap();
let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
}
Expand Down
Loading
Loading