Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize part size for checksummed read #315

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct PrefetchGetObject<Client: ObjectClient, Runtime> {
future_tasks: Arc<RwLock<VecDeque<RequestTask<TaskError<Client>>>>>,
bucket: String,
key: String,
// preferred part size in the prefetcher's part queue, not the object part
preferred_part_size: usize,
next_sequential_read_offset: u64,
next_request_size: usize,
next_request_offset: u64,
Expand All @@ -130,6 +132,7 @@ where
inner: inner.clone(),
current_task: None,
future_tasks: Default::default(),
preferred_part_size: 128 * 1024,
next_request_size: inner.config.first_request_size,
next_sequential_read_offset: 0,
next_request_offset: 0,
Expand All @@ -155,6 +158,16 @@ where
"read"
);

// Currently, we set preferred part size to the current read size.
// Our assumption is that the read size will be the same for most sequential
// read and it can be aligned to the size of prefetched chunks.
//
// We initialize this value to 128k as it is the Linux's readahead size
// and it can also be used as a lower bound in case the read size is too small.
// The upper bound is 1MiB since it should be a common IO size.
let max_preferred_part_size = 1024 * 1024;
self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size);

let remaining = self.size.saturating_sub(offset);
if remaining == 0 {
return Ok(ChecksummedBytes::default());
Expand Down Expand Up @@ -269,6 +282,7 @@ where

let request_task = {
let client = Arc::clone(&self.inner.client);
let preferred_part_size = self.preferred_part_size;
let bucket = self.bucket.to_owned();
let key = self.key.to_owned();
let etag = self.etag.clone();
Expand All @@ -285,13 +299,20 @@ where
loop {
match request.next().await {
Some(Ok((offset, body))) => {
let bytes: Bytes = body.into();
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&bytes);
let checksum_bytes = ChecksummedBytes::new(bytes, checksum);
let part = Part::new(&key, offset, checksum_bytes);
part_queue_producer.push(Ok(part));
// pre-split the body into multiple parts as suggested by preferred part size
// in order to avoid validating checksum on large parts at read.
let chunks = body.chunks(preferred_part_size);
let mut curr_offset = offset;
for chunk in chunks {
let bytes: Bytes = Bytes::copy_from_slice(chunk);
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&bytes);
let checksum_bytes = ChecksummedBytes::new(bytes, checksum);
let part = Part::new(&key, curr_offset, checksum_bytes);
curr_offset += part.len() as u64;
part_queue_producer.push(Ok(part));
}
}
Some(Err(e)) => {
error!(error=?e, "RequestTask body part failed");
Expand Down
Loading