Skip to content

Commit

Permalink
Validate part checksums without sending them to S3
Browse files Browse the repository at this point in the history
Even if we can't send the headers to S3, we should still retain them for
our own internal checks. This change adopts a new CRT ability to
decouple upload review checksums from the actual S3 headers, so that we
can still validate upload checksums locally without sending them to S3.

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt committed Apr 12, 2024
1 parent 7908cde commit ba893a3
Show file tree
Hide file tree
Showing 9 changed files with 112 additions and 42 deletions.
3 changes: 2 additions & 1 deletion mountpoint-s3-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ pub mod types {
pub use super::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesParts,
GetObjectAttributesResult, HeadObjectResult, ListObjectsResult, ObjectAttribute, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, RestoreStatus, UploadReview, UploadReviewPart,
ObjectInfo, ObjectPart, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, RestoreStatus,
UploadReview, UploadReviewPart,
};
}

Expand Down
36 changes: 25 additions & 11 deletions mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use crate::object_client::{
Checksum, ChecksumAlgorithm, DeleteObjectError, DeleteObjectResult, ETag, GetBodyPart, GetObjectAttributesError,
GetObjectAttributesParts, GetObjectAttributesResult, GetObjectError, HeadObjectError, HeadObjectResult,
ListObjectsError, ListObjectsResult, ObjectAttribute, ObjectClient, ObjectClientError, ObjectClientResult,
ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult, RestoreStatus,
UploadReview, UploadReviewPart,
ObjectInfo, ObjectPart, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult,
PutObjectTrailingChecksums, RestoreStatus, UploadReview, UploadReviewPart,
};

