From c6ccb3679335419d518ad0b4482913fd7e671b36 Mon Sep 17 00:00:00 2001 From: Monthon Klongklaew Date: Fri, 9 Aug 2024 12:28:10 +0000 Subject: [PATCH] Re-implement the prefetcher using backpressure mechanism The prefetcher now uses only one GetObject request to fetch data in advance. This request has a range of entire object but use backpressure mechanism to control how much data it wants to fetch into the part queue instead of spawning up to two requests in parallel. This should make the throughput more stable because previously the two request tasks could compete with each other when fetching data from S3. Also, it will be easier to control how much data we want to store in the part queue. Signed-off-by: Monthon Klongklaew --- mountpoint-s3/src/checksums.rs | 43 +- mountpoint-s3/src/cli.rs | 14 + mountpoint-s3/src/fs.rs | 2 + mountpoint-s3/src/fs/error.rs | 5 +- mountpoint-s3/src/prefetch.rs | 456 ++++++++++-------- .../src/prefetch/backpressure_controller.rs | 188 ++++++++ mountpoint-s3/src/prefetch/caching_stream.rs | 200 ++++++-- mountpoint-s3/src/prefetch/part.rs | 133 ++++- mountpoint-s3/src/prefetch/part_queue.rs | 25 +- mountpoint-s3/src/prefetch/part_stream.rs | 124 ++++- mountpoint-s3/src/prefetch/task.rs | 75 +-- mountpoint-s3/tests/common/fuse.rs | 17 +- mountpoint-s3/tests/common/mod.rs | 2 + mountpoint-s3/tests/fs.rs | 13 +- .../tests/fuse_tests/prefetch_test.rs | 70 +-- mountpoint-s3/tests/mock_s3_tests.rs | 1 + 16 files changed, 1020 insertions(+), 348 deletions(-) create mode 100644 mountpoint-s3/src/prefetch/backpressure_controller.rs diff --git a/mountpoint-s3/src/checksums.rs b/mountpoint-s3/src/checksums.rs index 4761506ef..b96f1b18a 100644 --- a/mountpoint-s3/src/checksums.rs +++ b/mountpoint-s3/src/checksums.rs @@ -63,7 +63,7 @@ impl ChecksummedBytes { /// This operation just increases the reference count and sets a few indices, /// so there will be no validation and the checksum will not be recomputed. pub fn split_off(&mut self, at: usize) -> ChecksummedBytes { - assert!(at < self.len()); + assert!(at <= self.len()); let start = self.range.start; let prefix_range = start..(start + at); @@ -77,6 +77,27 @@ impl ChecksummedBytes { } } + /// Splits the checksummed bytes into two at the given index. + /// + /// Afterwards self contains elements [at, len), and the returned Bytes contains elements [0, at). + /// + /// This operation just increases the reference count and sets a few indices, + /// so there will be no validation and the checksum will not be recomputed. + pub fn split_to(&mut self, at: usize) -> ChecksummedBytes { + assert!(at <= self.len()); + + let start = self.range.start; + let prefix_range = start..(start + at); + let suffix_range = (start + at)..self.range.end; + + self.range = suffix_range; + Self { + buffer: self.buffer.clone(), + range: prefix_range, + checksum: self.checksum, + } + } + /// Returns a slice of self for the provided range. /// /// This operation just increases the reference count and sets a few indices, @@ -321,6 +342,26 @@ mod tests { assert_eq!(expected_checksum, new_checksummed_bytes.checksum); } + #[test] + fn test_split_to() { + let split_to_at = 4; + let bytes = Bytes::from_static(b"some bytes"); + let expected = bytes.clone(); + let expected_checksum = crc32c::checksum(&expected); + let mut checksummed_bytes = ChecksummedBytes::new(bytes); + + let mut expected_part2 = expected.clone(); + let expected_part1 = expected_part2.split_to(split_to_at); + let new_checksummed_bytes = checksummed_bytes.split_to(split_to_at); + + assert_eq!(expected, checksummed_bytes.buffer); + assert_eq!(expected, new_checksummed_bytes.buffer); + assert_eq!(expected_part1, new_checksummed_bytes.buffer_slice()); + assert_eq!(expected_part2, checksummed_bytes.buffer_slice()); + assert_eq!(expected_checksum, checksummed_bytes.checksum); + assert_eq!(expected_checksum, new_checksummed_bytes.checksum); + } + #[test] fn test_slice() { let range = 3..7; diff --git a/mountpoint-s3/src/cli.rs b/mountpoint-s3/src/cli.rs index 208124588..4642260cd 100644 --- a/mountpoint-s3/src/cli.rs +++ b/mountpoint-s3/src/cli.rs @@ -649,11 +649,25 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo user_agent.key_value("mp-nw-interfaces", &interfaces.len().to_string()); } + // This is a weird looking number! We really want our first request size to be 1MiB, + // which is a common IO size. But Linux's readahead will try to read an extra 128k on on + // top of a 1MiB read, which we'd have to wait for a second request to service. Because + // FUSE doesn't know the difference between regular reads and readahead reads, it will + // send us a READ request for that 128k, so we'll have to block waiting for it even if + // the application doesn't want it. This is all in the noise for sequential IO, but + // waiting for the readahead hurts random IO. So we add 128k to the first request size + // to avoid the latency hit of the second request. + // + // Note the CRT does not respect this value right now, they always return chunks of part size + // but this is the first window size we prefer. + let initial_read_window_size = 1024 * 1024 + 128 * 1024; let mut client_config = S3ClientConfig::new() .auth_config(auth_config) .throughput_target_gbps(throughput_target_gbps) .read_part_size(args.read_part_size.unwrap_or(args.part_size) as usize) .write_part_size(args.write_part_size.unwrap_or(args.part_size) as usize) + .read_backpressure(true) + .initial_read_window(initial_read_window_size) .user_agent(user_agent); #[cfg(feature = "multiple-nw-iface")] if let Some(interfaces) = &args.bind { diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index 46c3f0915..b03563cd2 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -1349,6 +1349,8 @@ mod tests { let bucket = "bucket"; let client = Arc::new(MockClient::new(MockClientConfig { bucket: bucket.to_owned(), + enable_backpressure: true, + initial_read_window_size: 1024 * 1024, ..Default::default() })); // Create "dir1" in the client to avoid creating it locally diff --git a/mountpoint-s3/src/fs/error.rs b/mountpoint-s3/src/fs/error.rs index d01c53512..477d09974 100644 --- a/mountpoint-s3/src/fs/error.rs +++ b/mountpoint-s3/src/fs/error.rs @@ -129,9 +129,12 @@ impl From> fo GetObjectError::PreconditionFailed, )) => err!(libc::ESTALE, "object was mutated remotely"), PrefetchReadError::Integrity(e) => err!(libc::EIO, source:e, "integrity error"), + PrefetchReadError::PartReadFailed(e) => err!(libc::EIO, source:e, "part read failed"), PrefetchReadError::GetRequestFailed(_) | PrefetchReadError::GetRequestTerminatedUnexpectedly - | PrefetchReadError::GetRequestReturnedWrongOffset { .. } => { + | PrefetchReadError::GetRequestReturnedWrongOffset { .. } + | PrefetchReadError::BackpressurePreconditionFailed + | PrefetchReadError::ReadWindowIncrement => { err!(libc::EIO, source:err, "get request failed") } } diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 3bc5eb75d..daa10fab5 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -1,12 +1,37 @@ //! This module implements a prefetcher for GetObject requests. //! -//! It works by making increasingly larger GetObject requests to the CRT. We want the chunks to be -//! large enough that they can make effective use of the CRT's fan-out parallelism across the S3 -//! frontend, but small enough that we don't accumulate a lot of unread object data in memory or -//! wastefully download data we'll never read. As the reader continues to make sequential reads, -//! we increase the size of the GetObject requests up to some maximum. If the reader ever makes a -//! non-sequential read, we abandon the prefetching and start again with the minimum request size. - +//! It works by relying on the CRT's flow-control window feature. The prefetcher creates a single +//! GetObject request with entire length of the object (starting from the first read offset) and +//! makes increasingly larger read window. We want the chunks to be large enough that they can make +//! effective use of the CRT's fan-out parallelism across the S3 frontend, but small enough that we +//! don't accumulate a lot of unread object data in memory or wastefully download data we'll never +//! read. As the reader continues to make sequential reads, we increase the size of the read window +//! up to some maximum. If the reader ever makes a non-sequential read, we abandon the prefetching +//! and start again with a new GetObject request with minimum read window size. +//! +//! In more technical details, the prefetcher creates a RequestTask when receiving the first read +//! request from the file system or after it has just been reset. The RequestTask consists of two main +//! components. +//! 1. An ObjectPartStream that has a role to continuously fetch data from the sources which can be +//! either S3 or the cache on disk. The ObjectPartStream is spawned and run in a separate thread +//! from the prefetcher. +//! 2. A PartQueue, where we store data received from the ObjectPartStream, waiting to be read from +//! the prefetcher via a RequestTask function. +//! +//! A backpressure mechanism is needed to control how much data we want to store in the part queue at +//! a time as we don't want to download the entire object into memory. For the client part stream, we +//! may be able to rely on the CRT flow-control flow window to block when we don't increase the read +//! window size, but for the caching part stream we don't have the machinery to do that yet. That's why +//! we introduce the BackpressureController and BackpressureLimiter to help solving this. +//! +//! Essentially, the BackpressureController and BackpressureLimiter is a pair of sender/receiver of a +//! channel, created at RequestTask initialization. The sender is handed to the RequestTask. Its role +//! is to communicate with its receiver to tell "when" it is ready to receive more data. The receiver +//! is handed to the ObjectPartStream where the stream should call a provided function "before" fetching +//! more data from the sources and put them into the part queue. The BackpressureLimiter should be used +//! as a mean to block ObjectPartStream thread to fetch more data. + +mod backpressure_controller; mod caching_stream; mod part; mod part_queue; @@ -14,7 +39,6 @@ mod part_stream; mod seek_window; mod task; -use std::collections::VecDeque; use std::fmt::Debug; use std::time::Duration; @@ -24,6 +48,7 @@ use metrics::{counter, histogram}; use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; use mountpoint_s3_client::types::ETag; use mountpoint_s3_client::ObjectClient; +use part::PartOperationError; use part_stream::RequestTaskConfig; use thiserror::Error; use tracing::trace; @@ -80,6 +105,15 @@ pub enum PrefetchReadError { #[error("integrity check failed")] Integrity(#[from] IntegrityError), + + #[error("part read failed")] + PartReadFailed(#[from] PartOperationError), + + #[error("backpressure must be enabled with non-zero initial read window")] + BackpressurePreconditionFailed, + + #[error("read window increment failed")] + ReadWindowIncrement, } pub type DefaultPrefetcher = Prefetcher>; @@ -111,10 +145,8 @@ where #[derive(Debug, Clone, Copy)] pub struct PrefetcherConfig { - /// Size of the first request in a prefetch run - pub first_request_size: usize, - /// Maximum size of a single prefetch request - pub max_request_size: usize, + /// Maximum size of the read window + pub max_read_window_size: usize, /// Factor to increase the request size by whenever the reader continues making sequential reads pub sequential_prefetch_multiplier: usize, /// Timeout to wait for a part to become available @@ -128,19 +160,10 @@ pub struct PrefetcherConfig { } impl Default for PrefetcherConfig { + #[allow(clippy::identity_op)] fn default() -> Self { - #[allow(clippy::identity_op)] Self { - // This is a weird looking number! We really want our first request size to be 1MiB, - // which is a common IO size. But Linux's readahead will try to read an extra 128k on on - // top of a 1MiB read, which we'd have to wait for a second request to service. Because - // FUSE doesn't know the difference between regular reads and readahead reads, it will - // send us a READ request for that 128k, so we'll have to block waiting for it even if - // the application doesn't want it. This is all in the noise for sequential IO, but - // waiting for the readahead hurts random IO. So we add 128k to the first request size - // to avoid the latency hit of the second request. - first_request_size: 1 * 1024 * 1024 + 128 * 1024, - max_request_size: 2 * 1024 * 1024 * 1024, + max_read_window_size: 2 * 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, read_timeout: Duration::from_secs(60), // We want these large enough to tolerate a single out-of-order Linux readahead, which @@ -148,7 +171,7 @@ impl Default for PrefetcherConfig { // making a guess about where the optimal cut-off point is before it would be faster to // just start a new request instead. max_forward_seek_wait_distance: 16 * 1024 * 1024, - max_backward_seek_distance: 1 * 1024 * 1024, + max_backward_seek_distance: 1024 * 1024, } } } @@ -207,11 +230,7 @@ pub struct PrefetchGetObject { client: Arc, part_stream: Arc, config: PrefetcherConfig, - // Invariant: the offset of the first byte in this task's part queue is always - // self.next_sequential_read_offset. - current_task: Option>, - // Currently we only every spawn at most one future task (see [spawn_next_request]) - future_tasks: VecDeque>, + backpressure_task: Option>, // Invariant: the offset of the last byte in this window is always // self.next_sequential_read_offset - 1. backward_seek_window: SeekWindow, @@ -222,7 +241,6 @@ pub struct PrefetchGetObject { /// Start offset for sequential read, used for calculating contiguous read metric sequential_read_start_offset: u64, next_sequential_read_offset: u64, - next_request_size: usize, next_request_offset: u64, size: u64, } @@ -274,13 +292,11 @@ where client, part_stream, config, - current_task: None, - future_tasks: Default::default(), + backpressure_task: None, backward_seek_window: SeekWindow::new(config.max_backward_seek_distance as usize), preferred_part_size: 128 * 1024, sequential_read_start_offset: 0, next_sequential_read_offset: 0, - next_request_size: config.first_request_size, next_request_offset: 0, bucket: bucket.to_owned(), object_id: ObjectId::new(key.to_owned(), etag), @@ -330,12 +346,13 @@ where } assert_eq!(self.next_sequential_read_offset, offset); - self.prepare_requests(); + if self.backpressure_task.is_none() { + self.backpressure_task = Some(self.spawn_read_backpressure_request()?); + } let mut response = ChecksummedBytes::default(); while to_read > 0 { - let Some(current_task) = self.current_task.as_mut() else { - // If [prepare_requests] didn't spawn a request, we've reached the end of the object. + let Some(current_task) = self.backpressure_task.as_mut() else { trace!(offset, length, "read beyond object size"); break; }; @@ -343,13 +360,9 @@ where let part = current_task.read(to_read as usize).await?; self.backward_seek_window.push(part.clone()); - let part_bytes = part - .into_bytes(&self.object_id, self.next_sequential_read_offset) - .unwrap(); + let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?; self.next_sequential_read_offset += part_bytes.len() as u64; - self.prepare_requests(); - // If we can complete the read with just a single buffer, early return to avoid copying // into a new buffer. This should be the common case as long as part size is larger than // read size, which it almost always is for real S3 clients and FUSE. @@ -365,74 +378,45 @@ where Ok(response) } - /// Runs on every read to prepare and spawn any requests our prefetching logic requires - fn prepare_requests(&mut self) { - let current_task = self.current_task.as_ref(); - if current_task.map(|task| task.remaining() == 0).unwrap_or(true) { - // There's no current task, or the current task is finished. Prepare the next request. - if let Some(next_task) = self.future_tasks.pop_front() { - self.current_task = Some(next_task); - return; - } - self.current_task = self.spawn_next_request(); - } else if current_task - .map(|task| { - // Don't trigger prefetch if we're in a fake task created by backward streaming - task.is_streaming() && task.remaining() <= task.total_size() / 2 - }) - .unwrap_or(false) - && self.future_tasks.is_empty() - { - // The current task is nearing completion, so pre-spawn the next request in anticipation - // of it completing. - if let Some(task) = self.spawn_next_request() { - self.future_tasks.push_back(task); + /// Spawn a backpressure GetObject request which has a range from current offset to the end of the file. + /// We will be using flow-control window to control how much data we want to download into the prefetcher. + fn spawn_read_backpressure_request( + &mut self, + ) -> Result, PrefetchReadError> { + let start = self.next_sequential_read_offset; + let object_size = self.size as usize; + let range = RequestRange::new(object_size, start, object_size); + + // The prefetcher now relies on backpressure mechanism so it must be enabled + let initial_read_window_size = match self.client.initial_read_window_size() { + Some(value) => { + // Also, make sure that we don't get blocked from the beginning + if value == 0 { + return Err(PrefetchReadError::BackpressurePreconditionFailed); + } + value } - } - } - - /// Spawn the next required request - fn spawn_next_request(&mut self) -> Option> { - let start = self.next_request_offset; - if start >= self.size { - return None; - } + None => return Err(PrefetchReadError::BackpressurePreconditionFailed), + }; - let range = RequestRange::new(self.size as usize, start, self.next_request_size); let config = RequestTaskConfig { bucket: self.bucket.clone(), object_id: self.object_id.clone(), range, preferred_part_size: self.preferred_part_size, + initial_read_window_size, + max_read_window_size: self.config.max_read_window_size, + read_window_size_multiplier: self.config.sequential_prefetch_multiplier, }; - let task = self.part_stream.spawn_get_object_request(&self.client, config); - - // [read] will reset these if the reader stops making sequential requests - self.next_request_offset += task.total_size() as u64; - self.next_request_size = self.get_next_request_size(task.total_size()); - - Some(task) - } - - /// Suggest next request size. - /// The next request size is the current request size multiplied by sequential prefetch multiplier. - fn get_next_request_size(&self, request_size: usize) -> usize { - // TODO: this logic doesn't work well right now in the case where part_size < - // first_request_size and sequential_prefetch_multiplier = 1. It ends up just repeatedly - // shrinking the request size until it reaches 1. But this isn't a configuration we - // currently expect to ever run in (part_size will always be >= 5MB for MPU reasons, and a - // prefetcher with multiplier 1 is not very good). - (request_size * self.config.sequential_prefetch_multiplier).min(self.config.max_request_size) + Ok(self.part_stream.spawn_get_object_request(&self.client, config)) } /// Reset this prefetch request to a new offset, clearing any existing tasks queued. fn reset_prefetch_to_offset(&mut self, offset: u64) { - self.current_task = None; - self.future_tasks.drain(..); + self.backpressure_task = None; self.backward_seek_window.clear(); self.sequential_read_start_offset = offset; self.next_sequential_read_offset = offset; - self.next_request_size = self.config.first_request_size; self.next_request_offset = offset; } @@ -445,7 +429,7 @@ where if offset > self.next_sequential_read_offset { self.try_seek_forward(offset).await } else { - self.try_seek_backward(offset) + self.try_seek_backward(offset).await } } @@ -454,43 +438,22 @@ where let total_seek_distance = offset - self.next_sequential_read_offset; histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64); - let Some(current_task) = self.current_task.as_mut() else { + let Some(task) = self.backpressure_task.as_mut() else { // Can't seek if there's no requests in flight at all return Ok(false); }; - // Jump ahead to the right request - if offset >= current_task.end_offset() { - self.next_sequential_read_offset = current_task.end_offset(); - self.current_task = None; - while let Some(next_request) = self.future_tasks.pop_front() { - if next_request.end_offset() > offset { - self.current_task = Some(next_request); - break; - } else { - self.next_sequential_read_offset = next_request.end_offset(); - } - } - if self.current_task.is_none() { - // No inflight task containing the target offset. - trace!(current_offset=?self.next_sequential_read_offset, requested_offset=?offset, "seek failed: not enough inflight data"); - return Ok(false); - } - // We could try harder to preserve the backwards seek buffer if we're near the - // request boundary, but it's probably not worth the trouble. - self.backward_seek_window.clear(); + // Not enough data in the read window to serve the forward seek + if offset >= task.read_window_end_offset() { + return Ok(false); } - // At this point it's guaranteed by the previous if-block that `offset` is in the range of `self.current_task` - let current_task = self - .current_task - .as_mut() - .expect("a request existed that covered this seek offset"); // If we have enough bytes already downloaded (`available`) to skip straight to this read, then do // it. Otherwise, we're willing to wait for the bytes to download only if they're coming "soon", where // soon is defined as up to `max_forward_seek_wait_distance` bytes ahead of the available offset. - let available_offset = current_task.available_offset(); - if offset >= available_offset.saturating_add(self.config.max_forward_seek_wait_distance) { + let available_offset = task.available_offset(); + let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance); + if offset >= available_soon_offset { trace!( requested_offset = offset, available_offset = available_offset, @@ -500,7 +463,7 @@ where } let mut seek_distance = offset - self.next_sequential_read_offset; while seek_distance > 0 { - let part = current_task.read(seek_distance as usize).await?; + let part = task.read(seek_distance as usize).await?; seek_distance -= part.len() as u64; self.next_sequential_read_offset += part.len() as u64; self.backward_seek_window.push(part); @@ -508,8 +471,14 @@ where Ok(true) } - fn try_seek_backward(&mut self, offset: u64) -> Result> { + async fn try_seek_backward(&mut self, offset: u64) -> Result> { assert!(offset < self.next_sequential_read_offset); + + // When the task is None it means either we have just started prefetching or recently reset it, + // in both cases the backward seek window would be empty so we can bail out early. + let Some(task) = self.backpressure_task.as_mut() else { + return Ok(false); + }; let backwards_length_needed = self.next_sequential_read_offset - offset; histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64); @@ -517,14 +486,7 @@ where trace!("seek failed: not enough data in backwards seek window"); return Ok(false); }; - // We're going to create a new fake "request" that contains the parts we read out of the - // window. That sounds a bit hacky, but it keeps all the read logic simple rather than - // needing separate paths for backwards seeks vs others. - let request = RequestTask::from_parts(parts, offset); - if let Some(current_task) = self.current_task.take() { - self.future_tasks.push_front(current_task); - } - self.current_task = Some(request); + task.push_front(parts).await?; self.next_sequential_read_offset = offset; Ok(true) } @@ -552,12 +514,11 @@ mod tests { #![allow(clippy::identity_op)] use crate::data_cache::InMemoryDataCache; - use crate::prefetch::part_stream::ClientPartStream; use super::caching_stream::CachingPartStream; use super::*; use futures::executor::{block_on, ThreadPool}; - use mountpoint_s3_client::error::{GetObjectError, ObjectClientError}; + use mountpoint_s3_client::error::GetObjectError; use mountpoint_s3_client::failure_client::{countdown_failure_client, RequestFailureMap}; use mountpoint_s3_client::mock_client::{ramp_bytes, MockClient, MockClientConfig, MockClientError, MockObject}; use proptest::proptest; @@ -571,9 +532,9 @@ mod tests { #[derive(Debug, Arbitrary)] struct TestConfig { #[proptest(strategy = "16usize..1*1024*1024")] - first_request_size: usize, + initial_read_window_size: usize, #[proptest(strategy = "16usize..1*1024*1024")] - max_request_size: usize, + max_read_window_size: usize, #[proptest(strategy = "1usize..8usize")] sequential_prefetch_multiplier: usize, #[proptest(strategy = "16usize..2*1024*1024")] @@ -582,6 +543,8 @@ mod tests { max_forward_seek_wait_distance: u64, #[proptest(strategy = "1u64..4*1024*1024")] max_backward_seek_distance: u64, + #[proptest(strategy = "16usize..1*1024*1024")] + cache_block_size: usize, } fn default_stream() -> ClientPartStream { @@ -604,6 +567,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -613,8 +578,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, + max_read_window_size: test_config.max_read_window_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, read_timeout: Duration::from_secs(5), max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, @@ -645,12 +609,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config); } @@ -662,12 +627,13 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 16 * 1024 * 1024 + 111, 1024 * 1024, config); } @@ -679,17 +645,98 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 64 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 64 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_sequential_read_test(part_stream, 256 * 1024 * 1024 + 111, 1024 * 1024, config); } + fn fail_with_backpressure_precondition_test( + part_stream: Stream, + test_config: TestConfig, + client_config: MockClientConfig, + ) where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let client = MockClient::new(client_config); + let read_size = 1 * MB; + let object_size = 8 * MB; + let object = MockObject::ramp(0xaa, object_size, ETag::for_tests()); + let etag = object.etag(); + + let prefetcher_config = PrefetcherConfig { + max_read_window_size: test_config.max_read_window_size, + sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, + ..Default::default() + }; + + let prefetcher = Prefetcher::new(part_stream, prefetcher_config); + let mut request = prefetcher.prefetch(Arc::new(client), "test-bucket", "hello", object_size as u64, etag); + let result = block_on(request.read(0, read_size)); + assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed))); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_not_enabled(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is not enabled for the client + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: false, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + + #[test_case(default_stream())] + #[test_case(caching_stream(1 * MB))] + fn fail_with_backpressure_zero_read_window(part_stream: Stream) + where + Stream: ObjectPartStream + Send + Sync + 'static, + { + let test_config = TestConfig { + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, + sequential_prefetch_multiplier: 8, + client_part_size: 8 * 1024 * 1024, + max_forward_seek_wait_distance: 16 * 1024 * 1024, + max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, + }; + + // backpressure is enabled but initial read window size is zero + let config = MockClientConfig { + bucket: "test-bucket".to_string(), + part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: 0, + ..Default::default() + }; + + fail_with_backpressure_precondition_test(part_stream, test_config, config); + } + fn fail_sequential_read_test( part_stream: Stream, size: u64, @@ -700,6 +747,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = MockClient::new(config); @@ -711,8 +760,7 @@ mod tests { let client = countdown_failure_client(client, get_failures, HashMap::new(), HashMap::new(), HashMap::new()); let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, + max_read_window_size: test_config.max_read_window_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, ..Default::default() }; @@ -748,23 +796,25 @@ mod tests { Stream: ObjectPartStream + Send + Sync + 'static, { let config = TestConfig { - first_request_size: 256 * 1024, - max_request_size: 1024 * 1024 * 1024, + initial_read_window_size: 256 * 1024, + max_read_window_size: 1024 * 1024 * 1024, sequential_prefetch_multiplier: 8, client_part_size: 8 * 1024 * 1024, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; let mut get_failures = HashMap::new(); - get_failures.insert( - 2, - Err(ObjectClientError::ClientError(MockClientError( - err_value.to_owned().into(), - ))), - ); + // We only have one request with backpressure, so we are going to inject the failure at + // 2nd read from that request stream. + get_failures.insert(1, Ok((2, MockClientError(err_value.to_owned().into())))); + + // Object needs to be bigger than a part size in order to trigger the failure + // because the CRT data returns in chunks of part size. + let object_size = config.client_part_size + 111; - fail_sequential_read_test(part_stream, 1024 * 1024 + 111, 1024 * 1024, config, get_failures); + fail_sequential_read_test(part_stream, object_size as u64, 1024 * 1024, config, get_failures); } proptest! { @@ -788,18 +838,17 @@ mod tests { fn proptest_sequential_read_with_cache( size in 1u64..1 * 1024 * 1024, read_size in 1usize..1 * 1024 * 1024, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } #[test] fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, - block_size in 16usize..1 * 1024 * 1024, config: TestConfig) { + config: TestConfig) { // Pick read size smaller than the object size let read_size = (size as usize / read_factor).max(1); - run_sequential_read_test(caching_stream(block_size), size, read_size, config); + run_sequential_read_test(caching_stream(config.cache_block_size), size, read_size, config); } } @@ -808,12 +857,13 @@ mod tests { let object_size = 854966; let read_size = 161647; let config = TestConfig { - first_request_size: 484941, - max_request_size: 81509, + initial_read_window_size: 484941, + max_read_window_size: 81509, sequential_prefetch_multiplier: 1, client_part_size: 181682, max_forward_seek_wait_distance: 1, max_backward_seek_distance: 18668, + cache_block_size: 1 * MB, }; run_sequential_read_test(default_stream(), object_size, read_size, config); } @@ -827,6 +877,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: test_config.client_part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -836,8 +888,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size: test_config.first_request_size, - max_request_size: test_config.max_request_size, + max_read_window_size: test_config.max_read_window_size, sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier, max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance, max_backward_seek_distance: test_config.max_backward_seek_distance, @@ -895,11 +946,10 @@ mod tests { #[test] fn proptest_random_read_with_cache( reads in random_read_strategy(1 * 1024 * 1024), - block_size in 16usize..1 * 1024 * 1024, config: TestConfig, ) { let (object_size, reads) = reads; - run_random_read_test(caching_stream(block_size), object_size, reads, config); + run_random_read_test(caching_stream(config.cache_block_size), object_size, reads, config); } } @@ -908,12 +958,13 @@ mod tests { let object_size = 724314; let reads = vec![(0, 516883)]; let config = TestConfig { - first_request_size: 3684779, - max_request_size: 2147621, + initial_read_window_size: 3684779, + max_read_window_size: 2147621, sequential_prefetch_multiplier: 4, client_part_size: 516882, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -923,12 +974,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 278499), (311250, 1)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 16 * 1024 * 1024, max_backward_seek_distance: 2 * 1024 * 1024, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -938,12 +990,13 @@ mod tests { let object_size = 755678; let reads = vec![(0, 236766), (291204, 1), (280930, 36002)]; let config = TestConfig { - first_request_size: 556997, - max_request_size: 105938, + initial_read_window_size: 556997, + max_read_window_size: 105938, sequential_prefetch_multiplier: 7, client_part_size: 1219731, max_forward_seek_wait_distance: 2260662, max_backward_seek_distance: 2369799, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -953,12 +1006,13 @@ mod tests { let object_size = 14201; let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)]; let config = TestConfig { - first_request_size: 457999, - max_request_size: 863511, + initial_read_window_size: 457999, + max_read_window_size: 863511, sequential_prefetch_multiplier: 5, client_part_size: 1972409, max_forward_seek_wait_distance: 2810651, max_backward_seek_distance: 3531090, + cache_block_size: 1 * MB, }; run_random_read_test(default_stream(), object_size, reads, config); } @@ -971,6 +1025,9 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size: PART_SIZE, + enable_backpressure: true, + // For simplicity, prefetch the whole object in one request. + initial_read_window_size: OBJECT_SIZE, ..Default::default() }; @@ -1002,12 +1059,7 @@ mod tests { HashMap::new(), )); - // For simplicity, prefetch the whole object in one request. - let prefetcher_config = PrefetcherConfig { - first_request_size: OBJECT_SIZE, - ..Default::default() - }; - let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); + let prefetcher = Prefetcher::new(default_stream(), Default::default()); block_on(async { let mut request = prefetcher.prefetch(client, "test-bucket", "hello", OBJECT_SIZE as u64, etag.clone()); @@ -1045,6 +1097,8 @@ mod tests { let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: FIRST_REQUEST_SIZE, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1053,12 +1107,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; - - let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); + let prefetcher = Prefetcher::new(default_stream(), Default::default()); // Try every possible seek from first_read_size for offset in first_read_size + 1..OBJECT_SIZE { @@ -1079,11 +1128,12 @@ mod tests { #[test_case(125, 110; "read in second request")] fn test_backward_seek(first_read_size: usize, part_size: usize) { const OBJECT_SIZE: usize = 200; - const FIRST_REQUEST_SIZE: usize = 100; let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size: part_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1092,11 +1142,7 @@ mod tests { client.add_object("hello", object); - let prefetcher_config = PrefetcherConfig { - first_request_size: FIRST_REQUEST_SIZE, - ..Default::default() - }; - let prefetcher = Prefetcher::new(default_stream(), prefetcher_config); + let prefetcher = Prefetcher::new(default_stream(), Default::default()); // Try every possible seek from first_read_size for offset in 0..first_read_size { @@ -1131,16 +1177,18 @@ mod tests { fn sequential_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); let object_size = rng.gen_range(1u64..1 * 1024 * 1024); - let first_request_size = rng.gen_range(16usize..1 * 1024 * 1024); - let max_request_size = rng.gen_range(16usize..1 * 1024 * 1024); + let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024 + 128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1150,8 +1198,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, + max_read_window_size, sequential_prefetch_multiplier, max_forward_seek_wait_distance, max_backward_seek_distance, @@ -1184,20 +1231,22 @@ mod tests { fn random_read_stress_helper() { let mut rng = shuttle::rand::thread_rng(); - let first_request_size = rng.gen_range(16usize..32 * 1024); - let max_request_size = rng.gen_range(16usize..32 * 1024); - // Try to prevent testing very small reads of very large objects, which are easy to OOM - // under Shuttle (lots of concurrent tasks) - let max_object_size = first_request_size.min(max_request_size) * 20; - let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); + let max_read_window_size = rng.gen_range(16usize..32 * 1024); let sequential_prefetch_multiplier = rng.gen_range(2usize..16); let part_size = rng.gen_range(16usize..128 * 1024); + let initial_read_window_size = rng.gen_range(16usize..128 * 1024); let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024); let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024); + // Try to prevent testing very small reads of very large objects, which are easy to OOM + // under Shuttle (lots of concurrent tasks) + let max_object_size = initial_read_window_size.min(max_read_window_size) * 20; + let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64); let config = MockClientConfig { bucket: "test-bucket".to_string(), part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(config)); @@ -1207,8 +1256,7 @@ mod tests { client.add_object("hello", object); let prefetcher_config = PrefetcherConfig { - first_request_size, - max_request_size, + max_read_window_size, sequential_prefetch_multiplier, max_forward_seek_wait_distance, max_backward_seek_distance, diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs new file mode 100644 index 000000000..542976ab5 --- /dev/null +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -0,0 +1,188 @@ +use std::ops::Range; + +use async_channel::{unbounded, Receiver, Sender}; +use tracing::trace; + +use super::PrefetchReadError; + +#[derive(Debug)] +pub enum BackpressureFeedbackEvent { + /// An event where data with a certain length has been read out of the stream + DataRead(usize), + /// An event indicating part queue stall + PartQueueStall, +} + +pub struct BackpressureConfig { + /// Backpressure's initial read window size + pub initial_read_window_size: usize, + /// Maximum read window size that the backpressure controller is allowed to scale up to + pub max_read_window_size: usize, + /// Factor to increase the read window size by when the part queue is stalled + pub read_window_size_multiplier: usize, + /// Request range to apply backpressure + pub request_range: Range, +} + +#[derive(Debug)] +pub struct BackpressureController { + read_window_updater: Sender, + preferred_read_window_size: usize, + max_read_window_size: usize, + read_window_size_multiplier: usize, + /// Upper bound of the current read window. The request can return data up to this + /// offset *exclusively*. This value must be advanced to continue fetching new data. + read_window_end_offset: u64, + /// Next offset of the data to be read. It is used for tracking how many bytes of + /// data has been read out of the stream. + next_read_offset: u64, + /// End offset for the request we want to apply backpressure. The request can return + /// data up to this offset *exclusively*. + request_end_offset: u64, +} + +#[derive(Debug)] +pub struct BackpressureLimiter { + read_window_incrementing_queue: Receiver, + /// Upper bound of the current read window. + /// Calling [BackpressureLimiter::wait_for_read_window_increment()] will block current + /// thread until this value is advanced. + read_window_end_offset: u64, +} + +/// Creates a [BackpressureController] and its related [BackpressureLimiter]. +/// We use a pair of these to for providing feedback to backpressure stream. +/// +/// [BackpressureLimiter] is used on producer side of the object stream, that is, any +/// [super::part_stream::ObjectPartStream] that support backpressure. The producer can call +/// `wait_for_read_window_increment` to wait for feedback from the consumer. This method +/// could block when they know that the producer requires read window incrementing. +/// +/// [BackpressureController] will be given to the consumer side of the object stream. +/// It can be used anywhere to set preferred read window size for the stream and tell the +/// producer when its read window should be increased. +pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureController, BackpressureLimiter) { + let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64; + let (read_window_updater, read_window_incrementing_queue) = unbounded(); + let controller = BackpressureController { + read_window_updater, + preferred_read_window_size: config.initial_read_window_size, + max_read_window_size: config.max_read_window_size, + read_window_size_multiplier: config.read_window_size_multiplier, + read_window_end_offset, + next_read_offset: config.request_range.start, + request_end_offset: config.request_range.end, + }; + let limiter = BackpressureLimiter { + read_window_incrementing_queue, + read_window_end_offset, + }; + (controller, limiter) +} + +impl BackpressureController { + pub fn read_window_end_offset(&self) -> u64 { + self.read_window_end_offset + } + + /// Send a feedback to the backpressure controller when reading data out of the stream. The backpressure controller + /// will ensure that the read window size is enough to read this offset and that it is always close to `preferred_read_window_size`. + pub async fn send_feedback(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError> { + match event { + BackpressureFeedbackEvent::DataRead(length) => { + self.next_read_offset += length as u64; + // Increment the read window only if the remaining window reaches some threshold i.e. half of it left. + if self.remaining_window() < (self.preferred_read_window_size / 2) + && self.read_window_end_offset < self.request_end_offset + { + let new_read_window_end_offset = self + .next_read_offset + .saturating_add(self.preferred_read_window_size as u64) + .min(self.request_end_offset); + debug_assert!(self.read_window_end_offset < new_read_window_end_offset); + let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize; + trace!( + preferred_read_window_size = self.preferred_read_window_size, + next_read_offset = self.next_read_offset, + read_window_end_offset = self.read_window_end_offset, + to_increase, + "incrementing read window" + ); + self.increment_read_window(to_increase).await?; + self.read_window_end_offset = new_read_window_end_offset; + } + } + BackpressureFeedbackEvent::PartQueueStall => self.try_scaling_up(), + } + Ok(()) + } + + // Send an increment read window request to the stream producer + async fn increment_read_window(&self, len: usize) -> Result<(), PrefetchReadError> { + // This should not block since the channel is unbounded + self.read_window_updater + .send(len) + .await + .map_err(|_| PrefetchReadError::ReadWindowIncrement) + } + + fn remaining_window(&self) -> usize { + self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize + } + + // Try scaling up preferred read window size with a multiplier configured at initialization. + fn try_scaling_up(&mut self) { + if self.preferred_read_window_size < self.max_read_window_size { + let new_read_window_size = + (self.preferred_read_window_size * self.read_window_size_multiplier).min(self.max_read_window_size); + trace!( + current_size = self.preferred_read_window_size, + new_size = new_read_window_size, + "scaling up preferred read window" + ); + self.preferred_read_window_size = new_read_window_size; + } + } +} + +impl BackpressureLimiter { + pub fn read_window_end_offset(&self) -> u64 { + self.read_window_end_offset + } + + /// Checks if there is enough read window to put the next item with a given offset to the stream. + /// It blocks until receiving enough incrementing read window requests to serve the next part. + /// + /// Returns the new read window offset. + pub async fn wait_for_read_window_increment( + &mut self, + offset: u64, + ) -> Result, PrefetchReadError> { + // There is already enough read window so no need to block + if self.read_window_end_offset > offset { + // Check the read window incrementing queue to see there is an early request to increase read window + let new_read_window_offset = if let Ok(len) = self.read_window_incrementing_queue.try_recv() { + self.read_window_end_offset += len as u64; + Some(self.read_window_end_offset) + } else { + None + }; + return Ok(new_read_window_offset); + } + + // Reaching here means there is not enough read window, so we block until it is large enough + while self.read_window_end_offset <= offset { + trace!( + offset, + read_window_offset = self.read_window_end_offset, + "blocking for read window increment" + ); + let recv = self.read_window_incrementing_queue.recv().await; + match recv { + Ok(len) => self.read_window_end_offset += len as u64, + Err(_) => return Err(PrefetchReadError::ReadWindowIncrement), + } + } + Ok(Some(self.read_window_end_offset)) + } +} diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 75a1dff51..630444338 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -10,6 +10,7 @@ use tracing::{debug_span, trace, warn, Instrument}; use crate::checksums::ChecksummedBytes; use crate::data_cache::{BlockIndex, DataCache}; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig, BackpressureLimiter}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::part_stream::{ @@ -48,30 +49,35 @@ where where Client: ObjectClient + Clone + Send + Sync + 'static, { - let range = config.range.align(self.cache.block_size(), false); - - let start = range.start(); - let size = range.len(); + let range = config.range; + let backpressure_config = BackpressureConfig { + initial_read_window_size: config.initial_read_window_size, + max_read_window_size: config.max_read_window_size, + read_window_size_multiplier: config.read_window_size_multiplier, + request_range: range.into(), + }; + let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config); let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); let request_task = { - let request = CachingRequest::new(client.clone(), self.cache.clone(), config); + let request = CachingRequest::new(client.clone(), self.cache.clone(), backpressure_limiter, config); let span = debug_span!("prefetch", ?range); request.get_from_cache(range, part_queue_producer).instrument(span) }; let task_handle = self.runtime.spawn_with_handle(request_task).unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, range, part_queue, backpressure_controller) } } #[derive(Debug)] -struct CachingRequest { +struct CachingRequest { client: Client, cache: Arc, + backpressure_limiter: BackpressureLimiter, config: RequestTaskConfig, } @@ -80,11 +86,25 @@ where Client: ObjectClient + Clone + Send + Sync + 'static, Cache: DataCache + Send + Sync, { - fn new(client: Client, cache: Arc, config: RequestTaskConfig) -> Self { - Self { client, cache, config } + fn new( + client: Client, + cache: Arc, + backpressure_limiter: BackpressureLimiter, + config: RequestTaskConfig, + ) -> Self { + Self { + client, + cache, + backpressure_limiter, + config, + } } - async fn get_from_cache(self, range: RequestRange, part_queue_producer: PartQueueProducer) { + async fn get_from_cache( + mut self, + range: RequestRange, + part_queue_producer: PartQueueProducer, + ) { let cache_key = &self.config.object_id; let block_size = self.cache.block_size(); let block_range = self.block_indices_for_byte_range(&range); @@ -102,6 +122,15 @@ where let part = make_part(block, block_index, block_offset, block_size, cache_key, &range); part_queue_producer.push(Ok(part)); block_offset += block_size; + + if let Err(e) = self + .backpressure_limiter + .wait_for_read_window_increment(block_offset) + .await + { + part_queue_producer.push(Err(e)); + break; + } continue; } Ok(None) => trace!(?cache_key, block_index, ?range, "cache miss - no data for block"), @@ -116,32 +145,36 @@ where // If a block is uncached or reading it fails, fallback to S3 for the rest of the stream. metrics::counter!("prefetch.blocks_served_from_cache").increment(block_index - block_range.start); metrics::counter!("prefetch.blocks_requested_to_client").increment(block_range.end - block_index); - return self - .get_from_client( - range.trim_start(block_offset), - block_index..block_range.end, - part_queue_producer, - ) - .await; + self.get_from_client( + range.trim_start(block_offset), + block_index..block_range.end, + part_queue_producer, + ) + .await; + return; } // We served the whole range from cache. metrics::counter!("prefetch.blocks_served_from_cache").increment(block_range.end - block_range.start); } async fn get_from_client( - &self, + &mut self, range: RequestRange, block_range: Range, part_queue_producer: PartQueueProducer, ) { let bucket = &self.config.bucket; let cache_key = &self.config.object_id; + let first_read_window_end_offset = self.config.range.start() + self.config.initial_read_window_size as u64; let block_size = self.cache.block_size(); assert!(block_size > 0); // Always request a range aligned with block boundaries (or to the end of the object). let block_aligned_byte_range = (block_range.start * block_size)..(block_range.end * block_size).min(range.object_size() as u64); + let request_len = (block_aligned_byte_range.end - block_aligned_byte_range.start) as usize; + let block_aligned_byte_range = + RequestRange::new(range.object_size(), block_aligned_byte_range.start, request_len); trace!( key = cache_key.key(), @@ -160,14 +193,36 @@ where cache: self.cache.clone(), }; - let request_stream = read_from_request( - self.client.clone(), - bucket.clone(), - cache_key.clone(), - block_aligned_byte_range, - ); - let part_composer_future = part_composer.try_compose_parts(request_stream); - part_composer_future.await; + // Start by issuing the first request that has a range up to initial read window offset. + // This is an optimization to lower time to first bytes, see more details in [ClientPartStream] about why this is needed. + let first_req_range = block_aligned_byte_range.trim_end(first_read_window_end_offset); + if !first_req_range.is_empty() { + let request_stream = read_from_request( + &mut self.backpressure_limiter, + self.client.clone(), + bucket.clone(), + cache_key.clone(), + first_req_range.into(), + ); + let part_composer_future = part_composer.try_compose_parts(request_stream); + part_composer_future.await; + } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = block_aligned_byte_range.trim_start(first_read_window_end_offset); + if !range.is_empty() { + let request_stream = read_from_request( + &mut self.backpressure_limiter, + self.client.clone(), + bucket.clone(), + cache_key.clone(), + range.into(), + ); + let part_composer_future = part_composer.try_compose_parts(request_stream); + part_composer_future.await; + } + part_composer.flush(); } fn block_indices_for_byte_range(&self, range: &RequestRange) -> Range { @@ -230,11 +285,39 @@ where // Split the body into blocks. let mut body: Bytes = body.into(); + let mut offset = offset; while !body.is_empty() { let remaining = (block_size as usize).saturating_sub(self.buffer.len()).min(body.len()); - let chunk = body.split_to(remaining); + let chunk: ChecksummedBytes = body.split_to(remaining).into(); + + // We need to return some bytes to the part queue even before we can fill an entire caching block because + // we want to start the feedback loop for the flow-control window. + // + // This is because the read window might be enough to fetch "some data" from S3 but not the entire block. + // For example, consider that we got a file system read request with range 2MB to 4MB and we have to start + // reading from block_offset=0 and block_size=5MB. The first read window might have a range up to 4MB which + // is enough to serve the read request but if the prefetcher is not able to read anything it cannot tell + // the stream to move its read window. + // + // A side effect from this is delay on the cache updating which is actually good for performance but it makes + // testing a bit more complicated because the cache might not be updated immediately. + let part_range = self + .original_range + .trim_start(offset) + .trim_end(offset + chunk.len() as u64); + if !part_range.is_empty() { + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + let part = Part::new( + self.cache_key.clone(), + part_range.start(), + chunk.slice(trim_start..trim_end), + ); + self.part_queue_producer.push(Ok(part)); + } + offset += chunk.len() as u64; self.buffer - .extend(chunk.into()) + .extend(chunk) .inspect_err(|e| warn!(key, error=?e, "integrity check for body part failed"))?; if self.buffer.len() < block_size as usize { break; @@ -248,23 +331,24 @@ where self.block_offset, &self.cache_key, ); - self.part_queue_producer.push(Ok(make_part( - self.buffer.clone(), - self.block_index, - self.block_offset, - block_size, - &self.cache_key, - &self.original_range, - ))); self.block_index += 1; self.block_offset += block_size; self.buffer = ChecksummedBytes::default(); } } + Ok(()) + } + /// Flush remaining data in the buffer to the cache. This can be called to write the last + /// block for the object. + fn flush(self) { + let block_size = self.cache.block_size(); if !self.buffer.is_empty() { - // If we still have data in the buffer, this must be the last block for this object, - // which can be smaller than block_size (and ends at the end of the object). + assert!( + self.buffer.len() < block_size as usize, + "buffer should be flushed when we get a full block" + ); + // The last block for the object can be smaller than block_size (and ends at the end of the object). assert_eq!( self.block_offset as usize + self.buffer.len(), self.original_range.object_size(), @@ -279,16 +363,14 @@ where &self.cache_key, ); self.part_queue_producer.push(Ok(make_part( - self.buffer.clone(), + self.buffer, self.block_index, self.block_offset, - block_size, + self.cache.block_size(), &self.cache_key, &self.original_range, ))); } - - Ok(()) } } @@ -353,7 +435,7 @@ mod tests { }; use test_case::test_case; - use crate::data_cache::InMemoryDataCache; + use crate::{data_cache::InMemoryDataCache, object::ObjectId}; use super::*; @@ -379,11 +461,18 @@ mod tests { let object = MockObject::ramp(seed, object_size, ETag::for_tests()); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -400,7 +489,10 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, - preferred_part_size: 0, + preferred_part_size: 256 * KB, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, }; let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); @@ -415,13 +507,17 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range, - preferred_part_size: 0, + preferred_part_size: 256 * KB, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, }; let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); get_object_counter.count() }; - assert_eq!(second_read_count, 0); + // Just check that some blocks are served from cache + assert!(second_read_count < first_read_count); } #[test_case(1 * MB, 8 * MB)] @@ -435,11 +531,18 @@ mod tests { let object = MockObject::ramp(seed, object_size, ETag::for_tests()); let id = ObjectId::new(key.to_owned(), object.etag()); + // backpressure config + let initial_read_window_size = 1 * MB; + let max_read_window_size = 64 * MB; + let read_window_size_multiplier = 2; + let cache = InMemoryDataCache::new(block_size as u64); let bucket = "test-bucket"; let config = MockClientConfig { bucket: bucket.to_string(), part_size: client_part_size, + enable_backpressure: true, + initial_read_window_size, ..Default::default() }; let mock_client = Arc::new(MockClient::new(config)); @@ -454,7 +557,10 @@ mod tests { bucket: bucket.to_owned(), object_id: id.clone(), range: RequestRange::new(object_size, offset as u64, preferred_size), - preferred_part_size: 0, + preferred_part_size: 256 * KB, + initial_read_window_size, + max_read_window_size, + read_window_size_multiplier, }; let request_task = stream.spawn_get_object_request(&mock_client, config); compare_read(&id, &object, request_task); diff --git a/mountpoint-s3/src/prefetch/part.rs b/mountpoint-s3/src/prefetch/part.rs index 8542ed173..2e9106dce 100644 --- a/mountpoint-s3/src/prefetch/part.rs +++ b/mountpoint-s3/src/prefetch/part.rs @@ -1,6 +1,6 @@ use thiserror::Error; -use crate::checksums::ChecksummedBytes; +use crate::checksums::{ChecksummedBytes, IntegrityError}; use crate::object::ObjectId; /// A self-identifying part of an S3 object. Users can only retrieve the bytes from this part if @@ -21,7 +21,13 @@ impl Part { } } - pub fn into_bytes(self, id: &ObjectId, offset: u64) -> Result { + pub fn extend(&mut self, other: &Part) -> Result<(), PartOperationError> { + let expected_offset = self.offset + self.checksummed_bytes.len() as u64; + other.check(&self.id, expected_offset)?; + Ok(self.checksummed_bytes.extend(other.clone().checksummed_bytes)?) + } + + pub fn into_bytes(self, id: &ObjectId, offset: u64) -> Result { self.check(id, offset).map(|_| self.checksummed_bytes) } @@ -38,6 +44,10 @@ impl Part { } } + pub(super) fn offset(&self) -> u64 { + self.offset + } + pub(super) fn len(&self) -> usize { self.checksummed_bytes.len() } @@ -46,15 +56,15 @@ impl Part { self.checksummed_bytes.is_empty() } - fn check(&self, id: &ObjectId, offset: u64) -> Result<(), PartMismatchError> { + fn check(&self, id: &ObjectId, offset: u64) -> Result<(), PartOperationError> { if self.id != *id { - return Err(PartMismatchError::Id { + return Err(PartOperationError::IdMismatch { actual: self.id.clone(), requested: id.to_owned(), }); } if self.offset != offset { - return Err(PartMismatchError::Offset { + return Err(PartOperationError::OffsetMismatch { actual: self.offset, requested: offset, }); @@ -64,10 +74,117 @@ impl Part { } #[derive(Debug, Error)] -pub enum PartMismatchError { +pub enum PartOperationError { + #[error("part integrity check failed")] + Integrity(#[from] IntegrityError), + #[error("wrong part id: actual={actual:?}, requested={requested:?}")] - Id { actual: ObjectId, requested: ObjectId }, + IdMismatch { actual: ObjectId, requested: ObjectId }, #[error("wrong part offset: actual={actual}, requested={requested}")] - Offset { actual: u64, requested: u64 }, + OffsetMismatch { actual: u64, requested: u64 }, +} + +#[cfg(test)] +mod tests { + use mountpoint_s3_client::types::ETag; + + use crate::{checksums::ChecksummedBytes, object::ObjectId, prefetch::part::PartOperationError}; + + use super::Part; + + #[test] + fn test_append() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset + first_part_len as u64; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + first.extend(&second).expect("should be able to extend"); + assert_eq!(first_part_len + second_part_len, first.len()); + first.check(&object_id, first_offset).expect("the part should be valid"); + } + + #[test] + fn test_append_with_mismatch_object_id() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_object_id = ObjectId::new("other".to_owned(), ETag::for_tests()); + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(second_object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartOperationError::IdMismatch { + actual: _, + requested: _ + }) + )); + } + + #[test] + fn test_append_with_mismatch_offset() { + let object_id = ObjectId::new("key".to_owned(), ETag::for_tests()); + let first_offset = 0; + let first_part_len = 1024; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(first_offset as u8 as usize) + .take(first_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let mut first = Part::new(object_id.clone(), first_offset, checksummed_bytes); + + let second_part_len = 512; + let second_offset = first_offset; + let body: Box<[u8]> = (0u8..=255) + .cycle() + .skip(second_offset as u8 as usize) + .take(second_part_len) + .collect(); + let checksummed_bytes = ChecksummedBytes::new(body.into()); + let second = Part::new(object_id.clone(), second_offset, checksummed_bytes); + + let result = first.extend(&second); + assert!(matches!( + result, + Err(PartOperationError::OffsetMismatch { + actual: _, + requested: _ + }) + )); + } } diff --git a/mountpoint-s3/src/prefetch/part_queue.rs b/mountpoint-s3/src/prefetch/part_queue.rs index e680babb1..30a2c264c 100644 --- a/mountpoint-s3/src/prefetch/part_queue.rs +++ b/mountpoint-s3/src/prefetch/part_queue.rs @@ -51,7 +51,7 @@ impl PartQueue { /// empty. /// /// If this method returns an Err, the PartQueue must never be accessed again. - pub async fn read(&self, length: usize) -> Result> { + pub async fn read(&mut self, length: usize) -> Result> { let mut current_part = self.current_part.lock().await; assert!( @@ -93,6 +93,27 @@ impl PartQueue { Ok(part) } + /// Push a new [Part] onto the front of the queue + /// which actually just concatenate it with the current part + pub async fn push_front(&self, mut part: Part) -> Result<(), PrefetchReadError> { + let part_len = part.len(); + let mut current_part = self.current_part.lock().await; + + assert!( + !self.failed.load(Ordering::SeqCst), + "cannot use a PartQueue after failure" + ); + + if let Some(current_part) = current_part.as_mut() { + part.extend(current_part)?; + *current_part = part; + } else { + *current_part = Some(part); + } + metrics::gauge!("prefetch.bytes_in_queue").increment(part_len as f64); + Ok(()) + } + pub fn bytes_received(&self) -> usize { self.bytes_received.load(Ordering::SeqCst) } @@ -160,7 +181,7 @@ mod tests { async fn run_test(ops: Vec) { let part_id = ObjectId::new("key".to_owned(), ETag::for_tests()); - let (part_queue, part_queue_producer) = unbounded_part_queue::(); + let (mut part_queue, part_queue_producer) = unbounded_part_queue::(); let mut current_offset = 0; let mut current_length = 0; for op in ops { diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 11f897f3a..3111879f3 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -2,18 +2,21 @@ use async_stream::try_stream; use bytes::Bytes; use futures::task::{Spawn, SpawnExt}; use futures::{pin_mut, Stream, StreamExt}; -use mountpoint_s3_client::ObjectClient; +use mountpoint_s3_client::{types::GetObjectRequest, ObjectClient}; use std::marker::{Send, Sync}; use std::{fmt::Debug, ops::Range}; use tracing::{debug_span, error, trace, Instrument}; use crate::checksums::ChecksummedBytes; use crate::object::ObjectId; +use crate::prefetch::backpressure_controller::{new_backpressure_controller, BackpressureConfig}; use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureLimiter; + /// A generic interface to retrieve data from objects in a S3-like store. pub trait ObjectPartStream { /// Spawns a request to get the content of an object. The object data will be retrieved in fixed size @@ -35,6 +38,9 @@ pub struct RequestTaskConfig { pub object_id: ObjectId, pub range: RequestRange, pub preferred_part_size: usize, + pub initial_read_window_size: usize, + pub max_read_window_size: usize, + pub read_window_size_multiplier: usize, } /// The range of a [ObjectPartStream::spawn_get_object_request] request. @@ -180,37 +186,95 @@ where Client: ObjectClient + Clone + Send + Sync + 'static, { assert!(config.preferred_part_size > 0); - let request_range = config - .range - .align(client.read_part_size().unwrap_or(8 * 1024 * 1024) as u64, true); - let start = request_range.start(); - let size = request_range.len(); + let range = config.range; + + let backpressure_config = BackpressureConfig { + initial_read_window_size: config.initial_read_window_size, + max_read_window_size: config.max_read_window_size, + read_window_size_multiplier: config.read_window_size_multiplier, + request_range: range.into(), + }; + let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config); let (part_queue, part_queue_producer) = unbounded_part_queue(); - trace!(range=?request_range, "spawning request"); + trace!(?range, "spawning request"); - let span = debug_span!("prefetch", range=?request_range); + let span = debug_span!("prefetch", ?range); let client = client.clone(); - let bucket = config.bucket.clone(); let task_handle = self .runtime .spawn_with_handle( async move { - let part_composer = ClientPartComposer { - part_queue_producer, - object_id: config.object_id.clone(), - preferred_part_size: config.preferred_part_size, + let mut client_request = ClientRequest { + client: client.clone(), + backpressure_limiter, + config, }; - - let request_stream = read_from_request(client, bucket, config.object_id, request_range.into()); - let part_composer_future = part_composer.try_compose_parts(request_stream); - part_composer_future.await; + client_request.get_from_client(part_queue_producer).await; } .instrument(span), ) .unwrap(); - RequestTask::from_handle(task_handle, size, start, part_queue) + RequestTask::from_handle(task_handle, range, part_queue, backpressure_controller) + } +} + +struct ClientRequest { + client: Client, + backpressure_limiter: BackpressureLimiter, + config: RequestTaskConfig, +} + +impl ClientRequest +where + Client: ObjectClient + Clone + Send + Sync + 'static, +{ + async fn get_from_client(&mut self, part_queue_producer: PartQueueProducer) { + let bucket = &self.config.bucket; + let object_id = &self.config.object_id; + let first_read_window_end_offset = self.config.range.start() + self.config.initial_read_window_size as u64; + + let part_composer = ClientPartComposer { + part_queue_producer, + object_id: object_id.clone(), + preferred_part_size: self.config.preferred_part_size, + }; + + // Normally, initial read window size should be very small (~1MB) so that we can serve the first read request as soon as possible, + // but right now the CRT only returns data in chunks of part size (default to 8MB) even if initial read window is smaller than that. + // This makes time to first byte much higher than expected. + // + // To workaround this issue, we instead create two requests for the part stream where the first request has the range exactly equal to + // the initial read window size to force the CRT to return data immediately, and the second request for the rest of the stream. + // + // Let's start by issuing the first request with a range trimmed to initial read window offset + let first_req_range = self.config.range.trim_end(first_read_window_end_offset); + let request_stream = read_from_request( + &mut self.backpressure_limiter, + self.client.clone(), + bucket.clone(), + object_id.clone(), + first_req_range.into(), + ); + let part_composer_future = part_composer.try_compose_parts(request_stream); + part_composer_future.await; + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = self.config.range.trim_start(first_read_window_end_offset); + if range.is_empty() { + return; + } + let request_stream = read_from_request( + &mut self.backpressure_limiter, + self.client.clone(), + bucket.clone(), + object_id.clone(), + range.into(), + ); + let part_composer_future = part_composer.try_compose_parts(request_stream); + part_composer_future.await; } } @@ -240,8 +304,10 @@ impl ClientPartComposer { // in order to avoid validating checksum on large parts at read. let mut body: Bytes = body.into(); let mut curr_offset = offset; + let alignment = self.preferred_part_size; while !body.is_empty() { - let chunk_size = self.preferred_part_size.min(body.len()); + let distance_to_align = alignment - (curr_offset % alignment as u64) as usize; + let chunk_size = distance_to_align.min(body.len()); let chunk = body.split_to(chunk_size); // S3 doesn't provide checksum for us if the request range is not aligned to // object part boundaries, so we're computing our own checksum here. @@ -257,12 +323,13 @@ impl ClientPartComposer { /// Creates a `GetObject` request with the specified range and sends received body parts to the stream. /// A [PrefetchReadError] is returned when the request cannot be completed. -pub fn read_from_request( +pub fn read_from_request<'a, Client: ObjectClient + 'a>( + backpressure_limiter: &'a mut BackpressureLimiter, client: Client, bucket: String, id: ObjectId, request_range: Range, -) -> impl Stream> { +) -> impl Stream> + 'a { try_stream! { let request = client .get_object(&bucket, id.key(), Some(request_range), Some(id.etag().clone())) @@ -271,14 +338,27 @@ pub fn read_from_request( .map_err(PrefetchReadError::GetRequestFailed)?; pin_mut!(request); + let read_window_size_diff = backpressure_limiter + .read_window_end_offset() + .saturating_sub(request.as_ref().read_window_end_offset()) as usize; + request.as_mut().increment_read_window(read_window_size_diff); + while let Some(next) = request.next().await { let (offset, body) = next .inspect_err(|e| error!(key=id.key(), error=?e, "GetObject body part failed")) .map_err(PrefetchReadError::GetRequestFailed)?; - trace!(offset, length = body.len(), "received GetObject part"); + let length = body.len() as u64; + trace!(offset, length, "received GetObject part"); metrics::counter!("s3.client.total_bytes", "type" => "read").increment(body.len() as u64); yield(offset, body); + + // Blocks if read window increment if it's not enough to read the next offset + let next_offset = offset + length; + if let Some(next_read_window_offset) = backpressure_limiter.wait_for_read_window_increment(next_offset).await? { + let diff = next_read_window_offset.saturating_sub(request.as_ref().read_window_end_offset()) as usize; + request.as_mut().increment_read_window(diff); + } } trace!("request finished"); } diff --git a/mountpoint-s3/src/prefetch/task.rs b/mountpoint-s3/src/prefetch/task.rs index c55ebf532..1fb7edc63 100644 --- a/mountpoint-s3/src/prefetch/task.rs +++ b/mountpoint-s3/src/prefetch/task.rs @@ -1,65 +1,82 @@ use futures::future::RemoteHandle; +use crate::prefetch::backpressure_controller::BackpressureFeedbackEvent::{DataRead, PartQueueStall}; use crate::prefetch::part::Part; -use crate::prefetch::part_queue::{unbounded_part_queue, PartQueue}; +use crate::prefetch::part_queue::PartQueue; use crate::prefetch::PrefetchReadError; +use super::backpressure_controller::BackpressureController; +use super::part_stream::RequestRange; + /// A single GetObject request submitted to the S3 client #[derive(Debug)] pub struct RequestTask { /// Handle on the task/future. The future is cancelled when handle is dropped. This is None if /// the request is fake (created by seeking backwards in the stream) - task_handle: Option>, + _task_handle: RemoteHandle<()>, remaining: usize, - start_offset: u64, - total_size: usize, + range: RequestRange, part_queue: PartQueue, + backpressure_controller: BackpressureController, } impl RequestTask { - pub fn from_handle(task_handle: RemoteHandle<()>, size: usize, offset: u64, part_queue: PartQueue) -> Self { + pub fn from_handle( + task_handle: RemoteHandle<()>, + range: RequestRange, + part_queue: PartQueue, + backpressure_controller: BackpressureController, + ) -> Self { Self { - task_handle: Some(task_handle), - remaining: size, - start_offset: offset, - total_size: size, + _task_handle: task_handle, + remaining: range.len(), + range, part_queue, + backpressure_controller, } } - pub fn from_parts(parts: impl IntoIterator, offset: u64) -> Self { - let mut size = 0; - let (part_queue, part_queue_producer) = unbounded_part_queue(); - for part in parts { - size += part.len(); - part_queue_producer.push(Ok(part)); - } - Self { - task_handle: None, - remaining: size, - start_offset: offset, - total_size: size, - part_queue, + // Push a given list of parts in front of the part queue + pub async fn push_front(&mut self, parts: Vec) -> Result<(), PrefetchReadError> { + // Merge all parts into one single part by pushing them to the front of the part queue. + // This could result in a really big part, but we normally use this only for backward seek + // so its size should not be bigger than the prefetcher's `max_backward_seek_distance`. + for part in parts.into_iter().rev() { + self.remaining += part.len(); + self.part_queue.push_front(part).await?; } + Ok(()) } pub async fn read(&mut self, length: usize) -> Result> { let part = self.part_queue.read(length).await?; debug_assert!(part.len() <= self.remaining); self.remaining -= part.len(); + + // We read some data out of the part queue so the read window should be moved + self.backpressure_controller.send_feedback(DataRead(part.len())).await?; + + let next_offset = part.offset() + part.len() as u64; + let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize; + // If the part queue is empty it means we are reading faster than the task could prefetch, + // so we should use larger window for the task. + if remaining_in_queue == 0 { + self.backpressure_controller.send_feedback(PartQueueStall).await?; + } + Ok(part) } pub fn start_offset(&self) -> u64 { - self.start_offset + self.range.start() } pub fn end_offset(&self) -> u64 { - self.start_offset + self.total_size as u64 + self.range.end() } pub fn total_size(&self) -> usize { - self.total_size + self.range.len() } pub fn remaining(&self) -> usize { @@ -68,12 +85,10 @@ impl RequestTask { /// Maximum offset which data is known to be already in the `self.part_queue` pub fn available_offset(&self) -> u64 { - self.start_offset + self.part_queue.bytes_received() as u64 + self.start_offset() + self.part_queue.bytes_received() as u64 } - /// Some requests aren't actually streaming data (they're fake, created by backwards seeks), and - /// shouldn't be counted for prefetcher progress. - pub fn is_streaming(&self) -> bool { - self.task_handle.is_some() + pub fn read_window_end_offset(&self) -> u64 { + self.backpressure_controller.read_window_end_offset() } } diff --git a/mountpoint-s3/tests/common/fuse.rs b/mountpoint-s3/tests/common/fuse.rs index aa8290fe8..87399531f 100644 --- a/mountpoint-s3/tests/common/fuse.rs +++ b/mountpoint-s3/tests/common/fuse.rs @@ -49,6 +49,7 @@ pub type TestClientBox = Box; pub struct TestSessionConfig { pub part_size: usize, + pub initial_read_window_size: usize, pub filesystem_config: S3FilesystemConfig, pub prefetcher_config: PrefetcherConfig, pub auth_config: S3ClientAuthConfig, @@ -56,8 +57,10 @@ pub struct TestSessionConfig { impl Default for TestSessionConfig { fn default() -> Self { + let part_size = 8 * 1024 * 1024; Self { - part_size: 8 * 1024 * 1024, + part_size, + initial_read_window_size: part_size, filesystem_config: Default::default(), prefetcher_config: Default::default(), auth_config: Default::default(), @@ -125,6 +128,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -162,6 +167,8 @@ pub mod mock_session { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: test_config.part_size, + enable_backpressure: true, + initial_read_window_size: test_config.initial_read_window_size, ..Default::default() }; let client = Arc::new(MockClient::new(client_config)); @@ -284,7 +291,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) .endpoint_config(EndpointConfig::new(®ion)) - .auth_config(test_config.auth_config); + .auth_config(test_config.auth_config) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = default_prefetch(runtime, test_config.prefetcher_config); @@ -316,7 +325,9 @@ pub mod s3_session { let client_config = S3ClientConfig::default() .part_size(test_config.part_size) - .endpoint_config(EndpointConfig::new(®ion)); + .endpoint_config(EndpointConfig::new(®ion)) + .read_backpressure(true) + .initial_read_window(test_config.initial_read_window_size); let client = S3CrtClient::new(client_config).unwrap(); let runtime = client.event_loop_group(); let prefetcher = caching_prefetch(cache, runtime, test_config.prefetcher_config); diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 3afec9c5e..53ee65a7c 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -37,6 +37,8 @@ pub fn make_test_filesystem( let client_config = MockClientConfig { bucket: bucket.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index a6ec2ec11..3eeb7eea0 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -735,6 +735,8 @@ async fn test_upload_aborted_on_write_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -812,6 +814,8 @@ async fn test_upload_aborted_on_fsync_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -874,6 +878,8 @@ async fn test_upload_aborted_on_release_failure() { let client_config = MockClientConfig { bucket: BUCKET_NAME.to_string(), part_size: 1024 * 1024, + enable_backpressure: true, + initial_read_window_size: 256 * 1024, ..Default::default() }; @@ -1489,7 +1495,9 @@ async fn test_readdir_rewind_with_local_files_only() { async fn test_lookup_404_not_an_error() { let name = "test_lookup_404_not_an_error"; let (bucket, prefix) = get_test_bucket_and_prefix(name); - let client_config = S3ClientConfig::default().endpoint_config(EndpointConfig::new(&get_test_region())); + let client_config = S3ClientConfig::default() + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); let fs = make_test_filesystem_with_client( client, @@ -1523,7 +1531,8 @@ async fn test_lookup_forbidden() { let auth_config = get_crt_client_auth_config(get_scoped_down_credentials(&policy).await); let client_config = S3ClientConfig::default() .auth_config(auth_config) - .endpoint_config(EndpointConfig::new(&get_test_region())); + .endpoint_config(EndpointConfig::new(&get_test_region())) + .read_backpressure(true); let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); // create an empty file diff --git a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs index c7ece49af..4f47571d4 100644 --- a/mountpoint-s3/tests/fuse_tests/prefetch_test.rs +++ b/mountpoint-s3/tests/fuse_tests/prefetch_test.rs @@ -1,6 +1,5 @@ use fuser::BackgroundSession; use mountpoint_s3::data_cache::InMemoryDataCache; -use mountpoint_s3::prefetch::PrefetcherConfig; use std::fs::{File, OpenOptions}; use std::io::Read; use tempfile::TempDir; @@ -65,30 +64,32 @@ fn read_test_mock_with_cache(object_size: usize) { ); } -/// test for checking either prefetching fails or read original object when object is mutated during read. -/// Prefetching of next request occurs when more than half of the current request is being read. -/// So, when we read the first block, it prefetches the requests ti require to fulfill and the next request -/// depending on size of last request. -/// If object is mutated, E-Tag for the new prefetch request will change and hence the request will fail giving IO error. -fn prefetch_test_etag(creator_fn: F, prefix: &str, request_size: usize, read_size: usize) -where +/// Test for checking either prefetching fails or read original object when object is mutated during read. +/// Prefetching of next read window occurs when more than half of the current window is being read. +/// When we read the first block, it prefetches the data with a window size enough to fulfill the request +/// then increase the window size when needed. +/// If object is mutated, reading a part from the next read window would fail from pre-condition (ETag) error. +fn prefetch_test_etag( + creator_fn: F, + prefix: &str, + part_size: usize, + initial_read_window_size: usize, + read_size: usize, +) where F: FnOnce(&str, TestSessionConfig) -> (TempDir, BackgroundSession, TestClientBox), { - const OBJECT_SIZE: usize = 1024 * 1024; - - let prefetcher_config = PrefetcherConfig { - first_request_size: request_size, - ..Default::default() - }; - + // Object needs to be larger than part size because the CRT returns data in chunks of part size, + // we would not be able to see the failures if it's smaller. + let object_size = part_size * 2; let (mount_point, _session, mut test_client) = creator_fn( prefix, TestSessionConfig { - prefetcher_config, + part_size, + initial_read_window_size, ..Default::default() }, ); - let original_data_buf = vec![0u8; OBJECT_SIZE]; + let original_data_buf = vec![0u8; object_size]; test_client.put_object("dir/hello.txt", &original_data_buf).unwrap(); @@ -104,7 +105,7 @@ where .expect("Should be able to read file to buf"); // changed the value of data buf to distinguish it from previous data of the object. - let final_data_buf = vec![255u8; OBJECT_SIZE]; + let final_data_buf = vec![255u8; object_size]; test_client.put_object("dir/hello.txt", &final_data_buf).unwrap(); let mut dest_buf = vec![0u8; read_size]; @@ -149,11 +150,13 @@ where #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new, "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -162,11 +165,13 @@ fn prefetch_test_etag_mock(request_size: usize, read_size: usize) { #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] #[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(64 * 1024, 500 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_mock_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 256 * 1024; prefetch_test_etag( fuse::mock_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_mock", - request_size, + part_size, + initial_read_window_size, read_size, ); } @@ -174,22 +179,31 @@ fn prefetch_test_etag_mock_with_cache(request_size: usize, read_size: usize) { #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3(request_size: usize, read_size: usize) { - prefetch_test_etag(fuse::s3_session::new, "prefetch_test_etag_s3", request_size, read_size); +fn prefetch_test_etag_s3(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; + prefetch_test_etag( + fuse::s3_session::new, + "prefetch_test_etag_s3", + part_size, + initial_read_window_size, + read_size, + ); } #[cfg(feature = "s3_tests")] #[test_case(256 * 1024, 1024; "default first request size, much larger than first block read size")] #[test_case(64 * 1024, 1024; "first request size smaller than default, much larger than first block read size")] -#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] +#[test_case(512 * 1024, 1024; "first request size greater than default, much larger than first block read size")] #[test_case(256 * 1024, 256 * 1024; "first request size smaller than first block read size")] -fn prefetch_test_etag_s3_with_cache(request_size: usize, read_size: usize) { +fn prefetch_test_etag_s3_with_cache(initial_read_window_size: usize, read_size: usize) { + let part_size = 8 * 1024 * 1024; prefetch_test_etag( fuse::s3_session::new_with_cache(InMemoryDataCache::new(1024 * 1024)), "prefetch_test_etag_s3", - request_size, + part_size, + initial_read_window_size, read_size, ); } diff --git a/mountpoint-s3/tests/mock_s3_tests.rs b/mountpoint-s3/tests/mock_s3_tests.rs index cdf17bb0d..4aaa95dfa 100644 --- a/mountpoint-s3/tests/mock_s3_tests.rs +++ b/mountpoint-s3/tests/mock_s3_tests.rs @@ -134,6 +134,7 @@ fn create_fs_with_mock_s3(bucket: &str) -> (TestS3Filesystem, MockS let client_config = S3ClientConfig::default() .endpoint_config(endpoint_config) .auth_config(S3ClientAuthConfig::NoSigning) + .read_backpressure(true) .max_attempts(NonZeroUsize::new(3).unwrap()); // retry S3 request 3 times (which equals the existing default) let client = S3CrtClient::new(client_config).expect("must be able to create a CRT client"); (