diff --git a/src/batching.rs b/src/batching.rs index dab017c..9f8e119 100644 --- a/src/batching.rs +++ b/src/batching.rs @@ -4,6 +4,8 @@ use std::{ time::Duration, }; +#[cfg(test)] +use bytesize::ByteSize; use futures::{Stream, StreamExt}; use crate::types; @@ -13,7 +15,7 @@ use crate::types; pub struct AppendRecordsBatchingOpts { max_batch_records: usize, #[cfg(test)] - max_batch_size: bytesize::ByteSize, + max_batch_size: ByteSize, match_seq_num: Option, fencing_token: Option>, linger_duration: Duration, @@ -24,7 +26,7 @@ impl Default for AppendRecordsBatchingOpts { Self { max_batch_records: 1000, #[cfg(test)] - max_batch_size: bytesize::ByteSize::mib(1), + max_batch_size: ByteSize::mib(1), match_seq_num: None, fencing_token: None, linger_duration: Duration::from_millis(5), @@ -39,36 +41,30 @@ impl AppendRecordsBatchingOpts { } /// Maximum number of records in a batch. - pub fn with_max_batch_records(self, max_batch_records: usize) -> Result { - if max_batch_records == 0 - || max_batch_records > types::AppendRecordBatch::MAX_BATCH_CAPACITY - { - Err("Batch capacity must be between 1 and 1000".to_string()) - } else { - Ok(Self { - max_batch_records, - ..self - }) + pub fn with_max_batch_records(self, max_batch_records: usize) -> Self { + assert!( + max_batch_records > 0 + && max_batch_records <= types::AppendRecordBatch::MAX_BATCH_CAPACITY, + "Batch capacity must be between 1 and 1000" + ); + Self { + max_batch_records, + ..self } } /// Maximum size of a batch in bytes. #[cfg(test)] - pub fn with_max_batch_size( - self, - max_batch_size: impl Into, - ) -> Result { + pub fn with_max_batch_size(self, max_batch_size: impl Into) -> Self { let max_batch_size = max_batch_size.into(); - - if max_batch_size == bytesize::ByteSize(0) - || max_batch_size > types::AppendRecordBatch::MAX_METERED_SIZE - { - Err("Batch size must be between 1 byte and 1000 MiB".to_string()) - } else { - Ok(Self { - max_batch_size, - ..self - }) + assert!( + max_batch_size > ByteSize::b(0) + && max_batch_size <= types::AppendRecordBatch::MAX_METERED_SIZE, + "Batch capacity must be between 1 and 1000" + ); + Self { + max_batch_size, + ..self } } @@ -192,7 +188,6 @@ impl<'a> BatchBuilder<'a> { #[cfg(not(test))] fn new_batch(opts: &AppendRecordsBatchingOpts) -> types::AppendRecordBatch { types::AppendRecordBatch::with_max_capacity(opts.max_batch_records) - .expect("previously validated max capacity") } #[cfg(test)] @@ -201,7 +196,6 @@ impl<'a> BatchBuilder<'a> { opts.max_batch_records, opts.max_batch_size, ) - .expect("previously validated max capacity and size parameters") } pub fn push(&mut self, record: impl Into) { @@ -282,10 +276,10 @@ mod tests { let mut opts = AppendRecordsBatchingOpts::new().with_linger(Duration::ZERO); if let Some(max_batch_records) = max_batch_records { - opts = opts.with_max_batch_records(max_batch_records).unwrap(); + opts = opts.with_max_batch_records(max_batch_records); } if let Some(max_batch_size) = max_batch_size { - opts = opts.with_max_batch_size(max_batch_size).unwrap(); + opts = opts.with_max_batch_size(max_batch_size); } let batch_stream = AppendRecordsBatchingStream::new(stream, opts); @@ -316,9 +310,7 @@ mod tests { AppendRecordsBatchingOpts::new() .with_linger(Duration::from_secs(2)) .with_max_batch_records(3) - .unwrap() - .with_max_batch_size(ByteSize::b(40)) - .unwrap(), + .with_max_batch_size(ByteSize::b(40)), ); batch_stream @@ -402,9 +394,7 @@ mod tests { let mut batch_stream = AppendRecordsBatchingStream::new( stream, - AppendRecordsBatchingOpts::new() - .with_max_batch_size(ByteSize::b(1)) - .unwrap(), + AppendRecordsBatchingOpts::new().with_max_batch_size(ByteSize::b(1)), ); let _ = batch_stream.next().await; diff --git a/src/types.rs b/src/types.rs index e5eb3c0..a3eb2ff 100644 --- a/src/types.rs +++ b/src/types.rs @@ -875,39 +875,34 @@ impl AppendRecordBatch { pub fn new() -> Self { Self::with_max_capacity_and_size_inner(Self::MAX_BATCH_CAPACITY, Self::MAX_METERED_SIZE) - .expect("valid default max capacity and size") } - pub fn with_max_capacity(max_capacity: usize) -> Result { + pub fn with_max_capacity(max_capacity: usize) -> Self { Self::with_max_capacity_and_size_inner(max_capacity, Self::MAX_METERED_SIZE) } #[cfg(test)] - pub fn with_max_capacity_and_size( - max_capacity: usize, - max_size: ByteSize, - ) -> Result { + pub fn with_max_capacity_and_size(max_capacity: usize, max_size: ByteSize) -> Self { Self::with_max_capacity_and_size_inner(max_capacity, max_size) } - fn with_max_capacity_and_size_inner( - max_capacity: usize, - max_size: ByteSize, - ) -> Result { - if max_capacity == 0 || max_capacity > Self::MAX_BATCH_CAPACITY { - return Err("Batch capacity must be between 1 and 1000".into()); - } + fn with_max_capacity_and_size_inner(max_capacity: usize, max_size: ByteSize) -> Self { + assert!( + max_capacity > 0 && max_capacity <= Self::MAX_BATCH_CAPACITY, + "Batch capacity must be between 1 and 1000" + ); - if max_size == ByteSize(0) || max_size > Self::MAX_METERED_SIZE { - return Err("Batch size must be between 1 byte and 1000 MiB".into()); - } + assert!( + max_size > ByteSize(0) || max_size <= Self::MAX_METERED_SIZE, + "Batch size must be between 1 byte and 1 MiB" + ); - Ok(Self { + Self { records: Vec::with_capacity(max_capacity), metered_size: ByteSize(0), max_capacity, max_size, - }) + } } pub fn try_from_iter(iter: T) -> Result)>