Skip to content

Commit

Permalink
Backwards seek window does not affect read window
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Volodkin <[email protected]>
  • Loading branch information
vladem committed Sep 5, 2024
1 parent 3c371f3 commit ea0a89b
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 34 deletions.
4 changes: 2 additions & 2 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ where
request_range: range.into(),
};
let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config);
let (part_queue, part_queue_producer) = unbounded_part_queue();
let (part_queue, part_queue_producer) = unbounded_part_queue(backpressure_controller);
trace!(?range, "spawning request");

let request_task = {
Expand All @@ -69,7 +69,7 @@ where

let task_handle = self.runtime.spawn_with_handle(request_task).unwrap();

RequestTask::from_handle(task_handle, range, part_queue, backpressure_controller)
RequestTask::from_handle(task_handle, range, part_queue)
}
}

Expand Down
4 changes: 0 additions & 4 deletions mountpoint-s3/src/prefetch/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ impl Part {
}
}

pub(super) fn offset(&self) -> u64 {
self.offset
}

pub(super) fn len(&self) -> usize {
self.checksummed_bytes.len()
}
Expand Down
49 changes: 46 additions & 3 deletions mountpoint-s3/src/prefetch/part_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ use std::time::Instant;

use tracing::trace;

use crate::prefetch::backpressure_controller::{
BackpressureController,
BackpressureFeedbackEvent::{DataRead, PartQueueStall},
};
use crate::prefetch::part::Part;
use crate::prefetch::PrefetchReadError;
use crate::sync::async_channel::{unbounded, Receiver, RecvError, Sender};
Expand All @@ -17,6 +21,8 @@ pub struct PartQueue<E: std::error::Error> {
failed: AtomicBool,
/// The total number of bytes sent to the underlying queue of `self.receiver`
bytes_received: Arc<AtomicUsize>,
bytes_pushed_front: u64,
backpressure_controller: BackpressureController,
}

/// Producer side of the queue of [Part]s.
Expand All @@ -28,14 +34,18 @@ pub struct PartQueueProducer<E: std::error::Error> {
}

