From d339b3051fcf90b6b6093ae0905d822bf1ed01be Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Fri, 30 Aug 2024 14:45:42 -0400 Subject: [PATCH] fix(tree): many fixes, pass engine-api hive --- crates/chain-state/src/in_memory.rs | 15 +- crates/engine/tree/src/persistence.rs | 51 ++-- crates/engine/tree/src/tree/mod.rs | 286 +++++++++++++++------- crates/storage/provider/src/writer/mod.rs | 11 +- 4 files changed, 246 insertions(+), 117 deletions(-) diff --git a/crates/chain-state/src/in_memory.rs b/crates/chain-state/src/in_memory.rs index 4f20cea769e4..8ec12018c272 100644 --- a/crates/chain-state/src/in_memory.rs +++ b/crates/chain-state/src/in_memory.rs @@ -288,11 +288,24 @@ impl CanonicalInMemoryState { /// /// This will update the links between blocks and remove all blocks that are [.. /// `persisted_height`]. - pub fn remove_persisted_blocks(&self, persisted_height: u64) { + pub fn remove_persisted_blocks(&self, persisted_num_hash: BlockNumHash) { + // if the persisted hash is not in the canonical in memory state, do nothing, because it + // means canonical blocks were not actually persisted. + // + // This can happen if the persistence task takes a long time, while a reorg is happening. + { + if self.inner.in_memory_state.blocks.read().get(&persisted_num_hash.hash).is_none() { + // do nothing + return + } + } + { let mut blocks = self.inner.in_memory_state.blocks.write(); let mut numbers = self.inner.in_memory_state.numbers.write(); + let BlockNumHash { number: persisted_height, hash: _ } = persisted_num_hash; + // clear all numbers numbers.clear(); diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index de1703cd9fca..517d8f3e6d0b 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -1,10 +1,8 @@ -#![allow(dead_code)] - use crate::metrics::PersistenceMetrics; use reth_chain_state::ExecutedBlock; use reth_errors::ProviderError; use reth_node_types::NodeTypesWithDB; -use reth_primitives::B256; +use reth_primitives::BlockNumHash; use reth_provider::{ providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory, @@ -87,7 +85,10 @@ impl PersistenceService { Ok(()) } - fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result, PersistenceError> { + fn on_remove_blocks_above( + &self, + new_tip_num: u64, + ) -> Result, PersistenceError> { debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks"); let start_time = Instant::now(); let provider_rw = self.provider.provider_rw()?; @@ -97,16 +98,22 @@ impl PersistenceService { UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?; + debug!(target: "tree::persistence", ?new_tip_num, ?new_tip_hash, "Removed blocks from disk"); self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); - Ok(new_tip_hash) + Ok(new_tip_hash.map(|hash| BlockNumHash { hash, number: new_tip_num })) } - fn on_save_blocks(&self, blocks: Vec) -> Result, PersistenceError> { - debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.number), last=?blocks.last().map(|b| b.block.number), "Saving range of blocks"); + fn on_save_blocks( + &self, + blocks: Vec, + ) -> Result, PersistenceError> { + debug!(target: "tree::persistence", first=?blocks.first().map(|b| b.block.num_hash()), last=?blocks.last().map(|b| b.block.num_hash()), "Saving range of blocks"); let start_time = Instant::now(); - let last_block_hash = blocks.last().map(|block| block.block().hash()); + let last_block_hash_num = blocks + .last() + .map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number }); - if last_block_hash.is_some() { + if last_block_hash_num.is_some() { let provider_rw = self.provider.provider_rw()?; let static_file_provider = self.provider.static_file_provider(); @@ -114,7 +121,7 @@ impl PersistenceService { UnifiedStorageWriter::commit(provider_rw, static_file_provider)?; } self.metrics.save_blocks_duration_seconds.record(start_time.elapsed()); - Ok(last_block_hash) + Ok(last_block_hash_num) } } @@ -138,13 +145,13 @@ pub enum PersistenceAction { /// /// First, header, transaction, and receipt-related data should be written to static files. /// Then the execution history-related data will be written to the database. - SaveBlocks(Vec, oneshot::Sender>), + SaveBlocks(Vec, oneshot::Sender>), /// Removes block data above the given block number from the database. /// /// This will first update checkpoints from the database, then remove actual block data from /// static files. - RemoveBlocksAbove(u64, oneshot::Sender>), + RemoveBlocksAbove(u64, oneshot::Sender>), /// Prune associated block data before the given block number, according to already-configured /// prune modes. @@ -209,7 +216,7 @@ impl PersistenceHandle { pub fn save_blocks( &self, blocks: Vec, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) -> Result<(), SendError> { self.send_action(PersistenceAction::SaveBlocks(blocks, tx)) } @@ -222,7 +229,7 @@ impl PersistenceHandle { pub fn remove_blocks_above( &self, block_num: u64, - tx: oneshot::Sender>, + tx: oneshot::Sender>, ) -> Result<(), SendError> { self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx)) } @@ -296,11 +303,12 @@ mod tests { persistence_handle.save_blocks(blocks, tx).unwrap(); - let actual_hash = tokio::time::timeout(std::time::Duration::from_secs(10), rx) - .await - .expect("test timed out") - .expect("channel closed unexpectedly") - .expect("no hash returned"); + let BlockNumHash { hash: actual_hash, number: _ } = + tokio::time::timeout(std::time::Duration::from_secs(10), rx) + .await + .expect("test timed out") + .expect("channel closed unexpectedly") + .expect("no hash returned"); assert_eq!(block_hash, actual_hash); } @@ -316,8 +324,7 @@ mod tests { let (tx, rx) = oneshot::channel(); persistence_handle.save_blocks(blocks, tx).unwrap(); - - let actual_hash = rx.await.unwrap().unwrap(); + let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap(); assert_eq!(last_hash, actual_hash); } @@ -335,7 +342,7 @@ mod tests { persistence_handle.save_blocks(blocks, tx).unwrap(); - let actual_hash = rx.await.unwrap().unwrap(); + let BlockNumHash { hash: actual_hash, number: _ } = rx.await.unwrap().unwrap(); assert_eq!(last_hash, actual_hash); } } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 0e846b3ca5ba..b19ba19a7474 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -201,6 +201,108 @@ impl TreeState { Some((executed, children)) } + /// Returns whether or not the hash is part of the canonical chain. + pub(crate) fn is_canonical(&self, hash: B256) -> bool { + let mut current_block = self.current_canonical_head.hash; + if current_block == hash { + return true + } + + while let Some(executed) = self.blocks_by_hash.get(¤t_block) { + current_block = executed.block.parent_hash; + if current_block == hash { + return true + } + } + + false + } + + /// Removes canonical blocks below the upper bound, only if the last persisted hash is + /// part of the canonical chain. + pub(crate) fn remove_canonical_until( + &mut self, + upper_bound: BlockNumber, + last_persisted_hash: B256, + ) { + debug!(target: "engine", ?upper_bound, ?last_persisted_hash, "Removing canonical blocks from the tree"); + + // If the last persisted hash is not canonical, then we don't want to remove any canonical + // blocks yet. + if !self.is_canonical(last_persisted_hash) { + return + } + + // First, let's walk back the canonical chain and remove canonical blocks lower than the + // upper bound + let mut current_block = self.current_canonical_head.hash; + while let Some(executed) = self.blocks_by_hash.get(¤t_block) { + current_block = executed.block.parent_hash; + if executed.block.number <= upper_bound { + debug!(target: "engine", num_hash=?executed.block.num_hash(), "Attempting to remove block walking back from the head"); + if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) { + debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed block walking back from the head"); + // finally, move the trie updates + self.persisted_trie_updates + .insert(removed.block.hash(), (removed.block.number, removed.trie)); + } + } + } + } + + /// Removes all blocks that are below the finalized block, as well as removing non-canonical + /// sidechains that fork from below the finalized block. + pub(crate) fn prune_finalized_sidechains(&mut self, finalized_num_hash: BlockNumHash) { + let BlockNumHash { number: finalized_num, hash: finalized_hash } = finalized_num_hash; + + // We remove disconnected sidechains in three steps: + // * first, remove everything with a block number __below__ the finalized block. + // * next, we populate a vec with parents __at__ the finalized block. + // * finally, we iterate through the vec, removing children until the vec is empty + // (BFS). + + // We _exclude_ the finalized block because we will be dealing with the blocks __at__ + // the finalized block later. + let blocks_to_remove = self + .blocks_by_number + .range((Bound::Unbounded, Bound::Excluded(finalized_num))) + .flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash())) + .collect::>(); + for hash in blocks_to_remove { + if let Some((removed, _)) = self.remove_by_hash(hash) { + debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed finalized sidechain block"); + } + } + + // remove trie updates that are below the finalized block + self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num < finalized_num); + + // The only block that should remain at the `finalized` number now, is the finalized + // block, if it exists. + // + // For all other blocks, we first put their children into this vec. + // Then, we will iterate over them, removing them, adding their children, etc etc, + // until the vec is empty. + let mut blocks_to_remove = self.blocks_by_number.remove(&finalized_num).unwrap_or_default(); + + // re-insert the finalized hash if we removed it + if let Some(position) = + blocks_to_remove.iter().position(|b| b.block.hash() == finalized_hash) + { + let finalized_block = blocks_to_remove.swap_remove(position); + self.blocks_by_number.insert(finalized_num, vec![finalized_block]); + } + + let mut blocks_to_remove = + blocks_to_remove.into_iter().map(|e| e.block.hash()).collect::>(); + while let Some(block) = blocks_to_remove.pop_front() { + if let Some((removed, children)) = self.remove_by_hash(block) { + debug!(target: "engine", num_hash=?removed.block.num_hash(), "Removed finalized sidechain child block"); + blocks_to_remove.extend(children); + } + } + } + /// Remove all blocks up to __and including__ the given block number. /// /// If a finalized hash is provided, the only non-canonical blocks which will be removed are @@ -214,17 +316,18 @@ impl TreeState { pub(crate) fn remove_until( &mut self, upper_bound: BlockNumber, - finalized_num: Option, + last_persisted_hash: B256, + finalized_num_hash: Option, ) { - debug!(target: "engine", ?upper_bound, ?finalized_num, "Removing blocks from the tree"); + debug!(target: "engine", ?upper_bound, ?finalized_num_hash, "Removing blocks from the tree"); // If the finalized num is ahead of the upper bound, and exists, we need to instead ensure // that the only blocks removed, are canonical blocks less than the upper bound // finalized_num.take_if(|finalized| *finalized > upper_bound); - let finalized_num = finalized_num.map(|finalized| { - let new_finalized_num = finalized.min(upper_bound); - debug!(target: "engine", ?new_finalized_num, "Adjusted upper bound"); - new_finalized_num + let finalized_num_hash = finalized_num_hash.map(|mut finalized| { + finalized.number = finalized.number.min(upper_bound); + debug!(target: "engine", ?finalized, "Adjusted upper bound"); + finalized }); // We want to do two things: @@ -234,60 +337,12 @@ impl TreeState { // * remove all canonical blocks below the upper bound // * fetch the number of the finalized hash, removing any sidechains that are __below__ the // finalized block - - // First, let's walk back the canonical chain and remove canonical blocks lower than the - // upper bound - let mut current_block = self.current_canonical_head.hash; - while let Some(executed) = self.blocks_by_hash.get(¤t_block) { - current_block = executed.block.parent_hash; - if executed.block.number <= upper_bound { - if let Some((removed, _)) = self.remove_by_hash(executed.block.hash()) { - // finally, move the trie updates - self.persisted_trie_updates - .insert(removed.block.hash(), (removed.block.number, removed.trie)); - } - } - } + self.remove_canonical_until(upper_bound, last_persisted_hash); // Now, we have removed canonical blocks (assuming the upper bound is above the finalized // block) and only have sidechains below the finalized block. - if let Some(finalized) = finalized_num { - // We remove disconnected sidechains in three steps: - // * first, remove everything with a block number __below__ the finalized block. - // * next, we populate a vec with parents __at__ the finalized block. - // * finally, we iterate through the vec, removing children until the vec is empty - // (BFS). - - // We _exclude_ the finalized block because we will be dealing with the blocks __at__ - // the finalized block later. - let blocks_to_remove = self - .blocks_by_number - .range((Bound::Unbounded, Bound::Excluded(finalized))) - .flat_map(|(_, blocks)| blocks.iter().map(|b| b.block.hash())) - .collect::>(); - for hash in blocks_to_remove { - self.remove_by_hash(hash); - } - - // remove trie updates that are below the finalized block - self.persisted_trie_updates.retain(|_, (block_num, _)| *block_num <= finalized); - - // The only blocks that exist at `finalized_num` now, are blocks in sidechains that - // should be removed. - // - // We first put their children into this vec. - // Then, we will iterate over them, removing them, adding their children, etc etc, - // until the vec is empty. - let mut blocks_to_remove = self - .blocks_by_number - .remove(&finalized) - .map(|blocks| blocks.into_iter().map(|e| e.block.hash()).collect::>()) - .unwrap_or_default(); - while let Some(block) = blocks_to_remove.pop_front() { - if let Some((_, children)) = self.remove_by_hash(block) { - blocks_to_remove.extend(children); - } - } + if let Some(finalized_num_hash) = finalized_num_hash { + self.prune_finalized_sidechains(finalized_num_hash); } } @@ -531,7 +586,7 @@ where last_persisted_block_hash: header.hash(), last_persisted_block_number: best_block_number, rx: None, - remove_above_state: None, + remove_above_state: VecDeque::new(), }; let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel(); @@ -1053,11 +1108,14 @@ where /// or send a new persistence action if necessary. fn advance_persistence(&mut self) -> Result<(), TryRecvError> { if !self.persistence_state.in_progress() { - if let Some(new_tip_num) = self.persistence_state.remove_above_state.take() { - debug!(target: "engine", ?new_tip_num, "Removing blocks using persistence task"); - let (tx, rx) = oneshot::channel(); - let _ = self.persistence.remove_blocks_above(new_tip_num, tx); - self.persistence_state.start(rx); + if let Some(new_tip_num) = self.persistence_state.remove_above_state.pop_front() { + debug!(target: "engine", ?new_tip_num, remove_state=?self.persistence_state.remove_above_state, last_persisted_block_number=?self.persistence_state.last_persisted_block_number, "Removing blocks using persistence task"); + if new_tip_num < self.persistence_state.last_persisted_block_number { + debug!(target: "engine", ?new_tip_num, "Starting remove blocks job"); + let (tx, rx) = oneshot::channel(); + let _ = self.persistence.remove_blocks_above(new_tip_num, tx); + self.persistence_state.start(rx); + } } else if self.should_persist() { let blocks_to_persist = self.get_canonical_blocks_to_persist(); if blocks_to_persist.is_empty() { @@ -1078,22 +1136,23 @@ where .expect("if a persistence task is in progress Receiver must be Some"); // Check if persistence has complete match rx.try_recv() { - Ok(last_persisted_block_hash) => { + Ok(last_persisted_hash_num) => { self.metrics.engine.persistence_duration.record(start_time.elapsed()); - let Some(last_persisted_block_hash) = last_persisted_block_hash else { + let Some(BlockNumHash { + hash: last_persisted_block_hash, + number: last_persisted_block_number, + }) = last_persisted_hash_num + else { // if this happened, then we persisted no blocks because we sent an // empty vec of blocks warn!(target: "engine", "Persistence task completed but did not persist any blocks"); return Ok(()) }; - if let Some(block) = - self.state.tree_state.block_by_hash(last_persisted_block_hash) - { - self.persistence_state.finish(last_persisted_block_hash, block.number); - self.on_new_persisted_block(); - } else { - error!("could not find persisted block with hash {last_persisted_block_hash} in memory"); - } + + trace!(target: "engine", ?last_persisted_block_hash, ?last_persisted_block_number, "Finished persisting, calling finish"); + self.persistence_state + .finish(last_persisted_block_hash, last_persisted_block_number); + self.on_new_persisted_block(); } Err(TryRecvError::Closed) => return Err(TryRecvError::Closed), Err(TryRecvError::Empty) => self.persistence_state.rx = Some((rx, start_time)), @@ -1211,7 +1270,16 @@ where // // We set the `finalized_num` to `Some(backfill_height)` to ensure we remove all state // before that - self.state.tree_state.remove_until(backfill_height, Some(backfill_height)); + let backfill_num_hash = self + .provider + .block_hash(backfill_height)? + .map(|hash| BlockNumHash { hash, number: backfill_height }); + + self.state.tree_state.remove_until( + backfill_height, + self.persistence_state.last_persisted_block_hash, + backfill_num_hash, + ); self.metrics.engine.executed_blocks.set(self.state.tree_state.block_count() as f64); // remove all buffered blocks below the backfill height @@ -1389,10 +1457,13 @@ where /// Assumes that `finish` has been called on the `persistence_state` at least once fn on_new_persisted_block(&mut self) { let finalized = self.state.forkchoice_state_tracker.last_valid_finalized(); + debug!(target: "engine", last_persisted_hash=?self.persistence_state.last_persisted_block_hash, last_persisted_number=?self.persistence_state.last_persisted_block_number, ?finalized, "New persisted block, clearing in memory blocks"); self.remove_before(self.persistence_state.last_persisted_block_number, finalized) .expect("todo: error handling"); - self.canonical_in_memory_state - .remove_persisted_blocks(self.persistence_state.last_persisted_block_number); + self.canonical_in_memory_state.remove_persisted_blocks(BlockNumHash { + number: self.persistence_state.last_persisted_block_number, + hash: self.persistence_state.last_persisted_block_hash, + }); } /// Return an [`ExecutedBlock`] from database or in-memory state by hash. @@ -1402,8 +1473,8 @@ where /// has in memory. /// /// For finalized blocks, this will return `None`. - #[allow(unused)] fn executed_block_by_hash(&self, hash: B256) -> ProviderResult> { + trace!(target: "engine", ?hash, "Fetching executed block by hash"); // check memory first let block = self.state.tree_state.executed_block_by_hash(hash).cloned(); @@ -1858,6 +1929,16 @@ where let tip = chain_update.tip().header.clone(); let notification = chain_update.to_chain_notification(); + // reinsert any missing reorged blocks + if let NewCanonicalChain::Reorg { new, old } = &chain_update { + let new_first = new.first().map(|first| first.block.num_hash()); + let old_first = old.first().map(|first| first.block.num_hash()); + trace!(target: "engine", ?new_first, ?old_first, "Reorg detected, new and old first blocks"); + + self.reinsert_reorged_blocks(new.clone()); + self.reinsert_reorged_blocks(old.clone()); + } + // update the tracked in-memory state with the new chain self.canonical_in_memory_state.update_chain(chain_update); self.canonical_in_memory_state.set_canonical_head(tip.clone()); @@ -1872,6 +1953,16 @@ where )); } + /// This reinserts any blocks in the new chain that do not already exist in the tree + fn reinsert_reorged_blocks(&mut self, new_chain: Vec) { + for block in new_chain { + if self.state.tree_state.executed_block_by_hash(block.block.hash()).is_none() { + trace!(target: "engine", num=?block.block.number, hash=?block.block.hash(), "Reinserting block into tree state"); + self.state.tree_state.insert_executed(block); + } + } + } + /// This handles downloaded blocks that are shown to be disconnected from the canonical chain. /// /// This mainly compares the missing parent of the downloaded block with the current canonical @@ -2338,10 +2429,17 @@ where ) -> ProviderResult<()> { // first fetch the finalized block number and then call the remove_before method on // tree_state - let num = - if let Some(hash) = finalized_hash { self.provider.block_number(hash)? } else { None }; + let num = if let Some(hash) = finalized_hash { + self.provider.block_number(hash)?.map(|number| BlockNumHash { number, hash }) + } else { + None + }; - self.state.tree_state.remove_until(upper_bound, num); + self.state.tree_state.remove_until( + upper_bound, + self.persistence_state.last_persisted_block_hash, + num, + ); Ok(()) } } @@ -2355,14 +2453,14 @@ pub struct PersistenceState { last_persisted_block_hash: B256, /// Receiver end of channel where the result of the persistence task will be /// sent when done. A None value means there's no persistence task in progress. - rx: Option<(oneshot::Receiver>, Instant)>, + rx: Option<(oneshot::Receiver>, Instant)>, /// The last persisted block number. /// /// This tracks the chain height that is persisted on disk last_persisted_block_number: u64, /// The block above which blocks should be removed from disk, because there has been an on disk /// reorg. - remove_above_state: Option, + remove_above_state: VecDeque, } impl PersistenceState { @@ -2373,14 +2471,15 @@ impl PersistenceState { } /// Sets state for a started persistence task. - fn start(&mut self, rx: oneshot::Receiver>) { + fn start(&mut self, rx: oneshot::Receiver>) { self.rx = Some((rx, Instant::now())); } - /// Sets the `remove_above_state`, to the new tip number specified. + /// Sets the `remove_above_state`, to the new tip number specified, only if it is less than the + /// current `last_persisted_block_number`. fn schedule_removal(&mut self, new_tip_num: u64) { - // TODO: what about multiple on-disk reorgs in a row? - self.remove_above_state = Some(new_tip_num); + debug!(target: "engine", ?new_tip_num, prev_remove_state=?self.remove_above_state, last_persisted_block_number=?self.last_persisted_block_number, "Scheduling removal"); + self.remove_above_state.push_back(new_tip_num); } /// Sets state for a finished persistence task. @@ -2989,7 +3088,8 @@ mod tests { #[tokio::test] async fn test_tree_state_remove_before() { - let mut tree_state = TreeState::new(BlockNumHash::default()); + let start_num_hash = BlockNumHash::default(); + let mut tree_state = TreeState::new(start_num_hash); let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect(); for block in &blocks { @@ -3002,7 +3102,7 @@ mod tests { tree_state.set_canonical_head(last.block.num_hash()); // inclusive bound, so we should remove anything up to and including 2 - tree_state.remove_until(2, Some(2)); + tree_state.remove_until(2, start_num_hash.hash, Some(blocks[1].block.num_hash())); assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash())); assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash())); @@ -3034,7 +3134,8 @@ mod tests { #[tokio::test] async fn test_tree_state_remove_before_finalized() { - let mut tree_state = TreeState::new(BlockNumHash::default()); + let start_num_hash = BlockNumHash::default(); + let mut tree_state = TreeState::new(start_num_hash); let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect(); for block in &blocks { @@ -3047,7 +3148,7 @@ mod tests { tree_state.set_canonical_head(last.block.num_hash()); // we should still remove everything up to and including 2 - tree_state.remove_until(2, None); + tree_state.remove_until(2, start_num_hash.hash, None); assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash())); assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash())); @@ -3079,7 +3180,8 @@ mod tests { #[tokio::test] async fn test_tree_state_remove_before_lower_finalized() { - let mut tree_state = TreeState::new(BlockNumHash::default()); + let start_num_hash = BlockNumHash::default(); + let mut tree_state = TreeState::new(start_num_hash); let blocks: Vec<_> = TestBlockBuilder::default().get_executed_blocks(1..6).collect(); for block in &blocks { @@ -3092,7 +3194,7 @@ mod tests { tree_state.set_canonical_head(last.block.num_hash()); // we have no forks so we should still remove anything up to and including 2 - tree_state.remove_until(2, Some(1)); + tree_state.remove_until(2, start_num_hash.hash, Some(blocks[0].block.num_hash())); assert!(!tree_state.blocks_by_hash.contains_key(&blocks[0].block.hash())); assert!(!tree_state.blocks_by_hash.contains_key(&blocks[1].block.hash())); diff --git a/crates/storage/provider/src/writer/mod.rs b/crates/storage/provider/src/writer/mod.rs index db24674197b4..7e02b1f7ba48 100644 --- a/crates/storage/provider/src/writer/mod.rs +++ b/crates/storage/provider/src/writer/mod.rs @@ -260,16 +260,23 @@ where // Get the total txs for the block range, so we have the correct number of columns for // receipts and transactions + // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block let tx_range = self .database() - .transaction_range_by_block_range(block_number..=highest_static_file_block)?; + .transaction_range_by_block_range(block_number + 1..=highest_static_file_block)?; let total_txs = tx_range.end().saturating_sub(*tx_range.start()); + // IMPORTANT: we use `block_number+1` to make sure we remove only what is ABOVE the block debug!(target: "provider::storage_writer", ?block_number, "Removing blocks from database above block_number"); self.database().remove_block_and_execution_range( - block_number..=self.database().last_block_number()?, + block_number + 1..=self.database().last_block_number()?, )?; + // IMPORTANT: we use `highest_static_file_block.saturating_sub(block_number)` to make sure + // we remove only what is ABOVE the block. + // + // i.e., if the highest static file block is 8, we want to remove above block 5 only, we + // will have three blocks to remove, which will be block 8, 7, and 6. debug!(target: "provider::storage_writer", ?block_number, "Removing static file blocks above block_number"); self.static_file() .get_writer(block_number, StaticFileSegment::Headers)?