diff --git a/Cargo.lock b/Cargo.lock index 124b086497b7..9954a4c655f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7326,8 +7326,10 @@ dependencies = [ "dashmap 6.1.0", "eyre", "futures", + "itertools 0.13.0", "metrics", "parking_lot 0.12.3", + "rand 0.8.5", "reth-blockchain-tree", "reth-chain-state", "reth-chainspec", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index f10775e24587..0e76ef3d40df 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -44,6 +44,7 @@ tokio.workspace = true ## misc dashmap.workspace = true eyre.workspace = true +itertools.workspace = true metrics.workspace = true parking_lot.workspace = true serde_json.workspace = true @@ -62,6 +63,7 @@ reth-testing-utils.workspace = true alloy-genesis.workspace = true alloy-consensus.workspace = true +rand.workspace = true secp256k1.workspace = true tempfile.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index ada9e7a4b315..8c52bc6590ab 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -2,16 +2,19 @@ use crate::{ wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle, }; use futures::StreamExt; +use itertools::Itertools; use metrics::Gauge; use reth_chain_state::ForkChoiceStream; use reth_chainspec::Head; use reth_metrics::{metrics::Counter, Metrics}; use reth_primitives::{BlockNumHash, SealedHeader}; +use reth_provider::HeaderProvider; use reth_tracing::tracing::debug; use std::{ collections::VecDeque, fmt::Debug, future::{poll_fn, Future}, + ops::Not, pin::Pin, sync::{ atomic::{AtomicUsize, Ordering}, @@ -183,7 +186,10 @@ pub struct ExExManagerMetrics { /// - Error handling /// - Monitoring #[derive(Debug)] -pub struct ExExManager { +pub struct ExExManager

{ + /// Provider for querying headers. + provider: P, + /// Handles to communicate with the `ExEx`'s. exex_handles: Vec, @@ -223,7 +229,7 @@ pub struct ExExManager { metrics: ExExManagerMetrics, } -impl ExExManager { +impl

ExExManager

{ /// Create a new [`ExExManager`]. /// /// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the @@ -232,6 +238,7 @@ impl ExExManager { /// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send /// notifications over [`ExExManagerHandle`]s until there is capacity again. pub fn new( + provider: P, handles: Vec, max_capacity: usize, wal: Wal, @@ -254,6 +261,8 @@ impl ExExManager { metrics.num_exexs.set(num_exexs as f64); Self { + provider, + exex_handles: handles, handle_rx, @@ -309,83 +318,152 @@ impl ExExManager { } } -impl Future for ExExManager { +impl

ExExManager

+where + P: HeaderProvider, +{ + /// Finalizes the WAL according to the passed finalized header. + /// + /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if + /// necessary. + fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> { + debug!(header = ?finalized_header.num_hash(), "Received finalized header"); + + // Check if all ExExes are on the canonical chain + let exex_finished_heights = self + .exex_handles + .iter() + // Get ExEx ID and hash of the finished height for each ExEx + .map(|exex_handle| { + (&exex_handle.id, exex_handle.finished_height.map(|block| block.hash)) + }) + // Deduplicate all hashes + .unique_by(|(_, hash)| *hash) + // Check if hashes are canonical + .map(|(exex_id, hash)| { + hash.map_or(Ok((exex_id, hash, false)), |hash| { + self.provider + .is_known(&hash) + // Save the ExEx ID, hash of the finished height, and whether the hash + // is canonical + .map(|is_canonical| (exex_id, Some(hash), is_canonical)) + }) + }) + // We collect here to be able to log the unfinalized ExExes below + .collect::, _>>()?; + if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) { + // If there is a finalized header and all ExExs are on the canonical chain, finalize + // the WAL with the new finalized header + self.wal.finalize(finalized_header.num_hash())?; + } else { + let unfinalized_exexes = exex_finished_heights + .into_iter() + .filter_map(|(exex_id, hash, is_canonical)| { + is_canonical.not().then_some((exex_id, hash)) + }) + .format_with(", ", |(exex_id, hash), f| f(&format_args!("{exex_id:?} = {hash:?}"))); + debug!( + %unfinalized_exexes, + "Not all ExExes are on the canonical chain, can't finalize the WAL" + ); + } + + Ok(()) + } +} + +impl

Future for ExExManager

+where + P: HeaderProvider + Unpin + 'static, +{ type Output = eyre::Result<()>; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Drain the finalized header stream and grab the last finalized header + /// Main loop of the [`ExExManager`]. The order of operations is as follows: + /// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on + /// the latest state of [`ExExEvent::FinishedHeight`] events. + /// 2. Finalize the WAL with the finalized header, if necessary. + /// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update + /// the internal buffer capacity. + /// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new + /// notifications. + /// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and + /// update the internal buffer capacity. + /// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes. + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + // Handle incoming ExEx events + for exex in &mut this.exex_handles { + while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) { + debug!(exex_id = %exex.id, ?event, "Received event from ExEx"); + exex.metrics.events_sent_total.increment(1); + match event { + ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height), + } + } + } + + // Drain the finalized header stream and finalize the WAL with the last header let mut last_finalized_header = None; - while let Poll::Ready(finalized_header) = self.finalized_header_stream.poll_next_unpin(cx) { + while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) { last_finalized_header = finalized_header; } - // If there is a finalized header, finalize the WAL with it if let Some(header) = last_finalized_header { - self.wal.finalize((header.number, header.hash()).into())?; + this.finalize_wal(header)?; } - // drain handle notifications - while self.buffer.len() < self.max_capacity { - if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) { + // Drain handle notifications + while this.buffer.len() < this.max_capacity { + if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) { debug!( committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number), reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number), "Received new notification" ); - self.push_notification(notification); + this.push_notification(notification); continue } break } - // update capacity - self.update_capacity(); + // Update capacity + this.update_capacity(); - // advance all poll senders + // Advance all poll senders let mut min_id = usize::MAX; - for idx in (0..self.exex_handles.len()).rev() { - let mut exex = self.exex_handles.swap_remove(idx); + for idx in (0..this.exex_handles.len()).rev() { + let mut exex = this.exex_handles.swap_remove(idx); - // it is a logic error for this to ever underflow since the manager manages the + // It is a logic error for this to ever underflow since the manager manages the // notification IDs let notification_index = exex .next_notification_id - .checked_sub(self.min_id) + .checked_sub(this.min_id) .expect("exex expected notification ID outside the manager's range"); - if let Some(notification) = self.buffer.get(notification_index) { + if let Some(notification) = this.buffer.get(notification_index) { if let Poll::Ready(Err(err)) = exex.send(cx, notification) { - // the channel was closed, which is irrecoverable for the manager + // The channel was closed, which is irrecoverable for the manager return Poll::Ready(Err(err.into())) } } min_id = min_id.min(exex.next_notification_id); - self.exex_handles.push(exex); + this.exex_handles.push(exex); } - // remove processed buffered notifications + // Remove processed buffered notifications debug!(%min_id, "Updating lowest notification id in buffer"); - self.buffer.retain(|&(id, _)| id >= min_id); - self.min_id = min_id; + this.buffer.retain(|&(id, _)| id >= min_id); + this.min_id = min_id; - // update capacity - self.update_capacity(); - - // handle incoming exex events - for exex in &mut self.exex_handles { - while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) { - debug!(exex_id = %exex.id, ?event, "Received event from exex"); - exex.metrics.events_sent_total.increment(1); - match event { - ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height), - } - } - } + // Update capacity + this.update_capacity(); - // update watch channel block number - let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| { + // Update watch channel block number + let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| { exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr))) }); if let Ok(finished_height) = finished_height { - let _ = self.finished_height.send(FinishedExExHeight::Height(finished_height)); + let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height)); } Poll::Pending @@ -517,8 +595,9 @@ mod tests { use alloy_primitives::B256; use eyre::OptionExt; use futures::StreamExt; + use rand::Rng; use reth_primitives::SealedBlockWithSenders; - use reth_provider::Chain; + use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain}; use reth_testing_utils::generators::{self, random_block}; fn empty_finalized_header_stream() -> ForkChoiceStream { @@ -551,11 +630,11 @@ mod tests { let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); - assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream()) + assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream()) .handle .has_exexs()); - assert!(ExExManager::new(vec![exex_handle_1], 0, wal, empty_finalized_header_stream()) + assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream()) .handle .has_exexs()); } @@ -568,13 +647,19 @@ mod tests { let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); - assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream()) + assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream()) .handle .has_capacity()); - assert!(ExExManager::new(vec![exex_handle_1], 10, wal, empty_finalized_header_stream()) - .handle - .has_capacity()); + assert!(ExExManager::new( + (), + vec![exex_handle_1], + 10, + wal, + empty_finalized_header_stream() + ) + .handle + .has_capacity()); } #[test] @@ -587,7 +672,7 @@ mod tests { // Create a mock ExExManager and add the exex_handle to it let mut exex_manager = - ExExManager::new(vec![exex_handle], 10, wal, empty_finalized_header_stream()); + ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream()); // Define the notification for testing let mut block1 = SealedBlockWithSenders::default(); @@ -637,8 +722,13 @@ mod tests { // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; - let mut exex_manager = - ExExManager::new(vec![exex_handle], max_capacity, wal, empty_finalized_header_stream()); + let mut exex_manager = ExExManager::new( + (), + vec![exex_handle], + max_capacity, + wal, + empty_finalized_header_stream(), + ); // Push some notifications to fill part of the buffer let mut block1 = SealedBlockWithSenders::default(); @@ -671,6 +761,8 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); + let (exex_handle, event_tx, mut _notification_rx) = ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); @@ -683,6 +775,7 @@ mod tests { // Create a mock ExExManager and add the exex_handle to it let exex_manager = ExExManager::new( + provider_factory, vec![exex_handle], 10, Wal::new(temp_dir.path()).unwrap(), @@ -717,6 +810,8 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); + // Create two `ExExHandle` instances let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle()); @@ -731,6 +826,7 @@ mod tests { event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap(); let exex_manager = ExExManager::new( + provider_factory, vec![exex_handle1, exex_handle2], 10, Wal::new(temp_dir.path()).unwrap(), @@ -761,6 +857,8 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); + // Create two `ExExHandle` instances let (exex_handle1, event_tx1, _) = ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle()); @@ -778,6 +876,7 @@ mod tests { event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap(); let exex_manager = ExExManager::new( + provider_factory, vec![exex_handle1, exex_handle2], 10, Wal::new(temp_dir.path()).unwrap(), @@ -812,12 +911,15 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); // Create an ExExManager with a small max capacity let max_capacity = 2; let mut exex_manager = ExExManager::new( + provider_factory, vec![exex_handle_1], max_capacity, Wal::new(temp_dir.path()).unwrap(), @@ -1014,37 +1116,75 @@ mod tests { async fn test_exex_wal_finalize() -> eyre::Result<()> { reth_tracing::init_test_tracing(); + let mut rng = generators::rng(); + let temp_dir = tempfile::tempdir().unwrap(); let mut wal = Wal::new(temp_dir.path()).unwrap(); - let block = random_block(&mut generators::rng(), 0, Default::default()) + let provider_factory = create_test_provider_factory(); + + let block = random_block(&mut rng, 0, Default::default()) .seal_with_senders() .ok_or_eyre("failed to recover senders")?; + let provider_rw = provider_factory.provider_rw()?; + provider_rw.insert_block(block.clone())?; + provider_rw.commit()?; + let notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)), }; wal.commit(¬ification)?; - let (tx, rx) = watch::channel(None); + let (finalized_headers_tx, rx) = watch::channel(None); let finalized_header_stream = ForkChoiceStream::new(rx); - let (exex_handle, _, _) = + let (exex_handle, events_tx, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); - let mut exex_manager = - std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream)); + let mut exex_manager = std::pin::pin!(ExExManager::new( + provider_factory, + vec![exex_handle], + 1, + wal, + finalized_header_stream + )); let mut cx = Context::from_waker(futures::task::noop_waker_ref()); assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + assert_eq!( + exex_manager.wal.iter_notifications()?.collect::>>()?, + [notification.clone()] + ); + + finalized_headers_tx.send(Some(block.header.clone()))?; + assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event + assert_eq!( + exex_manager.wal.iter_notifications()?.collect::>>()?, + [notification.clone()] + ); + + // Send a `FinishedHeight` event with a non-canonical block + events_tx + .send(ExExEvent::FinishedHeight((rng.gen::(), rng.gen::()).into())) + .unwrap(); + + finalized_headers_tx.send(Some(block.header.clone()))?; + assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a + // non-canonical block assert_eq!( exex_manager.wal.iter_notifications()?.collect::>>()?, [notification] ); - tx.send(Some(block.header.clone()))?; + // Send a `FinishedHeight` event with a canonical block + events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap(); + finalized_headers_tx.send(Some(block.header.clone()))?; assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + // WAL is finalized assert!(exex_manager.wal.iter_notifications()?.next().is_none()); Ok(()) diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 369a0586c0c5..9e9ee78e6cd7 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -179,7 +179,7 @@ where /// If the head block is not found in the database, it means we're not on the canonical chain /// and we need to revert the notification with the ExEx head block. fn check_canonical(&mut self) -> eyre::Result> { - if self.provider.header(&self.exex_head.block.hash)?.is_some() { + if self.provider.is_known(&self.exex_head.block.hash)? { debug!(target: "exex::notifications", "ExEx head is on the canonical chain"); return Ok(None) } diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index d7aea3aafdfa..72e60fe1a3db 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -117,12 +117,6 @@ impl WalInner { Ok(()) } - /// Finalizes the WAL to the given block, inclusive. - /// - /// 1. Finds a notification with first unfinalized block (first notification containing a - /// committed block higher than `to_block`). - /// 2. Removes the notifications from the beginning of WAL until the found notification. If this - /// notification includes both finalized and non-finalized blocks, it will not be removed. #[instrument(target = "exex::wal", skip(self))] fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { // First, walk cache to find the file ID of the notification with the finalized block and diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index 816335d3dbdf..233605ef867d 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -109,6 +109,7 @@ impl ExExLauncher { debug!(target: "reth::cli", "spawning exex manager"); // todo(onbjerg): rm magic number let exex_manager = ExExManager::new( + components.provider().clone(), exex_handles, DEFAULT_EXEX_MANAGER_CAPACITY, exex_wal,