Skip to content

Commit

Permalink
Re-implement the prefetcher using backpressure mechanism
Browse files Browse the repository at this point in the history
The prefetcher now uses only one GetObject request to fetch data in advance.
This request has a range of entire object but use backpressure mechanism
to control how much data it wants to fetch into the part queue instead of
spawning up to two requests in parallel.

This should make the throughput more stable because previously the two
request tasks could compete with each other when fetching data from S3.
Also, it will be easier to control how much data we want to store in the
part queue.

Signed-off-by: Monthon Klongklaew <[email protected]>
  • Loading branch information
monthonk committed Aug 9, 2024
1 parent 4f8af81 commit c6ccb36
Show file tree
Hide file tree
Showing 16 changed files with 1,020 additions and 348 deletions.
43 changes: 42 additions & 1 deletion mountpoint-s3/src/checksums.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl ChecksummedBytes {
/// This operation just increases the reference count and sets a few indices,
/// so there will be no validation and the checksum will not be recomputed.
pub fn split_off(&mut self, at: usize) -> ChecksummedBytes {
assert!(at < self.len());
assert!(at <= self.len());

let start = self.range.start;
let prefix_range = start..(start + at);
Expand All @@ -77,6 +77,27 @@ impl ChecksummedBytes {
}
}

/// Splits the checksummed bytes into two at the given index.
///
/// Afterwards self contains elements [at, len), and the returned Bytes contains elements [0, at).
///
/// This operation just increases the reference count and sets a few indices,
/// so there will be no validation and the checksum will not be recomputed.
pub fn split_to(&mut self, at: usize) -> ChecksummedBytes {
assert!(at <= self.len());

let start = self.range.start;
let prefix_range = start..(start + at);
let suffix_range = (start + at)..self.range.end;

self.range = suffix_range;
Self {
buffer: self.buffer.clone(),
range: prefix_range,
checksum: self.checksum,
}
}

/// Returns a slice of self for the provided range.
///
/// This operation just increases the reference count and sets a few indices,
Expand Down Expand Up @@ -321,6 +342,26 @@ mod tests {
assert_eq!(expected_checksum, new_checksummed_bytes.checksum);
}

#[test]
fn test_split_to() {
let split_to_at = 4;
let bytes = Bytes::from_static(b"some bytes");
let expected = bytes.clone();
let expected_checksum = crc32c::checksum(&expected);
let mut checksummed_bytes = ChecksummedBytes::new(bytes);

let mut expected_part2 = expected.clone();
let expected_part1 = expected_part2.split_to(split_to_at);
let new_checksummed_bytes = checksummed_bytes.split_to(split_to_at);

assert_eq!(expected, checksummed_bytes.buffer);
assert_eq!(expected, new_checksummed_bytes.buffer);
assert_eq!(expected_part1, new_checksummed_bytes.buffer_slice());
assert_eq!(expected_part2, checksummed_bytes.buffer_slice());
assert_eq!(expected_checksum, checksummed_bytes.checksum);
assert_eq!(expected_checksum, new_checksummed_bytes.checksum);
}

#[test]
fn test_slice() {
let range = 3..7;
Expand Down
14 changes: 14 additions & 0 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -649,11 +649,25 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
user_agent.key_value("mp-nw-interfaces", &interfaces.len().to_string());
}

// This is a weird looking number! We really want our first request size to be 1MiB,
// which is a common IO size. But Linux's readahead will try to read an extra 128k on on
// top of a 1MiB read, which we'd have to wait for a second request to service. Because
// FUSE doesn't know the difference between regular reads and readahead reads, it will
// send us a READ request for that 128k, so we'll have to block waiting for it even if
// the application doesn't want it. This is all in the noise for sequential IO, but
// waiting for the readahead hurts random IO. So we add 128k to the first request size
// to avoid the latency hit of the second request.
//
// Note the CRT does not respect this value right now, they always return chunks of part size
// but this is the first window size we prefer.
let initial_read_window_size = 1024 * 1024 + 128 * 1024;
let mut client_config = S3ClientConfig::new()
.auth_config(auth_config)
.throughput_target_gbps(throughput_target_gbps)
.read_part_size(args.read_part_size.unwrap_or(args.part_size) as usize)
.write_part_size(args.write_part_size.unwrap_or(args.part_size) as usize)
.read_backpressure(true)
.initial_read_window(initial_read_window_size)
.user_agent(user_agent);
#[cfg(feature = "multiple-nw-iface")]
if let Some(interfaces) = &args.bind {
Expand Down
2 changes: 2 additions & 0 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1349,6 +1349,8 @@ mod tests {
let bucket = "bucket";
let client = Arc::new(MockClient::new(MockClientConfig {
bucket: bucket.to_owned(),
enable_backpressure: true,
initial_read_window_size: 1024 * 1024,
..Default::default()
}));
// Create "dir1" in the client to avoid creating it locally
Expand Down
5 changes: 4 additions & 1 deletion mountpoint-s3/src/fs/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,12 @@ impl<E: std::error::Error + Send + Sync + 'static> From<PrefetchReadError<E>> fo
GetObjectError::PreconditionFailed,
)) => err!(libc::ESTALE, "object was mutated remotely"),
PrefetchReadError::Integrity(e) => err!(libc::EIO, source:e, "integrity error"),
PrefetchReadError::PartReadFailed(e) => err!(libc::EIO, source:e, "part read failed"),
PrefetchReadError::GetRequestFailed(_)
| PrefetchReadError::GetRequestTerminatedUnexpectedly
| PrefetchReadError::GetRequestReturnedWrongOffset { .. } => {
| PrefetchReadError::GetRequestReturnedWrongOffset { .. }
| PrefetchReadError::BackpressurePreconditionFailed
| PrefetchReadError::ReadWindowIncrement => {
err!(libc::EIO, source:err, "get request failed")
}
}
Expand Down
Loading

0 comments on commit c6ccb36

Please sign in to comment.