Skip to content

Commit

Permalink
fix(derive): channel reader tests
Browse files Browse the repository at this point in the history
  • Loading branch information
refcell committed Apr 4, 2024
1 parent c0efec0 commit 176a60c
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 8 deletions.
62 changes: 54 additions & 8 deletions crates/derive/src/stages/channel_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,8 @@ use crate::{
types::{Batch, BlockInfo, StageError, StageResult},
};

use alloc::{boxed::Box, vec::Vec};
use alloc::{boxed::Box, sync::Arc, vec::Vec};
use alloy_primitives::Bytes;
use anyhow::anyhow;
use async_trait::async_trait;
use core::fmt::Debug;
use miniz_oxide::inflate::decompress_to_vec_zlib;
Expand All @@ -33,7 +32,7 @@ where
/// The previous stage of the derivation pipeline.
prev: P,
/// Telemetry
telemetry: T,
telemetry: Arc<T>,
/// The batch reader.
next_batch: Option<BatchReader>,
}
Expand All @@ -44,14 +43,14 @@ where
T: TelemetryProvider + Debug,
{
/// Create a new [ChannelReader] stage.
pub fn new(prev: P, telemetry: T) -> Self {
pub fn new(prev: P, telemetry: Arc<T>) -> Self {
Self { prev, telemetry, next_batch: None }
}

/// Creates the batch reader from available channel data.
async fn set_batch_reader(&mut self) -> StageResult<()> {
if self.next_batch.is_none() {
let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?;
let channel = self.prev.next_data().await?.ok_or(StageError::NoChannel)?;
self.next_batch = Some(BatchReader::from(&channel[..]));
}
Ok(())
Expand All @@ -68,7 +67,7 @@ where
impl<P, T> BatchQueueProvider for ChannelReader<P, T>
where
P: ChannelReaderProvider + OriginProvider + Send + Debug,
T: TelemetryProvider + Send + Debug,
T: TelemetryProvider + Send + Sync + Debug,
{
async fn next_batch(&mut self) -> StageResult<Batch> {
if let Err(e) = self.set_batch_reader().await {
Expand Down Expand Up @@ -151,11 +150,58 @@ impl From<Vec<u8>> for BatchReader {

#[cfg(test)]
mod test {
use crate::{stages::channel_reader::BatchReader, types::BatchType};
use super::*;
use crate::{
stages::test_utils::MockChannelReaderProvider, traits::test_utils::TestTelemetry,
types::BatchType,
};
use alloc::vec;
use miniz_oxide::deflate::compress_to_vec_zlib;

// TODO(clabby): More tests here for multiple batches, integration w/ channel bank, etc.
fn new_compressed_batch_data() -> Bytes {
let raw_data = include_bytes!("../../testdata/raw_batch.hex");
let mut typed_data = vec![BatchType::Span as u8];
typed_data.extend_from_slice(raw_data.as_slice());
compress_to_vec_zlib(typed_data.as_slice(), 5).into()
}

#[tokio::test]
async fn test_next_batch_batch_reader_set_fails() {
let mock = MockChannelReaderProvider::new(vec![Err(StageError::Eof)]);
let telemetry = Arc::new(TestTelemetry::new());
let mut reader = ChannelReader::new(mock, telemetry);
assert_eq!(reader.next_batch().await, Err(StageError::Eof));
assert!(reader.next_batch.is_none());
}

#[tokio::test]
async fn test_next_batch_batch_reader_no_data() {
let mock = MockChannelReaderProvider::new(vec![Ok(None)]);
let telemetry = Arc::new(TestTelemetry::new());
let mut reader = ChannelReader::new(mock, telemetry);
assert_eq!(reader.next_batch().await, Err(StageError::NoChannel));
assert!(reader.next_batch.is_none());
}

#[tokio::test]
async fn test_next_batch_not_enough_data() {
let mock = MockChannelReaderProvider::new(vec![Ok(Some(Bytes::default()))]);
let telemetry = Arc::new(TestTelemetry::new());
let mut reader = ChannelReader::new(mock, telemetry);
assert_eq!(reader.next_batch().await, Err(StageError::NotEnoughData));
assert!(reader.next_batch.is_none());
}

#[tokio::test]
async fn test_next_batch_succeeds() {
let raw = new_compressed_batch_data();
let mock = MockChannelReaderProvider::new(vec![Ok(Some(raw))]);
let telemetry = Arc::new(TestTelemetry::new());
let mut reader = ChannelReader::new(mock, telemetry);
let res = reader.next_batch().await.unwrap();
matches!(res, Batch::Span(_));
assert!(reader.next_batch.is_some());
}

#[test]
fn test_batch_reader() {
Expand Down
39 changes: 39 additions & 0 deletions crates/derive/src/stages/test_utils/channel_reader.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
//! Test utilities for the [ChannelReader] stage.
use crate::{
stages::ChannelReaderProvider,
traits::OriginProvider,
types::{BlockInfo, StageError, StageResult},
};
use alloc::{boxed::Box, vec::Vec};
use alloy_primitives::Bytes;
use async_trait::async_trait;

/// A mock [ChannelReaderProvider] for testing the [ChannelReader] stage.
#[derive(Debug)]
pub struct MockChannelReaderProvider {
/// The data to return.
pub data: Vec<StageResult<Option<Bytes>>>,
/// The origin block info
pub block_info: Option<BlockInfo>,
}

impl MockChannelReaderProvider {
/// Creates a new [MockChannelReaderProvider] with the given data.
pub fn new(data: Vec<StageResult<Option<Bytes>>>) -> Self {
Self { data, block_info: Some(BlockInfo::default()) }
}
}

impl OriginProvider for MockChannelReaderProvider {
fn origin(&self) -> Option<&BlockInfo> {
self.block_info.as_ref()
}
}

#[async_trait]
impl ChannelReaderProvider for MockChannelReaderProvider {
async fn next_data(&mut self) -> StageResult<Option<Bytes>> {
self.data.pop().unwrap_or(Err(StageError::Eof))
}
}
3 changes: 3 additions & 0 deletions crates/derive/src/stages/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ pub use frame_queue::MockFrameQueueProvider;

mod channel_bank;
pub use channel_bank::MockChannelBankProvider;

mod channel_reader;
pub use channel_reader::MockChannelReaderProvider;
4 changes: 4 additions & 0 deletions crates/derive/src/types/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub enum StageError {
Empty,
/// No channels are available in the channel bank.
NoChannelsAvailable,
/// No channel returned by the [crate::stages::ChannelReader] stage.
NoChannel,
/// Failed to find channel.
ChannelNotFound,
/// Missing L1 origin.
Expand Down Expand Up @@ -59,6 +61,7 @@ impl PartialEq<StageError> for StageError {
(StageError::Eof, StageError::Eof) |
(StageError::NotEnoughData, StageError::NotEnoughData) |
(StageError::NoChannelsAvailable, StageError::NoChannelsAvailable) |
(StageError::NoChannel, StageError::NoChannel) |
(StageError::ChannelNotFound, StageError::ChannelNotFound) |
(StageError::MissingOrigin, StageError::MissingOrigin) |
(StageError::AttributesBuild(_), StageError::AttributesBuild(_)) |
Expand Down Expand Up @@ -94,6 +97,7 @@ impl Display for StageError {
}
StageError::Empty => write!(f, "Empty"),
StageError::NoChannelsAvailable => write!(f, "No channels available"),
StageError::NoChannel => write!(f, "No channel"),
StageError::ChannelNotFound => write!(f, "Channel not found"),
StageError::MissingOrigin => write!(f, "Missing L1 origin"),
StageError::AttributesBuild(e) => write!(f, "Attributes build error: {}", e),
Expand Down

0 comments on commit 176a60c

Please sign in to comment.