Skip to content

Commit

Permalink
Optimize part size for checksummed read
Browse files Browse the repository at this point in the history
The prefetcher stores data received from each input stream as a part in
the part queue structure. Usually, the part size is pretty big (8 MB or
more) and the checksum validation always has to be done against an entire
part even if we only read a small portion of that part.

This makes checksummed read much slower than non-checksummed read. We could
make it more efficient by making the part smaller or ideally align the part
size to the read size so that we don't have to compute the checksum on
unnecessary bytes.

Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Jun 26, 2023
1 parent b85e1f6 commit 1ec1796
Showing 1 changed file with 27 additions and 7 deletions.
34 changes: 27 additions & 7 deletions mountpoint-s3/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct PrefetchGetObject<Client: ObjectClient, Runtime> {
future_tasks: Arc<RwLock<VecDeque<RequestTask<TaskError<Client>>>>>,
bucket: String,
key: String,
// preferred part size in the prefetcher's part queue, not the object part
preferred_part_size: usize,
next_sequential_read_offset: u64,
next_request_size: usize,
next_request_offset: u64,
Expand All @@ -130,6 +132,7 @@ where
inner: inner.clone(),
current_task: None,
future_tasks: Default::default(),
preferred_part_size: 0,
next_request_size: inner.config.first_request_size,
next_sequential_read_offset: 0,
next_request_offset: 0,
Expand All @@ -155,6 +158,15 @@ where
"read"
);

// Currently, we set preferred part size to the current read size.
// Our assumption is that the read size will be the same for most sequential
// read and it can be aligned to the size of prefetched chunks.
//
// We should also put some bounds on this value in case the read size
// is too small.
let min_part_size = 128 * 1024;
self.preferred_part_size = length.max(min_part_size);

let remaining = self.size.saturating_sub(offset);
if remaining == 0 {
return Ok(ChecksummedBytes::default());
Expand Down Expand Up @@ -269,6 +281,7 @@ where

let request_task = {
let client = Arc::clone(&self.inner.client);
let preferred_part_size = self.preferred_part_size;
let bucket = self.bucket.to_owned();
let key = self.key.to_owned();
let etag = self.etag.clone();
Expand All @@ -285,13 +298,20 @@ where
loop {
match request.next().await {
Some(Ok((offset, body))) => {
let bytes: Bytes = body.into();
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&bytes);
let checksum_bytes = ChecksummedBytes::new(bytes, checksum);
let part = Part::new(&key, offset, checksum_bytes);
part_queue_producer.push(Ok(part));
// pre-split the body into multiple parts as suggested by preferred part size
// in order to avoid validating checksum on large parts at read.
let chunks = body.chunks(preferred_part_size);
let mut curr_offset = offset;
for chunk in chunks {
let bytes: Bytes = Bytes::copy_from_slice(chunk);
// S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries,
// so we're computing our own checksum here.
let checksum = crc32c::checksum(&bytes);
let checksum_bytes = ChecksummedBytes::new(bytes, checksum);
let part = Part::new(&key, curr_offset, checksum_bytes);
curr_offset += part.len() as u64;
part_queue_producer.push(Ok(part));
}
}
Some(Err(e)) => {
error!(error=?e, "RequestTask body part failed");
Expand Down

0 comments on commit 1ec1796

Please sign in to comment.