mod leaky_bucket;
Expand Down Expand Up @@ -775,7 +775,7 @@ impl MockPutObjectRequest {
.chunks(self.part_size)
.map(|part| {
let size = part.len();
let checksum = if self.params.trailing_checksums {
let checksum = if self.params.trailing_checksums != PutObjectTrailingChecksums::Disabled {
let checksum = crc32c::checksum(part);
Some(crc32c_to_base64(&checksum))
} else {
Expand All @@ -794,7 +794,7 @@ impl MockPutObjectRequest {
let mut object: MockObject = buffer.into();
object.set_storage_class(self.params.storage_class.clone());
// For S3 Standard, part attributes are only available when additional checksums are used
if self.params.trailing_checksums {
if self.params.trailing_checksums == PutObjectTrailingChecksums::Enabled {
object.parts = Some(MockObjectParts::Parts(parts));
} else {
object.parts = Some(MockObjectParts::Count(parts.len()));
Expand Down Expand Up @@ -831,7 +831,7 @@ impl PutObjectRequest for MockPutObjectRequest {
self,
review_callback: impl FnOnce(UploadReview) -> bool + Send + 'static,
) -> ObjectClientResult<PutObjectResult, PutObjectError, Self::ClientError> {
let checksum_algorithm = if self.params.trailing_checksums {
let checksum_algorithm = if self.params.trailing_checksums != PutObjectTrailingChecksums::Disabled {
Some(ChecksumAlgorithm::Crc32c)
} else {
None
Expand Down Expand Up @@ -1428,10 +1428,11 @@ mod tests {
assert_eq!(1, head_counter_2.count());
}

#[test_case(true; "enabled")]
#[test_case(false; "disabled")]
#[test_case(PutObjectTrailingChecksums::Enabled; "enabled")]
#[test_case(PutObjectTrailingChecksums::ReviewOnly; "review only")]
#[test_case(PutObjectTrailingChecksums::Disabled; "disabled")]
#[tokio::test]
async fn test_checksum_attributes(enable_checksums: bool) {
async fn test_checksum_attributes(trailing_checksums: PutObjectTrailingChecksums) {
const OBJECT_SIZE: usize = 500 * 1024;
const PART_SIZE: usize = 16 * 1024;

Expand All @@ -1446,12 +1447,25 @@ mod tests {

let key = "key1";
let put_params = PutObjectParams {
trailing_checksums: enable_checksums,
trailing_checksums,
..Default::default()
};
let mut put_request = client.put_object(bucket, key, &put_params).await.unwrap();
put_request.write(&body).await.unwrap();
put_request.complete().await.unwrap();

put_request
.review_and_complete(move |review| {
let parts = review.parts;
if trailing_checksums == PutObjectTrailingChecksums::Disabled {
assert!(review.checksum_algorithm.is_none());
assert!(parts.iter().all(|p| p.checksum.is_none()));
} else {
assert_eq!(review.checksum_algorithm, Some(ChecksumAlgorithm::Crc32c));
}
true
})
.await
.unwrap();

// GetObjectAttributes returns checksums
let attrs = client
Expand All @@ -1464,7 +1478,7 @@ mod tests {
let expected_parts = (OBJECT_SIZE + PART_SIZE - 1) / PART_SIZE;
assert_eq!(parts.total_parts_count, Some(expected_parts));

if enable_checksums {
if trailing_checksums == PutObjectTrailingChecksums::Enabled {
let part_attributes = parts
.parts
.expect("part attributes should be returned if checksums enabled");
Expand Down
16 changes: 14 additions & 2 deletions mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub enum GetObjectAttributesError {
#[non_exhaustive]
pub struct PutObjectParams {
/// Enable Crc32c trailing checksums.
pub trailing_checksums: bool,
pub trailing_checksums: PutObjectTrailingChecksums,
/// Storage class to be used when creating new S3 object
pub storage_class: Option<String>,
/// The server-side encryption algorithm to be used for this object in Amazon S3 (for example, AES256, aws:kms, aws:kms:dsse)
Expand All @@ -289,7 +289,7 @@ impl PutObjectParams {
}

/// Set Crc32c trailing checksums.
pub fn trailing_checksums(mut self, value: bool) -> Self {
pub fn trailing_checksums(mut self, value: PutObjectTrailingChecksums) -> Self {
self.trailing_checksums = value;
self
}
Expand All @@ -313,6 +313,18 @@ impl PutObjectParams {
}
}

/// How CRC32c checksums are used for parts of a multi-part PutObject request
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum PutObjectTrailingChecksums {
/// Checksums are computed, passed to upload review, and also sent to S3
Enabled,
/// Checksums are computed, passed to upload review, but not sent to S3
ReviewOnly,
/// Checksums are not computed
#[default]
Disabled,
}

/// Info for the caller to review before an upload completes.
pub type UploadReview = mountpoint_s3_crt::s3::client::UploadReview;

Expand Down
12 changes: 7 additions & 5 deletions mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::{Arc, Mutex};
use std::time::Instant;

use crate::object_client::{ObjectClientResult, PutObjectError, PutObjectParams, PutObjectRequest, PutObjectResult};
use crate::s3_crt_client::{emit_throughput_metric, S3CrtClient, S3RequestError};
use crate::s3_crt_client::{emit_throughput_metric, PutObjectTrailingChecksums, S3CrtClient, S3RequestError};
use async_trait::async_trait;
use futures::channel::oneshot;
use mountpoint_s3_crt::http::request_response::{Header, Headers};
Expand Down Expand Up @@ -32,10 +32,12 @@ impl S3CrtClient {
.set_request_path(&key)
.map_err(S3RequestError::construction_failure)?;

if params.trailing_checksums {
let checksum_config = ChecksumConfig::trailing_crc32c();
message.set_checksum_config(Some(checksum_config));
}
let checksum_config = match params.trailing_checksums {
PutObjectTrailingChecksums::Enabled => Some(ChecksumConfig::trailing_crc32c()),
PutObjectTrailingChecksums::ReviewOnly => Some(ChecksumConfig::upload_review_crc32c()),
PutObjectTrailingChecksums::Disabled => None,
};
message.set_checksum_config(checksum_config);

let review_callback = ReviewCallbackBox::default();
let callback = review_callback.clone();
Expand Down
53 changes: 42 additions & 11 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::checksums::crc32c_to_base64;
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::types::{ObjectClientResult, PutObjectParams, PutObjectResult};
use mountpoint_s3_client::types::{
ChecksumAlgorithm, ObjectClientResult, PutObjectParams, PutObjectResult, PutObjectTrailingChecksums,
};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest, S3CrtClient, S3RequestError};
use mountpoint_s3_crt::checksums::crc32c;
use mountpoint_s3_crt_sys::aws_s3_errors;
Expand Down Expand Up @@ -274,8 +276,11 @@ async fn test_put_object_initiate_failure() {
assert_eq!(uploads_in_progress, 0);
}

#[test_case(PutObjectTrailingChecksums::Enabled; "enabled")]
#[test_case(PutObjectTrailingChecksums::ReviewOnly; "review only")]
#[test_case(PutObjectTrailingChecksums::Disabled; "disabled")]
#[tokio::test]
async fn test_put_checksums() {
async fn test_put_checksums(trailing_checksums: PutObjectTrailingChecksums) {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_checksums");
let client_config = S3ClientConfig::new()
Expand All @@ -288,14 +293,26 @@ async fn test_put_checksums() {
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectParams::new().trailing_checksums(true);
let params = PutObjectParams::new().trailing_checksums(trailing_checksums);
let mut request = client
.put_object(&bucket, &key, &params)
.await
.expect("put_object should succeed");

request.write(&contents).await.unwrap();
request.complete().await.unwrap();
request
.review_and_complete(move |review| {
let parts = review.parts;
if trailing_checksums == PutObjectTrailingChecksums::Disabled {
assert!(review.checksum_algorithm.is_none());
assert!(parts.iter().all(|p| p.checksum.is_none()));
} else {
assert_eq!(review.checksum_algorithm, Some(ChecksumAlgorithm::Crc32c));
}
true
})
.await
.unwrap();

let sdk_client = get_test_sdk_client().await;
let attributes = sdk_client
Expand All @@ -307,13 +324,27 @@ async fn test_put_checksums() {
.await
.unwrap();
let parts = attributes.object_parts().unwrap().parts();
let checksums: Vec<_> = parts.iter().map(|p| p.checksum_crc32_c().unwrap()).collect();
let expected_checksums: Vec<_> = contents.chunks(PART_SIZE).map(crc32c::checksum).collect();

assert_eq!(checksums.len(), expected_checksums.len());
for (checksum, expected_checksum) in checksums.into_iter().zip(expected_checksums.into_iter()) {
let encoded = crc32c_to_base64(&expected_checksum);
assert_eq!(checksum, encoded);
if trailing_checksums == PutObjectTrailingChecksums::Enabled {
let checksums: Vec<_> = parts.iter().map(|p| p.checksum_crc32_c().unwrap()).collect();
let expected_checksums: Vec<_> = contents.chunks(PART_SIZE).map(crc32c::checksum).collect();

assert_eq!(checksums.len(), expected_checksums.len());
for (checksum, expected_checksum) in checksums.into_iter().zip(expected_checksums.into_iter()) {
let encoded = crc32c_to_base64(&expected_checksum);
assert_eq!(checksum, encoded);
}
} else {
// For S3 Express, crc32 is used by default. For S3 Standard, no checksums are used by
// default and the list of parts is empty in GetObjectAttributes. So allow either case --
// the important thing is that crc32c checksums aren't present because we disabled those by
// disabling our upload checksums.
for part in parts {
assert!(
part.checksum_crc32_c().is_none(),
"crc32c should not be present when upload checksums are disabled"
);
}
}
}

Expand All @@ -333,7 +364,7 @@ async fn test_put_review(pass_review: bool) {
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectParams::new().trailing_checksums(true);
let params = PutObjectParams::new().trailing_checksums(PutObjectTrailingChecksums::Enabled);
let mut request = client
.put_object(&bucket, &key, &params)
.await
Expand Down
11 changes: 11 additions & 0 deletions mountpoint-s3-crt/src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1276,6 +1276,17 @@ impl ChecksumConfig {
}
}

/// Create a [ChecksumConfig] enabling Crc32 trailing checksums only for upload review.
pub fn upload_review_crc32c() -> Self {
Self {
inner: aws_s3_checksum_config {
location: aws_s3_checksum_location::AWS_SCL_NONE,
checksum_algorithm: aws_s3_checksum_algorithm::AWS_SCA_CRC32C,
..Default::default()
},
}
}

/// Get out the inner pointer to the checksum config
pub(crate) fn to_inner_ptr(&self) -> *const aws_s3_checksum_config {
&self.inner
Expand Down
17 changes: 8 additions & 9 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{fmt::Debug, sync::Arc};

use mountpoint_s3_client::checksums::crc32c_from_base64;
use mountpoint_s3_client::error::{ObjectClientError, PutObjectError};
use mountpoint_s3_client::types::{PutObjectParams, PutObjectResult, UploadReview};
use mountpoint_s3_client::types::{PutObjectParams, PutObjectResult, PutObjectTrailingChecksums, UploadReview};
use mountpoint_s3_client::{ObjectClient, PutObjectRequest};

use mountpoint_s3_crt::checksums::crc32c::{Crc32c, Hasher};
Expand Down Expand Up @@ -96,7 +96,6 @@ pub struct UploadRequest<Client: ObjectClient> {
request: Client::PutObjectRequest,
maximum_upload_size: Option<usize>,
sse: ServerSideEncryption,
use_additional_checksums: bool,
}

impl<Client: ObjectClient> UploadRequest<Client> {
Expand All @@ -108,7 +107,9 @@ impl<Client: ObjectClient> UploadRequest<Client> {
let mut params = PutObjectParams::new();

if inner.use_additional_checksums {
params = params.trailing_checksums(true);
params = params.trailing_checksums(PutObjectTrailingChecksums::Enabled);
} else {
params = params.trailing_checksums(PutObjectTrailingChecksums::ReviewOnly);
}

if let Some(storage_class) = &inner.storage_class {
Expand All @@ -133,7 +134,6 @@ impl<Client: ObjectClient> UploadRequest<Client> {
request,
maximum_upload_size,
sse: inner.server_side_encryption.clone(),
use_additional_checksums: inner.use_additional_checksums,
})
}

Expand Down Expand Up @@ -168,10 +168,9 @@ impl<Client: ObjectClient> UploadRequest<Client> {
pub async fn complete(self) -> Result<PutObjectResult, PutRequestError<Client>> {
let size = self.size();
let checksum = self.hasher.finalize();
let use_checksums = self.use_additional_checksums;
let result = self
.request
.review_and_complete(move |review| !use_checksums || verify_checksums(review, size, checksum))
.review_and_complete(move |review| verify_checksums(review, size, checksum))
.await?;
if let Err(err) = self
.sse
Expand Down Expand Up @@ -203,17 +202,17 @@ impl<Client: ObjectClient> Debug for UploadRequest<Client> {
fn verify_checksums(review: UploadReview, expected_size: u64, expected_checksum: Crc32c) -> bool {
let mut uploaded_size = 0u64;
let mut uploaded_checksum = Crc32c::new(0);
for part in review.parts {
for (i, part) in review.parts.iter().enumerate() {
uploaded_size += part.size;

let Some(checksum) = &part.checksum else {
error!("missing part checksum");
error!(part_number = i, "missing part checksum");
return false;
};
let checksum = match crc32c_from_base64(checksum) {
Ok(checksum) => checksum,
Err(error) => {
error!(?error, "error decoding part checksum");
error!(part_number = i, ?error, "error decoding part checksum");
return false;
}
};
Expand Down
4 changes: 2 additions & 2 deletions mountpoint-s3/tests/common/fuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ pub mod s3_session {
use aws_sdk_s3::Client;
use mountpoint_s3::prefetch::{caching_prefetch, default_prefetch};
use mountpoint_s3_client::config::{EndpointConfig, S3ClientConfig};
use mountpoint_s3_client::types::Checksum;
use mountpoint_s3_client::types::{Checksum, PutObjectTrailingChecksums};
use mountpoint_s3_client::S3CrtClient;

use crate::common::s3::{get_test_bucket_and_prefix, get_test_region, get_test_sdk_client, tokio_block_on};
Expand Down Expand Up @@ -377,7 +377,7 @@ pub mod s3_session {
if let Some(storage_class) = params.storage_class {
request = request.set_storage_class(Some(storage_class.as_str().into()));
}
if params.trailing_checksums {
if params.trailing_checksums == PutObjectTrailingChecksums::Enabled {
request = request.set_checksum_algorithm(Some(ChecksumAlgorithm::Crc32C));
}
Ok(tokio_block_on(request.send()).map(|_| ())?)
Expand Down

0 comments on commit ba893a3

Please sign in to comment.