Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize part size for checksummed read #315

Merged
merged 2 commits into from
Jun 29, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct PrefetchGetObject<Client: ObjectClient, Runtime> {
future_tasks: Arc<RwLock<VecDeque<RequestTask<TaskError<Client>>>>>,
bucket: String,
key: String,
// preferred part size in the prefetcher's part queue, not the object part
preferred_part_size: usize,
next_sequential_read_offset: u64,
next_request_size: usize,
next_request_offset: u64,
Expand All @@ -130,6 +132,7 @@ where
inner: inner.clone(),
current_task: None,
future_tasks: Default::default(),
preferred_part_size: 0,
next_request_size: inner.config.first_request_size,
next_sequential_read_offset: 0,
next_request_offset: 0,
Expand All @@ -155,6 +158,15 @@ where
"read"
);

// Currently, we set preferred part size to the current read size.
// Our assumption is that the read size will be the same for most sequential
// read and it can be aligned to the size of prefetched chunks.
//
// We should also put some bounds on this value in case the read size
// is too small.
let min_part_size = 128 * 1024;
self.preferred_part_size = length.max(min_part_size);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if you initialize preferred_part_size to 128KiB, this can just be

Suggested change
let min_part_size = 128 * 1024;
self.preferred_part_size = length.max(min_part_size);
self.preferred_part_size = self.preferred_part_size.max(length);

Also worth a comment on why we chose 128KiB.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, should we set a maximum here, like 1MiB?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For choosing 128k, it's the linux readahead size and seem to be a reasonable minimum value. I will update the comment, also happy to change if you have better suggestion.

I'm not sure we should put a maximum value though. If the read size is really big then we will have to combine data from multiple parts and the extend operation is quite expensive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if we change the logic the use max value between last preferred_part_size and current length I think there should be a maximum, otherwise it will keep growing bigger.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The effective maximum is the client's part size anyway, so probably we should enforce it here just to be explicit.


let remaining = self.size.saturating_sub(offset);
if remaining == 0 {
return Ok(ChecksummedBytes::default());
Expand Down Expand Up @@ -269,6 +281,7 @@ where

let request_task = {
let client = Arc::clone(&self.inner.client);
let preferred_part_size = self.preferred_part_size;
let bucket = self.bucket.to_owned();
let key = self.key.to_owned();
let etag = self.etag.clone();
Expand All @@ -285,13 +298,20 @@ where
loop {
match request.next().await {
Some(Ok((offset, body))) => {
let bytes: Bytes = body.into();
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&bytes);
let checksum_bytes = ChecksummedBytes::new(bytes, checksum);
let part = Part::new(&key, offset, checksum_bytes);
part_queue_producer.push(Ok(part));
// pre-split the body into multiple parts as suggested by preferred part size
// in order to avoid validating checksum on large parts at read.
let chunks = body.chunks(preferred_part_size);
let mut curr_offset = offset;
for chunk in chunks {
let bytes: Bytes = Bytes::copy_from_slice(chunk);
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&bytes);
let checksum_bytes = ChecksummedBytes::new(bytes, checksum);
let part = Part::new(&key, curr_offset, checksum_bytes);
curr_offset += part.len() as u64;
part_queue_producer.push(Ok(part));
}
}
Some(Err(e)) => {
error!(error=?e, "RequestTask body part failed");
Expand Down