diff --git a/mountpoint-s3/src/checksums.rs b/mountpoint-s3/src/checksums.rs index b96f1b18a..4761506ef 100644 --- a/mountpoint-s3/src/checksums.rs +++ b/mountpoint-s3/src/checksums.rs @@ -63,7 +63,7 @@ impl ChecksummedBytes { /// This operation just increases the reference count and sets a few indices, /// so there will be no validation and the checksum will not be recomputed. pub fn split_off(&mut self, at: usize) -> ChecksummedBytes { - assert!(at <= self.len()); + assert!(at < self.len()); let start = self.range.start; let prefix_range = start..(start + at); @@ -77,27 +77,6 @@ impl ChecksummedBytes { } } - /// Splits the checksummed bytes into two at the given index. - /// - /// Afterwards self contains elements [at, len), and the returned Bytes contains elements [0, at). - /// - /// This operation just increases the reference count and sets a few indices, - /// so there will be no validation and the checksum will not be recomputed. - pub fn split_to(&mut self, at: usize) -> ChecksummedBytes { - assert!(at <= self.len()); - - let start = self.range.start; - let prefix_range = start..(start + at); - let suffix_range = (start + at)..self.range.end; - - self.range = suffix_range; - Self { - buffer: self.buffer.clone(), - range: prefix_range, - checksum: self.checksum, - } - } - /// Returns a slice of self for the provided range. /// /// This operation just increases the reference count and sets a few indices, @@ -342,26 +321,6 @@ mod tests { assert_eq!(expected_checksum, new_checksummed_bytes.checksum); } - #[test] - fn test_split_to() { - let split_to_at = 4; - let bytes = Bytes::from_static(b"some bytes"); - let expected = bytes.clone(); - let expected_checksum = crc32c::checksum(&expected); - let mut checksummed_bytes = ChecksummedBytes::new(bytes); - - let mut expected_part2 = expected.clone(); - let expected_part1 = expected_part2.split_to(split_to_at); - let new_checksummed_bytes = checksummed_bytes.split_to(split_to_at); - - assert_eq!(expected, checksummed_bytes.buffer); - assert_eq!(expected, new_checksummed_bytes.buffer); - assert_eq!(expected_part1, new_checksummed_bytes.buffer_slice()); - assert_eq!(expected_part2, checksummed_bytes.buffer_slice()); - assert_eq!(expected_checksum, checksummed_bytes.checksum); - assert_eq!(expected_checksum, new_checksummed_bytes.checksum); - } - #[test] fn test_slice() { let range = 3..7; diff --git a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs index a39d118fa..47db7f735 100644 --- a/mountpoint-s3/src/data_cache/in_memory_data_cache.rs +++ b/mountpoint-s3/src/data_cache/in_memory_data_cache.rs @@ -21,6 +21,12 @@ impl InMemoryDataCache { block_size, } } + + /// Get number of caching blocks for the given cache key. + pub fn block_count(&self, cache_key: &ObjectId) -> usize { + let data = self.data.read().unwrap(); + data.get(cache_key).map_or(0, |cache| cache.len()) + } } impl DataCache for InMemoryDataCache { diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index daa10fab5..03cb9c1d5 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -171,7 +171,7 @@ impl Default for PrefetcherConfig { // making a guess about where the optimal cut-off point is before it would be faster to // just start a new request instead. max_forward_seek_wait_distance: 16 * 1024 * 1024, - max_backward_seek_distance: 1024 * 1024, + max_backward_seek_distance: 1 * 1024 * 1024, } } } diff --git a/mountpoint-s3/src/prefetch/backpressure_controller.rs b/mountpoint-s3/src/prefetch/backpressure_controller.rs index 542976ab5..947345852 100644 --- a/mountpoint-s3/src/prefetch/backpressure_controller.rs +++ b/mountpoint-s3/src/prefetch/backpressure_controller.rs @@ -48,6 +48,9 @@ pub struct BackpressureLimiter { /// Calling [BackpressureLimiter::wait_for_read_window_increment()] will block current /// thread until this value is advanced. read_window_end_offset: u64, + /// End offset for the request we want to apply backpressure. The request can return + /// data up to this offset *exclusively*. + request_end_offset: u64, } /// Creates a [BackpressureController] and its related [BackpressureLimiter]. @@ -76,6 +79,7 @@ pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureC let limiter = BackpressureLimiter { read_window_incrementing_queue, read_window_end_offset, + request_end_offset: config.request_range.end, }; (controller, limiter) } @@ -171,7 +175,7 @@ impl BackpressureLimiter { } // Reaching here means there is not enough read window, so we block until it is large enough - while self.read_window_end_offset <= offset { + while self.read_window_end_offset <= offset && self.read_window_end_offset < self.request_end_offset { trace!( offset, read_window_offset = self.read_window_end_offset, diff --git a/mountpoint-s3/src/prefetch/caching_stream.rs b/mountpoint-s3/src/prefetch/caching_stream.rs index 630444338..3adc59063 100644 --- a/mountpoint-s3/src/prefetch/caching_stream.rs +++ b/mountpoint-s3/src/prefetch/caching_stream.rs @@ -14,7 +14,7 @@ use crate::prefetch::backpressure_controller::{new_backpressure_controller, Back use crate::prefetch::part::Part; use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer}; use crate::prefetch::part_stream::{ - read_from_request, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig, + read_from_client_stream, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig, }; use crate::prefetch::task::RequestTask; use crate::prefetch::PrefetchReadError; @@ -100,6 +100,18 @@ where } } + // We have changed how often this method is being called after backpressure is used. + // Before, every time the prefetcher asked for more data and a new RequestTask is + // spawned, we would first check the cache and fall back to the client at the first + // cache miss. + // Now new RequestTasks are only spawned on out-of-order reads, while sequential data + // is requested via backpressure. This means that a fully sequential read will switch + // entirely to the client after a single cache miss. + // + // In theory, that could mean more requests to S3, but in practice the previous behavior + // would only be better when we have data cache scattered across the ranges and the new + // RequestTasks must happen to start somewhere in one of those ranges to benefit from + // the cache. This change should only affect sequential read workloads. async fn get_from_cache( mut self, range: RequestRange, @@ -183,6 +195,15 @@ where "fetching data from client" ); + let request_stream = read_from_client_stream( + &mut self.backpressure_limiter, + &self.client, + bucket.clone(), + cache_key.clone(), + first_read_window_end_offset, + block_aligned_byte_range, + ); + let mut part_composer = CachingPartComposer { part_queue_producer, cache_key: cache_key.clone(), @@ -192,36 +213,8 @@ where buffer: ChecksummedBytes::default(), cache: self.cache.clone(), }; - - // Start by issuing the first request that has a range up to initial read window offset. - // This is an optimization to lower time to first bytes, see more details in [ClientPartStream] about why this is needed. - let first_req_range = block_aligned_byte_range.trim_end(first_read_window_end_offset); - if !first_req_range.is_empty() { - let request_stream = read_from_request( - &mut self.backpressure_limiter, - self.client.clone(), - bucket.clone(), - cache_key.clone(), - first_req_range.into(), - ); - let part_composer_future = part_composer.try_compose_parts(request_stream); - part_composer_future.await; - } - - // After the first request is completed we will create the second request for the rest of the stream, - // but only if there is something left to be fetched. - let range = block_aligned_byte_range.trim_start(first_read_window_end_offset); - if !range.is_empty() { - let request_stream = read_from_request( - &mut self.backpressure_limiter, - self.client.clone(), - bucket.clone(), - cache_key.clone(), - range.into(), - ); - let part_composer_future = part_composer.try_compose_parts(request_stream); - part_composer_future.await; - } + let part_composer_future = part_composer.try_compose_parts(request_stream); + part_composer_future.await; part_composer.flush(); } @@ -293,26 +286,16 @@ where // We need to return some bytes to the part queue even before we can fill an entire caching block because // we want to start the feedback loop for the flow-control window. // - // This is because the read window might be enough to fetch "some data" from S3 but not the entire block. + // This is because the read window may not be aligned to block boundaries and therefore not enough to fetch + // the entire block, but we know it always fetch enough data for the prefetcher to start reading. // For example, consider that we got a file system read request with range 2MB to 4MB and we have to start // reading from block_offset=0 and block_size=5MB. The first read window might have a range up to 4MB which // is enough to serve the read request but if the prefetcher is not able to read anything it cannot tell // the stream to move its read window. // - // A side effect from this is delay on the cache updating which is actually good for performance but it makes - // testing a bit more complicated because the cache might not be updated immediately. - let part_range = self - .original_range - .trim_start(offset) - .trim_end(offset + chunk.len() as u64); - if !part_range.is_empty() { - let trim_start = (part_range.start().saturating_sub(offset)) as usize; - let trim_end = (part_range.end().saturating_sub(offset)) as usize; - let part = Part::new( - self.cache_key.clone(), - part_range.start(), - chunk.slice(trim_start..trim_end), - ); + // A side effect from this is the delay on cache updating which makes testing a bit more complicated because + // the cache is not updated synchronously. + if let Some(part) = try_make_part(&chunk, offset, &self.cache_key, &self.original_range) { self.part_queue_producer.push(Ok(part)); } offset += chunk.len() as u64; @@ -362,14 +345,6 @@ where self.block_offset, &self.cache_key, ); - self.part_queue_producer.push(Ok(make_part( - self.buffer, - self.block_index, - self.block_offset, - self.cache.block_size(), - &self.cache_key, - &self.original_range, - ))); } } } @@ -392,6 +367,23 @@ fn update_cache( metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64); } +/// Creates a Part that can be streamed to the prefetcher if the given bytes +/// are in the request range, otherwise return None. +fn try_make_part(bytes: &ChecksummedBytes, offset: u64, object_id: &ObjectId, range: &RequestRange) -> Option { + let part_range = range.trim_start(offset).trim_end(offset + bytes.len() as u64); + if part_range.is_empty() { + return None; + } + trace!(?part_range, "creating part trimmed to the request range"); + let trim_start = (part_range.start().saturating_sub(offset)) as usize; + let trim_end = (part_range.end().saturating_sub(offset)) as usize; + Some(Part::new( + object_id.clone(), + part_range.start(), + bytes.slice(trim_start..trim_end), + )) +} + /// Creates a Part that can be streamed to the prefetcher from the given cache block. /// If required, trims the block bytes to the request range. fn make_part( @@ -403,24 +395,15 @@ fn make_part( range: &RequestRange, ) -> Part { assert_eq!(block_offset, block_index * block_size, "invalid block offset"); - - let block_size = block.len(); - let part_range = range - .trim_start(block_offset) - .trim_end(block_offset + block_size as u64); trace!( ?cache_key, block_index, - ?part_range, block_offset, block_size, "creating part from block data", ); - - let trim_start = (part_range.start().saturating_sub(block_offset)) as usize; - let trim_end = (part_range.end().saturating_sub(block_offset)) as usize; - let bytes = block.slice(trim_start..trim_end); - Part::new(cache_key.clone(), part_range.start(), bytes) + // Cache blocks always contain bytes in the request range + try_make_part(&block, block_offset, cache_key, range).unwrap() } #[cfg(test)] @@ -428,6 +411,8 @@ mod tests { // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry #![allow(clippy::identity_op)] + use std::{thread, time::Duration}; + use futures::executor::{block_on, ThreadPool}; use mountpoint_s3_client::{ mock_client::{MockClient, MockClientConfig, MockObject, Operation}, @@ -481,6 +466,8 @@ mod tests { let runtime = ThreadPool::builder().pool_size(1).create().unwrap(); let stream = CachingPartStream::new(runtime, cache); let range = RequestRange::new(object_size, offset as u64, preferred_size); + let expected_start_block = (range.start() as usize).div_euclid(block_size); + let expected_end_block = (range.end() as usize).div_ceil(block_size); let first_read_count = { // First request (from client) @@ -500,6 +487,13 @@ mod tests { }; assert!(first_read_count > 0); + // Wait until all blocks are saved to the cache before spawning a new request + let expected_block_count = expected_end_block - expected_start_block; + while stream.cache.block_count(&id) < expected_block_count { + thread::sleep(Duration::from_millis(10)); + } + assert_eq!(expected_block_count, stream.cache.block_count(&id)); + let second_read_count = { // Second request (from cache) let get_object_counter = mock_client.new_counter(Operation::GetObject); @@ -516,8 +510,7 @@ mod tests { compare_read(&id, &object, request_task); get_object_counter.count() }; - // Just check that some blocks are served from cache - assert!(second_read_count < first_read_count); + assert_eq!(second_read_count, 0); } #[test_case(1 * MB, 8 * MB)] diff --git a/mountpoint-s3/src/prefetch/part_stream.rs b/mountpoint-s3/src/prefetch/part_stream.rs index 3111879f3..6b0b56a3c 100644 --- a/mountpoint-s3/src/prefetch/part_stream.rs +++ b/mountpoint-s3/src/prefetch/part_stream.rs @@ -195,7 +195,7 @@ where read_window_size_multiplier: config.read_window_size_multiplier, request_range: range.into(), }; - let (backpressure_controller, backpressure_limiter) = new_backpressure_controller(backpressure_config); + let (backpressure_controller, mut backpressure_limiter) = new_backpressure_controller(backpressure_config); let (part_queue, part_queue_producer) = unbounded_part_queue(); trace!(?range, "spawning request"); @@ -205,12 +205,23 @@ where .runtime .spawn_with_handle( async move { - let mut client_request = ClientRequest { - client: client.clone(), - backpressure_limiter, - config, + let first_read_window_end_offset = config.range.start() + config.initial_read_window_size as u64; + let request_stream = read_from_client_stream( + &mut backpressure_limiter, + &client, + config.bucket, + config.object_id.clone(), + first_read_window_end_offset, + config.range, + ); + + let part_composer = ClientPartComposer { + part_queue_producer, + object_id: config.object_id, + preferred_part_size: config.preferred_part_size, }; - client_request.get_from_client(part_queue_producer).await; + let part_composer_future = part_composer.try_compose_parts(request_stream); + part_composer_future.await; } .instrument(span), ) @@ -220,64 +231,6 @@ where } } -struct ClientRequest { - client: Client, - backpressure_limiter: BackpressureLimiter, - config: RequestTaskConfig, -} - -impl ClientRequest -where - Client: ObjectClient + Clone + Send + Sync + 'static, -{ - async fn get_from_client(&mut self, part_queue_producer: PartQueueProducer) { - let bucket = &self.config.bucket; - let object_id = &self.config.object_id; - let first_read_window_end_offset = self.config.range.start() + self.config.initial_read_window_size as u64; - - let part_composer = ClientPartComposer { - part_queue_producer, - object_id: object_id.clone(), - preferred_part_size: self.config.preferred_part_size, - }; - - // Normally, initial read window size should be very small (~1MB) so that we can serve the first read request as soon as possible, - // but right now the CRT only returns data in chunks of part size (default to 8MB) even if initial read window is smaller than that. - // This makes time to first byte much higher than expected. - // - // To workaround this issue, we instead create two requests for the part stream where the first request has the range exactly equal to - // the initial read window size to force the CRT to return data immediately, and the second request for the rest of the stream. - // - // Let's start by issuing the first request with a range trimmed to initial read window offset - let first_req_range = self.config.range.trim_end(first_read_window_end_offset); - let request_stream = read_from_request( - &mut self.backpressure_limiter, - self.client.clone(), - bucket.clone(), - object_id.clone(), - first_req_range.into(), - ); - let part_composer_future = part_composer.try_compose_parts(request_stream); - part_composer_future.await; - - // After the first request is completed we will create the second request for the rest of the stream, - // but only if there is something left to be fetched. - let range = self.config.range.trim_start(first_read_window_end_offset); - if range.is_empty() { - return; - } - let request_stream = read_from_request( - &mut self.backpressure_limiter, - self.client.clone(), - bucket.clone(), - object_id.clone(), - range.into(), - ); - let part_composer_future = part_composer.try_compose_parts(request_stream); - part_composer_future.await; - } -} - struct ClientPartComposer { part_queue_producer: PartQueueProducer, object_id: ObjectId, @@ -321,11 +274,64 @@ impl ClientPartComposer { } } +/// Creates a request stream with a given range. The stream will be served from two `GetObject` requests where the first request serves +/// data up to `first_read_window_end_offset` and the second request serves the rest of the stream. +/// A [PrefetchReadError] is returned when the request cannot be completed. +/// +/// This is a workaround for a specific issue where initial read window size could be very small (~1MB), but the CRT only returns data +/// in chunks of part size (default to 8MB) even if initial read window is smaller than that, which make time to first byte much higher +/// than expected. +pub fn read_from_client_stream<'a, Client: ObjectClient + Clone + 'a>( + backpressure_limiter: &'a mut BackpressureLimiter, + client: &'a Client, + bucket: String, + object_id: ObjectId, + first_read_window_end_offset: u64, + range: RequestRange, +) -> impl Stream> + 'a { + try_stream! { + // Let's start by issuing the first request with a range trimmed to initial read window offset + let first_req_range = range.trim_end(first_read_window_end_offset); + if !first_req_range.is_empty() { + let first_request_stream = read_from_request( + backpressure_limiter, + client, + bucket.clone(), + object_id.clone(), + first_req_range.into(), + ); + pin_mut!(first_request_stream); + while let Some(next) = first_request_stream.next().await { + let (offset, body) = next?; + yield(offset, body); + } + } + + // After the first request is completed we will create the second request for the rest of the stream, + // but only if there is something left to be fetched. + let range = range.trim_start(first_read_window_end_offset); + if !range.is_empty() { + let request_stream = read_from_request( + backpressure_limiter, + client, + bucket.clone(), + object_id.clone(), + range.into(), + ); + pin_mut!(request_stream); + while let Some(next) = request_stream.next().await { + let (offset, body) = next?; + yield(offset, body); + } + } + } +} + /// Creates a `GetObject` request with the specified range and sends received body parts to the stream. /// A [PrefetchReadError] is returned when the request cannot be completed. -pub fn read_from_request<'a, Client: ObjectClient + 'a>( +fn read_from_request<'a, Client: ObjectClient + 'a>( backpressure_limiter: &'a mut BackpressureLimiter, - client: Client, + client: &'a Client, bucket: String, id: ObjectId, request_range: Range,