Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): WAL handle #11266

Merged
merged 8 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ tokio-util.workspace = true
tokio.workspace = true

## misc
dashmap.workspace = true
eyre.workspace = true
metrics.workspace = true
serde_json.workspace = true
Expand Down
135 changes: 73 additions & 62 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::{wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight};
use crate::{
wal::Wal, ExExEvent, ExExNotification, ExExNotifications, FinishedExExHeight, WalHandle,
};
use alloy_primitives::BlockNumber;
use futures::StreamExt;
use metrics::Gauge;
Expand Down Expand Up @@ -67,10 +69,12 @@ impl ExExHandle {
node_head: Head,
provider: P,
executor: E,
wal_handle: WalHandle,
) -> (Self, UnboundedSender<ExExEvent>, ExExNotifications<P, E>) {
let (notification_tx, notification_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let notifications = ExExNotifications::new(node_head, provider, executor, notification_rx);
let notifications =
ExExNotifications::new(node_head, provider, executor, notification_rx, wal_handle);

(
Self {
Expand Down Expand Up @@ -521,8 +525,11 @@ mod tests {

#[tokio::test]
async fn test_delivers_events() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
Expand All @@ -533,65 +540,48 @@ mod tests {
#[tokio::test]
async fn test_has_exexs() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());

assert!(!ExExManager::new(
vec![],
0,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_exexs());
assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_exexs());

assert!(ExExManager::new(
vec![exex_handle_1],
0,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_exexs());
assert!(ExExManager::new(vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
.handle
.has_exexs());
}

#[tokio::test]
async fn test_has_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());

assert!(!ExExManager::new(
vec![],
0,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_capacity());
assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_capacity());

assert!(ExExManager::new(
vec![exex_handle_1],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream()
)
.handle
.has_capacity());
assert!(ExExManager::new(vec![exex_handle_1], 10, wal, empty_finalized_header_stream())
.handle
.has_capacity());
}

#[test]
fn test_push_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
let wal = Wal::new(temp_dir.path()).unwrap();

let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new(
vec![exex_handle],
10,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut exex_manager =
ExExManager::new(vec![exex_handle], 10, wal, empty_finalized_header_stream());

// Define the notification for testing
let mut block1 = SealedBlockWithSenders::default();
Expand Down Expand Up @@ -634,16 +624,15 @@ mod tests {
#[test]
fn test_update_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
let wal = Wal::new(temp_dir.path()).unwrap();

let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
let mut exex_manager = ExExManager::new(
vec![exex_handle],
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
empty_finalized_header_stream(),
);
let mut exex_manager =
ExExManager::new(vec![exex_handle], max_capacity, wal, empty_finalized_header_stream());

// Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default();
Expand Down Expand Up @@ -674,8 +663,10 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Check initial block height
assert!(exex_handle.finished_height.is_none());
Expand Down Expand Up @@ -717,11 +708,13 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height_lower() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());

// Send events to update the block heights of the two handles, with the second being lower
event_tx1.send(ExExEvent::FinishedHeight(42)).unwrap();
Expand Down Expand Up @@ -756,11 +749,13 @@ mod tests {
#[tokio::test]
async fn test_updates_block_height_greater() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
let (exex_handle2, event_tx2, _) =
ExExHandle::new("test_exex2".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex2".to_string(), Head::default(), (), (), wal.handle());

// Assert that the initial block height is `None` for the first `ExExHandle`.
assert!(exex_handle1.finished_height.is_none());
Expand Down Expand Up @@ -802,8 +797,10 @@ mod tests {
#[tokio::test]
async fn test_exex_manager_capacity() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());

// Create an ExExManager with a small max capacity
let max_capacity = 2;
Expand Down Expand Up @@ -846,8 +843,11 @@ mod tests {

#[tokio::test]
async fn exex_handle_new() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Check initial state
assert_eq!(exex_handle.id, "test_exex");
Expand Down Expand Up @@ -889,8 +889,11 @@ mod tests {

#[tokio::test]
async fn test_notification_if_finished_height_gt_chain_tip() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15);
Expand Down Expand Up @@ -931,8 +934,11 @@ mod tests {

#[tokio::test]
async fn test_sends_chain_reorged_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
Expand Down Expand Up @@ -962,8 +968,11 @@ mod tests {

#[tokio::test]
async fn test_sends_chain_reverted_notification() {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();

let (mut exex_handle, _, mut notifications) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };

Expand Down Expand Up @@ -994,6 +1003,7 @@ mod tests {

let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();

let block = random_block(&mut generators::rng(), 0, Default::default())
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
Expand All @@ -1005,7 +1015,8 @@ mod tests {
let (tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);

let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string(), Head::default(), (), ());
let (exex_handle, _, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());

let mut exex_manager =
std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream));
Expand Down
Loading
Loading