From 26cde8536ec458e43f6f4bf71cc19ce4ea90e634 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 21 Nov 2024 20:37:04 -0500 Subject: [PATCH 1/5] feat(protocol): compressor impls --- crates/protocol/Cargo.toml | 1 - crates/protocol/src/channel_out.rs | 123 +++++--- .../{brotli.rs => brotli/compress.rs} | 263 ++++++++---------- .../src/compression/brotli/decompress.rs | 103 +++++++ crates/protocol/src/compression/brotli/mod.rs | 28 ++ crates/protocol/src/compression/config.rs | 22 ++ crates/protocol/src/compression/mod.rs | 20 +- crates/protocol/src/compression/ratio.rs | 77 ++++- crates/protocol/src/compression/shadow.rs | 114 +++++++- crates/protocol/src/compression/traits.rs | 39 ++- crates/protocol/src/compression/types.rs | 14 + crates/protocol/src/compression/variant.rs | 78 ++++-- crates/protocol/src/compression/zlib.rs | 66 ++++- crates/protocol/src/lib.rs | 8 +- 14 files changed, 718 insertions(+), 238 deletions(-) rename crates/protocol/src/compression/{brotli.rs => brotli/compress.rs} (58%) create mode 100644 crates/protocol/src/compression/brotli/decompress.rs create mode 100644 crates/protocol/src/compression/brotli/mod.rs create mode 100644 crates/protocol/src/compression/config.rs diff --git a/crates/protocol/Cargo.toml b/crates/protocol/Cargo.toml index 2ff33603..e388ab5a 100644 --- a/crates/protocol/Cargo.toml +++ b/crates/protocol/Cargo.toml @@ -26,7 +26,6 @@ alloy-eips.workspace = true alloy-consensus.workspace = true # Misc -cfg-if.workspace = true tracing.workspace = true thiserror.workspace = true async-trait.workspace = true diff --git a/crates/protocol/src/channel_out.rs b/crates/protocol/src/channel_out.rs index 637dd337..e155a679 100644 --- a/crates/protocol/src/channel_out.rs +++ b/crates/protocol/src/channel_out.rs @@ -1,8 +1,7 @@ //! Contains the `ChannelOut` primitive for Optimism. -use crate::{Batch, ChannelId, Compressor, Frame}; +use crate::{Batch, ChannelCompressor, ChannelId, CompressorError, Frame}; use alloc::vec; -use alloy_primitives::Bytes; use op_alloy_genesis::RollupConfig; /// The frame overhead. @@ -20,9 +19,9 @@ pub enum ChannelOutError { /// Missing compressed batch data. #[error("Missing compressed batch data")] MissingData, - /// An error from brotli compression. - #[error("Error from Brotli compression")] - BrotliCompression, + /// An error from compression. + #[error("Error from compression")] + Compression(#[from] CompressorError), /// An error encoding the `Batch`. #[error("Error encoding the batch")] BatchEncoding, @@ -32,10 +31,10 @@ pub enum ChannelOutError { } /// [ChannelOut] constructs a channel from compressed, encoded batch data. -#[derive(Debug, Clone)] +#[allow(missing_debug_implementations)] pub struct ChannelOut<'a, C> where - C: Compressor + Clone + core::fmt::Debug, + C: ChannelCompressor, { /// The unique identifier for the channel. pub id: ChannelId, @@ -49,27 +48,26 @@ where pub closed: bool, /// The frame number. pub frame_number: u16, - /// Compressed batch data. - pub compressed: Option, /// The compressor. pub compressor: C, } impl<'a, C> ChannelOut<'a, C> where - C: Compressor + Clone + core::fmt::Debug, + C: ChannelCompressor, { /// Creates a new [ChannelOut] with the given [ChannelId]. pub const fn new(id: ChannelId, config: &'a RollupConfig, compressor: C) -> Self { - Self { - id, - config, - rlp_length: 0, - frame_number: 0, - closed: false, - compressed: None, - compressor, - } + Self { id, config, rlp_length: 0, frame_number: 0, closed: false, compressor } + } + + /// Resets the [ChannelOut] to its initial state. + pub fn reset(&mut self) { + self.rlp_length = 0; + self.frame_number = 0; + self.closed = false; + self.compressor.reset(); + // TODO: read random bytes into the channel id. } /// Accepts the given [crate::Batch] data into the [ChannelOut], compressing it @@ -89,14 +87,25 @@ where return Err(ChannelOutError::ExceedsMaxRlpBytesPerChannel); } - self.compressed = Some(self.compressor.compress(&buf).into()); + self.compressor.write(&buf)?; Ok(()) } + /// Returns the total amount of rlp-encoded input bytes. + pub const fn input_bytes(&self) -> u64 { + self.rlp_length + } + /// Returns the number of bytes ready to be output to a frame. pub fn ready_bytes(&self) -> usize { - self.compressed.as_ref().map_or(0, |c| c.len()) + self.compressor.len() + } + + /// Flush the internal compressor. + pub fn flush(&mut self) -> Result<(), ChannelOutError> { + self.compressor.flush()?; + Ok(()) } /// Closes the channel if not already closed. @@ -120,15 +129,11 @@ where } // Read `max_size` bytes from the compressed data. - let data = if let Some(data) = &self.compressed { - &data[..max_size] - } else { - return Err(ChannelOutError::MissingData); - }; - frame.data.extend_from_slice(data); + let mut data = Vec::with_capacity(max_size); + self.compressor.read(&mut data).map_err(ChannelOutError::Compression)?; + frame.data.extend_from_slice(data.as_slice()); // Update the compressed data. - self.compressed = self.compressed.as_mut().map(|b| b.split_off(max_size)); self.frame_number += 1; Ok(frame) } @@ -137,36 +142,70 @@ where #[cfg(test)] mod tests { use super::*; - use crate::{SingleBatch, SpanBatch}; + use crate::{CompressorResult, CompressorWriter, SingleBatch, SpanBatch}; + use alloy_primitives::Bytes; + + #[derive(Debug, Clone, Default)] + struct MockCompressor { + pub compressed: Option, + } + + impl CompressorWriter for MockCompressor { + fn write(&mut self, data: &[u8]) -> CompressorResult { + let data = data.to_vec(); + let written = data.len(); + self.compressed = Some(Bytes::from(data)); + Ok(written) + } + + fn flush(&mut self) -> CompressorResult<()> { + Ok(()) + } - #[derive(Debug, Clone)] - struct MockCompressor; + fn close(&mut self) -> CompressorResult<()> { + Ok(()) + } + + fn reset(&mut self) { + self.compressed = None; + } + + fn len(&self) -> usize { + self.compressed.as_ref().map(|b| b.len()).unwrap_or(0) + } + + fn read(&mut self, buf: &mut [u8]) -> CompressorResult { + let len = self.compressed.as_ref().map(|b| b.len()).unwrap_or(0); + buf[..len].copy_from_slice(self.compressed.as_ref().unwrap()); + Ok(len) + } + } - impl Compressor for MockCompressor { - fn compress(&self, data: &[u8]) -> Vec { - data.to_vec() + impl ChannelCompressor for MockCompressor { + fn get_compressed(&self) -> Vec { + self.compressed.as_ref().unwrap().to_vec() } } #[test] fn test_channel_out_ready_bytes_empty() { let config = RollupConfig::default(); - let channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); + let channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); assert_eq!(channel.ready_bytes(), 0); } #[test] fn test_channel_out_ready_bytes_some() { let config = RollupConfig::default(); - let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); - channel.compressed = Some(Bytes::from(vec![1, 2, 3])); + let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); + channel.compressor.write(&[1, 2, 3]).unwrap(); assert_eq!(channel.ready_bytes(), 3); } #[test] fn test_channel_out_close() { let config = RollupConfig::default(); - let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); + let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); assert!(!channel.closed); channel.close(); @@ -176,7 +215,7 @@ mod tests { #[test] fn test_channel_out_add_batch_closed() { let config = RollupConfig::default(); - let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); + let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); channel.close(); let batch = Batch::Single(SingleBatch::default()); @@ -186,7 +225,7 @@ mod tests { #[test] fn test_channel_out_empty_span_batch_decode_error() { let config = RollupConfig::default(); - let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); + let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); let batch = Batch::Span(SpanBatch::default()); assert_eq!(channel.add_batch(batch), Err(ChannelOutError::BatchEncoding)); @@ -195,7 +234,7 @@ mod tests { #[test] fn test_channel_out_max_rlp_bytes_per_channel() { let config = RollupConfig::default(); - let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); + let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); let batch = Batch::Single(SingleBatch::default()); channel.rlp_length = config.max_rlp_bytes_per_channel(batch.timestamp()); @@ -206,7 +245,7 @@ mod tests { #[test] fn test_channel_out_add_batch() { let config = RollupConfig::default(); - let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor); + let mut channel = ChannelOut::new(ChannelId::default(), &config, MockCompressor::default()); let batch = Batch::Single(SingleBatch::default()); assert_eq!(channel.add_batch(batch), Ok(())); diff --git a/crates/protocol/src/compression/brotli.rs b/crates/protocol/src/compression/brotli/compress.rs similarity index 58% rename from crates/protocol/src/compression/brotli.rs rename to crates/protocol/src/compression/brotli/compress.rs index 9fe3cc39..1c78490f 100644 --- a/crates/protocol/src/compression/brotli.rs +++ b/crates/protocol/src/compression/brotli/compress.rs @@ -1,15 +1,61 @@ -//! Brotli Compression and Decompression +//! Brotli Compression -use alloc::{vec, vec::Vec}; -use alloc_no_stdlib::*; -use brotli::*; -use core::ops; +use crate::{BrotliLevel, ChannelCompressor, CompressorError, CompressorResult, CompressorWriter}; +use std::{cell::RefCell, io::Write, rc::Rc, vec::Vec}; -use crate::MAX_SPAN_BATCH_ELEMENTS; +const DEFAULT_BROTLI_LGWIN: u32 = 22; -/// The brotli compressor. +/// A Brotli Compression Error. +#[derive(thiserror::Error, Debug)] +pub enum BrotliCompressionError { + /// Unimplemented in no_std environments. + #[error("brotli compression is not supported in no_std environments")] + NoStd, + /// An error returned by the `std` brotli compression method. + #[error("Error from Brotli compression: {0}")] + CompressionError(#[from] std::io::Error), +} + +/// A buffer wrapped in an Rc> #[derive(Debug, Clone)] +struct BrotliBuffer(Rc>>); + +impl BrotliBuffer { + /// Create a new BrotliBuffer. + pub(crate) fn new() -> Self { + Self(Rc::new(RefCell::new(Vec::new()))) + } + + /// Get the buffer. + pub(crate) fn get(&self) -> Rc>> { + self.0.clone() + } + + /// Returns the length of the buffer. + pub(crate) fn len(&self) -> usize { + self.0.borrow().len() + } +} + +impl Write for BrotliBuffer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.0.borrow_mut().write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +/// The brotli compressor. +#[allow(missing_debug_implementations)] pub struct BrotliCompressor { + /// The writer. + writer: brotli::CompressorWriter, + /// The buffer to write to. + buffer: BrotliBuffer, + /// Marks that the compressor is closed. + closed: bool, /// The compression level. pub level: BrotliLevel, } @@ -17,7 +63,19 @@ pub struct BrotliCompressor { impl BrotliCompressor { /// Creates a new brotli compressor with the given compression level. pub fn new(level: impl Into) -> Self { - Self { level: level.into() } + let level = level.into(); + let buffer = BrotliBuffer::new(); + Self { + closed: false, + writer: brotli::CompressorWriter::new( + buffer.clone(), + 0, + level.into(), + DEFAULT_BROTLI_LGWIN, + ), + buffer, + level, + } } } @@ -27,186 +85,95 @@ impl From for BrotliCompressor { } } -impl crate::Compressor for BrotliCompressor { - fn compress(&self, data: &[u8]) -> Vec { - compress_brotli(data, self.level).unwrap() +impl CompressorWriter for BrotliCompressor { + fn write(&mut self, data: &[u8]) -> CompressorResult { + if self.closed { + return Err(CompressorError::Brotli); + } + let written = self.writer.write(data).map_err(|_| CompressorError::Brotli)?; + Ok(written) } -} - -/// The brotli encoding level used in Optimism. -/// -/// See: -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum BrotliLevel { - /// The fastest compression level. - Brotli9 = 9, - /// The default compression level. - Brotli10 = 10, - /// The highest compression level. - Brotli11 = 11, -} -/// A frame decompression error. -#[derive(thiserror::Error, Debug, PartialEq, Eq)] -pub enum BatchDecompressionError { - /// The buffer exceeds the [MAX_SPAN_BATCH_ELEMENTS] protocol parameter. - #[error("The batch exceeds the maximum number of elements: {max_size}", max_size = MAX_SPAN_BATCH_ELEMENTS)] - BatchTooLarge, -} + fn flush(&mut self) -> CompressorResult<()> { + self.writer.flush().map_err(|_| CompressorError::Brotli)?; + Ok(()) + } -/// A Brotli Compression Error. -#[derive(thiserror::Error, Debug)] -pub enum BrotliCompressionError { - /// Unimplemented in no_std environments. - #[error("brotli compression is not supported in no_std environments")] - NoStd, - /// An error returned by the `std` brotli compression method. - #[cfg(feature = "std")] - #[error("Error from Brotli compression: {0}")] - CompressionError(#[from] std::io::Error), -} + fn close(&mut self) -> CompressorResult<()> { + self.flush()?; + self.closed = true; + Ok(()) + } -/// Compresses the given bytes data using the Brotli compressor implemented -/// in the [`brotli`](https://crates.io/crates/brotli) crate. -/// -/// Note: The level must be between 0 and 11. In Optimism, the levels 9, 10, and 11 are used. -/// By default, [BrotliLevel::Brotli10] is used. -#[allow(unused_variables)] -#[allow(unused_mut)] -pub fn compress_brotli( - mut input: &[u8], - level: BrotliLevel, -) -> Result, BrotliCompressionError> { - cfg_if::cfg_if! { - if #[cfg(feature = "std")] { - use brotli::enc::{BrotliCompress, BrotliEncoderParams}; - let mut output = alloc::vec![]; - BrotliCompress( - &mut input, - &mut output, - &BrotliEncoderParams { quality: level as i32, ..Default::default() }, - )?; - Ok(output) - } else { - unimplemented!("brotli compression is not supported in no_std environments") - } + fn reset(&mut self) { + self.closed = false; + self.writer = brotli::CompressorWriter::new( + BrotliBuffer::new(), + 0, + self.level.into(), + DEFAULT_BROTLI_LGWIN, + ); } -} -/// Decompresses the given bytes data using the Brotli decompressor implemented -/// in the [`brotli`](https://crates.io/crates/brotli) crate. -pub fn decompress_brotli( - data: &[u8], - max_rlp_bytes_per_channel: usize, -) -> Result, BatchDecompressionError> { - declare_stack_allocator_struct!(MemPool, 4096, stack); - - let mut u8_buffer = vec![0; 32 * 1024 * 1024].into_boxed_slice(); - let mut u32_buffer = vec![0; 1024 * 1024].into_boxed_slice(); - let mut hc_buffer = vec![HuffmanCode::default(); 4 * 1024 * 1024].into_boxed_slice(); - let u8_allocator = MemPool::::new_allocator(&mut u8_buffer, bzero); - let u32_allocator = MemPool::::new_allocator(&mut u32_buffer, bzero); - let hc_allocator = MemPool::::new_allocator(&mut hc_buffer, bzero); - let mut brotli_state = BrotliState::new(u8_allocator, u32_allocator, hc_allocator); - - // Setup the decompressor inputs and outputs - let mut output = vec![0; data.len()]; - let mut available_in = data.len(); - let mut input_offset = 0; - let mut available_out = output.len(); - let mut output_offset = 0; - let mut written = 0; - - // Decompress the data stream until success or failure - loop { - match brotli::BrotliDecompressStream( - &mut available_in, - &mut input_offset, - data, - &mut available_out, - &mut output_offset, - &mut output, - &mut written, - &mut brotli_state, - ) { - brotli::BrotliResult::ResultSuccess => break, - brotli::BrotliResult::NeedsMoreOutput => { - // Resize the output buffer to double the size, following standard - // practice for buffer resizing in streams. - let old_len = output.len(); - let new_len = old_len * 2; - - if new_len > max_rlp_bytes_per_channel { - return Err(BatchDecompressionError::BatchTooLarge); - } - - output.resize(new_len, 0); - available_out += old_len; - } - _ => break, - } + fn read(&mut self, buf: &mut [u8]) -> CompressorResult { + let len = self.buffer.get().borrow().len().min(buf.len()); + buf[..len].copy_from_slice(&self.buffer.get().borrow()[..len]); + Ok(len) } - // Truncate the output buffer to the written bytes - output.truncate(written); + fn len(&self) -> usize { + self.writer.get_ref().len() + } +} - Ok(output) +impl ChannelCompressor for BrotliCompressor { + fn get_compressed(&self) -> Vec { + self.buffer.get().borrow().clone() + } } #[cfg(test)] mod test { use super::*; + use crate::decompress_brotli; use alloy_primitives::hex; use op_alloy_genesis::MAX_RLP_BYTES_PER_CHANNEL_FJORD; #[test] - #[cfg(feature = "std")] fn test_compress_brotli() { let expected = hex!("8b048075ed184249e9bc19675e03"); let decompressed = hex!("75ed184249e9bc19675e"); - let compressed = compress_brotli(&decompressed, BrotliLevel::Brotli11).unwrap(); + let mut compressor = BrotliCompressor::new(BrotliLevel::Brotli11); + compressor.write(&decompressed).unwrap(); + compressor.close().unwrap(); + let compressed = compressor.get_compressed(); assert_eq!(compressed, expected); } #[test] - #[cfg(feature = "std")] fn test_compress_batch_brotli() { let raw_batch_decompressed = hex!(""); let raw_batch = hex!(""); - let compressed = compress_brotli(&raw_batch_decompressed, BrotliLevel::Brotli10).unwrap(); + let mut compressor = BrotliCompressor::new(BrotliLevel::Brotli10); + compressor.write(&raw_batch_decompressed).unwrap(); + compressor.close().unwrap(); + let compressed = compressor.get_compressed(); assert_eq!(compressed, raw_batch); } #[test] - #[cfg(feature = "std")] fn test_brotli_roundtrip() { let raw_batch_decompressed = hex!(""); - let compressed = compress_brotli(&raw_batch_decompressed, BrotliLevel::Brotli11).unwrap(); - let decompressed = - decompress_brotli(&compressed, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); - assert_eq!(decompressed, raw_batch_decompressed); - } - - #[test] - fn test_decompress_brotli() { - let expected = hex!("75ed184249e9bc19675e"); - let compressed = hex!("8b048075ed184249e9bc19675e03"); + let mut compressor = BrotliCompressor::new(BrotliLevel::Brotli11); + compressor.write(&raw_batch_decompressed).unwrap(); + compressor.close().unwrap(); + let compressed = compressor.get_compressed(); let decompressed = decompress_brotli(&compressed, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); - assert_eq!(decompressed, expected); - } - - #[test] - fn test_decompress_batch_brotli() { - let raw_batch_decompressed = hex!(""); - let raw_batch = hex!(""); - - let decompressed = - decompress_brotli(&raw_batch, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); assert_eq!(decompressed, raw_batch_decompressed); } } diff --git a/crates/protocol/src/compression/brotli/decompress.rs b/crates/protocol/src/compression/brotli/decompress.rs new file mode 100644 index 00000000..5fb750e2 --- /dev/null +++ b/crates/protocol/src/compression/brotli/decompress.rs @@ -0,0 +1,103 @@ +//! Contains brotli decompression utilities. + +use alloc::{vec, vec::Vec}; +use alloc_no_stdlib::*; +use brotli::*; +use core::ops; + +use crate::MAX_SPAN_BATCH_ELEMENTS; + +/// A frame decompression error. +#[derive(thiserror::Error, Debug, PartialEq, Eq)] +pub enum BrotliDecompressionError { + /// The buffer exceeds the [MAX_SPAN_BATCH_ELEMENTS] protocol parameter. + #[error("The batch exceeds the maximum number of elements: {max_size}", max_size = MAX_SPAN_BATCH_ELEMENTS)] + BatchTooLarge, +} + +/// Decompresses the given bytes data using the Brotli decompressor implemented +/// in the [`brotli`](https://crates.io/crates/brotli) crate. +pub fn decompress_brotli( + data: &[u8], + max_rlp_bytes_per_channel: usize, +) -> Result, BrotliDecompressionError> { + declare_stack_allocator_struct!(MemPool, 4096, stack); + + let mut u8_buffer = vec![0; 32 * 1024 * 1024].into_boxed_slice(); + let mut u32_buffer = vec![0; 1024 * 1024].into_boxed_slice(); + let mut hc_buffer = vec![HuffmanCode::default(); 4 * 1024 * 1024].into_boxed_slice(); + let u8_allocator = MemPool::::new_allocator(&mut u8_buffer, bzero); + let u32_allocator = MemPool::::new_allocator(&mut u32_buffer, bzero); + let hc_allocator = MemPool::::new_allocator(&mut hc_buffer, bzero); + let mut brotli_state = BrotliState::new(u8_allocator, u32_allocator, hc_allocator); + + // Setup the decompressor inputs and outputs + let mut output = vec![0; data.len()]; + let mut available_in = data.len(); + let mut input_offset = 0; + let mut available_out = output.len(); + let mut output_offset = 0; + let mut written = 0; + + // Decompress the data stream until success or failure + loop { + match brotli::BrotliDecompressStream( + &mut available_in, + &mut input_offset, + data, + &mut available_out, + &mut output_offset, + &mut output, + &mut written, + &mut brotli_state, + ) { + brotli::BrotliResult::ResultSuccess => break, + brotli::BrotliResult::NeedsMoreOutput => { + // Resize the output buffer to double the size, following standard + // practice for buffer resizing in streams. + let old_len = output.len(); + let new_len = old_len * 2; + + if new_len > max_rlp_bytes_per_channel { + return Err(BrotliDecompressionError::BatchTooLarge); + } + + output.resize(new_len, 0); + available_out += old_len; + } + _ => break, + } + } + + // Truncate the output buffer to the written bytes + output.truncate(written); + + Ok(output) +} + +#[cfg(test)] +mod test { + use super::*; + use alloy_primitives::hex; + use op_alloy_genesis::MAX_RLP_BYTES_PER_CHANNEL_FJORD; + + #[test] + fn test_decompress_brotli() { + let expected = hex!("75ed184249e9bc19675e"); + let compressed = hex!("8b048075ed184249e9bc19675e03"); + + let decompressed = + decompress_brotli(&compressed, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); + assert_eq!(decompressed, expected); + } + + #[test] + fn test_decompress_batch_brotli() { + let raw_batch_decompressed = hex!(""); + let raw_batch = hex!(""); + + let decompressed = + decompress_brotli(&raw_batch, MAX_RLP_BYTES_PER_CHANNEL_FJORD as usize).unwrap(); + assert_eq!(decompressed, raw_batch_decompressed); + } +} diff --git a/crates/protocol/src/compression/brotli/mod.rs b/crates/protocol/src/compression/brotli/mod.rs new file mode 100644 index 00000000..140c6448 --- /dev/null +++ b/crates/protocol/src/compression/brotli/mod.rs @@ -0,0 +1,28 @@ +//! Contains brotli compression and decompression utilities. + +#[cfg(feature = "std")] +mod compress; +#[cfg(feature = "std")] +pub use compress::{BrotliCompressionError, BrotliCompressor}; + +mod decompress; +pub use decompress::{decompress_brotli, BrotliDecompressionError}; + +/// The brotli encoding level used in Optimism. +/// +/// See: +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BrotliLevel { + /// The fastest compression level. + Brotli9 = 9, + /// The default compression level. + Brotli10 = 10, + /// The highest compression level. + Brotli11 = 11, +} + +impl From for u32 { + fn from(level: BrotliLevel) -> Self { + level as Self + } +} diff --git a/crates/protocol/src/compression/config.rs b/crates/protocol/src/compression/config.rs new file mode 100644 index 00000000..72f485e4 --- /dev/null +++ b/crates/protocol/src/compression/config.rs @@ -0,0 +1,22 @@ +//! Compression configuration. + +use crate::{CompressionAlgo, CompressorType}; + +/// Configuration for the compressor itself. +#[derive(Debug, Clone)] +pub struct Config { + /// TargetOutputSize is the target size that the compressed data should reach. + /// The shadow compressor guarantees that the compressed data stays below + /// this bound. The ratio compressor might go over. + pub target_output_size: u64, + /// ApproxComprRatio to assume (only ratio compressor). Should be slightly smaller + /// than average from experiments to avoid the chances of creating a small + /// additional leftover frame. + pub approx_compr_ratio: f64, + /// Kind of compressor to use. Must be one of KindKeys. If unset, NewCompressor + /// will default to RatioKind. + pub kind: CompressorType, + + /// Type of compression algorithm to use. Must be one of [zlib, brotli-(9|10|11)] + pub compression_algo: CompressionAlgo, +} diff --git a/crates/protocol/src/compression/mod.rs b/crates/protocol/src/compression/mod.rs index 9727da01..5f0f35cb 100644 --- a/crates/protocol/src/compression/mod.rs +++ b/crates/protocol/src/compression/mod.rs @@ -1,25 +1,33 @@ //! Contains compression and decompression primitives for Optimism. +#[cfg(feature = "std")] mod variant; +#[cfg(feature = "std")] pub use variant::VariantCompressor; +mod config; +pub use config::Config; + mod types; -pub use types::{CompressionAlgo, CompressorType}; +pub use types::{CompressionAlgo, CompressorError, CompressorResult, CompressorType}; mod zlib; pub use zlib::{compress_zlib, decompress_zlib, ZlibCompressor}; mod brotli; -pub use brotli::{ - compress_brotli, decompress_brotli, BatchDecompressionError, BrotliCompressionError, - BrotliCompressor, BrotliLevel, -}; +pub use brotli::{decompress_brotli, BrotliDecompressionError, BrotliLevel}; +#[cfg(feature = "std")] +pub use brotli::{BrotliCompressionError, BrotliCompressor}; mod traits; -pub use traits::Compressor; +pub use traits::{ChannelCompressor, CompressorWriter}; +#[cfg(feature = "std")] mod shadow; +#[cfg(feature = "std")] pub use shadow::ShadowCompressor; +#[cfg(feature = "std")] mod ratio; +#[cfg(feature = "std")] pub use ratio::RatioCompressor; diff --git a/crates/protocol/src/compression/ratio.rs b/crates/protocol/src/compression/ratio.rs index d2bbb396..bc80c0ae 100644 --- a/crates/protocol/src/compression/ratio.rs +++ b/crates/protocol/src/compression/ratio.rs @@ -4,6 +4,77 @@ //! //! [rc]: https://github.com/ethereum-optimism/optimism/blob/develop/op-batcher/compressor/ratio_compressor.go#L7 -/// The ratio compressor. -#[derive(Debug, Clone)] -pub struct RatioCompressor; +use crate::{CompressorResult, CompressorWriter, Config, VariantCompressor}; + +/// Ratio Compressor +/// +/// The ratio compressor uses the target size and a compression ration parameter +/// to determine how much data can be written to the compressor before it's +/// considered full. The full calculation is as follows: +/// +/// full = uncompressedLength * approxCompRatio >= targetFrameSize * targetNumFrames +/// +/// The ratio compressor wraps a [VariantCompressor] which dispatches to the +/// appropriate compression algorithm (ZLIB or Brotli). +#[allow(missing_debug_implementations)] +pub struct RatioCompressor { + /// The compressor configuration. + config: Config, + /// The amount of data currently in the compressor. + lake: u64, + /// The inner [VariantCompressor] that will be used to compress the data. + compressor: VariantCompressor, +} + +impl RatioCompressor { + /// Create a new [RatioCompressor] with the given [VariantCompressor]. + pub const fn new(config: Config, compressor: VariantCompressor) -> Self { + Self { config, lake: 0, compressor } + } + + /// Calculates the input threshold in bytes. + pub fn input_threshold(&self) -> usize { + let target_frame_size = self.config.target_output_size; + let approx_comp_ratio = self.config.approx_compr_ratio; + + (target_frame_size as f64 / approx_comp_ratio) as usize + } + + /// Returns if the compressor is full (exceeds the input threshold). + pub fn is_full(&self) -> bool { + self.lake >= self.input_threshold() as u64 + } +} + +impl From for RatioCompressor { + fn from(config: Config) -> Self { + let compressor = VariantCompressor::from(config.compression_algo); + Self::new(config, compressor) + } +} + +impl CompressorWriter for RatioCompressor { + fn write(&mut self, data: &[u8]) -> CompressorResult { + self.compressor.write(data) + } + + fn flush(&mut self) -> CompressorResult<()> { + self.compressor.flush() + } + + fn close(&mut self) -> CompressorResult<()> { + self.compressor.close() + } + + fn reset(&mut self) { + self.compressor.reset(); + } + + fn len(&self) -> usize { + self.compressor.len() + } + + fn read(&mut self, buf: &mut [u8]) -> CompressorResult { + self.compressor.read(buf) + } +} diff --git a/crates/protocol/src/compression/shadow.rs b/crates/protocol/src/compression/shadow.rs index 438e3a39..df2352aa 100644 --- a/crates/protocol/src/compression/shadow.rs +++ b/crates/protocol/src/compression/shadow.rs @@ -4,6 +4,114 @@ //! //! [sc]: https://github.com/ethereum-optimism/optimism/blob/develop/op-batcher/compressor/shadow_compressor.go#L18 -/// The shadow compressor. -#[derive(Debug, Clone)] -pub struct ShadowCompressor; +use crate::{CompressorError, CompressorResult, CompressorWriter, Config, VariantCompressor}; + +/// The largest potential blow-up in bytes we expect to see when compressing +/// arbitrary (e.g. random) data. Here we account for a 2 byte header, 4 byte +/// digest, 5 byte eof indicator, and then 5 byte flate block header for each 16k of potential +/// data. Assuming frames are max 128k size (the current max blob size) this is 2+4+5+(5*8) = 51 +/// bytes. If we start using larger frames (e.g. should max blob size increase) a larger blowup +/// might be possible, but it would be highly unlikely, and the system still works if our +/// estimate is wrong -- we just end up writing one more tx for the overflow. +const SAFE_COMPRESSION_OVERHEAD: u64 = 51; + +// The number of final bytes a `zlib.Writer` call writes to the output buffer. +const CLOSE_OVERHEAD_ZLIB: u64 = 9; + +/// Shadow Compressor +/// +/// The shadow compressor contains two compression buffers, one for size estimation, and +/// one for the final compressed data. The first compression buffer is flushed on every +/// write, and the second isn't, which means the final compressed data is always at least +/// smaller than the size estimation. +/// +/// One exception to the rule is when the first write to the buffer is not checked against +/// the target. This allows individual blocks larger than the target to be included. +/// Notice, this will be split across multiple channel frames. +#[allow(missing_debug_implementations)] +pub struct ShadowCompressor { + /// The compressor configuration. + config: Config, + /// The inner [VariantCompressor] that will be used to compress the data. + compressor: VariantCompressor, + /// The shadow compressor. + shadow: VariantCompressor, + + /// Flags that the buffer is full. + is_full: bool, + /// An upper bound on the size of the compressed data. + bound: u64, +} + +impl ShadowCompressor { + /// Creates a new [ShadowCompressor] with the given [VariantCompressor]. + pub const fn new( + config: Config, + compressor: VariantCompressor, + shadow: VariantCompressor, + ) -> Self { + Self { config, is_full: false, compressor, shadow, bound: SAFE_COMPRESSION_OVERHEAD } + } +} + +impl From for ShadowCompressor { + fn from(config: Config) -> Self { + let compressor = VariantCompressor::from(config.compression_algo); + let shadow = VariantCompressor::from(config.compression_algo); + Self::new(config, compressor, shadow) + } +} + +impl CompressorWriter for ShadowCompressor { + fn write(&mut self, data: &[u8]) -> CompressorResult { + // If the buffer is full, error so the user can flush. + if self.is_full { + return Err(CompressorError::Full); + } + + // Write to the shadow compressor. + self.shadow.write(data)?; + + // The new bound increases by the length of the compressed data. + let mut newbound = data.len() as u64; + if newbound > self.config.target_output_size { + // Don't flush the buffer if there's a chance we're over the size limit. + self.shadow.flush()?; + newbound = self.shadow.len() as u64 + CLOSE_OVERHEAD_ZLIB; + if newbound > self.config.target_output_size { + self.is_full = true; + // Only error if the buffer has been written to. + if self.compressor.len() > 0 { + return Err(CompressorError::Full); + } + } + } + + // Update the bound and compress. + self.bound = newbound; + self.compressor.write(data) + } + + fn len(&self) -> usize { + self.compressor.len() + } + + fn flush(&mut self) -> CompressorResult<()> { + self.shadow.flush() + } + + fn close(&mut self) -> CompressorResult<()> { + self.shadow.close() + } + + fn reset(&mut self) { + self.compressor.reset(); + self.shadow.reset(); + self.is_full = false; + self.bound = SAFE_COMPRESSION_OVERHEAD; + } + + fn read(&mut self, buf: &mut [u8]) -> CompressorResult { + self.compressor.read(buf) + } +} diff --git a/crates/protocol/src/compression/traits.rs b/crates/protocol/src/compression/traits.rs index 8d369bf2..0ba24d32 100644 --- a/crates/protocol/src/compression/traits.rs +++ b/crates/protocol/src/compression/traits.rs @@ -1,9 +1,38 @@ //! Contains the core `Compressor` trait. -use alloc::vec::Vec; +use crate::CompressorResult; -/// The Compressor trait abstracts compression. -pub trait Compressor { - /// Compresses the given data. - fn compress(&self, data: &[u8]) -> Vec; +/// Compressor Writer +/// +/// A trait that expands the standard library `Write` trait to include +/// compression-specific methods and return [CompressorResult] instead of +/// standard library `Result`. +#[allow(clippy::len_without_is_empty)] +pub trait CompressorWriter { + /// Writes the given data to the compressor. + fn write(&mut self, data: &[u8]) -> CompressorResult; + + /// Flushes the buffer. + fn flush(&mut self) -> CompressorResult<()>; + + /// Closes the compressor. + fn close(&mut self) -> CompressorResult<()>; + + /// Resets the compressor. + fn reset(&mut self); + + /// Returns the length of the compressed data. + fn len(&self) -> usize; + + /// Reads the compressed data into the given buffer. + /// Returns the number of bytes read. + fn read(&mut self, buf: &mut [u8]) -> CompressorResult; +} + +/// Channel Compressor +/// +/// A compressor for channels. +pub trait ChannelCompressor: CompressorWriter { + /// Returns the compressed data buffer. + fn get_compressed(&self) -> Vec; } diff --git a/crates/protocol/src/compression/types.rs b/crates/protocol/src/compression/types.rs index fa5eb24c..18e20d07 100644 --- a/crates/protocol/src/compression/types.rs +++ b/crates/protocol/src/compression/types.rs @@ -3,6 +3,20 @@ use crate::BrotliLevel; use alloc::borrow::Borrow; +/// The result from compressing data. +pub type CompressorResult = Result; + +/// An error returned by the compressor. +#[derive(Debug, thiserror::Error, Clone, Copy, PartialEq, Eq)] +pub enum CompressorError { + /// Thrown when the compressor is full. + #[error("compressor is full")] + Full, + /// Brotli compression failed. + #[error("brotli compression failed")] + Brotli, +} + /// The type of compressor to use. /// /// See: diff --git a/crates/protocol/src/compression/variant.rs b/crates/protocol/src/compression/variant.rs index bfcccc35..a3e29807 100644 --- a/crates/protocol/src/compression/variant.rs +++ b/crates/protocol/src/compression/variant.rs @@ -1,15 +1,17 @@ -//! A variant over the different implementations of [Compressor]. +//! A variant over the different implementations of [ChannelCompressor]. -use crate::{BrotliCompressor, CompressionAlgo, Compressor, ZlibCompressor}; -use alloc::vec::Vec; +use crate::{ + BrotliCompressor, ChannelCompressor, CompressionAlgo, CompressorResult, CompressorWriter, + ZlibCompressor, +}; use op_alloy_genesis::RollupConfig; /// The channel compressor wraps the brotli and zlib compressor types, -/// implementing the [Compressor] trait itself. -#[derive(Debug, Clone)] +/// implementing the [ChannelCompressor] trait itself. +#[allow(missing_debug_implementations)] pub enum VariantCompressor { /// The brotli compressor. - Brotli(BrotliCompressor), + Brotli(Box), /// The zlib compressor. Zlib(ZlibCompressor), } @@ -18,18 +20,62 @@ impl VariantCompressor { /// Constructs a [VariantCompressor] using the given [RollupConfig] and timestamp. pub fn from_timestamp(config: &RollupConfig, timestamp: u64) -> Self { if config.is_fjord_active(timestamp) { - Self::Brotli(BrotliCompressor::new(CompressionAlgo::Brotli10)) + Self::Brotli(Box::new(BrotliCompressor::new(CompressionAlgo::Brotli10))) } else { - Self::Zlib(ZlibCompressor) + Self::Zlib(ZlibCompressor::new()) } } } -impl Compressor for VariantCompressor { - fn compress(&self, data: &[u8]) -> Vec { +impl CompressorWriter for VariantCompressor { + fn write(&mut self, data: &[u8]) -> CompressorResult { match self { - Self::Brotli(compressor) => compressor.compress(data), - Self::Zlib(compressor) => compressor.compress(data), + Self::Brotli(compressor) => compressor.write(data), + Self::Zlib(compressor) => compressor.write(data), + } + } + + fn flush(&mut self) -> CompressorResult<()> { + match self { + Self::Brotli(compressor) => compressor.flush(), + Self::Zlib(compressor) => compressor.flush(), + } + } + + fn close(&mut self) -> CompressorResult<()> { + match self { + Self::Brotli(compressor) => compressor.close(), + Self::Zlib(compressor) => compressor.close(), + } + } + + fn reset(&mut self) { + match self { + Self::Brotli(compressor) => compressor.reset(), + Self::Zlib(compressor) => compressor.reset(), + } + } + + fn len(&self) -> usize { + match self { + Self::Brotli(compressor) => compressor.len(), + Self::Zlib(compressor) => compressor.len(), + } + } + + fn read(&mut self, buf: &mut [u8]) -> CompressorResult { + match self { + Self::Brotli(compressor) => compressor.read(buf), + Self::Zlib(compressor) => compressor.read(buf), + } + } +} + +impl ChannelCompressor for VariantCompressor { + fn get_compressed(&self) -> Vec { + match self { + Self::Brotli(compressor) => compressor.get_compressed(), + Self::Zlib(compressor) => compressor.get_compressed(), } } } @@ -37,10 +83,10 @@ impl Compressor for VariantCompressor { impl From for VariantCompressor { fn from(algo: CompressionAlgo) -> Self { match algo { - lvl @ CompressionAlgo::Brotli9 => Self::Brotli(BrotliCompressor::new(lvl)), - lvl @ CompressionAlgo::Brotli10 => Self::Brotli(BrotliCompressor::new(lvl)), - lvl @ CompressionAlgo::Brotli11 => Self::Brotli(BrotliCompressor::new(lvl)), - CompressionAlgo::Zlib => Self::Zlib(ZlibCompressor), + lvl @ CompressionAlgo::Brotli9 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))), + lvl @ CompressionAlgo::Brotli10 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))), + lvl @ CompressionAlgo::Brotli11 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))), + CompressionAlgo::Zlib => Self::Zlib(ZlibCompressor::new()), } } } diff --git a/crates/protocol/src/compression/zlib.rs b/crates/protocol/src/compression/zlib.rs index 6231577c..cf252107 100644 --- a/crates/protocol/src/compression/zlib.rs +++ b/crates/protocol/src/compression/zlib.rs @@ -1,29 +1,73 @@ //! Contains ZLIB compression and decompression primitives for Optimism. -use crate::Compressor; +use crate::{ChannelCompressor, CompressorResult, CompressorWriter}; use alloc::vec::Vec; use miniz_oxide::inflate::DecompressError; /// The best compression. const BEST_ZLIB_COMPRESSION: u8 = 9; +/// Method to compress data using ZLIB. +pub fn compress_zlib(data: &[u8]) -> Vec { + miniz_oxide::deflate::compress_to_vec(data, BEST_ZLIB_COMPRESSION) +} + +/// Method to decompress data using ZLIB. +pub fn decompress_zlib(data: &[u8]) -> Result, DecompressError> { + miniz_oxide::inflate::decompress_to_vec(data) +} + /// The ZLIB compressor. #[derive(Debug, Clone, Default)] #[non_exhaustive] -pub struct ZlibCompressor; +pub struct ZlibCompressor { + /// Holds a non-compressed buffer. + buffer: Vec, + /// The compressed buffer. + compressed: Vec, +} -impl Compressor for ZlibCompressor { - fn compress(&self, data: &[u8]) -> Vec { - compress_zlib(data) +impl ZlibCompressor { + /// Create a new ZLIB compressor. + pub const fn new() -> Self { + Self { buffer: Vec::new(), compressed: Vec::new() } } } -/// Method to compress data using ZLIB. -pub fn compress_zlib(data: &[u8]) -> Vec { - miniz_oxide::deflate::compress_to_vec(data, BEST_ZLIB_COMPRESSION) +impl CompressorWriter for ZlibCompressor { + fn write(&mut self, data: &[u8]) -> CompressorResult { + self.buffer.extend_from_slice(data); + self.compressed.clear(); + self.compressed.extend_from_slice(&compress_zlib(&self.buffer)); + Ok(data.len()) + } + + fn flush(&mut self) -> CompressorResult<()> { + Ok(()) + } + + fn close(&mut self) -> CompressorResult<()> { + Ok(()) + } + + fn reset(&mut self) { + self.buffer.clear(); + self.compressed.clear(); + } + + fn len(&self) -> usize { + self.compressed.len() + } + + fn read(&mut self, buf: &mut [u8]) -> CompressorResult { + let len = self.compressed.len().min(buf.len()); + buf[..len].copy_from_slice(&self.compressed[..len]); + Ok(len) + } } -/// Method to decompress data using ZLIB. -pub fn decompress_zlib(data: &[u8]) -> Result, DecompressError> { - miniz_oxide::inflate::decompress_to_vec(data) +impl ChannelCompressor for ZlibCompressor { + fn get_compressed(&self) -> Vec { + self.compressed.clone() + } } diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index 5693cbbb..feb6153e 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -32,10 +32,12 @@ pub use frame::{ mod compression; pub use compression::{ - compress_brotli, compress_zlib, decompress_brotli, decompress_zlib, BatchDecompressionError, - BrotliCompressionError, BrotliCompressor, BrotliLevel, CompressionAlgo, Compressor, - CompressorType, RatioCompressor, ShadowCompressor, VariantCompressor, ZlibCompressor, + compress_zlib, decompress_brotli, decompress_zlib, BrotliDecompressionError, BrotliLevel, + ChannelCompressor, CompressionAlgo, CompressorError, CompressorResult, CompressorType, + CompressorWriter, Config, RatioCompressor, ShadowCompressor, ZlibCompressor, }; +#[cfg(feature = "std")] +pub use compression::{BrotliCompressionError, BrotliCompressor, VariantCompressor}; mod iter; pub use iter::FrameIter; From 631e27d09d063ca950286a91e44a57cf65e2ef22 Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 5 Dec 2024 09:16:44 -0500 Subject: [PATCH 2/5] fix: no_std --- crates/protocol/src/channel_out.rs | 2 +- crates/protocol/src/compression/traits.rs | 1 + crates/protocol/src/lib.rs | 6 ++++-- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/crates/protocol/src/channel_out.rs b/crates/protocol/src/channel_out.rs index e155a679..f86740bf 100644 --- a/crates/protocol/src/channel_out.rs +++ b/crates/protocol/src/channel_out.rs @@ -1,7 +1,7 @@ //! Contains the `ChannelOut` primitive for Optimism. use crate::{Batch, ChannelCompressor, ChannelId, CompressorError, Frame}; -use alloc::vec; +use alloc::{vec, vec::Vec}; use op_alloy_genesis::RollupConfig; /// The frame overhead. diff --git a/crates/protocol/src/compression/traits.rs b/crates/protocol/src/compression/traits.rs index 0ba24d32..50ef1a32 100644 --- a/crates/protocol/src/compression/traits.rs +++ b/crates/protocol/src/compression/traits.rs @@ -1,6 +1,7 @@ //! Contains the core `Compressor` trait. use crate::CompressorResult; +use alloc::vec::Vec; /// Compressor Writer /// diff --git a/crates/protocol/src/lib.rs b/crates/protocol/src/lib.rs index feb6153e..a386a3cb 100644 --- a/crates/protocol/src/lib.rs +++ b/crates/protocol/src/lib.rs @@ -34,10 +34,12 @@ mod compression; pub use compression::{ compress_zlib, decompress_brotli, decompress_zlib, BrotliDecompressionError, BrotliLevel, ChannelCompressor, CompressionAlgo, CompressorError, CompressorResult, CompressorType, - CompressorWriter, Config, RatioCompressor, ShadowCompressor, ZlibCompressor, + CompressorWriter, Config, ZlibCompressor, }; #[cfg(feature = "std")] -pub use compression::{BrotliCompressionError, BrotliCompressor, VariantCompressor}; +pub use compression::{ + BrotliCompressionError, BrotliCompressor, RatioCompressor, ShadowCompressor, VariantCompressor, +}; mod iter; pub use iter::FrameIter; From 1eb0d6bb62225a06348a4bb915f942d96c9143db Mon Sep 17 00:00:00 2001 From: refcell Date: Thu, 5 Dec 2024 09:43:20 -0500 Subject: [PATCH 3/5] fix: mock streaming compression for now --- .../src/compression/brotli/compress.rs | 107 +++++++----------- crates/protocol/src/compression/ratio.rs | 2 +- crates/protocol/src/compression/shadow.rs | 2 +- crates/protocol/src/compression/variant.rs | 2 +- 4 files changed, 46 insertions(+), 67 deletions(-) diff --git a/crates/protocol/src/compression/brotli/compress.rs b/crates/protocol/src/compression/brotli/compress.rs index 1c78490f..207356a4 100644 --- a/crates/protocol/src/compression/brotli/compress.rs +++ b/crates/protocol/src/compression/brotli/compress.rs @@ -1,9 +1,7 @@ //! Brotli Compression use crate::{BrotliLevel, ChannelCompressor, CompressorError, CompressorResult, CompressorWriter}; -use std::{cell::RefCell, io::Write, rc::Rc, vec::Vec}; - -const DEFAULT_BROTLI_LGWIN: u32 = 22; +use std::vec::Vec; /// A Brotli Compression Error. #[derive(thiserror::Error, Debug)] @@ -16,44 +14,13 @@ pub enum BrotliCompressionError { CompressionError(#[from] std::io::Error), } -/// A buffer wrapped in an Rc> -#[derive(Debug, Clone)] -struct BrotliBuffer(Rc>>); - -impl BrotliBuffer { - /// Create a new BrotliBuffer. - pub(crate) fn new() -> Self { - Self(Rc::new(RefCell::new(Vec::new()))) - } - - /// Get the buffer. - pub(crate) fn get(&self) -> Rc>> { - self.0.clone() - } - - /// Returns the length of the buffer. - pub(crate) fn len(&self) -> usize { - self.0.borrow().len() - } -} - -impl Write for BrotliBuffer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.0.borrow_mut().write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - /// The brotli compressor. -#[allow(missing_debug_implementations)] +#[derive(Debug, Clone)] pub struct BrotliCompressor { - /// The writer. - writer: brotli::CompressorWriter, - /// The buffer to write to. - buffer: BrotliBuffer, + /// The compressed bytes. + compressed: Vec, + /// The raw bytes (need to store on reset). + raw: Vec, /// Marks that the compressor is closed. closed: bool, /// The compression level. @@ -64,18 +31,7 @@ impl BrotliCompressor { /// Creates a new brotli compressor with the given compression level. pub fn new(level: impl Into) -> Self { let level = level.into(); - let buffer = BrotliBuffer::new(); - Self { - closed: false, - writer: brotli::CompressorWriter::new( - buffer.clone(), - 0, - level.into(), - DEFAULT_BROTLI_LGWIN, - ), - buffer, - level, - } + Self { compressed: Vec::new(), raw: Vec::new(), closed: false, level } } } @@ -85,17 +41,44 @@ impl From for BrotliCompressor { } } +/// Compresses the given bytes data using the Brotli compressor implemented +/// in the [`brotli`](https://crates.io/crates/brotli) crate. +/// +/// Note: The level must be between 0 and 11. In Optimism, the levels 9, 10, and 11 are used. +/// By default, [BrotliLevel::Brotli10] is used. +#[allow(unused_variables)] +#[allow(unused_mut)] +fn compress_brotli( + mut input: &[u8], + level: BrotliLevel, +) -> Result, BrotliCompressionError> { + use brotli::enc::{BrotliCompress, BrotliEncoderParams}; + let mut output = alloc::vec![]; + BrotliCompress( + &mut input, + &mut output, + &BrotliEncoderParams { quality: level as i32, ..Default::default() }, + )?; + Ok(output) +} + impl CompressorWriter for BrotliCompressor { fn write(&mut self, data: &[u8]) -> CompressorResult { if self.closed { return Err(CompressorError::Brotli); } - let written = self.writer.write(data).map_err(|_| CompressorError::Brotli)?; - Ok(written) + + // First append the new data to the raw buffer. + self.raw.extend_from_slice(data); + + // Compress the raw buffer. + self.compressed = + compress_brotli(&self.raw, self.level).map_err(|_| CompressorError::Brotli)?; + + Ok(data.len()) } fn flush(&mut self) -> CompressorResult<()> { - self.writer.flush().map_err(|_| CompressorError::Brotli)?; Ok(()) } @@ -107,28 +90,24 @@ impl CompressorWriter for BrotliCompressor { fn reset(&mut self) { self.closed = false; - self.writer = brotli::CompressorWriter::new( - BrotliBuffer::new(), - 0, - self.level.into(), - DEFAULT_BROTLI_LGWIN, - ); + self.raw.clear(); + self.compressed.clear(); } fn read(&mut self, buf: &mut [u8]) -> CompressorResult { - let len = self.buffer.get().borrow().len().min(buf.len()); - buf[..len].copy_from_slice(&self.buffer.get().borrow()[..len]); + let len = self.compressed.len().min(buf.len()); + buf[..len].copy_from_slice(&self.compressed[..len]); Ok(len) } fn len(&self) -> usize { - self.writer.get_ref().len() + self.compressed.len() } } impl ChannelCompressor for BrotliCompressor { fn get_compressed(&self) -> Vec { - self.buffer.get().borrow().clone() + self.compressed.clone() } } diff --git a/crates/protocol/src/compression/ratio.rs b/crates/protocol/src/compression/ratio.rs index bc80c0ae..c420589b 100644 --- a/crates/protocol/src/compression/ratio.rs +++ b/crates/protocol/src/compression/ratio.rs @@ -16,7 +16,7 @@ use crate::{CompressorResult, CompressorWriter, Config, VariantCompressor}; /// /// The ratio compressor wraps a [VariantCompressor] which dispatches to the /// appropriate compression algorithm (ZLIB or Brotli). -#[allow(missing_debug_implementations)] +#[derive(Debug, Clone)] pub struct RatioCompressor { /// The compressor configuration. config: Config, diff --git a/crates/protocol/src/compression/shadow.rs b/crates/protocol/src/compression/shadow.rs index df2352aa..735521f2 100644 --- a/crates/protocol/src/compression/shadow.rs +++ b/crates/protocol/src/compression/shadow.rs @@ -28,7 +28,7 @@ const CLOSE_OVERHEAD_ZLIB: u64 = 9; /// One exception to the rule is when the first write to the buffer is not checked against /// the target. This allows individual blocks larger than the target to be included. /// Notice, this will be split across multiple channel frames. -#[allow(missing_debug_implementations)] +#[derive(Debug, Clone)] pub struct ShadowCompressor { /// The compressor configuration. config: Config, diff --git a/crates/protocol/src/compression/variant.rs b/crates/protocol/src/compression/variant.rs index a3e29807..261259ed 100644 --- a/crates/protocol/src/compression/variant.rs +++ b/crates/protocol/src/compression/variant.rs @@ -8,7 +8,7 @@ use op_alloy_genesis::RollupConfig; /// The channel compressor wraps the brotli and zlib compressor types, /// implementing the [ChannelCompressor] trait itself. -#[allow(missing_debug_implementations)] +#[derive(Debug, Clone)] pub enum VariantCompressor { /// The brotli compressor. Brotli(Box), From 70ad526503f57be923190ae1a147737302c5e8cb Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 6 Dec 2024 10:09:17 -0500 Subject: [PATCH 4/5] fix: rm box --- crates/protocol/src/compression/variant.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/protocol/src/compression/variant.rs b/crates/protocol/src/compression/variant.rs index 261259ed..5ea2aa0f 100644 --- a/crates/protocol/src/compression/variant.rs +++ b/crates/protocol/src/compression/variant.rs @@ -11,7 +11,7 @@ use op_alloy_genesis::RollupConfig; #[derive(Debug, Clone)] pub enum VariantCompressor { /// The brotli compressor. - Brotli(Box), + Brotli(BrotliCompressor), /// The zlib compressor. Zlib(ZlibCompressor), } @@ -20,7 +20,7 @@ impl VariantCompressor { /// Constructs a [VariantCompressor] using the given [RollupConfig] and timestamp. pub fn from_timestamp(config: &RollupConfig, timestamp: u64) -> Self { if config.is_fjord_active(timestamp) { - Self::Brotli(Box::new(BrotliCompressor::new(CompressionAlgo::Brotli10))) + Self::Brotli(BrotliCompressor::new(CompressionAlgo::Brotli10)) } else { Self::Zlib(ZlibCompressor::new()) } @@ -83,9 +83,9 @@ impl ChannelCompressor for VariantCompressor { impl From for VariantCompressor { fn from(algo: CompressionAlgo) -> Self { match algo { - lvl @ CompressionAlgo::Brotli9 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))), - lvl @ CompressionAlgo::Brotli10 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))), - lvl @ CompressionAlgo::Brotli11 => Self::Brotli(Box::new(BrotliCompressor::new(lvl))), + lvl @ CompressionAlgo::Brotli9 => Self::Brotli(BrotliCompressor::new(lvl)), + lvl @ CompressionAlgo::Brotli10 => Self::Brotli(BrotliCompressor::new(lvl)), + lvl @ CompressionAlgo::Brotli11 => Self::Brotli(BrotliCompressor::new(lvl)), CompressionAlgo::Zlib => Self::Zlib(ZlibCompressor::new()), } } From c9b705534e7df154f0654dc10ff9c43c303be51c Mon Sep 17 00:00:00 2001 From: refcell Date: Fri, 3 Jan 2025 17:03:25 -0800 Subject: [PATCH 5/5] fixes --- crates/protocol/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/protocol/Cargo.toml b/crates/protocol/Cargo.toml index ce45701f..dfc5d470 100644 --- a/crates/protocol/Cargo.toml +++ b/crates/protocol/Cargo.toml @@ -27,7 +27,6 @@ alloy-eips.workspace = true alloy-consensus.workspace = true # Misc -cfg-if.workspace = true derive_more.workspace = true tracing.workspace = true thiserror.workspace = true