/// Creates an unbounded [PartQueue] and its related [PartQueueProducer].
pub fn unbounded_part_queue<E: std::error::Error>() -> (PartQueue<E>, PartQueueProducer<E>) {
pub fn unbounded_part_queue<E: std::error::Error>(
backpressure_controller: BackpressureController,
) -> (PartQueue<E>, PartQueueProducer<E>) {
let (sender, receiver) = unbounded();
let bytes_counter = Arc::new(AtomicUsize::new(0));
let part_queue = PartQueue {
current_part: AsyncMutex::new(None),
receiver,
failed: AtomicBool::new(false),
bytes_received: Arc::clone(&bytes_counter),
bytes_pushed_front: 0,
backpressure_controller,
};
let part_queue_producer = PartQueueProducer {
sender,
Expand All @@ -59,13 +69,21 @@ impl<E: std::error::Error + Send + Sync> PartQueue<E> {
"cannot use a PartQueue after failure"
);

let mut from_prefetcher = 0;
let part = if let Some(current_part) = current_part.take() {
debug_assert!(current_part.len() as u64 >= self.bytes_pushed_front);
from_prefetcher = self.bytes_pushed_front.min(length as u64);
self.bytes_pushed_front = self.bytes_pushed_front.saturating_sub(length as u64);
Ok(current_part)
} else {
// Do `try_recv` first so we can track whether the read is starved or not
if let Ok(part) = self.receiver.try_recv() {
part
} else {
// If the part queue is empty it means we are reading faster than the task could prefetch,
// so we should use larger window for the task.
self.backpressure_controller.send_feedback(PartQueueStall).await?;

let start = Instant::now();
let part = self.receiver.recv().await;
metrics::histogram!("prefetch.part_queue_starved_us").record(start.elapsed().as_micros() as f64);
Expand All @@ -89,13 +107,23 @@ impl<E: std::error::Error + Send + Sync> PartQueue<E> {
let tail = part.split_off(length);
*current_part = Some(tail);
}

// We read some data out of the part queue so the read window should be moved.
// We don't account data added from the backwards seek window (prefetcher) in the read window computations.
self.backpressure_controller
.send_feedback(DataRead(part.len() - from_prefetcher as usize))
.await?;

metrics::gauge!("prefetch.bytes_in_queue").decrement(part.len() as f64);
metrics::gauge!("prefetch.bytes_in_queue_from_client").decrement((part.len() as u64 - from_prefetcher) as f64);
Ok(part)
}

/// Push a new [Part] onto the front of the queue
/// which actually just concatenate it with the current part
pub async fn push_front(&self, mut part: Part) -> Result<(), PrefetchReadError<E>> {
pub async fn push_front(&mut self, mut part: Part) -> Result<(), PrefetchReadError<E>> {
self.bytes_pushed_front += part.len() as u64;

let part_len = part.len();
let mut current_part = self.current_part.lock().await;

Expand All @@ -117,6 +145,10 @@ impl<E: std::error::Error + Send + Sync> PartQueue<E> {
pub fn bytes_received(&self) -> usize {
self.bytes_received.load(Ordering::SeqCst)
}

pub fn read_window_end_offset(&self) -> u64 {
self.backpressure_controller.read_window_end_offset()
}
}

impl<E: std::error::Error + Send + Sync> PartQueueProducer<E> {
Expand All @@ -131,6 +163,7 @@ impl<E: std::error::Error + Send + Sync> PartQueueProducer<E> {
} else {
self.bytes_sent.fetch_add(part_len, Ordering::SeqCst);
metrics::gauge!("prefetch.bytes_in_queue").increment(part_len as f64);
metrics::gauge!("prefetch.bytes_in_queue_from_client").increment(part_len as f64);
}
}
}
Expand All @@ -152,13 +185,16 @@ impl<E: std::error::Error> Drop for PartQueue<E> {
}
let remaining = current_size + queue_size;
metrics::gauge!("prefetch.bytes_in_queue").decrement(remaining as f64);
metrics::gauge!("prefetch.bytes_in_queue_from_client")
.decrement((remaining as u64 - self.bytes_pushed_front) as f64);
}
}

#[cfg(test)]
mod tests {
use crate::checksums::ChecksummedBytes;
use crate::object::ObjectId;
use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig};

use super::*;

Expand All @@ -181,7 +217,14 @@ mod tests {

async fn run_test(ops: Vec<Op>) {
let part_id = ObjectId::new("key".to_owned(), ETag::for_tests());
let (mut part_queue, part_queue_producer) = unbounded_part_queue::<DummyError>();
let dummy_backpressure_config = BackpressureConfig {
initial_read_window_size: 0,
max_read_window_size: 0,
read_window_size_multiplier: 0,
request_range: 0..1,
};
let (backpressure_controller, _) = new_backpressure_controller(dummy_backpressure_config);
let (mut part_queue, part_queue_producer) = unbounded_part_queue::<DummyError>(backpressure_controller);
let mut current_offset = 0;
let mut current_length = 0;
for op in ops {
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/src/prefetch/part_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ where
request_range: range.into(),
};
let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller(backpressure_config);
let (part_queue, part_queue_producer) = unbounded_part_queue();
let (part_queue, part_queue_producer) = unbounded_part_queue(backpressure_controller);
trace!(?range, "spawning request");

let span = debug_span!("prefetch", ?range);
Expand Down Expand Up @@ -226,7 +226,7 @@ where
)
.unwrap();

RequestTask::from_handle(task_handle, range, part_queue, backpressure_controller)
RequestTask::from_handle(task_handle, range, part_queue)
}
}

Expand Down
25 changes: 2 additions & 23 deletions mountpoint-s3/src/prefetch/task.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
use futures::future::RemoteHandle;

use crate::prefetch::backpressure_controller::BackpressureFeedbackEvent::{DataRead, PartQueueStall};
use crate::prefetch::part::Part;
use crate::prefetch::part_queue::PartQueue;
use crate::prefetch::PrefetchReadError;

use super::backpressure_controller::BackpressureController;
use super::part_stream::RequestRange;

/// A single GetObject request submitted to the S3 client
Expand All @@ -17,22 +15,15 @@ pub struct RequestTask<E: std::error::Error> {
remaining: usize,
range: RequestRange,
part_queue: PartQueue<E>,
backpressure_controller: BackpressureController,
}

impl<E: std::error::Error + Send + Sync> RequestTask<E> {
pub fn from_handle(
task_handle: RemoteHandle<()>,
range: RequestRange,
part_queue: PartQueue<E>,
backpressure_controller: BackpressureController,
) -> Self {
pub fn from_handle(task_handle: RemoteHandle<()>, range: RequestRange, part_queue: PartQueue<E>) -> Self {
Self {
_task_handle: task_handle,
remaining: range.len(),
range,
part_queue,
backpressure_controller,
}
}

Expand All @@ -52,18 +43,6 @@ impl<E: std::error::Error + Send + Sync> RequestTask<E> {
let part = self.part_queue.read(length).await?;
debug_assert!(part.len() <= self.remaining);
self.remaining -= part.len();

// We read some data out of the part queue so the read window should be moved
self.backpressure_controller.send_feedback(DataRead(part.len())).await?;

let next_offset = part.offset() + part.len() as u64;
let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize;
// If the part queue is empty it means we are reading faster than the task could prefetch,
// so we should use larger window for the task.
if remaining_in_queue == 0 {
self.backpressure_controller.send_feedback(PartQueueStall).await?;
}

Ok(part)
}

Expand All @@ -89,6 +68,6 @@ impl<E: std::error::Error + Send + Sync> RequestTask<E> {
}

pub fn read_window_end_offset(&self) -> u64 {
self.backpressure_controller.read_window_end_offset()
self.part_queue.read_window_end_offset()
}
}

0 comments on commit ea0a89b

Please sign in to comment.