From 5e1f8851576eb0a064035a1c25ac4b370636fb67 Mon Sep 17 00:00:00 2001 From: Felipe Cardozo Date: Tue, 15 Oct 2024 20:19:07 -0300 Subject: [PATCH] feat: fix record size larger than batch size (#4195) * feat: fix record size bigger than batch size * feat: add max_request_size * chore: clean up * chore: clean up part 2 * chore: remove clones * chore: add some tests and fix storage max_request_size --- Cargo.lock | 4 +- crates/fluvio-cli/src/client/produce/mod.rs | 13 +- crates/fluvio-storage/src/config.rs | 13 +- crates/fluvio-storage/src/replica.rs | 10 +- crates/fluvio-types/Cargo.toml | 2 +- crates/fluvio-types/src/defaults.rs | 3 +- crates/fluvio/Cargo.toml | 2 +- crates/fluvio/src/producer/accumulator.rs | 213 ++++++++++++++---- crates/fluvio/src/producer/config.rs | 13 ++ crates/fluvio/src/producer/error.rs | 2 +- crates/fluvio/src/producer/memory_batch.rs | 107 ++++++--- crates/fluvio/src/producer/mod.rs | 1 + .../consume-batch-size.bats | 19 +- .../produce-batch-size.bats | 94 ++++++++ .../cli/fluvio_smoke_tests/produce-error.bats | 2 +- 15 files changed, 407 insertions(+), 91 deletions(-) create mode 100644 tests/cli/fluvio_smoke_tests/produce-batch-size.bats diff --git a/Cargo.lock b/Cargo.lock index 4e3f995826..da42652795 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2388,7 +2388,7 @@ dependencies = [ [[package]] name = "fluvio" -version = "0.23.3" +version = "0.23.4" dependencies = [ "anyhow", "async-channel 1.9.0", @@ -3333,7 +3333,7 @@ dependencies = [ [[package]] name = "fluvio-types" -version = "0.5.1" +version = "0.5.2" dependencies = [ "event-listener 5.3.1", "fluvio-future", diff --git a/crates/fluvio-cli/src/client/produce/mod.rs b/crates/fluvio-cli/src/client/produce/mod.rs index f8c1ffb633..3c6846330c 100644 --- a/crates/fluvio-cli/src/client/produce/mod.rs +++ b/crates/fluvio-cli/src/client/produce/mod.rs @@ -100,10 +100,14 @@ mod cmd { #[arg(long, value_parser=parse_duration)] pub linger: Option, - /// Max amount of bytes accumulated before sending + /// Max number of records to batch before sending #[arg(long)] pub batch_size: Option, + /// Max amount of bytes accumulated before sending + #[arg(long)] + pub max_request_size: Option, + /// Isolation level that producer must respect. /// Supported values: read_committed (ReadCommitted) - wait for records to be committed before response, /// read_uncommitted (ReadUncommitted) - just wait for leader to accept records. @@ -212,6 +216,13 @@ mod cmd { config_builder }; + // Max request size + let config_builder = if let Some(max_request_size) = self.max_request_size { + config_builder.max_request_size(max_request_size) + } else { + config_builder + }; + // Isolation let config_builder = if let Some(isolation) = self.isolation { config_builder.isolation(isolation) diff --git a/crates/fluvio-storage/src/config.rs b/crates/fluvio-storage/src/config.rs index dc19937b31..d26b8f95f9 100644 --- a/crates/fluvio-storage/src/config.rs +++ b/crates/fluvio-storage/src/config.rs @@ -11,7 +11,8 @@ use serde::Deserialize; use fluvio_controlplane_metadata::topic::CleanupPolicy; use fluvio_types::defaults::{ SPU_LOG_INDEX_MAX_BYTES, SPU_LOG_BASE_DIR, STORAGE_FLUSH_WRITE_COUNT, STORAGE_FLUSH_IDLE_MSEC, - STORAGE_MAX_BATCH_SIZE, STORAGE_RETENTION_SECONDS, SPU_PARTITION_MAX_BYTES, + STORAGE_MAX_BATCH_SIZE, STORAGE_MAX_REQUEST_SIZE, STORAGE_RETENTION_SECONDS, + SPU_PARTITION_MAX_BYTES, }; use fluvio_types::defaults::SPU_LOG_INDEX_MAX_INTERVAL_BYTES; use fluvio_types::defaults::SPU_LOG_SEGMENT_MAX_BYTES; @@ -44,6 +45,9 @@ pub struct ReplicaConfig { #[builder(default = "default_max_batch_size()")] #[serde(default = "default_max_batch_size")] pub max_batch_size: Size, + #[builder(default = "default_max_request_size()")] + #[serde(default = "default_max_request_size")] + pub max_request_size: Size, #[builder(default = "default_update_hw()")] #[serde(default = "default_update_hw")] pub update_hw: bool, // if true, enable hw update @@ -120,6 +124,10 @@ const fn default_max_batch_size() -> Size { STORAGE_MAX_BATCH_SIZE } +const fn default_max_request_size() -> Size { + STORAGE_MAX_REQUEST_SIZE +} + const fn default_retention_seconds() -> Size { STORAGE_RETENTION_SECONDS } @@ -149,6 +157,7 @@ impl Default for ReplicaConfig { flush_write_count: default_flush_write_count(), flush_idle_msec: default_flush_idle_msec(), max_batch_size: default_max_batch_size(), + max_request_size: default_max_request_size(), retention_seconds: default_retention_seconds(), max_partition_size: default_max_partition_size(), update_hw: true, @@ -225,6 +234,7 @@ pub struct SharedReplicaConfig { pub flush_write_count: SharedConfigU32Value, pub flush_idle_msec: SharedConfigU32Value, pub max_batch_size: SharedConfigU32Value, + pub max_request_size: SharedConfigU32Value, pub update_hw: bool, // if true, enable hw update pub retention_seconds: SharedConfigU32Value, pub max_partition_size: SharedConfigU64Value, @@ -240,6 +250,7 @@ impl From for SharedReplicaConfig { flush_write_count: SharedConfigU32Value::new(config.flush_write_count), flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec), max_batch_size: SharedConfigU32Value::new(config.max_batch_size), + max_request_size: SharedConfigU32Value::new(config.max_request_size), update_hw: config.update_hw, retention_seconds: SharedConfigU32Value::new(config.retention_seconds), max_partition_size: SharedConfigU64Value::new(config.max_partition_size), diff --git a/crates/fluvio-storage/src/replica.rs b/crates/fluvio-storage/src/replica.rs index 5155408d0e..d2feafc0b8 100644 --- a/crates/fluvio-storage/src/replica.rs +++ b/crates/fluvio-storage/src/replica.rs @@ -132,7 +132,7 @@ impl ReplicaStorage for FileReplica { records: &mut RecordSet, update_highwatermark: bool, ) -> Result { - let max_batch_size = self.option.max_batch_size.get() as usize; + let max_request_size = self.option.max_request_size.get() as usize; let max_segment_size = self.option.segment_max_bytes.get() as usize; let mut total_size = 0; // check if any of the records's batch exceed max length @@ -146,8 +146,8 @@ impl ReplicaStorage for FileReplica { .into()); } total_size += batch_size; - if batch_size > max_batch_size { - return Err(StorageError::BatchTooBig(max_batch_size).into()); + if batch_size > max_request_size { + return Err(StorageError::BatchTooBig(max_request_size).into()); } } @@ -862,9 +862,9 @@ mod tests { } #[fluvio_future::test] - async fn test_replica_limit_batch() { + async fn test_replica_limit_request_size() { let mut option = base_option("test_batch_limit"); - option.max_batch_size = 100; + option.max_request_size = 100; option.update_hw = false; let mut replica = create_replica("test", START_OFFSET, option).await; diff --git a/crates/fluvio-types/Cargo.toml b/crates/fluvio-types/Cargo.toml index c964d73831..0b65804f18 100644 --- a/crates/fluvio-types/Cargo.toml +++ b/crates/fluvio-types/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio-types" -version = "0.5.1" +version = "0.5.2" authors = ["Fluvio Contributors "] edition = "2021" description = "Fluvio common types and objects" diff --git a/crates/fluvio-types/src/defaults.rs b/crates/fluvio-types/src/defaults.rs index b369d9aa73..6797dc7a0e 100644 --- a/crates/fluvio-types/src/defaults.rs +++ b/crates/fluvio-types/src/defaults.rs @@ -49,7 +49,8 @@ pub const STORAGE_RETENTION_SECONDS: u32 = 7 * 24 * 3600; pub const STORAGE_RETENTION_SECONDS_MIN: u32 = 10; // crd pub const STORAGE_FLUSH_WRITE_COUNT: u32 = 1; pub const STORAGE_FLUSH_IDLE_MSEC: u32 = 0; -pub const STORAGE_MAX_BATCH_SIZE: u32 = 33_554_432; +pub const STORAGE_MAX_BATCH_SIZE: u32 = 2_097_152; +pub const STORAGE_MAX_REQUEST_SIZE: u32 = 33_554_432; pub const SPU_SMARTENGINE_STORE_MAX_BYTES: usize = 1_073_741_824; //1Gb diff --git a/crates/fluvio/Cargo.toml b/crates/fluvio/Cargo.toml index 60bf56dbc8..1af42b9ec1 100644 --- a/crates/fluvio/Cargo.toml +++ b/crates/fluvio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "fluvio" -version = "0.23.3" +version = "0.23.4" edition = "2021" license = "Apache-2.0" authors = ["Fluvio Contributors "] diff --git a/crates/fluvio/src/producer/accumulator.rs b/crates/fluvio/src/producer/accumulator.rs index 21ecbdb368..537910ad3a 100644 --- a/crates/fluvio/src/producer/accumulator.rs +++ b/crates/fluvio/src/producer/accumulator.rs @@ -11,7 +11,7 @@ use tracing::trace; use futures_util::future::{BoxFuture, Either, Shared}; use futures_util::{FutureExt, ready}; -use fluvio_future::sync::Mutex; +use fluvio_future::sync::{Mutex, MutexGuard}; use fluvio_future::sync::Condvar; use fluvio_protocol::record::Batch; use fluvio_compression::Compression; @@ -27,7 +27,7 @@ use crate::producer::ProducerError; use crate::error::Result; use super::event::EventHandler; -use super::memory_batch::MemoryBatch; +use super::memory_batch::{MemoryBatch, MemoryBatchStatus}; const RECORD_ENQUEUE_TIMEOUT: Duration = Duration::from_secs(30); @@ -55,6 +55,7 @@ impl BatchesDeque { /// The batches are separated by PartitionId pub(crate) struct RecordAccumulator { batch_size: usize, + max_request_size: usize, queue_size: usize, batches: Arc>>, compression: Compression, @@ -63,16 +64,17 @@ pub(crate) struct RecordAccumulator { impl RecordAccumulator { pub(crate) fn new( batch_size: usize, + max_request_size: usize, queue_size: usize, partition_n: PartitionCount, compression: Compression, ) -> Self { - let mut batches = HashMap::new(); - for p in 0..partition_n { - batches.insert(p, (BatchEvents::shared(), BatchesDeque::shared())); - } + let batches = (0..partition_n) + .map(|p| (p, (BatchEvents::shared(), BatchesDeque::shared()))) + .collect::>(); Self { batches: Arc::new(RwLock::new(batches)), + max_request_size, batch_size, compression, queue_size, @@ -103,6 +105,56 @@ impl RecordAccumulator { .get(&partition_id) .ok_or(ProducerError::PartitionNotFound(partition_id))?; + // Wait for space in the batch queue + let mut batches = self.wait_for_space(batches_lock).await?; + + // If the last batch is not full, push the record to it + if let Some(batch) = batches.back_mut() { + match batch.push_record(record) { + Ok(ProduceBatchStatus::Added(push_record)) => { + if batch.is_full() { + batch_events.notify_batch_full().await; + } + return Ok(PushRecord::new( + push_record.into_future_record_metadata(partition_id), + )); + } + Ok(ProduceBatchStatus::NotAdded(record)) => { + if batch.is_full() { + batch_events.notify_batch_full().await; + } + + // Create and push a new batch if needed + let push_record = self + .create_and_new_batch(batch_events, &mut batches, record, 1) + .await?; + + return Ok(PushRecord::new( + push_record.into_future_record_metadata(partition_id), + )); + } + Err(err) => { + return Err(err); + } + } + } + + trace!(partition_id, "Creating a new batch"); + + // Create and push a new batch if needed + let push_record = self + .create_and_new_batch(batch_events, &mut batches, record, 1) + .await?; + + Ok(PushRecord::new( + push_record.into_future_record_metadata(partition_id), + )) + } + + async fn wait_for_space<'a>( + &self, + batches_lock: &'a Arc, + ) -> Result>, ProducerError> { let mut batches = batches_lock.batches.lock().await; if batches.len() >= self.queue_size { let (guard, wait_result) = batches_lock @@ -116,41 +168,48 @@ impl RecordAccumulator { } batches = guard; } - if let Some(batch) = batches.back_mut() { - if let Some(push_record) = batch.push_record(record.clone()) { - if batch.is_full() { - batch_events.notify_batch_full().await; - } - return Ok(PushRecord::new( - push_record.into_future_record_metadata(partition_id), - )); - } else { - batch_events.notify_batch_full().await; - } - } + Ok(batches) + } - trace!( - partition_id, - "Batch is full. Creating a new batch for partition" - ); + async fn create_and_new_batch( + &self, + batch_events: &BatchEvents, + batches: &mut VecDeque, + record: Record, + attempts: usize, + ) -> Result { + if attempts > 2 { + // This should never happen, but if it does, we should stop the recursion + return Err(ProducerError::Internal( + "Attempts exceeded while creating a new batch".to_string(), + )); + } - let mut batch = ProducerBatch::new(self.batch_size, self.compression); + let mut batch = + ProducerBatch::new(self.max_request_size, self.batch_size, self.compression); match batch.push_record(record) { - Some(push_record) => { + Ok(ProduceBatchStatus::Added(push_record)) => { batch_events.notify_new_batch().await; - if batch.is_full() { batch_events.notify_batch_full().await; } batches.push_back(batch); + Ok(push_record) + } + Ok(ProduceBatchStatus::NotAdded(record)) => { + batch_events.notify_new_batch().await; + if batch.is_full() { + batch_events.notify_batch_full().await; + } - Ok(PushRecord::new( - push_record.into_future_record_metadata(partition_id), - )) + batches.push_back(batch); + // Box the future to avoid infinite size due to recursion + Box::pin(self.create_and_new_batch(batch_events, batches, record, attempts + 1)) + .await } - None => Err(ProducerError::RecordTooLarge(self.batch_size)), + Err(err) => Err(err), } } @@ -170,16 +229,21 @@ where { } } +enum ProduceBatchStatus { + Added(PartialFutureRecordMetadata), + NotAdded(Record), +} + pub(crate) struct ProducerBatch { pub(crate) notify: Sender, batch_metadata: Arc, batch: MemoryBatch, } impl ProducerBatch { - fn new(write_limit: usize, compression: Compression) -> Self { + fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self { let (sender, receiver) = async_channel::bounded(1); let batch_metadata = Arc::new(BatchMetadata::new(receiver)); - let batch = MemoryBatch::new(write_limit, compression); + let batch = MemoryBatch::new(write_limit, batch_limit, compression); Self { notify: sender, @@ -191,13 +255,13 @@ impl ProducerBatch { /// Add a record to the batch. /// Return ProducerError::BatchFull if record does not fit in the batch, so /// the RecordAccumulator can create more batches if needed. - fn push_record(&mut self, record: Record) -> Option { + fn push_record(&mut self, record: Record) -> Result { match self.batch.push_record(record) { - None => None, - Some(relative_offset) => Some(PartialFutureRecordMetadata::new( - relative_offset, - self.batch_metadata.clone(), + Ok(MemoryBatchStatus::Added(offset)) => Ok(ProduceBatchStatus::Added( + PartialFutureRecordMetadata::new(offset, self.batch_metadata.clone()), )), + Ok(MemoryBatchStatus::NotAdded(record)) => Ok(ProduceBatchStatus::NotAdded(record)), + Err(err) => Err(err), } } @@ -327,6 +391,7 @@ mod test { // Producer batch that can store three instances of Record::from(("key", "value")) let mut pb = ProducerBatch::new( + 1_048_576, size * 3 + 1 + Batch::::default().write_size(0) @@ -334,13 +399,25 @@ mod test { Compression::None, ); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); assert!(!pb.is_full()); - assert!(pb.push_record(record).is_none()); + assert!(matches!( + pb.push_record(record), + Ok(ProduceBatchStatus::NotAdded(_)) + )); } #[test] @@ -350,19 +427,66 @@ mod test { // Producer batch that can store three instances of Record::from(("key", "value")) let mut pb = ProducerBatch::new( + 1_048_576, + size * 3 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), + Compression::None, + ); + + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + + assert!(pb.is_full()); + + assert!(matches!( + pb.push_record(record), + Ok(ProduceBatchStatus::NotAdded(_)) + )); + } + + #[test] + fn test_producer_write_limit() { + let record = Record::from(("key", "value")); + let size = record.write_size(0); + + // Producer batch that can store three instances of Record::from(("key", "value")) + let mut pb = ProducerBatch::new( + size * 3 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), size * 3 + Batch::::default().write_size(0) + Vec::::default().write_size(0), Compression::None, ); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); - assert!(pb.push_record(record.clone()).is_some()); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); + assert!(matches!( + pb.push_record(record.clone()), + Ok(ProduceBatchStatus::Added(_)) + )); assert!(pb.is_full()); - assert!(pb.push_record(record).is_none()); + assert!(pb.push_record(record).is_err()); } #[fluvio_future::test] @@ -374,6 +498,7 @@ mod test { size * 3 + Batch::::default().write_size(0) + Vec::::default().write_size(0), + 1_048_576, 10, 1, Compression::None, diff --git a/crates/fluvio/src/producer/config.rs b/crates/fluvio/src/producer/config.rs index d2cd23d552..d8add9816d 100644 --- a/crates/fluvio/src/producer/config.rs +++ b/crates/fluvio/src/producer/config.rs @@ -17,6 +17,7 @@ const DEFAULT_LINGER_MS: u64 = 100; const DEFAULT_TIMEOUT_MS: u64 = 1500; const DEFAULT_BATCH_SIZE_BYTES: usize = 16_384; const DEFAULT_BATCH_QUEUE_SIZE: usize = 100; +const DEFAULT_MAX_REQUEST_SIZE: usize = 1_048_576; const DEFAULT_RETRIES_TIMEOUT: Duration = Duration::from_secs(300); const DEFAULT_INITIAL_DELAY: Duration = Duration::from_millis(20); @@ -27,6 +28,10 @@ fn default_batch_size() -> usize { DEFAULT_BATCH_SIZE_BYTES } +fn default_max_request_size() -> usize { + DEFAULT_MAX_REQUEST_SIZE +} + fn default_batch_queue_size() -> usize { DEFAULT_BATCH_QUEUE_SIZE } @@ -68,6 +73,9 @@ pub struct TopicProducerConfig { /// Maximum amount of bytes accumulated by the records before sending the batch. #[builder(default = "default_batch_size()")] pub(crate) batch_size: usize, + /// Maximum amount of bytes that the server is allowed to process in a single request. + #[builder(default = "default_max_request_size()")] + pub(crate) max_request_size: usize, /// Maximum amount of batches waiting in the queue before sending to the SPU. #[builder(default = "default_batch_queue_size()")] pub(crate) batch_queue_size: usize, @@ -118,6 +126,10 @@ impl TopicProducerConfig { self.batch_size } + pub fn max_request_size(&self) -> usize { + self.max_request_size + } + pub fn batch_queue_size(&self) -> usize { self.batch_queue_size } @@ -148,6 +160,7 @@ impl Default for TopicProducerConfig { Self { linger: default_linger_duration(), batch_size: default_batch_size(), + max_request_size: default_max_request_size(), batch_queue_size: default_batch_queue_size(), partitioner: default_partitioner(), compression: None, diff --git a/crates/fluvio/src/producer/error.rs b/crates/fluvio/src/producer/error.rs index 4630ecff71..4bedac61e2 100644 --- a/crates/fluvio/src/producer/error.rs +++ b/crates/fluvio/src/producer/error.rs @@ -9,7 +9,7 @@ use crate::producer::PartitionId; #[derive(thiserror::Error, Debug, Clone)] #[non_exhaustive] pub enum ProducerError { - #[error("the given record is larger than the buffer max_size ({0} bytes). Try increasing the producer batch size or reducing the record size enabling a compression algorithm")] + #[error("the given record is larger than the max_request_size ({0} bytes)")] RecordTooLarge(usize), #[error("failed to send record metadata: {0}")] SendRecordMetadata(#[from] async_channel::SendError), diff --git a/crates/fluvio/src/producer/memory_batch.rs b/crates/fluvio/src/producer/memory_batch.rs index c4178b444a..286459a3f7 100644 --- a/crates/fluvio/src/producer/memory_batch.rs +++ b/crates/fluvio/src/producer/memory_batch.rs @@ -8,8 +8,14 @@ use fluvio_types::Timestamp; use super::*; +pub enum MemoryBatchStatus { + Added(Offset), + NotAdded(Record), +} + pub struct MemoryBatch { compression: Compression, + batch_limit: usize, write_limit: usize, current_size_uncompressed: usize, is_full: bool, @@ -17,11 +23,12 @@ pub struct MemoryBatch { records: Vec, } impl MemoryBatch { - pub fn new(write_limit: usize, compression: Compression) -> Self { + pub fn new(write_limit: usize, batch_limit: usize, compression: Compression) -> Self { let now = Utc::now().timestamp_millis(); Self { compression, is_full: false, + batch_limit, write_limit, create_time: now, current_size_uncompressed: Vec::::default().write_size(0), @@ -35,7 +42,9 @@ impl MemoryBatch { /// Add a record to the batch. /// The value of `Offset` is relative to the `MemoryBatch` instance. - pub fn push_record(&mut self, mut record: Record) -> Option { + pub fn push_record(&mut self, mut record: Record) -> Result { + let is_the_first_record = self.records_len() == 0; + let current_offset = self.offset() as i64; record .get_mut_header() @@ -45,25 +54,36 @@ impl MemoryBatch { record.get_mut_header().set_timestamp_delta(timestamp_delta); let record_size = record.write_size(0); + let actual_batch_size = self.raw_size() + record_size; - if self.estimated_size() + record_size > self.write_limit { + // Error if the record is too large + if actual_batch_size > self.write_limit { self.is_full = true; - return None; + return Err(ProducerError::RecordTooLarge(actual_batch_size)); } - if self.estimated_size() + record_size == self.write_limit { + // is full, but is first record, add to the batch and then we will send it directly + // is full, but is not the first record, then finish the batch and let this record to be added to next batch + // is not full, then add record to batch + if is_the_first_record { + if actual_batch_size > self.batch_limit { + self.is_full = true; + } + } else if actual_batch_size > self.batch_limit { + self.is_full = true; + return Ok(MemoryBatchStatus::NotAdded(record)); + } else if actual_batch_size == self.batch_limit { self.is_full = true; } self.current_size_uncompressed += record_size; - self.records.push(record); - Some(current_offset) + Ok(MemoryBatchStatus::Added(current_offset)) } pub fn is_full(&self) -> bool { - self.is_full || self.write_limit <= self.estimated_size() + self.is_full || self.raw_size() >= self.batch_limit } pub fn elapsed(&self) -> Timestamp { @@ -72,24 +92,8 @@ impl MemoryBatch { std::cmp::max(0, now - self.create_time) } - fn estimated_size(&self) -> usize { - (self.current_size_uncompressed as f32 * self.compression_coefficient()) as usize - + Batch::::default().write_size(0) - } - - fn compression_coefficient(&self) -> f32 { - cfg_if::cfg_if! { - if #[cfg(feature = "compress")] { - match self.compression { - Compression::None => 1.0, - Compression::Gzip | Compression::Snappy | Compression::Lz4 | Compression::Zstd => { - 0.5 - } - } - } else { - 1.0 - } - } + fn raw_size(&self) -> usize { + self.current_size_uncompressed + Batch::::default().write_size(0) } pub fn records_len(&self) -> usize { @@ -154,16 +158,26 @@ mod test { size * 4 + Batch::::default().write_size(0) + Vec::::default().write_size(0), + 1_048_576, Compression::None, ); - assert!(mb.push_record(record).is_some()); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).is_some()); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); std::thread::sleep(std::time::Duration::from_millis(100)); let record = Record::from(("key", "value")); - assert!(mb.push_record(record).is_some()); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); let batch: Batch = mb.into(); assert!( @@ -196,6 +210,31 @@ mod test { ); } + #[test] + fn test_is_the_first_record_from_batch_and_actual_batch_size_larger_then_batch_limit() { + let record = Record::from(("key", "value")); + let size = record.write_size(0); + + let mut mb = MemoryBatch::new( + 1_048_576, + size / 2 + + Batch::::default().write_size(0) + + Vec::::default().write_size(0), + Compression::None, + ); + + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::Added(_)) + )); + std::thread::sleep(std::time::Duration::from_millis(100)); + let record = Record::from(("key", "value")); + assert!(matches!( + mb.push_record(record.clone()), + Ok(MemoryBatchStatus::NotAdded(_)) + )); + } + #[test] fn test_convert_memory_batch_to_batch() { let num_records = 10; @@ -204,17 +243,23 @@ mod test { let memory_batch_compression = Compression::Gzip; // This MemoryBatch write limit is minimal value to pass test - let mut memory_batch = MemoryBatch::new(180, memory_batch_compression); + let mut memory_batch = MemoryBatch::new(360, 1_048_576, memory_batch_compression); let mut offset = 0; for _ in 0..num_records { - offset = memory_batch + let status = memory_batch .push_record(Record { value: RecordData::from(record_data.clone()), ..Default::default() }) .expect("Offset should exist"); + + if let MemoryBatchStatus::Added(o) = status { + offset = o; + } else { + panic!("this should not happen"); + } } let memory_batch_records_len = memory_batch.records_len(); diff --git a/crates/fluvio/src/producer/mod.rs b/crates/fluvio/src/producer/mod.rs index 966401de1d..90f22679e6 100644 --- a/crates/fluvio/src/producer/mod.rs +++ b/crates/fluvio/src/producer/mod.rs @@ -413,6 +413,7 @@ where let record_accumulator = RecordAccumulator::new( config.batch_size, + config.max_request_size, config.batch_queue_size, partition_count, compression, diff --git a/tests/cli/fluvio_smoke_tests/consume-batch-size.bats b/tests/cli/fluvio_smoke_tests/consume-batch-size.bats index 6580bb17c4..3f4273b066 100644 --- a/tests/cli/fluvio_smoke_tests/consume-batch-size.bats +++ b/tests/cli/fluvio_smoke_tests/consume-batch-size.bats @@ -25,14 +25,29 @@ teardown_file() { run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" } -@test "Produce message with batch large" { +@test "Produce message with max request size" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME.txt" - run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 15000000 --file $TOPIC_NAME.txt --linger 30s' + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --max-request-size 15000000 --file $TOPIC_NAME.txt --linger 30s' + assert_success } # Consume message and compare message @test "Consume message" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run timeout 15s "$FLUVIO_BIN" consume "$TOPIC_NAME" -B -d --maxbytes 16000000 assert_output --partial "abcdefghijklmnopqrstuvwxyz" assert_success diff --git a/tests/cli/fluvio_smoke_tests/produce-batch-size.bats b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats new file mode 100644 index 0000000000..28a280aef0 --- /dev/null +++ b/tests/cli/fluvio_smoke_tests/produce-batch-size.bats @@ -0,0 +1,94 @@ +#!/usr/bin/env bats + +TEST_HELPER_DIR="$BATS_TEST_DIRNAME/../test_helper" +export TEST_HELPER_DIR + +load "$TEST_HELPER_DIR"/tools_check.bash +load "$TEST_HELPER_DIR"/fluvio_dev.bash +load "$TEST_HELPER_DIR"/bats-support/load.bash +load "$TEST_HELPER_DIR"/bats-assert/load.bash + +setup_file() { + TOPIC_NAME=$(random_string) + export TOPIC_NAME + debug_msg "Topic name: $TOPIC_NAME" +} + +teardown_file() { + run timeout 15s "$FLUVIO_BIN" topic delete "$TOPIC_NAME" + run rm $TOPIC_NAME-small.txt + run rm $TOPIC_NAME-med.txt + run rm $TOPIC_NAME-big.txt +} + +# Create topic +@test "Create topics for test" { + debug_msg "topic: $TOPIC_NAME" + run timeout 15s "$FLUVIO_BIN" topic create "$TOPIC_NAME" + assert_success +} + +# regression test issue https://github.com/infinyon/fluvio/issues/4161 +@test "Produce message larger than batch size" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 2500000 > $TOPIC_NAME-small.txt" + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 5000000 > $TOPIC_NAME-med.txt" + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME-big.txt" + + debug_msg small 25 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-small.txt --raw --compression gzip' + assert_success + + debug_msg med 50+1 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-med.txt --raw --compression gzip' + assert_success + + # should create a single batch for this record + debug_msg big 150 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-big.txt --raw --compression gzip' + assert_success +} + +# This is to cover cases when the metadata + record size is larger than the batch size +@test "Produce message larger than batch size with loop" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + + + for x in $(seq 0 99); do + debug_msg "Running test with $x characters" + + # Create the text file with 'x' number of characters + run bash -c "yes abcdefghijklmnopqrstuvwxyz | head -c 49999$x > $TOPIC_NAME-loop.txt" + + debug_msg small 25 + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 100000000 --file $TOPIC_NAME-loop.txt --raw --compression gzip' + assert_success + done +} + +@test "Produce message larger then max request size" { + if [ "$FLUVIO_CLI_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on fluvio cli stable version" + fi + if [ "$FLUVIO_CLUSTER_RELEASE_CHANNEL" == "stable" ]; then + skip "don't run on cluster stable version" + fi + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 14999000 > $TOPIC_NAME.txt" + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 15000000 --file $TOPIC_NAME.txt --raw --compression gzip' + assert_success + + run bash -c "yes abcdefghijklmnopqrstuvwxyz |head -c 15000000 > $TOPIC_NAME.txt" + run bash -c 'timeout 65s "$FLUVIO_BIN" produce "$TOPIC_NAME" --batch-size 5000000 --max-request-size 15000000 --file $TOPIC_NAME.txt --raw --compression gzip' + assert_failure +} + diff --git a/tests/cli/fluvio_smoke_tests/produce-error.bats b/tests/cli/fluvio_smoke_tests/produce-error.bats index 3c5cdf02ec..ad48b5130a 100644 --- a/tests/cli/fluvio_smoke_tests/produce-error.bats +++ b/tests/cli/fluvio_smoke_tests/produce-error.bats @@ -49,7 +49,7 @@ teardown_file() { skip "don't check output on stable version" fi - assert_output --partial "Try increasing the producer batch size" + assert_output --partial "the given record is larger than the max_request_size" } # This should fail due to wrong compression algorithm