diff --git a/mountpoint-s3/src/prefetch.rs b/mountpoint-s3/src/prefetch.rs index 70d0fdc70..ecc1f2a8b 100644 --- a/mountpoint-s3/src/prefetch.rs +++ b/mountpoint-s3/src/prefetch.rs @@ -112,6 +112,8 @@ pub struct PrefetchGetObject { future_tasks: Arc>>>>, bucket: String, key: String, + // preferred part size in the prefetcher's part queue, not the object part + preferred_part_size: usize, next_sequential_read_offset: u64, next_request_size: usize, next_request_offset: u64, @@ -130,6 +132,7 @@ where inner: inner.clone(), current_task: None, future_tasks: Default::default(), + preferred_part_size: 128 * 1024, next_request_size: inner.config.first_request_size, next_sequential_read_offset: 0, next_request_offset: 0, @@ -155,6 +158,16 @@ where "read" ); + // Currently, we set preferred part size to the current read size. + // Our assumption is that the read size will be the same for most sequential + // read and it can be aligned to the size of prefetched chunks. + // + // We initialize this value to 128k as it is the Linux's readahead size + // and it can also be used as a lower bound in case the read size is too small. + // The upper bound is 1MiB since it should be a common IO size. + let max_preferred_part_size = 1024 * 1024; + self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size); + let remaining = self.size.saturating_sub(offset); if remaining == 0 { return Ok(ChecksummedBytes::default()); @@ -269,6 +282,7 @@ where let request_task = { let client = Arc::clone(&self.inner.client); + let preferred_part_size = self.preferred_part_size; let bucket = self.bucket.to_owned(); let key = self.key.to_owned(); let etag = self.etag.clone(); @@ -285,13 +299,20 @@ where loop { match request.next().await { Some(Ok((offset, body))) => { - let bytes: Bytes = body.into(); - // S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries, - // so we're computing our own checksum here. - let checksum = crc32c::checksum(&bytes); - let checksum_bytes = ChecksummedBytes::new(bytes, checksum); - let part = Part::new(&key, offset, checksum_bytes); - part_queue_producer.push(Ok(part)); + // pre-split the body into multiple parts as suggested by preferred part size + // in order to avoid validating checksum on large parts at read. + let chunks = body.chunks(preferred_part_size); + let mut curr_offset = offset; + for chunk in chunks { + let bytes: Bytes = Bytes::copy_from_slice(chunk); + // S3 doesn't provide checksum for us if the request range is not aligned to object part boundaries, + // so we're computing our own checksum here. + let checksum = crc32c::checksum(&bytes); + let checksum_bytes = ChecksummedBytes::new(bytes, checksum); + let part = Part::new(&key, curr_offset, checksum_bytes); + curr_offset += part.len() as u64; + part_queue_producer.push(Ok(part)); + } } Some(Err(e)) => { error!(error=?e, "RequestTask body part failed");