From 98b214f63dcbe11a357ec85532306c3756552eba Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Wed, 4 Sep 2024 18:36:14 -0400 Subject: [PATCH] feat(tree): schedule block removal on disk reorgs (#10603) --- crates/engine/tree/src/persistence.rs | 20 +++++--- crates/engine/tree/src/tree/mod.rs | 72 ++++++++++++++++++++++++--- 2 files changed, 78 insertions(+), 14 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 8ccfde04b169..82c7883897a4 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -5,7 +5,9 @@ use reth_chain_state::ExecutedBlock; use reth_db::Database; use reth_errors::ProviderError; use reth_primitives::B256; -use reth_provider::{writer::UnifiedStorageWriter, ProviderFactory, StaticFileProviderFactory}; +use reth_provider::{ + writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory, StaticFileProviderFactory, +}; use reth_prune::{Pruner, PrunerError, PrunerOutput}; use std::{ sync::mpsc::{Receiver, SendError, Sender}, @@ -67,9 +69,9 @@ where while let Ok(action) = self.incoming.recv() { match action { PersistenceAction::RemoveBlocksAbove(new_tip_num, sender) => { - self.on_remove_blocks_above(new_tip_num)?; + let result = self.on_remove_blocks_above(new_tip_num)?; // we ignore the error because the caller may or may not care about the result - let _ = sender.send(()); + let _ = sender.send(result); } PersistenceAction::SaveBlocks(blocks, sender) => { let result = self.on_save_blocks(blocks)?; @@ -87,17 +89,18 @@ where Ok(()) } - fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result<(), PersistenceError> { + fn on_remove_blocks_above(&self, new_tip_num: u64) -> Result, PersistenceError> { debug!(target: "tree::persistence", ?new_tip_num, "Removing blocks"); let start_time = Instant::now(); let provider_rw = self.provider.provider_rw()?; let sf_provider = self.provider.static_file_provider(); + let new_tip_hash = provider_rw.block_hash(new_tip_num)?; UnifiedStorageWriter::from(&provider_rw, &sf_provider).remove_blocks_above(new_tip_num)?; UnifiedStorageWriter::commit_unwind(provider_rw, sf_provider)?; self.metrics.remove_blocks_above_duration_seconds.record(start_time.elapsed()); - Ok(()) + Ok(new_tip_hash) } fn on_save_blocks(&self, blocks: Vec) -> Result, PersistenceError> { @@ -143,7 +146,7 @@ pub enum PersistenceAction { /// /// This will first update checkpoints from the database, then remove actual block data from /// static files. - RemoveBlocksAbove(u64, oneshot::Sender<()>), + RemoveBlocksAbove(u64, oneshot::Sender>), /// Prune associated block data before the given block number, according to already-configured /// prune modes. @@ -216,11 +219,12 @@ impl PersistenceHandle { /// Tells the persistence service to remove blocks above a certain block number. The removed /// blocks are returned by the service. /// - /// When the operation completes, `()` is returned in the receiver end of the sender argument. + /// When the operation completes, the new tip hash is returned in the receiver end of the sender + /// argument. pub fn remove_blocks_above( &self, block_num: u64, - tx: oneshot::Sender<()>, + tx: oneshot::Sender>, ) -> Result<(), SendError> { self.send_action(PersistenceAction::RemoveBlocksAbove(block_num, tx)) } diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index ed13fb45403f..e9caa328e75c 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -41,6 +41,7 @@ use reth_rpc_types::{ use reth_stages_api::ControlFlow; use reth_trie::{updates::TrieUpdates, HashedPostState}; use std::{ + cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque}, fmt::Debug, ops::Bound, @@ -530,6 +531,7 @@ where last_persisted_block_hash: header.hash(), last_persisted_block_number: best_block_number, rx: None, + remove_above_state: None, }; let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel(); @@ -1050,14 +1052,21 @@ where /// If we're currently awaiting a response this will try to receive the response (non-blocking) /// or send a new persistence action if necessary. fn advance_persistence(&mut self) -> Result<(), TryRecvError> { - if self.should_persist() && !self.persistence_state.in_progress() { - let blocks_to_persist = self.get_canonical_blocks_to_persist(); - if blocks_to_persist.is_empty() { - debug!(target: "engine", "Returned empty set of blocks to persist"); - } else { + if !self.persistence_state.in_progress() { + if let Some(new_tip_num) = self.persistence_state.remove_above_state.take() { + debug!(target: "engine", ?new_tip_num, "Removing blocks using persistence task"); let (tx, rx) = oneshot::channel(); - let _ = self.persistence.save_blocks(blocks_to_persist, tx); + let _ = self.persistence.remove_blocks_above(new_tip_num, tx); self.persistence_state.start(rx); + } else if self.should_persist() { + let blocks_to_persist = self.get_canonical_blocks_to_persist(); + if blocks_to_persist.is_empty() { + debug!(target: "engine", "Returned empty set of blocks to persist"); + } else { + let (tx, rx) = oneshot::channel(); + let _ = self.persistence.save_blocks(blocks_to_persist, tx); + self.persistence_state.start(rx); + } } } @@ -1794,6 +1803,41 @@ where None } + /// This determines whether or not we should remove blocks from the chain, based on a canonical + /// chain update. + /// + /// If the chain update is a reorg: + /// * is the new chain behind the last persisted block, or + /// * if the root of the new chain is at the same height as the last persisted block, is it a + /// different block + /// + /// If either of these are true, then this returns the height of the first block. Otherwise, + /// this returns [`None`]. This should be used to check whether or not we should be sending a + /// remove command to the persistence task. + fn find_disk_reorg(&self, chain_update: &NewCanonicalChain) -> Option { + let NewCanonicalChain::Reorg { new, old: _ } = chain_update else { return None }; + + let BlockNumHash { number: new_num, hash: new_hash } = + new.first().map(|block| block.block.num_hash())?; + + match new_num.cmp(&self.persistence_state.last_persisted_block_number) { + Ordering::Greater => { + // new number is above the last persisted block so the reorg can be performed + // entirely in memory + None + } + Ordering::Equal => { + // new number is the same, if the hash is the same then we should not need to remove + // any blocks + (self.persistence_state.last_persisted_block_hash != new_hash).then_some(new_num) + } + Ordering::Less => { + // this means we are below the last persisted block and must remove on disk blocks + Some(new_num) + } + } + } + /// Invoked when we the canonical chain has been updated. /// /// This is invoked on a valid forkchoice update, or if we can make the target block canonical. @@ -1801,6 +1845,13 @@ where trace!(target: "engine", new_blocks = %chain_update.new_block_count(), reorged_blocks = %chain_update.reorged_block_count(), "applying new chain update"); let start = Instant::now(); + // schedule a remove_above call if we have an on-disk reorg + if let Some(height) = self.find_disk_reorg(&chain_update) { + // calculate the new tip by subtracting one from the lowest part of the chain + let new_tip_num = height.saturating_sub(1); + self.persistence_state.schedule_removal(new_tip_num); + } + // update the tracked canonical head self.state.tree_state.set_canonical_head(chain_update.tip().num_hash()); @@ -2308,6 +2359,9 @@ pub struct PersistenceState { /// /// This tracks the chain height that is persisted on disk last_persisted_block_number: u64, + /// The block above which blocks should be removed from disk, because there has been an on disk + /// reorg. + remove_above_state: Option, } impl PersistenceState { @@ -2322,6 +2376,12 @@ impl PersistenceState { self.rx = Some((rx, Instant::now())); } + /// Sets the `remove_above_state`, to the new tip number specified. + fn schedule_removal(&mut self, new_tip_num: u64) { + // TODO: what about multiple on-disk reorgs in a row? + self.remove_above_state = Some(new_tip_num); + } + /// Sets state for a finished persistence task. fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) { trace!(target: "engine", block= %last_persisted_block_number, hash=%last_persisted_block_hash, "updating persistence state");