diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 23be68f4fd92..9c8377596ba6 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2529,6 +2529,61 @@ mod tests { }; use tokio::sync::mpsc::unbounded_channel; + /// This is a test channel that allows you to `release` any value that is in the channel. + /// + /// If nothing has been sent, then the next value will be immediately sent. + #[allow(dead_code)] + struct TestChannel { + /// If an item is sent to this channel, an item will be released in the wrapped channel + release: Receiver<()>, + /// The sender channel + tx: Sender, + /// The receiver channel + rx: Receiver, + } + + impl TestChannel { + /// Creates a new test channel + #[allow(dead_code)] + fn spawn_channel() -> (Sender, Receiver, TestChannelHandle) { + let (original_tx, original_rx) = channel(); + let (wrapped_tx, wrapped_rx) = channel(); + let (release_tx, release_rx) = channel(); + let handle = TestChannelHandle::new(release_tx); + let test_channel = Self { release: release_rx, tx: wrapped_tx, rx: original_rx }; + // spawn the task that listens and releases stuff + std::thread::spawn(move || test_channel.intercept_loop()); + (original_tx, wrapped_rx, handle) + } + + /// Runs the intercept loop, waiting for the handle to release a value + fn intercept_loop(&self) { + while self.release.recv() == Ok(()) { + let Ok(value) = self.rx.recv() else { return }; + + let _ = self.tx.send(value); + } + } + } + + struct TestChannelHandle { + /// The sender to use for releasing values + release: Sender<()>, + } + + impl TestChannelHandle { + /// Returns a [`TestChannelHandle`] + const fn new(release: Sender<()>) -> Self { + Self { release } + } + + /// Signals to the channel task that a value should be released + #[allow(dead_code)] + fn release(&self) { + let _ = self.release.send(()); + } + } + struct TestHarness { tree: EngineApiTreeHandler, to_tree_tx: Sender>>, @@ -2543,6 +2598,20 @@ mod tests { impl TestHarness { fn new(chain_spec: Arc) -> Self { let (action_tx, action_rx) = channel(); + Self::with_persistence_channel(chain_spec, action_tx, action_rx) + } + + #[allow(dead_code)] + fn with_test_channel(chain_spec: Arc) -> (Self, TestChannelHandle) { + let (action_tx, action_rx, handle) = TestChannel::spawn_channel(); + (Self::with_persistence_channel(chain_spec, action_tx, action_rx), handle) + } + + fn with_persistence_channel( + chain_spec: Arc, + action_tx: Sender, + action_rx: Receiver, + ) -> Self { let persistence_handle = PersistenceHandle::new(action_tx); let consensus = Arc::new(EthBeaconConsensus::new(chain_spec.clone()));