diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs
index 4f20cea769e49..64968509a40ca 100644
--- a/crates/chain-state/src/in_memory.rs
+++ b/crates/chain-state/src/in_memory.rs
@@ -20,6 +20,7 @@ use std::{
time::Instant,
};
use tokio::sync::{broadcast, watch};
+use tracing::debug;
/// Size of the broadcast channel used to notify canonical state events.
const CANON_STATE_NOTIFICATION_CHANNEL_SIZE: usize = 256;
@@ -245,6 +246,7 @@ impl CanonicalInMemoryState {
// we first remove the blocks from the reorged chain
for block in reorged {
+ debug!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "removing block from in_memory_state");
let hash = block.block().hash();
let number = block.block().number;
blocks.remove(&hash);
@@ -253,6 +255,7 @@ impl CanonicalInMemoryState {
// insert the new blocks
for block in new_blocks {
+ debug!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "reinserting block into in_memory_state");
let parent = blocks.get(&block.block().parent_hash).cloned();
let block_state =
BlockState::with_parent(block.clone(), parent.map(|p| (*p).clone()));
@@ -288,11 +291,24 @@ impl CanonicalInMemoryState {
///
/// This will update the links between blocks and remove all blocks that are [..
/// `persisted_height`].
- pub fn remove_persisted_blocks(&self, persisted_height: u64) {
+ pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) {
+ // if the persisted hash is not in the canonical in memory state, do nothing, because it
+ // means canonical blocks were not actually persisted.
+ //
+ // This can happen if the persistence task takes a long time, while a reorg is happening.
+ {
+ if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() {
+ // do nothing
+ return
+ }
+ }
+
{
let mut blocks = self.inner.in_memory_state.blocks.write();
let mut numbers = self.inner.in_memory_state.numbers.write();
+ let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash;
+
// clear all numbers
numbers.clear();
diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs
index 82c7883897a49..2ac0c493d1651 100644
--- a/crates/engine/tree/src/persistence.rs
+++ b/crates/engine/tree/src/persistence.rs
@@ -4,7 +4,7 @@ use crate::metrics::PersistenceMetrics;
use reth_chain_state::ExecutedBlock;
use reth_db::Database;
use reth_errors::ProviderError;
-use reth_primitives::B256;
+use reth_primitives::{BlockNumber, B256};
use reth_provider::{
writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory,
};
@@ -89,26 +89,46 @@ where
Ok(())
}
- fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result, PersistenceError> {
+ fn on_remove_blocks_above(
+ &self,
+ new_tip_num: u64,
+ ) -> Result , PersistenceError> {
debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
+
let provider_rw = self.provider.provider_rw()?;
let sf_provider = self.provider.static_file_provider();
let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
- UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?;
- UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?;
+ debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Got new tip hash");
+ if let Err(err) =
+ UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)
+ {
+ error!(target: "tree::persistence", ?err, ?new_tip_num, ?new_tip_hash, "Error while removing blocks from disk");
+ return Err(err.into())
+ }
+
+ if let Err(err) = UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider) {
+ error!(target: "tree::persistence", ?err, ?new_tip_num, ?new_tip_hash, "Error while committing block removal");
+ return Err(err.into())
+ }
+
+ debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk");
self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed());
- Ok(new_tip_hash)
+ Ok(new_tip_hash.map(|hash| (hash, new_tip_num)))
}
- fn on_save_blocks(&self, blocks: Vec) -> Result, PersistenceError> {
- debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks");
+ fn on_save_blocks(
+ &self,
+ blocks: Vec,
+ ) -> Result, PersistenceError> {
+ debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks");
let start_time = Instant::now();
- let last_block_hash = blocks.last().map(|block| block.block().hash());
+ let last_block_hash_num =
+ blocks.last().map(|block| (block.block().hash(), block.block().number));
- if last_block_hash.is_some() {
+ if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let static_file_provider = self.provider.static_file_provider();
@@ -116,7 +136,7 @@ where
UnifiedStorageWriter::commit(provider_rw, static_file_provider)?;
}
self.metrics.save_blocks_duration_seconds.record(start_time.elapsed());
- Ok(last_block_hash)
+ Ok(last_block_hash_num)
}
}
@@ -140,13 +160,13 @@ pub enum PersistenceAction {
///
/// First, header, transaction, and receipt-related data should be written to static files.
/// Then the execution history-related data will be written to the database.
- SaveBlocks(Vec, oneshot::Sender>),
+ SaveBlocks(Vec, oneshot::Sender>),
/// Removes block data above the given block number from the database.
///
/// This will first update checkpoints from the database, then remove actual block data from
/// static files.
- RemoveBlocksAbove(u64, oneshot::Sender >),
+ RemoveBlocksAbove(u64, oneshot::Sender >),
/// Prune associated block data before the given block number, according to already-configured
/// prune modes.
@@ -211,7 +231,7 @@ impl PersistenceHandle {
pub fn save_blocks(
&self,
blocks: Vec,
- tx: oneshot::Sender>,
+ tx: oneshot::Sender >,
) -> Result<(), SendError> {
self.send_action(PersistenceAction::SaveBlocks(blocks, tx))
}
@@ -224,7 +244,7 @@ impl PersistenceHandle {
pub fn remove_blocks_above(
&self,
block_num: u64,
- tx: oneshot::Sender>,
+ tx: oneshot::Sender >,
) -> Result<(), SendError> {
self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx))
}
@@ -298,7 +318,7 @@ mod tests {
persistence_handle.save_blocks(blocks, tx).unwrap();
- let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
+ let (actual_hash, _) = tokio::time::timeout(std::time::Duration::from_secs(10), rx)
.await
.expect("test timed out")
.expect("channel closed unexpectedly")
@@ -319,7 +339,7 @@ mod tests {
persistence_handle.save_blocks(blocks, tx).unwrap();
- let actual_hash = rx.await.unwrap().unwrap();
+ let (actual_hash, _) = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
@@ -337,7 +357,7 @@ mod tests {
persistence_handle.save_blocks(blocks, tx).unwrap();
- let actual_hash = rx.await.unwrap().unwrap();
+ let (actual_hash, _) = rx.await.unwrap().unwrap();
assert_eq!(last_hash, actual_hash);
}
}
diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs
index e9caa328e75cb..598962203523f 100644
--- a/crates/engine/tree/src/tree/mod.rs
+++ b/crates/engine/tree/src/tree/mod.rs
@@ -201,6 +201,108 @@ impl TreeState {
Some((executed, children))
}
+ /// Returns whether or not the hash is part of the canonical chain.
+ pub(crate) fn is_canonical(&self, hash: B256) -> bool {
+ let mut current_block = self.current_canonical_head.hash;
+ if current_block == hash {
+ return true
+ }
+
+ while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
+ current_block = executed.block.parent_hash;
+ if current_block == hash {
+ return true
+ }
+ }
+
+ false
+ }
+
+ /// Removes canonical blocks below the upper bound, only if the last persisted hash is
+ /// part of the canonical chain.
+ pub(crate) fn remove_canonical_until(
+ &mut self,
+ upper_bound: BlockNumber,
+ last_persisted_hash: B256,
+ ) {
+ debug!(target: "engine", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree");
+
+ // If the last persisted hash is not canonical, then we don't want to remove any canonical
+ // blocks yet.
+ if !self.is_canonical(last_persisted_hash) {
+ return
+ }
+
+ // First, let's walk back the canonical chain and remove canonical blocks lower than the
+ // upper bound
+ let mut current_block = self.current_canonical_head.hash;
+ while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
+ current_block = executed.block.parent_hash;
+ if executed.block.number <= upper_bound {
+ debug!(target: "engine", num_hash=?executed.block.num_hash(), "Attempting to remove block walking back from the head");
+ if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) {
+ debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed block walking back from the head");
+ // finally, move the trie updates
+ self.persisted_trie_updates
+ .insert(removed.block.hash(), (removed.block.number, removed.trie));
+ }
+ }
+ }
+ }
+
+ /// Removes all blocks that are below the finalized block, as well as removing non-canonical
+ /// sidechains that fork from below the finalized block.
+ pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) {
+ let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash;
+
+ // We remove disconnected sidechains in three steps:
+ // * first, remove everything with a block number __below__ the finalized block.
+ // * next, we populate a vec with parents __at__ the finalized block.
+ // * finally, we iterate through the vec, removing children until the vec is empty
+ // (BFS).
+
+ // We _exclude_ the finalized block because we will be dealing with the blocks __at__
+ // the finalized block later.
+ let blocks_to_remove = self
+ .blocks_by_number
+ .range((Bound::Unbounded, Bound::Excluded(finalized_num)))
+ .flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash()))
+ .collect::>();
+ for hash in blocks_to_remove {
+ if let Some((removed, _)) = self.remove_by_hash(hash) {
+ debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed finalized sidechain block");
+ }
+ }
+
+ // remove trie updates that are below the finalized block
+ self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num < finalized_num);
+
+ // The only block that should remain at the `finalized` number now, is the finalized
+ // block, if it exists.
+ //
+ // For all other blocks, we first put their children into this vec.
+ // Then, we will iterate over them, removing them, adding their children, etc etc,
+ // until the vec is empty.
+ let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default();
+
+ // re-insert the finalized hash if we removed it
+ if let Some(position) =
+ blocks_to_remove.iter().position(|b| b.block.hash() == finalized_hash)
+ {
+ let finalized_block = blocks_to_remove.swap_remove(position);
+ self.blocks_by_number.insert(finalized_num, vec![finalized_block]);
+ }
+
+ let mut blocks_to_remove =
+ blocks_to_remove.into_iter().map(|e| e.block.hash()).collect::>();
+ while let Some(block) = blocks_to_remove.pop_front() {
+ if let Some((removed, children)) = self.remove_by_hash(block) {
+ debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed finalized sidechain child block");
+ blocks_to_remove.extend(children);
+ }
+ }
+ }
+
/// Remove all blocks up to __and including__ the given block number.
///
/// If a finalized hash is provided, the only non-canonical blocks which will be removed are
@@ -214,17 +316,18 @@ impl TreeState {
pub(crate) fn remove_until(
&mut self,
upper_bound: BlockNumber,
- finalized_num: Option,
+ last_persisted_hash: B256,
+ finalized_num_hash: Option,
) {
- debug!(target: "engine", ?upper_bound, ?finalized_num, "Removing blocks from the tree");
+ debug!(target: "engine", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree");
// If the finalized num is ahead of the upper bound, and exists, we need to instead ensure
// that the only blocks removed, are canonical blocks less than the upper bound
// finalized_num.take_if(|finalized| *finalized > upper_bound);
- let finalized_num = finalized_num.map(|finalized| {
- let new_finalized_num = finalized.min(upper_bound);
- debug!(target: "engine", ?new_finalized_num, "Adjusted upper bound");
- new_finalized_num
+ let finalized_num_hash = finalized_num_hash.map(|mut finalized| {
+ finalized.number = finalized.number.min(upper_bound);
+ debug!(target: "engine", ?finalized, "Adjusted upper bound");
+ finalized
});
// We want to do two things:
@@ -234,60 +337,12 @@ impl TreeState {
// * remove all canonical blocks below the upper bound
// * fetch the number of the finalized hash, removing any sidechains that are __below__ the
// finalized block
-
- // First, let's walk back the canonical chain and remove canonical blocks lower than the
- // upper bound
- let mut current_block = self.current_canonical_head.hash;
- while let Some(executed) = self.blocks_by_hash.get(¤t_block) {
- current_block = executed.block.parent_hash;
- if executed.block.number <= upper_bound {
- if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) {
- // finally, move the trie updates
- self.persisted_trie_updates
- .insert(removed.block.hash(), (removed.block.number, removed.trie));
- }
- }
- }
+ self.remove_canonical_until(upper_bound, last_persisted_hash);
// Now, we have removed canonical blocks (assuming the upper bound is above the finalized
// block) and only have sidechains below the finalized block.
- if let Some(finalized) = finalized_num {
- // We remove disconnected sidechains in three steps:
- // * first, remove everything with a block number __below__ the finalized block.
- // * next, we populate a vec with parents __at__ the finalized block.
- // * finally, we iterate through the vec, removing children until the vec is empty
- // (BFS).
-
- // We _exclude_ the finalized block because we will be dealing with the blocks __at__
- // the finalized block later.
- let blocks_to_remove = self
- .blocks_by_number
- .range((Bound::Unbounded, Bound::Excluded(finalized)))
- .flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash()))
- .collect::>();
- for hash in blocks_to_remove {
- self.remove_by_hash(hash);
- }
-
- // remove trie updates that are below the finalized block
- self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num <= finalized);
-
- // The only blocks that exist at `finalized_num` now, are blocks in sidechains that
- // should be removed.
- //
- // We first put their children into this vec.
- // Then, we will iterate over them, removing them, adding their children, etc etc,
- // until the vec is empty.
- let mut blocks_to_remove = self
- .blocks_by_number
- .remove(&finalized)
- .map(|blocks| blocks.into_iter().map(|e| e.block.hash()).collect::>())
- .unwrap_or_default();
- while let Some(block) = blocks_to_remove.pop_front() {
- if let Some((_, children)) = self.remove_by_hash(block) {
- blocks_to_remove.extend(children);
- }
- }
+ if let Some(finalized_num_hash) = finalized_num_hash {
+ self.prune_finalized_sidechains(finalized_num_hash);
}
}
@@ -531,7 +586,7 @@ where
last_persisted_block_hash: header.hash(),
last_persisted_block_number: best_block_number,
rx: None,
- remove_above_state: None,
+ remove_above_state: VecDeque::new(),
};
let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
@@ -1053,11 +1108,14 @@ where
/// or send a new persistence action if necessary.
fn advance_persistence(&mut self) -> Result<(), TryRecvError> {
if !self.persistence_state.in_progress() {
- if let Some(new_tip_num) = self.persistence_state.remove_above_state.take() {
- debug!(target: "engine", ?new_tip_num, "Removing blocks using persistence task");
- let (tx, rx) = oneshot::channel();
- let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
- self.persistence_state.start(rx);
+ if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() {
+ debug!(target: "engine", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block_number, "Removing blocks using persistence task");
+ if new_tip_num < self.persistence_state.last_persisted_block_number {
+ debug!(target: "engine", ?new_tip_num, "Starting remove blocks job");
+ let (tx, rx) = oneshot::channel();
+ let _ = self.persistence.remove_blocks_above(new_tip_num, tx);
+ self.persistence_state.start(rx);
+ }
} else if self.should_persist() {
let blocks_to_persist = self.get_canonical_blocks_to_persist();
if blocks_to_persist.is_empty() {
@@ -1078,22 +1136,21 @@ where
.expect("if a persistence task is in progress Receiver must be Some");
// Check if persistence has complete
match rx.try_recv() {
- Ok(last_persisted_block_hash) => {
+ Ok(last_persisted_hash_num) => {
self.metrics.engine.persistence_duration.record(start_time.elapsed());
- let Some(last_persisted_block_hash) = last_persisted_block_hash else {
+ let Some((last_persisted_block_hash, last_persisted_block_number)) =
+ last_persisted_hash_num
+ else {
// if this happened, then we persisted no blocks because we sent an
// empty vec of blocks
warn!(target: "engine", "Persistence task completed but did not persist any blocks");
return Ok(())
};
- if let Some(block) =
- self.state.tree_state.block_by_hash(last_persisted_block_hash)
- {
- self.persistence_state.finish(last_persisted_block_hash, block.number);
- self.on_new_persisted_block();
- } else {
- error!("could not find persisted block with hash {last_persisted_block_hash} in memory");
- }
+
+ debug!(target: "engine", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish");
+ self.persistence_state
+ .finish(last_persisted_block_hash, last_persisted_block_number);
+ self.on_new_persisted_block();
}
Err(TryRecvError::Closed) => return Err(TryRecvError::Closed),
Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)),
@@ -1211,7 +1268,16 @@ where
//
// We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state
// before that
- self.state.tree_state.remove_until(backfill_height, Some(backfill_height));
+ let backfill_num_hash = self
+ .provider
+ .block_hash(backfill_height)?
+ .map(|hash| BlockNumHash { hash, number: backfill_height });
+
+ self.state.tree_state.remove_until(
+ backfill_height,
+ self.persistence_state.last_persisted_block_hash,
+ backfill_num_hash,
+ );
self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64);
// remove all buffered blocks below the backfill height
@@ -1389,10 +1455,15 @@ where
/// Assumes that `finish` has been called on the `persistence_state` at least once
fn on_new_persisted_block(&mut self) {
let finalized = self.state.forkchoice_state_tracker.last_valid_finalized();
+ debug!(target: "engine", last_persisted_hash=?self.persistence_state.last_persisted_block_hash, last_persisted_number=?self.persistence_state.last_persisted_block_number, ?finalized, "New persisted block, clearing in memory blocks");
self.remove_before(self.persistence_state.last_persisted_block_number, finalized)
.expect("todo: error handling");
- self.canonical_in_memory_state
- .remove_persisted_blocks(self.persistence_state.last_persisted_block_number);
+ self.canonical_in_memory_state.remove_persisted_blocks(
+ BlockNumHash {
+ number: self.persistence_state.last_persisted_block_number,
+ hash: self.persistence_state.last_persisted_block_hash,
+ }, // self.persistence_state.last_persisted_block_number,
+ );
}
/// Return an [`ExecutedBlock`] from database or in-memory state by hash.
@@ -1404,6 +1475,7 @@ where
/// For finalized blocks, this will return `None`.
#[allow(unused)]
fn executed_block_by_hash(&self, hash: B256) -> ProviderResult> {
+ debug!(target: "engine", ?hash, "Fetching executed block by hash");
// check memory first
let block = self.state.tree_state.executed_block_by_hash(hash).cloned();
@@ -1858,6 +1930,16 @@ where
let tip = chain_update.tip().header.clone();
let notification = chain_update.to_chain_notification();
+ // reinsert any missing reorged blocks
+ if let NewCanonicalChain::Reorg { new, old } = &chain_update {
+ let new_first = new.first().map(|first| first.block.num_hash());
+ let old_first = old.first().map(|first| first.block.num_hash());
+ debug!(target: "engine", ?new_first, ?old_first, "Reorg detected, new and old first blocks");
+
+ self.reinsert_reorged_blocks(new.clone());
+ self.reinsert_reorged_blocks(old.clone());
+ }
+
// update the tracked in-memory state with the new chain
self.canonical_in_memory_state.update_chain(chain_update);
self.canonical_in_memory_state.set_canonical_head(tip.clone());
@@ -1872,6 +1954,18 @@ where
));
}
+ /// This reinserts any blocks in the new chain that do not already exist in the tree
+ fn reinsert_reorged_blocks(&mut self, new_chain: Vec) {
+ debug!(target: "engine", first=?new_chain.first().map(|b| b.block.num_hash()), last=?new_chain.last().map(|b| b.block.num_hash()), "Trying to reinsert reorged blocks into tree state");
+ for block in new_chain {
+ debug!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "Trying to reinsert block into tree state");
+ if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() {
+ debug!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state");
+ self.state.tree_state.insert_executed(block);
+ }
+ }
+ }
+
/// This handles downloaded blocks that are shown to be disconnected from the canonical chain.
///
/// This mainly compares the missing parent of the downloaded block with the current canonical
@@ -2337,10 +2431,17 @@ where
) -> ProviderResult<()> {
// first fetch the finalized block number and then call the remove_before method on
// tree_state
- let num =
- if let Some(hash) = finalized_hash { self.provider.block_number(hash)? } else { None };
+ let num = if let Some(hash) = finalized_hash {
+ self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash })
+ } else {
+ None
+ };
- self.state.tree_state.remove_until(upper_bound, num);
+ self.state.tree_state.remove_until(
+ upper_bound,
+ self.persistence_state.last_persisted_block_hash,
+ num,
+ );
Ok(())
}
}
@@ -2354,14 +2455,15 @@ pub struct PersistenceState {
last_persisted_block_hash: B256,
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
- rx: Option<(oneshot::Receiver>, Instant)>,
+ #[allow(clippy::type_complexity)]
+ rx: Option<(oneshot::Receiver >, Instant)>,
/// The last persisted block number.
///
/// This tracks the chain height that is persisted on disk
last_persisted_block_number: u64,
/// The block above which blocks should be removed from disk, because there has been an on disk
/// reorg.
- remove_above_state: Option,
+ remove_above_state: VecDeque,
}
impl PersistenceState {
@@ -2372,14 +2474,15 @@ impl PersistenceState {
}
/// Sets state for a started persistence task.
- fn start(&mut self, rx: oneshot::Receiver>) {
+ fn start(&mut self, rx: oneshot::Receiver >) {
self.rx = Some((rx, Instant::now()));
}
- /// Sets the `remove_above_state`, to the new tip number specified.
+ /// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the
+ /// current `last_persisted_block_number`.
fn schedule_removal(&mut self, new_tip_num: u64) {
- // TODO: what about multiple on-disk reorgs in a row?
- self.remove_above_state = Some(new_tip_num);
+ debug!(target: "engine", ?new_tip_num, prev_remove_state=?self.remove_above_state, last_persisted_block_number=?self.last_persisted_block_number, "Scheduling removal");
+ self.remove_above_state.push_back(new_tip_num);
}
/// Sets state for a finished persistence task.
@@ -2987,7 +3090,8 @@ mod tests {
#[tokio::test]
async fn test_tree_state_remove_before() {
- let mut tree_state = TreeState::new(BlockNumHash::default());
+ let start_num_hash = BlockNumHash::default();
+ let mut tree_state = TreeState::new(start_num_hash);
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
for block in &blocks {
@@ -3000,7 +3104,7 @@ mod tests {
tree_state.set_canonical_head(last.block.num_hash());
// inclusive bound, so we should remove anything up to and including 2
- tree_state.remove_until(2, Some(2));
+ tree_state.remove_until(2, start_num_hash.hash, Some(blocks[1].block.num_hash()));
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
@@ -3032,7 +3136,8 @@ mod tests {
#[tokio::test]
async fn test_tree_state_remove_before_finalized() {
- let mut tree_state = TreeState::new(BlockNumHash::default());
+ let start_num_hash = BlockNumHash::default();
+ let mut tree_state = TreeState::new(start_num_hash);
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
for block in &blocks {
@@ -3045,7 +3150,7 @@ mod tests {
tree_state.set_canonical_head(last.block.num_hash());
// we should still remove everything up to and including 2
- tree_state.remove_until(2, None);
+ tree_state.remove_until(2, start_num_hash.hash, None);
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
@@ -3077,7 +3182,8 @@ mod tests {
#[tokio::test]
async fn test_tree_state_remove_before_lower_finalized() {
- let mut tree_state = TreeState::new(BlockNumHash::default());
+ let start_num_hash = BlockNumHash::default();
+ let mut tree_state = TreeState::new(start_num_hash);
let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect();
for block in &blocks {
@@ -3090,7 +3196,7 @@ mod tests {
tree_state.set_canonical_head(last.block.num_hash());
// we have no forks so we should still remove anything up to and including 2
- tree_state.remove_until(2, Some(1));
+ tree_state.remove_until(2, start_num_hash.hash, Some(blocks[0].block.num_hash()));
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash()));
assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash()));
diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs
index 40287c962ea90..995384d53359a 100644
--- a/crates/storage/provider/src/providers/database/mod.rs
+++ b/crates/storage/provider/src/providers/database/mod.rs
@@ -28,7 +28,7 @@ use std::{
sync::Arc,
};
use tokio::sync::watch;
-use tracing::trace;
+use tracing::{debug, trace};
mod provider;
pub use provider::{DatabaseProvider, DatabaseProviderRO, DatabaseProviderRW};
@@ -206,6 +206,7 @@ impl HeaderProvider for ProviderFactory {
}
fn header_td_by_number(&self, number: BlockNumber) -> ProviderResult> {
+ debug!(target: "engine", ?number, "Getting total difficulty for block");
if let Some(td) = self.chain_spec.final_paris_total_difficulty(number) {
// if this block is higher than the final paris(merge) block, return the final paris
// difficulty
diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs
index db24674197b4c..7e02b1f7ba487 100644
--- a/crates/storage/provider/src/writer/mod.rs
+++ b/crates/storage/provider/src/writer/mod.rs
@@ -260,16 +260,23 @@ where
// Get the total txs for the block range, so we have the correct number of columns for
// receipts and transactions
+ // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
let tx_range = self
.database()
- .transaction_range_by_block_range(block_number..=highest_static_file_block)?;
+ .transaction_range_by_block_range(block_number + 1..=highest_static_file_block)?;
let total_txs = tx_range.end().saturating_sub(*tx_range.start());
+ // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block
debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number");
self.database().remove_block_and_execution_range(
- block_number..=self.database().last_block_number()?,
+ block_number + 1..=self.database().last_block_number()?,
)?;
+ // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure
+ // we remove only what is ABOVE the block.
+ //
+ // i.e., if the highest static file block is 8, we want to remove above block 5 only, we
+ // will have three blocks to remove, which will be block 8, 7, and 6.
debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number");
self.static_file()
.get_writer(block_number, StaticFileSegment::Headers)?