Skip to content

Commit

Permalink
PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Aug 14, 2024
1 parent c6ccb36 commit a0d02d7
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 177 deletions.
43 changes: 1 addition & 42 deletions mountpoint-s3/src/checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ChecksummedBytes {
/// This operation just increases the reference count and sets a few indices,
/// so there will be no validation and the checksum will not be recomputed.
pub fn split_off(&mut self, at: usize) -> ChecksummedBytes {
assert!(at <= self.len());
assert!(at < self.len());

let start = self.range.start;
let prefix_range = start..(start + at);
Expand All @@ -77,27 +77,6 @@ impl ChecksummedBytes {
}
}

/// Splits the checksummed bytes into two at the given index.
///
/// Afterwards self contains elements [at, len), and the returned Bytes contains elements [0, at).
///
/// This operation just increases the reference count and sets a few indices,
/// so there will be no validation and the checksum will not be recomputed.
pub fn split_to(&mut self, at: usize) -> ChecksummedBytes {
assert!(at <= self.len());

let start = self.range.start;
let prefix_range = start..(start + at);
let suffix_range = (start + at)..self.range.end;

self.range = suffix_range;
Self {
buffer: self.buffer.clone(),
range: prefix_range,
checksum: self.checksum,
}
}

/// Returns a slice of self for the provided range.
///
/// This operation just increases the reference count and sets a few indices,
Expand Down Expand Up @@ -342,26 +321,6 @@ mod tests {
assert_eq!(expected_checksum, new_checksummed_bytes.checksum);
}

#[test]
fn test_split_to() {
let split_to_at = 4;
let bytes = Bytes::from_static(b"some bytes");
let expected = bytes.clone();
let expected_checksum = crc32c::checksum(&expected);
let mut checksummed_bytes = ChecksummedBytes::new(bytes);

let mut expected_part2 = expected.clone();
let expected_part1 = expected_part2.split_to(split_to_at);
let new_checksummed_bytes = checksummed_bytes.split_to(split_to_at);

assert_eq!(expected, checksummed_bytes.buffer);
assert_eq!(expected, new_checksummed_bytes.buffer);
assert_eq!(expected_part1, new_checksummed_bytes.buffer_slice());
assert_eq!(expected_part2, checksummed_bytes.buffer_slice());
assert_eq!(expected_checksum, checksummed_bytes.checksum);
assert_eq!(expected_checksum, new_checksummed_bytes.checksum);
}

#[test]
fn test_slice() {
let range = 3..7;
Expand Down
6 changes: 6 additions & 0 deletions mountpoint-s3/src/data_cache/in_memory_data_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ impl InMemoryDataCache {
block_size,
}
}

/// Get number of caching blocks for the given cache key.
pub fn block_count(&self, cache_key: &ObjectId) -> usize {
let data = self.data.read().unwrap();
data.get(cache_key).map_or(0, |cache| cache.len())
}
}

