Skip to content

Commit

Permalink
feat(exex): add parent hash to WAL block cache, index by hashes (#11263)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Sep 27, 2024
1 parent da6b1e7 commit 37b0c56
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 27 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tokio-util.workspace = true
tokio.workspace = true

## misc
dashmap.workspace = true
eyre.workspace = true
metrics.workspace = true
serde_json.workspace = true
Expand Down
66 changes: 39 additions & 27 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,70 @@
use std::collections::{BTreeMap, VecDeque};

use dashmap::DashMap;
use reth_exex_types::ExExNotification;
use reth_primitives::BlockNumHash;
use reth_primitives::{BlockNumHash, B256};

/// The block cache of the WAL. Acts as a mapping of `File ID -> List of Blocks`.
///
/// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks in the
/// cache.
/// The block cache of the WAL.
///
/// This cache is needed to avoid walking the WAL directory every time we want to find a
/// notification corresponding to a block.
/// notification corresponding to a block or a block corresponding to a hash.
#[derive(Debug)]
pub struct BlockCache(BTreeMap<u64, VecDeque<CachedBlock>>);
pub struct BlockCache {
/// A mapping of `File ID -> List of Blocks`.
///
/// For each notification written to the WAL, there will be an entry per block written to
/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks
/// in the cache.
files: BTreeMap<u64, VecDeque<CachedBlock>>,
/// A mapping of `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
blocks: DashMap<B256, CachedBlock>,
}

impl BlockCache {
/// Creates a new instance of [`BlockCache`].
pub(super) const fn new() -> Self {
Self(BTreeMap::new())
pub(super) fn new() -> Self {
Self { files: BTreeMap::new(), blocks: DashMap::new() }
}

/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.0.is_empty()
self.files.is_empty()
}

/// Returns a front-to-back iterator.
pub(super) fn iter(&self) -> impl Iterator<Item = (u64, CachedBlock)> + '_ {
self.0.iter().flat_map(|(k, v)| v.iter().map(move |b| (*k, *b)))
self.files.iter().flat_map(|(k, v)| v.iter().map(move |b| (*k, *b)))
}

/// Provides a reference to the first block from the cache, or `None` if the cache is
/// empty.
pub(super) fn front(&self) -> Option<(u64, CachedBlock)> {
self.0.first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b)))
self.files.first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b)))
}

/// Provides a reference to the last block from the cache, or `None` if the cache is
/// empty.
pub(super) fn back(&self) -> Option<(u64, CachedBlock)> {
self.0.last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b)))
self.files.last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b)))
}

/// Removes the notification with the given file ID.
pub(super) fn remove_notification(&mut self, key: u64) -> Option<VecDeque<CachedBlock>> {
self.0.remove(&key)
self.files.remove(&key)
}

/// Pops the first block from the cache. If it resulted in the whole file entry being empty,
/// it will also remove the file entry.
pub(super) fn pop_front(&mut self) -> Option<(u64, CachedBlock)> {
let first_entry = self.0.first_entry()?;
let first_entry = self.files.first_entry()?;
let key = *first_entry.key();
let blocks = first_entry.into_mut();
let first_block = blocks.pop_front().unwrap();
if blocks.is_empty() {
self.0.remove(&key);
self.files.remove(&key);
}

Some((key, first_block))
Expand All @@ -64,20 +73,20 @@ impl BlockCache {
/// Pops the last block from the cache. If it resulted in the whole file entry being empty,
/// it will also remove the file entry.
pub(super) fn pop_back(&mut self) -> Option<(u64, CachedBlock)> {
let last_entry = self.0.last_entry()?;
let last_entry = self.files.last_entry()?;
let key = *last_entry.key();
let blocks = last_entry.into_mut();
let last_block = blocks.pop_back().unwrap();
if blocks.is_empty() {
self.0.remove(&key);
self.files.remove(&key);
}

Some((key, last_block))
}

/// Appends a block to the back of the specified file entry.
pub(super) fn insert(&mut self, file_id: u64, block: CachedBlock) {
self.0.entry(file_id).or_default().push_back(block);
self.files.entry(file_id).or_default().push_back(block);
}

/// Inserts the blocks from the notification into the cache with the given file ID.
Expand All @@ -98,20 +107,21 @@ impl BlockCache {
CachedBlock {
action: CachedBlockAction::Revert,
block: (block.number, block.hash()).into(),
parent_hash: block.parent_hash,
},
);
}
}

if let Some(committed_chain) = committed_chain {
for block in committed_chain.blocks().values() {
self.insert(
file_id,
CachedBlock {
action: CachedBlockAction::Commit,
block: (block.number, block.hash()).into(),
},
);
let cached_block = CachedBlock {
action: CachedBlockAction::Commit,
block: (block.number, block.hash()).into(),
parent_hash: block.parent_hash,
};
self.insert(file_id, cached_block);
self.blocks.insert(block.hash(), cached_block);
}
}
}
Expand All @@ -122,6 +132,8 @@ pub(super) struct CachedBlock {
pub(super) action: CachedBlockAction,
/// The block number and hash of the block.
pub(super) block: BlockNumHash,
/// The hash of the parent block.
pub(super) parent_hash: B256,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down
8 changes: 8 additions & 0 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,15 @@ mod tests {
CachedBlock {
action: CachedBlockAction::Commit,
block: (blocks[0].number, blocks[0].hash()).into(),
parent_hash: blocks[0].parent_hash,
},
),
(
file_id,
CachedBlock {
action: CachedBlockAction::Commit,
block: (blocks[1].number, blocks[1].hash()).into(),
parent_hash: blocks[1].parent_hash,
},
),
];
Expand All @@ -284,6 +286,7 @@ mod tests {
CachedBlock {
action: CachedBlockAction::Revert,
block: (blocks[1].number, blocks[1].hash()).into(),
parent_hash: blocks[1].parent_hash,
},
)];
assert_eq!(
Expand All @@ -304,13 +307,15 @@ mod tests {
CachedBlock {
action: CachedBlockAction::Commit,
block: (block_1_reorged.number, block_1_reorged.hash()).into(),
parent_hash: block_1_reorged.parent_hash,
},
),
(
file_id,
CachedBlock {
action: CachedBlockAction::Commit,
block: (blocks[2].number, blocks[2].hash()).into(),
parent_hash: blocks[2].parent_hash,
},
),
];
Expand Down Expand Up @@ -341,20 +346,23 @@ mod tests {
CachedBlock {
action: CachedBlockAction::Revert,
block: (blocks[2].number, blocks[2].hash()).into(),
parent_hash: blocks[2].parent_hash,
},
),
(
file_id,
CachedBlock {
action: CachedBlockAction::Commit,
block: (block_2_reorged.number, block_2_reorged.hash()).into(),
parent_hash: block_2_reorged.parent_hash,
},
),
(
file_id,
CachedBlock {
action: CachedBlockAction::Commit,
block: (blocks[3].number, blocks[3].hash()).into(),
parent_hash: blocks[3].parent_hash,
},
),
];
Expand Down

0 comments on commit 37b0c56

Please sign in to comment.