diff --git a/Cargo.lock b/Cargo.lock index 56ee426f4..8cca54c46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -327,6 +327,7 @@ dependencies = [ "anyhow", "async-trait", "hashbrown", + "miniz_oxide", "serde", "tokio", "unsigned-varint", diff --git a/crates/derive/Cargo.toml b/crates/derive/Cargo.toml index 38e640b31..fbdb6947a 100644 --- a/crates/derive/Cargo.toml +++ b/crates/derive/Cargo.toml @@ -19,6 +19,7 @@ alloy-sol-types = { version = "0.6.3", default-features = false } async-trait = "0.1.77" hashbrown = "0.14.3" unsigned-varint = "0.8.0" +miniz_oxide = { version = "0.7.2" } # Optional serde = { version = "1.0.197", default-features = false, features = ["derive"], optional = true } diff --git a/crates/derive/src/stages/channel_bank.rs b/crates/derive/src/stages/channel_bank.rs index 39d8520a8..4da1ee609 100644 --- a/crates/derive/src/stages/channel_bank.rs +++ b/crates/derive/src/stages/channel_bank.rs @@ -88,8 +88,8 @@ where pub fn ingest_frame(&mut self, frame: Frame) -> StageResult<()> { let origin = *self.origin().ok_or(anyhow!("No origin"))?; + // Get the channel for the frame, or create a new one if it doesn't exist. let current_channel = self.channels.entry(frame.id).or_insert_with(|| { - // Create a new channel let channel = Channel::new(frame.id, origin); self.channel_queue.push_back(frame.id); channel diff --git a/crates/derive/src/stages/channel_reader.rs b/crates/derive/src/stages/channel_reader.rs index 8b1378917..05c2edc78 100644 --- a/crates/derive/src/stages/channel_reader.rs +++ b/crates/derive/src/stages/channel_reader.rs @@ -1 +1,111 @@ +//! This module contains the `ChannelReader` struct. +use super::channel_bank::ChannelBank; +use crate::{ + traits::{ChainProvider, DataAvailabilityProvider}, + types::{Batch, BlockInfo, StageError, StageResult}, +}; +use alloc::vec::Vec; +use anyhow::anyhow; +use core::fmt::Debug; +use miniz_oxide::inflate::decompress_to_vec; + +/// [ChannelReader] is a stateful stage that does the following: +#[derive(Debug)] +pub struct ChannelReader +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, +{ + /// The previous stage of the derivation pipeline. + prev: ChannelBank, + /// The batch reader. + next_batch: Option, +} + +impl ChannelReader +where + DAP: DataAvailabilityProvider + Debug, + CP: ChainProvider + Debug, +{ + /// Create a new [ChannelReader] stage. + pub fn new(prev: ChannelBank) -> Self { + Self { + prev, + next_batch: None, + } + } + + /// Pulls out the next Batch from the available channel. + pub async fn next_batch(&mut self) -> StageResult { + if let Err(e) = self.set_batch_reader().await { + self.next_channel(); + return Err(e); + } + match self + .next_batch + .as_mut() + .unwrap() + .next_batch() + .ok_or(StageError::NotEnoughData) + { + Ok(batch) => Ok(batch), + Err(e) => { + self.next_channel(); + Err(e) + } + } + } + + /// Creates the batch reader from available channel data. + async fn set_batch_reader(&mut self) -> StageResult<()> { + if self.next_batch.is_none() { + let channel = self.prev.next_data().await?.ok_or(anyhow!("no channel"))?; + self.next_batch = Some(BatchReader::from(&channel[..])); + } + Ok(()) + } + + /// Returns the L1 origin [BlockInfo]. + pub fn origin(&self) -> Option<&BlockInfo> { + self.prev.origin() + } + + /// Forces the read to continue with the next channel, resetting any + /// decoding / decompression state to a fresh start. + pub fn next_channel(&mut self) { + self.next_batch = None; + } +} + +/// Batch Reader provides a function that iteratively consumes batches from the reader. +/// The L1Inclusion block is also provided at creation time. +/// Warning: the batch reader can read every batch-type. +/// The caller of the batch-reader should filter the results. +#[derive(Debug)] +pub(crate) struct BatchReader { + /// The raw data to decode. + data: Option>, + /// Decompressed data. + decompressed: Vec, +} + +impl BatchReader { + /// Pulls out the next batch from the reader. + pub(crate) fn next_batch(&mut self) -> Option { + if let Some(data) = self.data.take() { + self.decompressed = decompress_to_vec(&data).ok()?; + } + let batch = Batch::decode(&mut self.decompressed.as_ref()).ok()?; + Some(batch) + } +} + +impl From<&[u8]> for BatchReader { + fn from(data: &[u8]) -> Self { + Self { + data: Some(data.to_vec()), + decompressed: Vec::new(), + } + } +} diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index b1e079bbf..761995d9b 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -24,7 +24,9 @@ pub use frame_queue::FrameQueue; mod channel_bank; pub use channel_bank::ChannelBank; -mod batch_queue; mod channel_reader; +pub use channel_reader::ChannelReader; + +mod batch_queue; mod engine_queue; mod payload_derivation; diff --git a/crates/derive/src/types/batch.rs b/crates/derive/src/types/batch.rs new file mode 100644 index 000000000..5b52bd8e3 --- /dev/null +++ b/crates/derive/src/types/batch.rs @@ -0,0 +1,40 @@ +//! This module contains the enumerable [Batch]. + +use super::batch_type::BatchType; +use super::single_batch::SingleBatch; +use crate::types::errors::DecodeError; + +use alloy_rlp::Decodable; + +// TODO: replace this with a span batch +/// Span Batch. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SpanBatch {} + +/// A Batch. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Batch { + /// A single batch + Single(SingleBatch), + /// Span Batches + Span(SpanBatch), +} + +impl Batch { + /// Attempts to decode a batch from a byte slice. + pub fn decode(r: &mut &[u8]) -> Result { + if r.is_empty() { + return Err(DecodeError::EmptyBuffer); + } + match BatchType::from(r[0]) { + BatchType::Single => { + let single_batch = SingleBatch::decode(r)?; + Ok(Batch::Single(single_batch)) + } + BatchType::Span => { + // TODO: implement span batch decoding + unimplemented!() + } + } + } +} diff --git a/crates/derive/src/types/batch_type.rs b/crates/derive/src/types/batch_type.rs new file mode 100644 index 000000000..068452816 --- /dev/null +++ b/crates/derive/src/types/batch_type.rs @@ -0,0 +1,67 @@ +//! Contains the [BatchType] and its encodings. + +use alloy_rlp::{Decodable, Encodable}; + +/// The single batch type identifier. +pub(crate) const SINGLE_BATCH_TYPE: u8 = 0x01; + +/// The span batch type identifier. +pub(crate) const SPAN_BATCH_TYPE: u8 = 0x02; + +/// The Batch Type. +#[derive(Debug, Clone, PartialEq, Eq)] +#[repr(u8)] +pub enum BatchType { + /// Single Batch. + Single = SINGLE_BATCH_TYPE, + /// Span Batch. + Span = SPAN_BATCH_TYPE, +} + +impl From for BatchType { + fn from(val: u8) -> Self { + match val { + SINGLE_BATCH_TYPE => BatchType::Single, + SPAN_BATCH_TYPE => BatchType::Span, + _ => panic!("Invalid batch type"), + } + } +} + +impl From<&[u8]> for BatchType { + fn from(buf: &[u8]) -> Self { + BatchType::from(buf[0]) + } +} + +impl Encodable for BatchType { + fn encode(&self, out: &mut dyn alloy_rlp::BufMut) { + let val = match self { + BatchType::Single => SINGLE_BATCH_TYPE, + BatchType::Span => SPAN_BATCH_TYPE, + }; + val.encode(out); + } +} + +impl Decodable for BatchType { + fn decode(buf: &mut &[u8]) -> alloy_rlp::Result { + let val = u8::decode(buf)?; + Ok(BatchType::from(val)) + } +} + +#[cfg(test)] +mod test { + use super::*; + use alloc::vec::Vec; + + #[test] + fn test_batch_type() { + let batch_type = BatchType::Single; + let mut buf = Vec::new(); + batch_type.encode(&mut buf); + let decoded = BatchType::decode(&mut buf.as_slice()).unwrap(); + assert_eq!(batch_type, decoded); + } +} diff --git a/crates/derive/src/types/errors.rs b/crates/derive/src/types/errors.rs index 98d3220b8..b5659922c 100644 --- a/crates/derive/src/types/errors.rs +++ b/crates/derive/src/types/errors.rs @@ -43,3 +43,37 @@ impl Display for StageError { } } } + +/// A decoding error. +#[derive(Debug)] +pub enum DecodeError { + /// The buffer is empty. + EmptyBuffer, + /// Alloy RLP Encoding Error. + AlloyRlpError(alloy_rlp::Error), +} + +impl From for DecodeError { + fn from(e: alloy_rlp::Error) -> Self { + DecodeError::AlloyRlpError(e) + } +} + +impl PartialEq for DecodeError { + fn eq(&self, other: &DecodeError) -> bool { + matches!( + (self, other), + (DecodeError::EmptyBuffer, DecodeError::EmptyBuffer) + | (DecodeError::AlloyRlpError(_), DecodeError::AlloyRlpError(_)) + ) + } +} + +impl Display for DecodeError { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + match self { + DecodeError::EmptyBuffer => write!(f, "Empty buffer"), + DecodeError::AlloyRlpError(e) => write!(f, "Alloy RLP Decoding Error: {}", e), + } + } +} diff --git a/crates/derive/src/types/mod.rs b/crates/derive/src/types/mod.rs index 91b7725cb..2025ab547 100644 --- a/crates/derive/src/types/mod.rs +++ b/crates/derive/src/types/mod.rs @@ -3,6 +3,12 @@ use alloc::vec::Vec; use alloy_rlp::{Decodable, Encodable}; +mod batch; +pub use batch::Batch; + +mod batch_type; +pub use batch_type::BatchType; + mod system_config; pub use system_config::{ SystemAccounts, SystemConfig, SystemConfigUpdateType, CONFIG_UPDATE_EVENT_VERSION_0, @@ -43,7 +49,7 @@ mod channel; pub use channel::Channel; mod errors; -pub use errors::{StageError, StageResult}; +pub use errors::{DecodeError, StageError, StageResult}; mod single_batch; pub use single_batch::SingleBatch;