impl DataCache for InMemoryDataCache {
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl Default for PrefetcherConfig {
// making a guess about where the optimal cut-off point is before it would be faster to
// just start a new request instead.
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 1024 * 1024,
max_backward_seek_distance: 1 * 1024 * 1024,
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion mountpoint-s3/src/prefetch/backpressure_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ pub struct BackpressureLimiter {
/// Calling [BackpressureLimiter::wait_for_read_window_increment()] will block current
/// thread until this value is advanced.
read_window_end_offset: u64,
/// End offset for the request we want to apply backpressure. The request can return
/// data up to this offset *exclusively*.
request_end_offset: u64,
}

/// Creates a [BackpressureController] and its related [BackpressureLimiter].
Expand Down Expand Up @@ -76,6 +79,7 @@ pub fn new_backpressure_controller(config: BackpressureConfig) -> (BackpressureC
let limiter = BackpressureLimiter {
read_window_incrementing_queue,
read_window_end_offset,
request_end_offset: config.request_range.end,
};
(controller, limiter)
}
Expand Down Expand Up @@ -171,7 +175,7 @@ impl BackpressureLimiter {
}

// Reaching here means there is not enough read window, so we block until it is large enough
while self.read_window_end_offset <= offset {
while self.read_window_end_offset <= offset && self.read_window_end_offset < self.request_end_offset {
trace!(
offset,
read_window_offset = self.read_window_end_offset,
Expand Down
127 changes: 60 additions & 67 deletions mountpoint-s3/src/prefetch/caching_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::prefetch::backpressure_controller::{new_backpressure_controller, Back
use crate::prefetch::part::Part;
use crate::prefetch::part_queue::{unbounded_part_queue, PartQueueProducer};
use crate::prefetch::part_stream::{
read_from_request, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig,
read_from_client_stream, ObjectPartStream, RequestRange, RequestReaderOutput, RequestTaskConfig,
};
use crate::prefetch::task::RequestTask;
use crate::prefetch::PrefetchReadError;
Expand Down Expand Up @@ -100,6 +100,18 @@ where
}
}

// We have changed how often this method is being called after backpressure is used.
// Before, every time the prefetcher asked for more data and a new RequestTask is
// spawned, we would first check the cache and fall back to the client at the first
// cache miss.
// Now new RequestTasks are only spawned on out-of-order reads, while sequential data
// is requested via backpressure. This means that a fully sequential read will switch
// entirely to the client after a single cache miss.
//
// In theory, that could mean more requests to S3, but in practice the previous behavior
// would only be better when we have data cache scattered across the ranges and the new
// RequestTasks must happen to start somewhere in one of those ranges to benefit from
// the cache. This change should only affect sequential read workloads.
async fn get_from_cache(
mut self,
range: RequestRange,
Expand Down Expand Up @@ -183,6 +195,15 @@ where
"fetching data from client"
);

let request_stream = read_from_client_stream(
&mut self.backpressure_limiter,
&self.client,
bucket.clone(),
cache_key.clone(),
first_read_window_end_offset,
block_aligned_byte_range,
);

let mut part_composer = CachingPartComposer {
part_queue_producer,
cache_key: cache_key.clone(),
Expand All @@ -192,36 +213,8 @@ where
buffer: ChecksummedBytes::default(),
cache: self.cache.clone(),
};

// Start by issuing the first request that has a range up to initial read window offset.
// This is an optimization to lower time to first bytes, see more details in [ClientPartStream] about why this is needed.
let first_req_range = block_aligned_byte_range.trim_end(first_read_window_end_offset);
if !first_req_range.is_empty() {
let request_stream = read_from_request(
&mut self.backpressure_limiter,
self.client.clone(),
bucket.clone(),
cache_key.clone(),
first_req_range.into(),
);
let part_composer_future = part_composer.try_compose_parts(request_stream);
part_composer_future.await;
}

// After the first request is completed we will create the second request for the rest of the stream,
// but only if there is something left to be fetched.
let range = block_aligned_byte_range.trim_start(first_read_window_end_offset);
if !range.is_empty() {
let request_stream = read_from_request(
&mut self.backpressure_limiter,
self.client.clone(),
bucket.clone(),
cache_key.clone(),
range.into(),
);
let part_composer_future = part_composer.try_compose_parts(request_stream);
part_composer_future.await;
}
let part_composer_future = part_composer.try_compose_parts(request_stream);
part_composer_future.await;
part_composer.flush();
}

Expand Down Expand Up @@ -293,26 +286,16 @@ where
// We need to return some bytes to the part queue even before we can fill an entire caching block because
// we want to start the feedback loop for the flow-control window.
//
// This is because the read window might be enough to fetch "some data" from S3 but not the entire block.
// This is because the read window may not be aligned to block boundaries and therefore not enough to fetch
// the entire block, but we know it always fetch enough data for the prefetcher to start reading.
// For example, consider that we got a file system read request with range 2MB to 4MB and we have to start
// reading from block_offset=0 and block_size=5MB. The first read window might have a range up to 4MB which
// is enough to serve the read request but if the prefetcher is not able to read anything it cannot tell
// the stream to move its read window.
//
// A side effect from this is delay on the cache updating which is actually good for performance but it makes
// testing a bit more complicated because the cache might not be updated immediately.
let part_range = self
.original_range
.trim_start(offset)
.trim_end(offset + chunk.len() as u64);
if !part_range.is_empty() {
let trim_start = (part_range.start().saturating_sub(offset)) as usize;
let trim_end = (part_range.end().saturating_sub(offset)) as usize;
let part = Part::new(
self.cache_key.clone(),
part_range.start(),
chunk.slice(trim_start..trim_end),
);
// A side effect from this is the delay on cache updating which makes testing a bit more complicated because
// the cache is not updated synchronously.
if let Some(part) = try_make_part(&chunk, offset, &self.cache_key, &self.original_range) {
self.part_queue_producer.push(Ok(part));
}
offset += chunk.len() as u64;
Expand Down Expand Up @@ -362,14 +345,6 @@ where
self.block_offset,
&self.cache_key,
);
self.part_queue_producer.push(Ok(make_part(
self.buffer,
self.block_index,
self.block_offset,
self.cache.block_size(),
&self.cache_key,
&self.original_range,
)));
}
}
}
Expand All @@ -392,6 +367,23 @@ fn update_cache<Cache: DataCache + Send + Sync>(
metrics::histogram!("prefetch.cache_update_duration_us").record(start.elapsed().as_micros() as f64);
}

