Skip to content

Commit

Permalink
Implement checksums for MockClient upload path (#1102)
Browse files Browse the repository at this point in the history
## Description of change

This change updates the mock client's write path to compute and store
checksums, matching behavior approximate to S3. We want this so that we
can use the mock client for both uploads and downloads and verify
checksum behavior for the client.

The change stores the checksums as strings, as this is the observed
behavior of S3. We can always update if this turns out to be a bad
assumption.

Relevant issues: N/A

## Does this change impact existing behavior?

This change updates the `mountpoint-s3-client` mock client to add
checksum persistence to the write path. This was previously a gap in the
mock client.

## Does this change need a changelog entry in any of the crates?

I do not think this needs a changelog entry, as it does not change the
behavior of the S3 client itself. Happy to discuss.

---

By submitting this pull request, I confirm that my contribution is made
under the terms of the Apache 2.0 license and I agree to the terms of
the [Developer Certificate of Origin
(DCO)](https://developercertificate.org/).

---------

Signed-off-by: Daniel Carl Jones <[email protected]>
  • Loading branch information
dannycjones authored Nov 5, 2024
1 parent 2a95d14 commit db4571f
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 12 deletions.
181 changes: 170 additions & 11 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
use std::fmt::Write;
use std::ops::Range;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
Expand All @@ -30,7 +31,7 @@ use crate::object_client::{
GetObjectAttributesResult, GetObjectError, GetObjectRequest, HeadObjectError, HeadObjectParams, HeadObjectResult,
ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, PutObjectSingleParams,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
PutObjectTrailingChecksums, RestoreStatus, UploadChecksum, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -781,6 +782,14 @@ impl ObjectClient for MockClient {
let mut object: MockObject = contents.into();
object.set_storage_class(params.storage_class.clone());
object.set_object_metadata(params.object_metadata.clone());
if let Some(upload_checksum) = &params.checksum {
let mut checksum = Checksum::empty();
match upload_checksum {
UploadChecksum::Crc32c(crc32c) => checksum.checksum_crc32c = Some(crc32c_to_base64(crc32c)),
}
object.set_checksum(checksum);
}

let etag = object.etag.clone();
add_object(&self.objects, key, object);
Ok(PutObjectResult {
Expand Down Expand Up @@ -811,14 +820,7 @@ impl ObjectClient for MockClient {
for attribute in object_attributes.iter() {
match attribute {
ObjectAttribute::ETag => result.etag = Some("TODO".to_owned()),
ObjectAttribute::Checksum => {
result.checksum = Some(Checksum {
checksum_crc32: Some("TODO".to_owned()),
checksum_crc32c: Some("TODO".to_owned()),
checksum_sha1: Some("TODO".to_owned()),
checksum_sha256: Some("TODO".to_owned()),
})
}
ObjectAttribute::Checksum => result.checksum = Some(object.checksum.clone()),
ObjectAttribute::ObjectParts => {
let parts = match &object.parts {
Some(MockObjectParts::Count(num_parts)) => Some(GetObjectAttributesParts {
Expand Down Expand Up @@ -868,6 +870,9 @@ impl ObjectClient for MockClient {
}
}

/// Mock implementation of a meta [PutObjectRequest], created by [MockClient]'s [ObjectClient::put_object].
///
/// For a single PutObject, see [MockClient]'s implementation of [ObjectClient::put_object_single].
#[derive(Debug)]
pub struct MockPutObjectRequest {
key: String,
Expand Down Expand Up @@ -921,13 +926,25 @@ impl MockPutObjectRequest {
let mut object: MockObject = buffer.into();
object.set_storage_class(self.params.storage_class.clone());
object.set_object_metadata(self.params.object_metadata.clone());

// For S3 Standard, part attributes are only available when additional checksums are used
if self.params.trailing_checksums == PutObjectTrailingChecksums::Enabled {
let whole_obj_checksum = {
let mut whole_obj_checksum = Checksum::empty();
let part_checksums = parts
.iter()
.map(|part| part.checksum.clone())
.map(|checksum| checksum.expect("checksum must be set when using trailing checksums"));
whole_obj_checksum.checksum_crc32c = Some(compute_crc32c_of_crc32c_checksums(part_checksums));
whole_obj_checksum
};
object.set_checksum(whole_obj_checksum);
object.parts = Some(MockObjectParts::Parts(parts));
} else {
object.parts = Some(MockObjectParts::Count(parts.len()));
}
let etag = object.etag.clone();

let etag = object.etag();
add_object(&self.objects, &self.key, object);
Ok(PutObjectResult {
etag,
Expand All @@ -937,6 +954,19 @@ impl MockPutObjectRequest {
}
}

/// Compute a checksum of checksums, mirroring how S3 computes object checksums for MPUs.
fn compute_crc32c_of_crc32c_checksums(individual_checksums: impl IntoIterator<Item = String>) -> String {
let mut checksum = crc32c::Hasher::new();
let mut count = 0;
for individual_checksum in individual_checksums {
count += 1;
checksum.update(individual_checksum.as_bytes());
}
let mut checksum = crc32c_to_base64(&checksum.finalize());
write!(checksum, "-{count}").expect("should be able to append to String");
checksum
}

impl Drop for MockPutObjectRequest {
fn drop(&mut self) {
self.in_progress_uploads.write().unwrap().remove(&self.key);
Expand Down Expand Up @@ -1687,6 +1717,116 @@ mod tests {
assert_eq!(&content, &*actual);
}

#[tokio::test]
async fn test_checksums_set_after_single_put() {
let client = MockClient::new(MockClientConfig {
bucket: "test_bucket".to_string(),
..Default::default()
});

let s3_key = "key1";
let content = vec![42u8; 512];
let content_checksum = crc32c::checksum(&content);
let put_object_params = PutObjectSingleParams::new().checksum(Some(UploadChecksum::Crc32c(content_checksum)));
let _put_result = client
.put_object_single("test_bucket", s3_key, &put_object_params, &content)
.await
.expect("put_object failed");

// Now verify...

let objects = client.objects.read().unwrap();
let stored_object = objects.get(s3_key).expect("object should exist after PutObject");

let mut expected_checksum = Checksum::empty();
expected_checksum.checksum_crc32c = Some(crc32c_to_base64(&content_checksum));
assert_eq!(
stored_object.checksum, expected_checksum,
"stored object checksum should equal expected checksum",
);
}

#[test_case(PutObjectTrailingChecksums::Enabled; "enabled")]
#[test_case(PutObjectTrailingChecksums::ReviewOnly; "review only")]
#[test_case(PutObjectTrailingChecksums::Disabled; "disabled")]
#[tokio::test]
async fn test_checksums_set_after_meta_put(trailing_checksums: PutObjectTrailingChecksums) {
let mut rng = ChaChaRng::seed_from_u64(0x12345678);

let obj = MockObject::ramp(0xaa, 2 * RAMP_BUFFER_SIZE, ETag::for_tests());

let client = MockClient::new(MockClientConfig {
bucket: "test_bucket".to_string(),
part_size: 1024,
..Default::default()
});

let s3_key = "key1";
let put_object_params = PutObjectParams::new().trailing_checksums(trailing_checksums);
let mut put_request = client
.put_object("test_bucket", s3_key, &put_object_params)
.await
.expect("should be able to initiate meta put_object");

// Stream randomly sized parts into put_object_request.
let mut next_offset = 0;
while next_offset < obj.len() {
let part_size = rng.gen_range(0..=obj.len() - next_offset);
let result = obj.read(next_offset as u64, part_size);
next_offset += part_size;
put_request.write(&result).await.unwrap();
}

put_request
.complete()
.await
.expect("should be able to complete meta put_object");

// Now verify...

let objects = client.objects.read().unwrap();
let stored_object = objects.get(s3_key).expect("object should exist after PutObject");

match stored_object
.parts
.as_ref()
.expect("parts must exist when using meta put")
{
MockObjectParts::Parts(_) => {
assert!(
matches!(trailing_checksums, PutObjectTrailingChecksums::Enabled),
"checksums should only be set if trailing checksums were sent to S3",
);
}
MockObjectParts::Count(_) => {
assert!(
!matches!(trailing_checksums, PutObjectTrailingChecksums::Enabled),
"checksums should be set if trailing checksums were sent to S3",
);
}
}

let mut expected_obj_checksum = Checksum::empty();
if let PutObjectTrailingChecksums::Enabled = trailing_checksums {
// Only if the checksums should be persisted should we check part-level checksums were set.
let Some(MockObjectParts::Parts(parts)) = stored_object.parts.as_ref() else {
unreachable!("we know checksums were enabled for this upload");
};

let part_checksums = parts
.iter()
.map(|part| part.checksum.clone())
.map(|checksum| checksum.expect("checksum must be set when using trailing checksums"));
let obj_checksum = compute_crc32c_of_crc32c_checksums(part_checksums);
expected_obj_checksum.checksum_crc32c = Some(obj_checksum);
}

assert_eq!(
stored_object.checksum, expected_obj_checksum,
"stored object checksum should equal expected checksum",
);
}

proptest::proptest! {
#[test]
fn test_ramp(size in 1..2*RAMP_BUFFER_SIZE, read_size in 1..2*RAMP_BUFFER_SIZE, offset in 0..RAMP_BUFFER_SIZE) {
Expand Down Expand Up @@ -1803,7 +1943,13 @@ mod tests {

// GetObjectAttributes returns checksums
let attrs = client
.get_object_attributes(bucket, key, None, None, &[ObjectAttribute::ObjectParts])
.get_object_attributes(
bucket,
key,
None,
None,
&[ObjectAttribute::ObjectParts, ObjectAttribute::Checksum],
)
.await
.unwrap();

Expand All @@ -1830,6 +1976,19 @@ mod tests {
.expect("crc32c should be present");
assert_eq!(&expected_checksum, actual_checksum);
}

// We trust that other tests will cover checksum correctness,
// so let's just check the right checksums are set.
let Checksum {
checksum_crc32,
checksum_crc32c,
checksum_sha1,
checksum_sha256,
} = attrs.checksum.expect("object checksum should be present");
assert!(checksum_crc32.is_none(), "CRC32 should not be set");
assert!(checksum_crc32c.is_some(), "CRC32C should be set");
assert!(checksum_sha1.is_none(), "SHA1 should not be set");
assert!(checksum_sha256.is_none(), "SHA256 should not be set");
} else {
assert!(
parts.parts.is_none(),
Expand Down
2 changes: 1 addition & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ impl fmt::Display for ObjectAttribute {
///
/// See [Checksum](https://docs.aws.amazon.com/AmazonS3/latest/API/API_Checksum.html) in the *Amazon
/// S3 API Reference* for more details.
#[derive(Clone, Debug)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Checksum {
/// Base64-encoded, 32-bit CRC32 checksum of the object
pub checksum_crc32: Option<String>,
Expand Down

0 comments on commit db4571f

Please sign in to comment.