Skip to content

Commit

Permalink
Enable trailing checksums on PUT (#320)
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Passaro <[email protected]>
  • Loading branch information
passaro authored Jun 29, 2023
1 parent 005b590 commit 5c1c831
Show file tree
Hide file tree
Showing 10 changed files with 145 additions and 8 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mountpoint-s3-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ anyhow = { version = "1.0.64", features = ["backtrace"] }
aws-config = "0.54.1"
aws-credential-types = "0.54.1"
aws-sdk-s3 = "0.24.0"
base64ct = { version = "1.6.0", features = ["std"] }
bytes = "1.2.1"
clap = "3.2.12"
ctor = "0.1.23"
Expand Down
18 changes: 17 additions & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,23 @@ pub enum GetObjectAttributesError {
/// TODO: Populate this struct with parameters from the S3 API, e.g., storage class, encryption.
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct PutObjectParams {}
pub struct PutObjectParams {
/// Enable Crc32c trailing checksums.
pub trailing_checksums: bool,
}

impl PutObjectParams {
/// Create a default [PutObjectParams].
pub fn new() -> Self {
Self::default()
}

/// Set Crc32c trailing checksums.
pub fn trailing_checksums(mut self, value: bool) -> Self {
self.trailing_checksums = value;
self
}
}

/// A streaming put request which allows callers to asynchronously write
/// the body of the request.
Expand Down
14 changes: 12 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{HostResolver, HostResolverDefaultOptions};
use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
use mountpoint_s3_crt::s3::client::{
init_default_signing_config, Client, ClientConfig, MetaRequestOptions, MetaRequestResult, MetaRequestType,
RequestType,
init_default_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequestOptions, MetaRequestResult,
MetaRequestType, RequestType,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -246,6 +246,7 @@ impl S3CrtClientInner {
inner: message,
uri,
path_prefix,
checksum_config: None,
})
}

Expand Down Expand Up @@ -273,6 +274,9 @@ impl S3CrtClientInner {
let first_body_part_clone = Arc::clone(&first_body_part);

let mut options = MetaRequestOptions::new();
if let Some(checksum_config) = message.checksum_config {
options.checksum_config(checksum_config);
}
options
.message(message.inner)
.endpoint(message.uri)
Expand Down Expand Up @@ -457,6 +461,7 @@ struct S3Message {
inner: Message,
uri: Uri,
path_prefix: String,
checksum_config: Option<ChecksumConfig>,
}

impl S3Message {
Expand Down Expand Up @@ -528,6 +533,11 @@ impl S3Message {
fn set_body_stream(&mut self, input_stream: Option<AsyncInputStream>) -> Option<AsyncInputStream> {
self.inner.set_body_stream(input_stream)
}

/// Sets the checksum configuration for this message.
fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
self.checksum_config = checksum_config;
}
}

#[derive(Debug)]
Expand Down
7 changes: 6 additions & 1 deletion mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{ObjectClientError, PutObjectRequest, PutObjectResult, S3CrtClient, S
use async_trait::async_trait;
use mountpoint_s3_crt::http::request_response::Header;
use mountpoint_s3_crt::io::async_stream::{self, AsyncStreamWriter};
use mountpoint_s3_crt::s3::client::MetaRequestType;
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestType};
use tracing::{debug, Span};

use super::{S3CrtClientInner, S3HttpRequest};
Expand Down Expand Up @@ -67,6 +67,11 @@ impl S3PutObjectRequest {
.set_request_path(&key)
.map_err(S3RequestError::construction_failure)?;

if self.params.trailing_checksums {
let checksum_config = ChecksumConfig::trailing_crc32c();
message.set_checksum_config(Some(checksum_config));
}

let (body_async_stream, writer) = async_stream::new_stream(&self.client.allocator);
message.set_body_stream(Some(body_async_stream));

Expand Down
51 changes: 51 additions & 0 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

pub mod common;

use base64ct::Base64;
use base64ct::Encoding;
use common::*;
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::GetObjectError;
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_client::ObjectClientResult;
use mountpoint_s3_client::PutObjectParams;
use mountpoint_s3_client::PutObjectRequest;
use mountpoint_s3_client::S3ClientConfig;
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_crt::checksums::crc32c;
use rand::Rng;

// Simple test for PUT object. Puts a single, small object as a single part and checks that the
Expand Down Expand Up @@ -184,3 +190,48 @@ async fn test_put_object_abort() {
let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &key).await.unwrap();
assert_eq!(uploads_in_progress, 0);
}

#[tokio::test]
async fn test_put_checksums() {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_checksums");
let client_config = S3ClientConfig {
throughput_target_gbps: Some(10.0),
part_size: Some(PART_SIZE),
..Default::default()
};
let client = S3CrtClient::new(&get_test_region(), client_config).expect("could not create test client");
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectParams::new().trailing_checksums(true);
let mut request = client
.put_object(&bucket, &key, &params)
.await
.expect("put_object should succeed");

request.write(&contents).await.unwrap();
request.complete().await.unwrap();

let sdk_client = get_test_sdk_client().await;
let attributes = sdk_client
.get_object_attributes()
.bucket(bucket)
.key(key)
.object_attributes(aws_sdk_s3::model::ObjectAttributes::ObjectParts)
.send()
.await
.unwrap();
let parts = attributes.object_parts().unwrap().parts().unwrap();
let checksums: Vec<_> = parts.iter().map(|p| p.checksum_crc32_c().unwrap()).collect();
let expected_checksums: Vec<_> = contents.chunks(PART_SIZE).map(crc32c::checksum).collect();

assert_eq!(checksums.len(), expected_checksums.len());
for (checksum, expected_checksum) in checksums.into_iter().zip(expected_checksums.into_iter()) {
let encoded = Base64::encode_string(&expected_checksum.value().to_be_bytes());
assert_eq!(checksum, encoded);
}
}
5 changes: 5 additions & 0 deletions mountpoint-s3-crt/src/checksums/crc32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ impl Crc32 {
pub fn new(value: u32) -> Crc32 {
Crc32(value)
}

/// The CRC32 checksum value.
pub fn value(&self) -> u32 {
self.0
}
}

/// Computes the CRC32 checksum of a byte slice.
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-crt/src/checksums/crc32c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ impl Crc32c {
pub fn new(value: u32) -> Crc32c {
Crc32c(value)
}

/// The CRC32C checksum value.
pub fn value(&self) -> u32 {
self.0
}
}

/// Computes the CRC32C checksum of a byte slice.
Expand Down
39 changes: 39 additions & 0 deletions mountpoint-s3-crt/src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ struct MetaRequestOptionsInner {
/// Owned signing config, if provided.
signing_config: Option<SigningConfig>,

/// Owned checksum config, if provided.
checksum_config: Option<ChecksumConfig>,

/// Telemetry callback, if provided
on_telemetry: Option<TelemetryCallback>,

Expand Down Expand Up @@ -210,6 +213,7 @@ impl MetaRequestOptions {
message: None,
endpoint: None,
signing_config: None,
checksum_config: None,
on_telemetry: None,
on_headers: None,
on_body: None,
Expand Down Expand Up @@ -255,6 +259,16 @@ impl MetaRequestOptions {
self
}

/// Set the checksum config used for this message.
pub fn checksum_config(&mut self, checksum_config: ChecksumConfig) -> &mut Self {
// SAFETY: we aren't moving out of the struct.
let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
options.checksum_config = Some(checksum_config);
options.inner.checksum_config =
options.checksum_config.as_mut().unwrap().to_inner_ptr() as *mut aws_s3_checksum_config;
self
}

/// Set the signing config used for this message. Not public because we copy it from the client
/// when making a request.
fn signing_config(&mut self, signing_config: SigningConfig) -> &mut Self {
Expand Down Expand Up @@ -829,3 +843,28 @@ pub fn init_default_signing_config(region: &str, credentials_provider: Credentia

SigningConfig(Arc::new(Box::into_pin(signing_config)))
}

/// The checksum configuration.
#[derive(Debug, Clone, Default)]
pub struct ChecksumConfig {
/// The struct we can pass into the CRT's functions.
inner: aws_s3_checksum_config,
}

impl ChecksumConfig {
/// Create a [ChecksumConfig] enabling Crc32c trailing checksums in PUT requests.
pub fn trailing_crc32c() -> Self {
Self {
inner: aws_s3_checksum_config {
location: aws_s3_checksum_location::AWS_SCL_TRAILER,
checksum_algorithm: aws_s3_checksum_algorithm::AWS_SCA_CRC32C,
..Default::default()
},
}
}

/// Get out the inner pointer to the checksum config
pub(crate) fn to_inner_ptr(&self) -> *const aws_s3_checksum_config {
&self.inner
}
}
6 changes: 2 additions & 4 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ where
bucket: &str,
key: &str,
) -> ObjectClientResult<Self, PutObjectError, Client::ClientError> {
let request = inner
.client
.put_object(bucket, key, &PutObjectParams::default())
.await?;
let params = PutObjectParams::new().trailing_checksums(true);
let request = inner.client.put_object(bucket, key, &params).await?;

Ok(Self {
bucket: bucket.to_owned(),
Expand Down

0 comments on commit 5c1c831

Please sign in to comment.