Skip to content

Commit

Permalink
feat(exex): finalize WAL below the given block
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 29, 2024
1 parent b8aeeca commit 258315a
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 234 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ tokio-util.workspace = true
tokio.workspace = true

## misc
dashmap.workspace = true
eyre.workspace = true
itertools.workspace = true
metrics.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1119,7 +1119,7 @@ mod tests {
let mut rng = generators::rng();

let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let provider_factory = create_test_provider_factory();

Expand Down
4 changes: 2 additions & 2 deletions crates/exex/exex/src/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ mod tests {
let mut rng = generators::rng();

let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
Expand Down Expand Up @@ -557,7 +557,7 @@ mod tests {
let mut rng = generators::rng();

let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let provider_factory = create_test_provider_factory();
let genesis_hash = init_genesis(&provider_factory)?;
Expand Down
151 changes: 50 additions & 101 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
@@ -1,98 +1,54 @@
use std::collections::{BTreeMap, VecDeque};
use std::{
cmp::Reverse,
collections::{BinaryHeap, HashSet},
};

use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use dashmap::DashMap;
use parking_lot::RwLock;
use alloy_primitives::{map::FbHashMap, BlockNumber, B256};
use reth_exex_types::ExExNotification;

/// The block cache of the WAL.
///
/// This cache is needed to avoid walking the WAL directory every time we want to find a
/// notification corresponding to a block or a block corresponding to a hash.
#[derive(Debug)]
#[derive(Debug, Default)]
pub struct BlockCache {
/// A mapping of `File ID -> List of Blocks`.
///
/// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks
/// in the cache.
files: RwLock<BTreeMap<u64, VecDeque<CachedBlock>>>,
/// A min heap of `(Block Number, File ID)` tuples.
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u64)>>,
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
committed_blocks: DashMap<B256, (u64, CachedBlock)>,
pub(super) committed_blocks: FbHashMap<32, (u64, CachedBlock)>,
}

impl BlockCache {
/// Creates a new instance of [`BlockCache`].
pub(super) fn new() -> Self {
Self { files: RwLock::new(BTreeMap::new()), committed_blocks: DashMap::new() }
}

/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.files.read().is_empty()
}

/// Returns a front-to-back iterator.
pub(super) fn iter(&self) -> impl Iterator<Item = (u64, CachedBlock)> + '_ {
self.files
.read()
.iter()
.flat_map(|(k, v)| v.iter().map(move |b| (*k, *b)))
.collect::<Vec<_>>()
.into_iter()
}

/// Provides a reference to the first block from the cache, or `None` if the cache is
/// empty.
pub(super) fn front(&self) -> Option<(u64, CachedBlock)> {
self.files.read().first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b)))
}

/// Provides a reference to the last block from the cache, or `None` if the cache is
/// empty.
pub(super) fn back(&self) -> Option<(u64, CachedBlock)> {
self.files.read().last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b)))
self.blocks.is_empty()
}

/// Removes the notification with the given file ID.
pub(super) fn remove_notification(&self, key: u64) -> Option<VecDeque<CachedBlock>> {
self.files.write().remove(&key)
}

/// Pops the first block from the cache. If it resulted in the whole file entry being empty,
/// it will also remove the file entry.
pub(super) fn pop_front(&self) -> Option<(u64, CachedBlock)> {
let mut files = self.files.write();

let first_entry = files.first_entry()?;
let key = *first_entry.key();
let blocks = first_entry.into_mut();
let first_block = blocks.pop_front().unwrap();
if blocks.is_empty() {
files.remove(&key);
/// Removes all files from the cache that has notifications with a tip block less than or equal
/// to the given block number.
///
/// # Returns
///
/// A set of file IDs that were removed.
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u64> {
let mut file_ids = HashSet::default();

while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
if max_block <= block_number {
debug_assert_eq!(self.blocks.pop().unwrap(), block);
file_ids.insert(file_id);
} else {
break
}
}

Some((key, first_block))
}

/// Pops the last block from the cache. If it resulted in the whole file entry being empty,
/// it will also remove the file entry.
pub(super) fn pop_back(&self) -> Option<(u64, CachedBlock)> {
let mut files = self.files.write();
self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));

let last_entry = files.last_entry()?;
let key = *last_entry.key();
let blocks = last_entry.into_mut();
let last_block = blocks.pop_back().unwrap();
if blocks.is_empty() {
files.remove(&key);
}

Some((key, last_block))
file_ids
}

/// Returns the file ID for the notification containing the given committed block hash, if it
Expand All @@ -102,59 +58,52 @@ impl BlockCache {
}

/// Inserts the blocks from the notification into the cache with the given file ID.
///
/// First, inserts the reverted blocks (if any), then the committed blocks (if any).
pub(super) fn insert_notification_blocks_with_file_id(
&self,
&mut self,
file_id: u64,
notification: &ExExNotification,
) {
let mut files = self.files.write();

let reverted_chain = notification.reverted_chain();
let committed_chain = notification.committed_chain();

if let Some(reverted_chain) = reverted_chain {
for block in reverted_chain.blocks().values() {
files.entry(file_id).or_default().push_back(CachedBlock {
action: CachedBlockAction::Revert,
block: (block.number, block.hash()).into(),
parent_hash: block.parent_hash,
});
}
let max_block =
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
if let Some(max_block) = max_block {
self.blocks.push(Reverse((max_block, file_id)));
}

if let Some(committed_chain) = committed_chain {
if let Some(committed_chain) = &committed_chain {
for block in committed_chain.blocks().values() {
let cached_block = CachedBlock {
action: CachedBlockAction::Commit,
block: (block.number, block.hash()).into(),
parent_hash: block.parent_hash,
};
files.entry(file_id).or_default().push_back(cached_block);
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}
}
}

#[cfg(test)]
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u64)> {
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
}

#[cfg(test)]
pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u64, CachedBlock)> {
use itertools::Itertools;

self.committed_blocks
.iter()
.map(|(hash, (file_id, block))| (*hash, *file_id, *block))
.sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash))
.collect()
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) struct CachedBlock {
pub(super) action: CachedBlockAction,
/// The block number and hash of the block.
pub(super) block: BlockNumHash,
/// The hash of the parent block.
pub(super) parent_hash: B256,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum CachedBlockAction {
Commit,
Revert,
}

impl CachedBlockAction {
pub(super) const fn is_commit(&self) -> bool {
matches!(self, Self::Commit)
}
}
Loading

0 comments on commit 258315a

Please sign in to comment.