Skip to content

Commit

Permalink
Use part's offset to calculate remaining window
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Volodkin <[email protected]>
  • Loading branch information
vladem committed Sep 5, 2024
1 parent 3c371f3 commit 652a559
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 16 deletions.
23 changes: 8 additions & 15 deletions mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::PrefetchReadError;
#[derive(Debug)]
pub enum BackpressureFeedbackEvent {
/// An event where data with a certain length has been read out of the stream
DataRead(usize),
DataRead(u64, usize),
/// An event indicating part queue stall
PartQueueStall,
}
Expand All @@ -33,9 +33,6 @@ pub struct BackpressureController {
/// Upper bound of the current read window. The request can return data up to this
/// offset *exclusively*. This value must be advanced to continue fetching new data.
read_window_end_offset: u64,
/// Next offset of the data to be read. It is used for tracking how many bytes of
/// data has been read out of the stream.
next_read_offset: u64,
/// End offset for the request we want to apply backpressure. The request can return
/// data up to this offset *exclusively*.
request_end_offset: u64,
Expand Down Expand Up @@ -73,7 +70,6 @@ pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureC
max_read_window_size: config.max_read_window_size,
read_window_size_multiplier: config.read_window_size_multiplier,
read_window_end_offset,
next_read_offset: config.request_range.start,
request_end_offset: config.request_range.end,
};
let limiter = BackpressureLimiter {
Expand All @@ -93,21 +89,22 @@ impl BackpressureController {
/// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`.
pub async fn send_feedback<E>(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError<E>> {
match event {
BackpressureFeedbackEvent::DataRead(length) => {
self.next_read_offset += length as u64;
// Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending
BackpressureFeedbackEvent::DataRead(offset, length) => {
let next_read_offset = offset + length as u64;
let remaining_window = self.read_window_end_offset.saturating_sub(next_read_offset);
// Increment the read window only if the remaining window reaches some threshold i.e. half of it left.
if self.remaining_window() < (self.preferred_read_window_size / 2)
if remaining_window < (self.preferred_read_window_size / 2) as u64
&& self.read_window_end_offset < self.request_end_offset
{
let new_read_window_end_offset = self
.next_read_offset
let new_read_window_end_offset = next_read_offset
.saturating_add(self.preferred_read_window_size as u64)
.min(self.request_end_offset);
debug_assert!(self.read_window_end_offset < new_read_window_end_offset);
let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize;
trace!(
preferred_read_window_size = self.preferred_read_window_size,
next_read_offset = self.next_read_offset,
next_read_offset,
read_window_end_offset = self.read_window_end_offset,
to_increase,
"incrementing read window"
Expand All @@ -131,10 +128,6 @@ impl BackpressureController {
.inspect_err(|_| trace!("read window incrementing queue is already closed"));
}

fn remaining_window(&self) -> usize {
self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize
}

// Try scaling up preferred read window size with a multiplier configured at initialization.
fn try_scaling_up(&mut self) {
if self.preferred_read_window_size < self.max_read_window_size {
Expand Down
4 changes: 3 additions & 1 deletion mountpoint-s3/src/prefetch/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ impl<E: std::error::Error + Send + Sync> RequestTask<E> {
self.remaining -= part.len();

// We read some data out of the part queue so the read window should be moved
self.backpressure_controller.send_feedback(DataRead(part.len())).await?;
self.backpressure_controller
.send_feedback(DataRead(part.offset(), part.len()))
.await?;

let next_offset = part.offset() + part.len() as u64;
let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize;
Expand Down

0 comments on commit 652a559

Please sign in to comment.