diff --git a/src/stream/src/executor/source/mod.rs b/src/stream/src/executor/source/mod.rs index 7c22db7f52233..bff54a7611e04 100644 --- a/src/stream/src/executor/source/mod.rs +++ b/src/stream/src/executor/source/mod.rs @@ -148,24 +148,25 @@ pub async fn apply_rate_limit(stream: BoxSourceChunkStream, rate_limit_rps: Opti } let limiter = limiter.as_ref().unwrap(); - let limit = rate_limit_rps.unwrap() as usize; - - let required_permits = compute_rate_limit_chunk_permits(&chunk, limit); - if required_permits <= limit { - let n = NonZeroU32::new(required_permits as u32).unwrap(); - // `InsufficientCapacity` should never happen because we have check the cardinality - limiter.until_n_ready(n).await.unwrap(); - yield chunk; - } else { - // Cut the chunk into smaller chunks - for chunk in chunk.split(limit) { - let n = NonZeroU32::new(compute_rate_limit_chunk_permits(&chunk, limit) as u32) - .unwrap(); - // chunks split should have effective chunk size <= limit - limiter.until_n_ready(n).await.unwrap(); - yield chunk; - } + let burst = rate_limit_rps.unwrap() as usize; + + let mut required_permits = compute_rate_limit_chunk_permits(&chunk, burst); + if required_permits > burst { + // This should not happen after https://github.com/risingwavelabs/risingwave/pull/19698. + // But if it does happen, let's don't panic and just log an error. + tracing::error!( + chunk_size, + required_permits, + burst, + "unexpected large chunk size" + ); + required_permits = burst; } + + let n = NonZeroU32::new(required_permits as u32).unwrap(); + // `InsufficientCapacity` should never happen because we have check the cardinality + limiter.until_n_ready(n).await.unwrap(); + yield chunk; } } diff --git a/src/stream/src/executor/utils.rs b/src/stream/src/executor/utils.rs index ad2e1b8a4268c..bd6db0b7c61c5 100644 --- a/src/stream/src/executor/utils.rs +++ b/src/stream/src/executor/utils.rs @@ -23,7 +23,7 @@ impl Execute for DummyExecutor { } } -pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize { +pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, burst: usize) -> usize { let chunk_size = chunk.capacity(); let ends_with_update = if chunk_size >= 2 { // Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`. @@ -32,7 +32,7 @@ pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> us } else { false }; - if chunk_size == limit + 1 && ends_with_update { + if chunk_size == burst + 1 && ends_with_update { // If the chunk size exceed limit because of the last `Update` operation, // we should minus 1 to make sure the permits consumed is within the limit (max burst). chunk_size - 1