diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs index 77c6eec11..a4878225e 100644 --- a/mountpoint-s3/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -8,7 +8,7 @@ use super::PrefetchReadError; #[derive(Debug)] pub enum BackpressureFeedbackEvent { /// An event where data with a certain length has been read out of the stream - DataRead(usize), + DataRead(u64, usize), /// An event indicating part queue stall PartQueueStall, } @@ -33,9 +33,6 @@ pub struct BackpressureController { /// Upper bound of the current read window. The request can return data up to this /// offset *exclusively*. This value must be advanced to continue fetching new data. read_window_end_offset: u64, - /// Next offset of the data to be read. It is used for tracking how many bytes of - /// data has been read out of the stream. - next_read_offset: u64, /// End offset for the request we want to apply backpressure. The request can return /// data up to this offset *exclusively*. request_end_offset: u64, @@ -73,7 +70,6 @@ pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureC max_read_window_size: config.max_read_window_size, read_window_size_multiplier: config.read_window_size_multiplier, read_window_end_offset, - next_read_offset: config.request_range.start, request_end_offset: config.request_range.end, }; let limiter = BackpressureLimiter { @@ -93,21 +89,22 @@ impl BackpressureController { /// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. pub async fn send_feedback(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError> { match event { - BackpressureFeedbackEvent::DataRead(length) => { - self.next_read_offset += length as u64; + // Note, that this may come from a backwards seek, so offsets observed by this method are not necessarily ascending + BackpressureFeedbackEvent::DataRead(offset, length) => { + let next_read_offset = offset + length as u64; + let remaining_window = self.read_window_end_offset.saturating_sub(next_read_offset); // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. - if self.remaining_window() < (self.preferred_read_window_size / 2) + if remaining_window < (self.preferred_read_window_size / 2) as u64 && self.read_window_end_offset < self.request_end_offset { - let new_read_window_end_offset = self - .next_read_offset + let new_read_window_end_offset = next_read_offset .saturating_add(self.preferred_read_window_size as u64) .min(self.request_end_offset); debug_assert!(self.read_window_end_offset < new_read_window_end_offset); let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; trace!( preferred_read_window_size = self.preferred_read_window_size, - next_read_offset = self.next_read_offset, + next_read_offset, read_window_end_offset = self.read_window_end_offset, to_increase, "incrementing read window" @@ -131,10 +128,6 @@ impl BackpressureController { .inspect_err(|_| trace!("read window incrementing queue is already closed")); } - fn remaining_window(&self) -> usize { - self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize - } - // Try scaling up preferred read window size with a multiplier configured at initialization. fn try_scaling_up(&mut self) { if self.preferred_read_window_size < self.max_read_window_size { diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index 1fb7edc63..45201008b 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -54,7 +54,9 @@ impl RequestTask { self.remaining -= part.len(); // We read some data out of the part queue so the read window should be moved - self.backpressure_controller.send_feedback(DataRead(part.len())).await?; + self.backpressure_controller + .send_feedback(DataRead(part.offset(), part.len())) + .await?; let next_offset = part.offset() + part.len() as u64; let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize;