diff --git a/Cargo.lock b/Cargo.lock index 46d878936129..173eb2946b40 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7400,6 +7400,7 @@ dependencies = [ "eyre", "futures", "metrics", + "parking_lot 0.12.3", "reth-blockchain-tree", "reth-chain-state", "reth-chainspec", diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 5baf5b97f3ea..2b5b89fbd16d 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -44,6 +44,7 @@ tokio.workspace = true dashmap.workspace = true eyre.workspace = true metrics.workspace = true +parking_lot.workspace = true serde_json.workspace = true tracing.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 3230e003b28d..9b07aef0aad6 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -1,4 +1,6 @@ -use crate::{wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight}; +use crate::{ + wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle, +}; use alloy_primitives::BlockNumber; use futures::StreamExt; use metrics::Gauge; @@ -67,10 +69,12 @@ impl ExExHandle { node_head: Head, provider: P, executor: E, + wal_handle: WalHandle, ) -> (Self, UnboundedSender, ExExNotifications) { let (notification_tx, notification_rx) = mpsc::channel(1); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let notifications = ExExNotifications::new(node_head, provider, executor, notification_rx); + let notifications = + ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle); ( Self { @@ -521,8 +525,11 @@ mod tests { #[tokio::test] async fn test_delivers_events() { + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (mut exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); // Send an event and check that it's delivered correctly event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -533,65 +540,48 @@ mod tests { #[tokio::test] async fn test_has_exexs() { let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (exex_handle_1, _, _) = - ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); - assert!(!ExExManager::new( - vec![], - 0, - Wal::new(temp_dir.path()).unwrap(), - empty_finalized_header_stream() - ) - .handle - .has_exexs()); + assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream()) + .handle + .has_exexs()); - assert!(ExExManager::new( - vec![exex_handle_1], - 0, - Wal::new(temp_dir.path()).unwrap(), - empty_finalized_header_stream() - ) - .handle - .has_exexs()); + assert!(ExExManager::new(vec![exex_handle_1], 0, wal, empty_finalized_header_stream()) + .handle + .has_exexs()); } #[tokio::test] async fn test_has_capacity() { let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (exex_handle_1, _, _) = - ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); - assert!(!ExExManager::new( - vec![], - 0, - Wal::new(temp_dir.path()).unwrap(), - empty_finalized_header_stream() - ) - .handle - .has_capacity()); + assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream()) + .handle + .has_capacity()); - assert!(ExExManager::new( - vec![exex_handle_1], - 10, - Wal::new(temp_dir.path()).unwrap(), - empty_finalized_header_stream() - ) - .handle - .has_capacity()); + assert!(ExExManager::new(vec![exex_handle_1], 10, wal, empty_finalized_header_stream()) + .handle + .has_capacity()); } #[test] fn test_push_notification() { let temp_dir = tempfile::tempdir().unwrap(); - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + let wal = Wal::new(temp_dir.path()).unwrap(); + + let (exex_handle, _, _) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); // Create a mock ExExManager and add the exex_handle to it - let mut exex_manager = ExExManager::new( - vec![exex_handle], - 10, - Wal::new(temp_dir.path()).unwrap(), - empty_finalized_header_stream(), - ); + let mut exex_manager = + ExExManager::new(vec![exex_handle], 10, wal, empty_finalized_header_stream()); // Define the notification for testing let mut block1 = SealedBlockWithSenders::default(); @@ -634,16 +624,15 @@ mod tests { #[test] fn test_update_capacity() { let temp_dir = tempfile::tempdir().unwrap(); - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + let wal = Wal::new(temp_dir.path()).unwrap(); + + let (exex_handle, _, _) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); // Create a mock ExExManager and add the exex_handle to it let max_capacity = 5; - let mut exex_manager = ExExManager::new( - vec![exex_handle], - max_capacity, - Wal::new(temp_dir.path()).unwrap(), - empty_finalized_header_stream(), - ); + let mut exex_manager = + ExExManager::new(vec![exex_handle], max_capacity, wal, empty_finalized_header_stream()); // Push some notifications to fill part of the buffer let mut block1 = SealedBlockWithSenders::default(); @@ -674,8 +663,10 @@ mod tests { #[tokio::test] async fn test_updates_block_height() { let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (exex_handle, event_tx, mut _notification_rx) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); // Check initial block height assert!(exex_handle.finished_height.is_none()); @@ -717,11 +708,13 @@ mod tests { #[tokio::test] async fn test_updates_block_height_lower() { let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + // Create two `ExExHandle` instances let (exex_handle1, event_tx1, _) = - ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle()); let (exex_handle2, event_tx2, _) = - ExExHandle::new("test_exex2".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle()); // Send events to update the block heights of the two handles, with the second being lower event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap(); @@ -756,11 +749,13 @@ mod tests { #[tokio::test] async fn test_updates_block_height_greater() { let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + // Create two `ExExHandle` instances let (exex_handle1, event_tx1, _) = - ExExHandle::new("test_exex1".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle()); let (exex_handle2, event_tx2, _) = - ExExHandle::new("test_exex2".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle()); // Assert that the initial block height is `None` for the first `ExExHandle`. assert!(exex_handle1.finished_height.is_none()); @@ -802,8 +797,10 @@ mod tests { #[tokio::test] async fn test_exex_manager_capacity() { let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (exex_handle_1, _, _) = - ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle()); // Create an ExExManager with a small max capacity let max_capacity = 2; @@ -846,8 +843,11 @@ mod tests { #[tokio::test] async fn exex_handle_new() { + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); // Check initial state assert_eq!(exex_handle.id, "test_exex"); @@ -889,8 +889,11 @@ mod tests { #[tokio::test] async fn test_notification_if_finished_height_gt_chain_tip() { + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); // Set finished_height to a value higher than the block tip exex_handle.finished_height = Some(15); @@ -931,8 +934,11 @@ mod tests { #[tokio::test] async fn test_sends_chain_reorged_notification() { + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); let notification = ExExNotification::ChainReorged { old: Arc::new(Chain::default()), @@ -962,8 +968,11 @@ mod tests { #[tokio::test] async fn test_sends_chain_reverted_notification() { + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let (mut exex_handle, _, mut notifications) = - ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; @@ -994,6 +1003,7 @@ mod tests { let temp_dir = tempfile::tempdir().unwrap(); let mut wal = Wal::new(temp_dir.path()).unwrap(); + let block = random_block(&mut generators::rng(), 0, Default::default()) .seal_with_senders() .ok_or_eyre("failed to recover senders")?; @@ -1005,7 +1015,8 @@ mod tests { let (tx, rx) = watch::channel(None); let finalized_header_stream = ForkChoiceStream::new(rx); - let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ()); + let (exex_handle, _, _) = + ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); let mut exex_manager = std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream)); diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index 54d7959dc5e8..e182f385fa79 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -1,4 +1,4 @@ -use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob}; +use crate::{BackfillJobFactory, ExExNotification, StreamBackfillJob, WalHandle}; use alloy_primitives::U256; use eyre::OptionExt; use futures::{Stream, StreamExt}; @@ -21,6 +21,7 @@ pub struct ExExNotifications { provider: P, executor: E, notifications: Receiver, + wal_handle: WalHandle, } impl Debug for ExExNotifications { @@ -40,8 +41,9 @@ impl ExExNotifications { provider: P, executor: E, notifications: Receiver, + wal_handle: WalHandle, ) -> Self { - Self { node_head, provider, executor, notifications } + Self { node_head, provider, executor, notifications, wal_handle } } /// Receives the next value for this receiver. @@ -113,6 +115,7 @@ where self.provider, self.executor, self.notifications, + self.wal_handle, head, ) } @@ -134,6 +137,8 @@ pub struct ExExNotificationsWithHead { provider: P, executor: E, notifications: Receiver, + #[allow(dead_code)] + wal_handle: WalHandle, exex_head: ExExHead, pending_sync: bool, /// The backfill job to run before consuming any notifications. @@ -154,6 +159,7 @@ where provider: P, executor: E, notifications: Receiver, + wal_handle: WalHandle, exex_head: ExExHead, ) -> Self { Self { @@ -161,6 +167,7 @@ where provider, executor, notifications, + wal_handle, exex_head, pending_sync: true, backfill_job: None, @@ -344,6 +351,8 @@ where mod tests { use std::future::poll_fn; + use crate::Wal; + use super::*; use alloy_consensus::Header; use eyre::OptionExt; @@ -362,6 +371,9 @@ mod tests { async fn exex_notifications_behind_head_canonical() -> eyre::Result<()> { let mut rng = generators::rng(); + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); let genesis_hash = init_genesis(&provider_factory)?; let genesis_block = provider_factory @@ -412,6 +424,7 @@ mod tests { provider, EthExecutorProvider::mainnet(), notifications_rx, + wal.handle(), ) .with_head(exex_head); @@ -445,6 +458,9 @@ mod tests { #[tokio::test] async fn exex_notifications_same_head_canonical() -> eyre::Result<()> { + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); let genesis_hash = init_genesis(&provider_factory)?; let genesis_block = provider_factory @@ -485,6 +501,7 @@ mod tests { provider, EthExecutorProvider::mainnet(), notifications_rx, + wal.handle(), ) .with_head(exex_head); @@ -504,6 +521,9 @@ mod tests { async fn test_notifications_ahead_of_head() -> eyre::Result<()> { let mut rng = generators::rng(); + let temp_dir = tempfile::tempdir().unwrap(); + let wal = Wal::new(temp_dir.path()).unwrap(); + let provider_factory = create_test_provider_factory(); let genesis_hash = init_genesis(&provider_factory)?; let genesis_block = provider_factory @@ -544,6 +564,7 @@ mod tests { provider, EthExecutorProvider::mainnet(), notifications_rx, + wal.handle(), ) .with_head(exex_head); diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 2c79826e0e17..8a432bbebef1 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -1,6 +1,7 @@ use std::collections::{BTreeMap, VecDeque}; use dashmap::DashMap; +use parking_lot::RwLock; use reth_exex_types::ExExNotification; use reth_primitives::{BlockNumHash, B256}; @@ -15,7 +16,7 @@ pub struct BlockCache { /// For each notification written to the WAL, there will be an entry per block written to /// the cache with the same file ID. I.e. for each notification, there may be multiple blocks /// in the cache. - files: BTreeMap>, + files: RwLock>>, /// A mapping of `Block Hash -> Block`. /// /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per @@ -26,45 +27,52 @@ pub struct BlockCache { impl BlockCache { /// Creates a new instance of [`BlockCache`]. pub(super) fn new() -> Self { - Self { files: BTreeMap::new(), blocks: DashMap::new() } + Self { files: RwLock::new(BTreeMap::new()), blocks: DashMap::new() } } /// Returns `true` if the cache is empty. pub(super) fn is_empty(&self) -> bool { - self.files.is_empty() + self.files.read().is_empty() } /// Returns a front-to-back iterator. pub(super) fn iter(&self) -> impl Iterator + '_ { - self.files.iter().flat_map(|(k, v)| v.iter().map(move |b| (*k, *b))) + self.files + .read() + .iter() + .flat_map(|(k, v)| v.iter().map(move |b| (*k, *b))) + .collect::>() + .into_iter() } /// Provides a reference to the first block from the cache, or `None` if the cache is /// empty. pub(super) fn front(&self) -> Option<(u64, CachedBlock)> { - self.files.first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b))) + self.files.read().first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b))) } /// Provides a reference to the last block from the cache, or `None` if the cache is /// empty. pub(super) fn back(&self) -> Option<(u64, CachedBlock)> { - self.files.last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b))) + self.files.read().last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b))) } /// Removes the notification with the given file ID. - pub(super) fn remove_notification(&mut self, key: u64) -> Option> { - self.files.remove(&key) + pub(super) fn remove_notification(&self, key: u64) -> Option> { + self.files.write().remove(&key) } /// Pops the first block from the cache. If it resulted in the whole file entry being empty, /// it will also remove the file entry. - pub(super) fn pop_front(&mut self) -> Option<(u64, CachedBlock)> { - let first_entry = self.files.first_entry()?; + pub(super) fn pop_front(&self) -> Option<(u64, CachedBlock)> { + let mut files = self.files.write(); + + let first_entry = files.first_entry()?; let key = *first_entry.key(); let blocks = first_entry.into_mut(); let first_block = blocks.pop_front().unwrap(); if blocks.is_empty() { - self.files.remove(&key); + files.remove(&key); } Some((key, first_block)) @@ -72,44 +80,40 @@ impl BlockCache { /// Pops the last block from the cache. If it resulted in the whole file entry being empty, /// it will also remove the file entry. - pub(super) fn pop_back(&mut self) -> Option<(u64, CachedBlock)> { - let last_entry = self.files.last_entry()?; + pub(super) fn pop_back(&self) -> Option<(u64, CachedBlock)> { + let mut files = self.files.write(); + + let last_entry = files.last_entry()?; let key = *last_entry.key(); let blocks = last_entry.into_mut(); let last_block = blocks.pop_back().unwrap(); if blocks.is_empty() { - self.files.remove(&key); + files.remove(&key); } Some((key, last_block)) } - /// Appends a block to the back of the specified file entry. - pub(super) fn insert(&mut self, file_id: u64, block: CachedBlock) { - self.files.entry(file_id).or_default().push_back(block); - } - /// Inserts the blocks from the notification into the cache with the given file ID. /// /// First, inserts the reverted blocks (if any), then the committed blocks (if any). pub(super) fn insert_notification_blocks_with_file_id( - &mut self, + &self, file_id: u64, notification: &ExExNotification, ) { + let mut files = self.files.write(); + let reverted_chain = notification.reverted_chain(); let committed_chain = notification.committed_chain(); if let Some(reverted_chain) = reverted_chain { for block in reverted_chain.blocks().values() { - self.insert( - file_id, - CachedBlock { - action: CachedBlockAction::Revert, - block: (block.number, block.hash()).into(), - parent_hash: block.parent_hash, - }, - ); + files.entry(file_id).or_default().push_back(CachedBlock { + action: CachedBlockAction::Revert, + block: (block.number, block.hash()).into(), + parent_hash: block.parent_hash, + }); } } @@ -120,7 +124,7 @@ impl BlockCache { block: (block.number, block.hash()).into(), parent_hash: block.parent_hash, }; - self.insert(file_id, cached_block); + files.entry(file_id).or_default().push_back(cached_block); self.blocks.insert(block.hash(), cached_block); } } diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 1efda8d84f8e..91a447f23682 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -5,7 +5,7 @@ pub use cache::BlockCache; mod storage; pub use storage::Storage; -use std::path::Path; +use std::{path::Path, sync::Arc}; use reth_exex_types::ExExNotification; use reth_primitives::BlockNumHash; @@ -15,23 +15,62 @@ use reth_tracing::tracing::{debug, instrument}; /// /// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache /// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory -/// and decoding notifications every time we want to rollback/finalize the WAL. +/// and decoding notifications every time we want to iterate or finalize the WAL. /// /// The expected mode of operation is as follows: /// 1. On every new canonical chain notification, call [`Wal::commit`]. /// 2. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the /// WAL. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Wal { + inner: Arc, +} + +impl Wal { + /// Creates a new instance of [`Wal`]. + pub fn new(directory: impl AsRef) -> eyre::Result { + Ok(Self { inner: Arc::new(WalInner::new(directory)?) }) + } + + /// Returns a read-only handle to the WAL. + pub fn handle(&self) -> WalHandle { + WalHandle { wal: self.inner.clone() } + } + + /// Commits the notification to WAL. + pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { + self.inner.commit(notification) + } + + /// Finalizes the WAL to the given block, inclusive. + /// + /// 1. Finds a notification with first unfinalized block (first notification containing a + /// committed block higher than `to_block`). + /// 2. Removes the notifications from the beginning of WAL until the found notification. If this + /// notification includes both finalized and non-finalized blocks, it will not be removed. + pub fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { + self.inner.finalize(to_block) + } + + /// Returns an iterator over all notifications in the WAL. + pub fn iter_notifications( + &self, + ) -> eyre::Result> + '_>> { + self.inner.iter_notifications() + } +} + +/// Inner type for the WAL. +#[derive(Debug)] +struct WalInner { /// The underlying WAL storage backed by a file. storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. block_cache: BlockCache, } -impl Wal { - /// Creates a new instance of [`Wal`]. - pub fn new(directory: impl AsRef) -> eyre::Result { +impl WalInner { + fn new(directory: impl AsRef) -> eyre::Result { let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() }; wal.fill_block_cache()?; Ok(wal) @@ -62,12 +101,11 @@ impl Wal { Ok(()) } - /// Commits the notification to WAL. #[instrument(target = "exex::wal", skip_all, fields( reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] - pub fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { + fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1); self.storage.write_notification(file_id, notification)?; @@ -84,7 +122,7 @@ impl Wal { /// 2. Removes the notifications from the beginning of WAL until the found notification. If this /// notification includes both finalized and non-finalized blocks, it will not be removed. #[instrument(target = "exex::wal", skip(self))] - pub fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> { + fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { // First, walk cache to find the file ID of the notification with the finalized block and // save the file ID with the first unfinalized block. Do not remove any notifications // yet. @@ -152,7 +190,7 @@ impl Wal { } /// Returns an iterator over all notifications in the WAL. - pub(crate) fn iter_notifications( + fn iter_notifications( &self, ) -> eyre::Result> + '_>> { let Some(range) = self.storage.files_range()? else { @@ -163,6 +201,12 @@ impl Wal { } } +/// A read-only handle to the WAL that can be shared. +#[derive(Debug)] +pub struct WalHandle { + wal: Arc, +} + #[cfg(test)] mod tests { use std::sync::Arc; @@ -180,9 +224,10 @@ mod tests { }; fn read_notifications(wal: &Wal) -> eyre::Result> { - let Some(files_range) = wal.storage.files_range()? else { return Ok(Vec::new()) }; + let Some(files_range) = wal.inner.storage.files_range()? else { return Ok(Vec::new()) }; - wal.storage + wal.inner + .storage .iter_notifications(files_range) .map(|entry| Ok(entry?.1)) .collect::>() @@ -197,7 +242,7 @@ mod tests { // Create an instance of the WAL in a temporary directory let temp_dir = tempfile::tempdir()?; let mut wal = Wal::new(&temp_dir)?; - assert!(wal.block_cache.is_empty()); + assert!(wal.inner.block_cache.is_empty()); // Create 4 canonical blocks and one reorged block with number 2 let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default()) @@ -275,7 +320,10 @@ mod tests { ), ]; wal.commit(&committed_notification_1)?; - assert_eq!(wal.block_cache.iter().collect::>(), committed_notification_1_cache); + assert_eq!( + wal.inner.block_cache.iter().collect::>(), + committed_notification_1_cache + ); assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]); // Second notification (revert block 1) @@ -290,7 +338,7 @@ mod tests { }, )]; assert_eq!( - wal.block_cache.iter().collect::>(), + wal.inner.block_cache.iter().collect::>(), [committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat() ); assert_eq!( @@ -320,7 +368,7 @@ mod tests { ), ]; assert_eq!( - wal.block_cache.iter().collect::>(), + wal.inner.block_cache.iter().collect::>(), [ committed_notification_1_cache.clone(), reverted_notification_cache.clone(), @@ -367,7 +415,7 @@ mod tests { ), ]; assert_eq!( - wal.block_cache.iter().collect::>(), + wal.inner.block_cache.iter().collect::>(), [ committed_notification_1_cache, reverted_notification_cache, @@ -392,7 +440,7 @@ mod tests { // the notifications before it. wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?; assert_eq!( - wal.block_cache.iter().collect::>(), + wal.inner.block_cache.iter().collect::>(), [committed_notification_2_cache, reorged_notification_cache].concat() ); assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]); diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index ad2307361d0c..8953a6a4edfb 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -13,7 +13,7 @@ use tracing::instrument; /// /// Each notification is represented by a single file that contains a MessagePack-encoded /// notification. -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Storage { /// The path to the WAL file. path: PathBuf, diff --git a/crates/exex/test-utils/src/lib.rs b/crates/exex/test-utils/src/lib.rs index 4117c0c73c9a..906437ca7821 100644 --- a/crates/exex/test-utils/src/lib.rs +++ b/crates/exex/test-utils/src/lib.rs @@ -20,7 +20,7 @@ use reth_db_common::init::init_genesis; use reth_ethereum_engine_primitives::EthereumEngineValidator; use reth_evm::test_utils::MockExecutorProvider; use reth_execution_types::Chain; -use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications}; +use reth_exex::{ExExContext, ExExEvent, ExExNotification, ExExNotifications, Wal}; use reth_network::{config::SecretKey, NetworkConfigBuilder, NetworkManager}; use reth_node_api::{ FullNodeTypes, FullNodeTypesAdapter, NodeTypes, NodeTypesWithDBAdapter, NodeTypesWithEngine, @@ -49,6 +49,7 @@ use reth_provider::{ use reth_tasks::TaskManager; use reth_transaction_pool::test_utils::{testing_pool, TestPool}; use std::{ + env::temp_dir, fmt::Debug, future::{poll_fn, Future}, sync::Arc, @@ -310,6 +311,8 @@ pub async fn test_exex_context_with_chain_spec( components.provider.clone(), components.components.executor.clone(), notifications_rx, + // TODO(alexey): do we want to expose WAL to the user? + Wal::new(temp_dir())?.handle(), ); let ctx = ExExContext { diff --git a/crates/node/builder/src/launch/exex.rs b/crates/node/builder/src/launch/exex.rs index d037200869c4..6cd705338384 100644 --- a/crates/node/builder/src/launch/exex.rs +++ b/crates/node/builder/src/launch/exex.rs @@ -45,6 +45,15 @@ impl ExExLauncher { return Ok(None) } + let exex_wal = Wal::new( + config_container + .config + .datadir + .clone() + .resolve_datadir(config_container.config.chain.chain()) + .exex_wal(), + )?; + let mut exex_handles = Vec::with_capacity(extensions.len()); let mut exexes = Vec::with_capacity(extensions.len()); @@ -55,6 +64,7 @@ impl ExExLauncher { head, components.provider().clone(), components.block_executor().clone(), + exex_wal.handle(), ); exex_handles.push(handle); @@ -96,14 +106,6 @@ impl ExExLauncher { // spawn exex manager debug!(target: "reth::cli", "spawning exex manager"); // todo(onbjerg): rm magic number - let exex_wal = Wal::new( - config_container - .config - .datadir - .clone() - .resolve_datadir(config_container.config.chain.chain()) - .exex_wal(), - )?; let exex_manager = ExExManager::new( exex_handles, 1024,