/// Creates a Part that can be streamed to the prefetcher if the given bytes
/// are in the request range, otherwise return None.
fn try_make_part(bytes: &ChecksummedBytes, offset: u64, object_id: &ObjectId, range: &RequestRange) -> Option<Part> {
let part_range = range.trim_start(offset).trim_end(offset + bytes.len() as u64);
if part_range.is_empty() {
return None;
}
trace!(?part_range, "creating part trimmed to the request range");
let trim_start = (part_range.start().saturating_sub(offset)) as usize;
let trim_end = (part_range.end().saturating_sub(offset)) as usize;
Some(Part::new(
object_id.clone(),
part_range.start(),
bytes.slice(trim_start..trim_end),
))
}

/// Creates a Part that can be streamed to the prefetcher from the given cache block.
/// If required, trims the block bytes to the request range.
fn make_part(
Expand All @@ -403,31 +395,24 @@ fn make_part(
range: &RequestRange,
) -> Part {
assert_eq!(block_offset, block_index * block_size, "invalid block offset");

let block_size = block.len();
let part_range = range
.trim_start(block_offset)
.trim_end(block_offset + block_size as u64);
trace!(
?cache_key,
block_index,
?part_range,
block_offset,
block_size,
"creating part from block data",
);

let trim_start = (part_range.start().saturating_sub(block_offset)) as usize;
let trim_end = (part_range.end().saturating_sub(block_offset)) as usize;
let bytes = block.slice(trim_start..trim_end);
Part::new(cache_key.clone(), part_range.start(), bytes)
// Cache blocks always contain bytes in the request range
try_make_part(&block, block_offset, cache_key, range).unwrap()
}

#[cfg(test)]
mod tests {
// It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
#![allow(clippy::identity_op)]

use std::{thread, time::Duration};

use futures::executor::{block_on, ThreadPool};
use mountpoint_s3_client::{
mock_client::{MockClient, MockClientConfig, MockObject, Operation},
Expand Down Expand Up @@ -481,6 +466,8 @@ mod tests {
let runtime = ThreadPool::builder().pool_size(1).create().unwrap();
let stream = CachingPartStream::new(runtime, cache);
let range = RequestRange::new(object_size, offset as u64, preferred_size);
let expected_start_block = (range.start() as usize).div_euclid(block_size);
let expected_end_block = (range.end() as usize).div_ceil(block_size);

let first_read_count = {
// First request (from client)
Expand All @@ -500,6 +487,13 @@ mod tests {
};
assert!(first_read_count > 0);

// Wait until all blocks are saved to the cache before spawning a new request
let expected_block_count = expected_end_block - expected_start_block;
while stream.cache.block_count(&id) < expected_block_count {
thread::sleep(Duration::from_millis(10));
}
assert_eq!(expected_block_count, stream.cache.block_count(&id));

let second_read_count = {
// Second request (from cache)
let get_object_counter = mock_client.new_counter(Operation::GetObject);
Expand All @@ -516,8 +510,7 @@ mod tests {
compare_read(&id, &object, request_task);
get_object_counter.count()
};
// Just check that some blocks are served from cache
assert!(second_read_count < first_read_count);
assert_eq!(second_read_count, 0);
}

#[test_case(1 * MB, 8 * MB)]
Expand Down
Loading

0 comments on commit a0d02d7

Please sign in to comment.