From 176a60c95bb59a9617dc0e529903b9d57947bc5b Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 4 Apr 2024 15:24:01 -0400 Subject: [PATCH] fix(derive): channel reader tests --- crates/derive/src/stages/channel_reader.rs | 62 ++++++++++++++++--- .../src/stages/test_utils/channel_reader.rs | 39 ++++++++++++ crates/derive/src/stages/test_utils/mod.rs | 3 + crates/derive/src/types/errors.rs | 4 ++ 4 files changed, 100 insertions(+), 8 deletions(-) create mode 100644 crates/derive/src/stages/test_utils/channel_reader.rs diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 2b579313a..724a23c3c 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -6,9 +6,8 @@ use crate::{ types::{Batch, BlockInfo, StageError, StageResult}, }; -use alloc::{boxed::Box, vec::Vec}; +use alloc::{boxed::Box, sync::Arc, vec::Vec}; use alloy_primitives::Bytes; -use anyhow::anyhow; use async_trait::async_trait; use core::fmt::Debug; use miniz_oxide::inflate::decompress_to_vec_zlib; @@ -33,7 +32,7 @@ where /// The previous stage of the derivation pipeline. prev: P, /// Telemetry - telemetry: T, + telemetry: Arc, /// The batch reader. next_batch: Option, } @@ -44,14 +43,14 @@ where T: TelemetryProvider + Debug, { /// Create a new [ChannelReader] stage. - pub fn new(prev: P, telemetry: T) -> Self { + pub fn new(prev: P, telemetry: Arc) -> Self { Self { prev, telemetry, next_batch: None } } /// Creates the batch reader from available channel data. async fn set_batch_reader(&mut self) -> StageResult<()> { if self.next_batch.is_none() { - let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?; + let channel = self.prev.next_data().await?.ok_or(StageError::NoChannel)?; self.next_batch = Some(BatchReader::from(&channel[..])); } Ok(()) @@ -68,7 +67,7 @@ where impl BatchQueueProvider for ChannelReader where P: ChannelReaderProvider + OriginProvider + Send + Debug, - T: TelemetryProvider + Send + Debug, + T: TelemetryProvider + Send + Sync + Debug, { async fn next_batch(&mut self) -> StageResult { if let Err(e) = self.set_batch_reader().await { @@ -151,11 +150,58 @@ impl From> for BatchReader { #[cfg(test)] mod test { - use crate::{stages::channel_reader::BatchReader, types::BatchType}; + use super::*; + use crate::{ + stages::test_utils::MockChannelReaderProvider, traits::test_utils::TestTelemetry, + types::BatchType, + }; use alloc::vec; use miniz_oxide::deflate::compress_to_vec_zlib; - // TODO(clabby): More tests here for multiple batches, integration w/ channel bank, etc. + fn new_compressed_batch_data() -> Bytes { + let raw_data = include_bytes!("../../testdata/raw_batch.hex"); + let mut typed_data = vec![BatchType::Span as u8]; + typed_data.extend_from_slice(raw_data.as_slice()); + compress_to_vec_zlib(typed_data.as_slice(), 5).into() + } + + #[tokio::test] + async fn test_next_batch_batch_reader_set_fails() { + let mock = MockChannelReaderProvider::new(vec![Err(StageError::Eof)]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + assert_eq!(reader.next_batch().await, Err(StageError::Eof)); + assert!(reader.next_batch.is_none()); + } + + #[tokio::test] + async fn test_next_batch_batch_reader_no_data() { + let mock = MockChannelReaderProvider::new(vec![Ok(None)]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + assert_eq!(reader.next_batch().await, Err(StageError::NoChannel)); + assert!(reader.next_batch.is_none()); + } + + #[tokio::test] + async fn test_next_batch_not_enough_data() { + let mock = MockChannelReaderProvider::new(vec![Ok(Some(Bytes::default()))]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + assert_eq!(reader.next_batch().await, Err(StageError::NotEnoughData)); + assert!(reader.next_batch.is_none()); + } + + #[tokio::test] + async fn test_next_batch_succeeds() { + let raw = new_compressed_batch_data(); + let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]); + let telemetry = Arc::new(TestTelemetry::new()); + let mut reader = ChannelReader::new(mock, telemetry); + let res = reader.next_batch().await.unwrap(); + matches!(res, Batch::Span(_)); + assert!(reader.next_batch.is_some()); + } #[test] fn test_batch_reader() { diff --git a/crates/derive/src/stages/test_utils/channel_reader.rs b/crates/derive/src/stages/test_utils/channel_reader.rs new file mode 100644 index 000000000..23cea6416 --- /dev/null +++ b/crates/derive/src/stages/test_utils/channel_reader.rs @@ -0,0 +1,39 @@ +//! Test utilities for the [ChannelReader] stage. + +use crate::{ + stages::ChannelReaderProvider, + traits::OriginProvider, + types::{BlockInfo, StageError, StageResult}, +}; +use alloc::{boxed::Box, vec::Vec}; +use alloy_primitives::Bytes; +use async_trait::async_trait; + +/// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage. +#[derive(Debug)] +pub struct MockChannelReaderProvider { + /// The data to return. + pub data: Vec>>, + /// The origin block info + pub block_info: Option, +} + +impl MockChannelReaderProvider { + /// Creates a new [MockChannelReaderProvider] with the given data. + pub fn new(data: Vec>>) -> Self { + Self { data, block_info: Some(BlockInfo::default()) } + } +} + +impl OriginProvider for MockChannelReaderProvider { + fn origin(&self) -> Option<&BlockInfo> { + self.block_info.as_ref() + } +} + +#[async_trait] +impl ChannelReaderProvider for MockChannelReaderProvider { + async fn next_data(&mut self) -> StageResult> { + self.data.pop().unwrap_or(Err(StageError::Eof)) + } +} diff --git a/crates/derive/src/stages/test_utils/mod.rs b/crates/derive/src/stages/test_utils/mod.rs index 524df71f0..753935e9a 100644 --- a/crates/derive/src/stages/test_utils/mod.rs +++ b/crates/derive/src/stages/test_utils/mod.rs @@ -12,3 +12,6 @@ pub use frame_queue::MockFrameQueueProvider; mod channel_bank; pub use channel_bank::MockChannelBankProvider; + +mod channel_reader; +pub use channel_reader::MockChannelReaderProvider; diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 18a3fb4bf..d226c559b 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -23,6 +23,8 @@ pub enum StageError { Empty, /// No channels are available in the channel bank. NoChannelsAvailable, + /// No channel returned by the [crate::stages::ChannelReader] stage. + NoChannel, /// Failed to find channel. ChannelNotFound, /// Missing L1 origin. @@ -59,6 +61,7 @@ impl PartialEq for StageError { (StageError::Eof, StageError::Eof) | (StageError::NotEnoughData, StageError::NotEnoughData) | (StageError::NoChannelsAvailable, StageError::NoChannelsAvailable) | + (StageError::NoChannel, StageError::NoChannel) | (StageError::ChannelNotFound, StageError::ChannelNotFound) | (StageError::MissingOrigin, StageError::MissingOrigin) | (StageError::AttributesBuild(_), StageError::AttributesBuild(_)) | @@ -94,6 +97,7 @@ impl Display for StageError { } StageError::Empty => write!(f, "Empty"), StageError::NoChannelsAvailable => write!(f, "No channels available"), + StageError::NoChannel => write!(f, "No channel"), StageError::ChannelNotFound => write!(f, "Channel not found"), StageError::MissingOrigin => write!(f, "Missing L1 origin"), StageError::AttributesBuild(e) => write!(f, "Attributes build error: {}", e),