Skip to content

Commit

Permalink
feat: fix record size larger than batch size (#4195)
Browse files Browse the repository at this point in the history
* feat: fix record size bigger than batch size

* feat: add max_request_size

* chore: clean up

* chore: clean up part 2

* chore: remove clones

* chore: add some tests and fix storage max_request_size
  • Loading branch information
fraidev authored Oct 15, 2024
1 parent bc5b4ea commit 5e1f885
Show file tree
Hide file tree
Showing 15 changed files with 407 additions and 91 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion crates/fluvio-cli/src/client/produce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,14 @@ mod cmd {
#[arg(long, value_parser=parse_duration)]
pub linger: Option<Duration>,

/// Max amount of bytes accumulated before sending
/// Max number of records to batch before sending
#[arg(long)]
pub batch_size: Option<usize>,

/// Max amount of bytes accumulated before sending
#[arg(long)]
pub max_request_size: Option<usize>,

/// Isolation level that producer must respect.
/// Supported values: read_committed (ReadCommitted) - wait for records to be committed before response,
/// read_uncommitted (ReadUncommitted) - just wait for leader to accept records.
Expand Down Expand Up @@ -212,6 +216,13 @@ mod cmd {
config_builder
};

// Max request size
let config_builder = if let Some(max_request_size) = self.max_request_size {
config_builder.max_request_size(max_request_size)
} else {
config_builder
};

// Isolation
let config_builder = if let Some(isolation) = self.isolation {
config_builder.isolation(isolation)
Expand Down
13 changes: 12 additions & 1 deletion crates/fluvio-storage/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use serde::Deserialize;
use fluvio_controlplane_metadata::topic::CleanupPolicy;
use fluvio_types::defaults::{
SPU_LOG_INDEX_MAX_BYTES, SPU_LOG_BASE_DIR, STORAGE_FLUSH_WRITE_COUNT, STORAGE_FLUSH_IDLE_MSEC,
STORAGE_MAX_BATCH_SIZE, STORAGE_RETENTION_SECONDS, SPU_PARTITION_MAX_BYTES,
STORAGE_MAX_BATCH_SIZE, STORAGE_MAX_REQUEST_SIZE, STORAGE_RETENTION_SECONDS,
SPU_PARTITION_MAX_BYTES,
};
use fluvio_types::defaults::SPU_LOG_INDEX_MAX_INTERVAL_BYTES;
use fluvio_types::defaults::SPU_LOG_SEGMENT_MAX_BYTES;
Expand Down Expand Up @@ -44,6 +45,9 @@ pub struct ReplicaConfig {
#[builder(default = "default_max_batch_size()")]
#[serde(default = "default_max_batch_size")]
pub max_batch_size: Size,
#[builder(default = "default_max_request_size()")]
#[serde(default = "default_max_request_size")]
pub max_request_size: Size,
#[builder(default = "default_update_hw()")]
#[serde(default = "default_update_hw")]
pub update_hw: bool, // if true, enable hw update
Expand Down Expand Up @@ -120,6 +124,10 @@ const fn default_max_batch_size() -> Size {
STORAGE_MAX_BATCH_SIZE
}

const fn default_max_request_size() -> Size {
STORAGE_MAX_REQUEST_SIZE
}

const fn default_retention_seconds() -> Size {
STORAGE_RETENTION_SECONDS
}
Expand Down Expand Up @@ -149,6 +157,7 @@ impl Default for ReplicaConfig {
flush_write_count: default_flush_write_count(),
flush_idle_msec: default_flush_idle_msec(),
max_batch_size: default_max_batch_size(),
max_request_size: default_max_request_size(),
retention_seconds: default_retention_seconds(),
max_partition_size: default_max_partition_size(),
update_hw: true,
Expand Down Expand Up @@ -225,6 +234,7 @@ pub struct SharedReplicaConfig {
pub flush_write_count: SharedConfigU32Value,
pub flush_idle_msec: SharedConfigU32Value,
pub max_batch_size: SharedConfigU32Value,
pub max_request_size: SharedConfigU32Value,
pub update_hw: bool, // if true, enable hw update
pub retention_seconds: SharedConfigU32Value,
pub max_partition_size: SharedConfigU64Value,
Expand All @@ -240,6 +250,7 @@ impl From<ReplicaConfig> for SharedReplicaConfig {
flush_write_count: SharedConfigU32Value::new(config.flush_write_count),
flush_idle_msec: SharedConfigU32Value::new(config.flush_idle_msec),
max_batch_size: SharedConfigU32Value::new(config.max_batch_size),
max_request_size: SharedConfigU32Value::new(config.max_request_size),
update_hw: config.update_hw,
retention_seconds: SharedConfigU32Value::new(config.retention_seconds),
max_partition_size: SharedConfigU64Value::new(config.max_partition_size),
Expand Down
10 changes: 5 additions & 5 deletions crates/fluvio-storage/src/replica.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ReplicaStorage for FileReplica {
records: &mut RecordSet<R>,
update_highwatermark: bool,
) -> Result<usize> {
let max_batch_size = self.option.max_batch_size.get() as usize;
let max_request_size = self.option.max_request_size.get() as usize;
let max_segment_size = self.option.segment_max_bytes.get() as usize;
let mut total_size = 0;
// check if any of the records's batch exceed max length
Expand All @@ -146,8 +146,8 @@ impl ReplicaStorage for FileReplica {
.into());
}
total_size += batch_size;
if batch_size > max_batch_size {
return Err(StorageError::BatchTooBig(max_batch_size).into());
if batch_size > max_request_size {
return Err(StorageError::BatchTooBig(max_request_size).into());
}
}

Expand Down Expand Up @@ -862,9 +862,9 @@ mod tests {
}

#[fluvio_future::test]
async fn test_replica_limit_batch() {
async fn test_replica_limit_request_size() {
let mut option = base_option("test_batch_limit");
option.max_batch_size = 100;
option.max_request_size = 100;
option.update_hw = false;

let mut replica = create_replica("test", START_OFFSET, option).await;
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-types/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-types"
version = "0.5.1"
version = "0.5.2"
authors = ["Fluvio Contributors <[email protected]>"]
edition = "2021"
description = "Fluvio common types and objects"
Expand Down
3 changes: 2 additions & 1 deletion crates/fluvio-types/src/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ pub const STORAGE_RETENTION_SECONDS: u32 = 7 * 24 * 3600;
pub const STORAGE_RETENTION_SECONDS_MIN: u32 = 10; // crd
pub const STORAGE_FLUSH_WRITE_COUNT: u32 = 1;
pub const STORAGE_FLUSH_IDLE_MSEC: u32 = 0;
pub const STORAGE_MAX_BATCH_SIZE: u32 = 33_554_432;
pub const STORAGE_MAX_BATCH_SIZE: u32 = 2_097_152;
pub const STORAGE_MAX_REQUEST_SIZE: u32 = 33_554_432;

pub const SPU_SMARTENGINE_STORE_MAX_BYTES: usize = 1_073_741_824; //1Gb

Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio"
version = "0.23.3"
version = "0.23.4"
edition = "2021"
license = "Apache-2.0"
authors = ["Fluvio Contributors <[email protected]>"]
Expand Down
Loading

0 comments on commit 5e1f885

Please sign in to comment.