Skip to content

Commit

Permalink
remove chunk splitting logic in apply_rate_limit
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 18, 2024
1 parent eeeebb4 commit 489ce16
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 19 deletions.
35 changes: 18 additions & 17 deletions src/stream/src/executor/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,25 @@ pub async fn apply_rate_limit(stream: BoxChunkSourceStream, rate_limit_rps: Opti
}

let limiter = limiter.as_ref().unwrap();
let limit = rate_limit_rps.unwrap() as usize;

let required_permits = compute_rate_limit_chunk_permits(&chunk, limit);
if required_permits <= limit {
let n = NonZeroU32::new(required_permits as u32).unwrap();
// `InsufficientCapacity` should never happen because we have check the cardinality
limiter.until_n_ready(n).await.unwrap();
yield chunk;
} else {
// Cut the chunk into smaller chunks
for chunk in chunk.split(limit) {
let n = NonZeroU32::new(compute_rate_limit_chunk_permits(&chunk, limit) as u32)
.unwrap();
// chunks split should have effective chunk size <= limit
limiter.until_n_ready(n).await.unwrap();
yield chunk;
}
let burst = rate_limit_rps.unwrap() as usize;

let mut required_permits = compute_rate_limit_chunk_permits(&chunk, burst);
if required_permits > burst {
// This should not happen after https://github.com/risingwavelabs/risingwave/pull/19698.
// But if it does happen, let's don't panic and just log an error.
tracing::error!(
chunk_size,
required_permits,
burst,
"unexpected large chunk size"
);
required_permits = burst;
}

let n = NonZeroU32::new(required_permits as u32).unwrap();
// `InsufficientCapacity` should never happen because we have check the cardinality
limiter.until_n_ready(n).await.unwrap();
yield chunk;
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/stream/src/executor/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl Execute for DummyExecutor {
}
}

pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> usize {
pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, burst: usize) -> usize {
let chunk_size = chunk.capacity();
let ends_with_update = if chunk_size >= 2 {
// Note we have to check if the 2nd last is `U-` to be consistenct with `StreamChunkBuilder`.
Expand All @@ -32,7 +32,7 @@ pub fn compute_rate_limit_chunk_permits(chunk: &StreamChunk, limit: usize) -> us
} else {
false
};
if chunk_size == limit + 1 && ends_with_update {
if chunk_size == burst + 1 && ends_with_update {
// If the chunk size exceed limit because of the last `Update` operation,
// we should minus 1 to make sure the permits consumed is within the limit (max burst).
chunk_size - 1
Expand Down

0 comments on commit 489ce16

Please sign in to comment.