Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable PUT checksums #320

Merged
merged 1 commit into from
Jun 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions mountpoint-s3-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ anyhow = { version = "1.0.64", features = ["backtrace"] }
aws-config = "0.54.1"
aws-credential-types = "0.54.1"
aws-sdk-s3 = "0.24.0"
base64ct = { version = "1.6.0", features = ["std"] }
bytes = "1.2.1"
clap = "3.2.12"
ctor = "0.1.23"
Expand Down
18 changes: 17 additions & 1 deletion mountpoint-s3-client/src/object_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,23 @@ pub enum GetObjectAttributesError {
/// TODO: Populate this struct with parameters from the S3 API, e.g., storage class, encryption.
#[derive(Debug, Default, Clone)]
#[non_exhaustive]
pub struct PutObjectParams {}
pub struct PutObjectParams {
/// Enable Crc32c trailing checksums.
pub trailing_checksums: bool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a doc comment (specifically that it's crc32c).

Copy link
Member

@jamesbornholt jamesbornholt Jun 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, maybe this should be pub checksum_config: Option<ChecksumConfig> and we re-export mountpoint_s3_crt::ChecksumConfig from this crate, so that we don't end up duplicating stuff if we add more options here? I don't feel strongly about it either way, especially since the CRT combines PUT and GET checksum config into the same struct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that, but it's unclear to me whether we want to expose ChecksumConfig in the long run (vs something tailored to specific requests: PutObject, MPU, GetObject).
I'd rather keep it as a simple bool for now and think about a more complete API when/if we'll introduce more options.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably second having an enum/struct over boolean. Something like Option<ChecksumAlgorithm>, even if ChecksumAlgorithm will only be CRC32C today?

I'm thinking the API we're putting here matches up to this one in Java SDK: https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/s3/model/PutObjectRequest.Builder.html#checksumAlgorithm(software.amazon.awssdk.services.s3.model.ChecksumAlgorithm)

We don't have to re-export the CRT config struct, we could just use our own in mountpoint-s3-client if it makes more sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dannycjones, we could start adding a ChecksumAlgorithm enum, but what about the location (trailer vs header)? That's part of ChecksumConfig but (I believe) only trailer is supported on multi-part uploads (which is what put_object uses). Do we want to limit the API to what a specific request allows, or just error on misconfiguration? Related: will we have a separate put_object for non-mpu? Or maybe that would also be configurable?

I don't think we need to answer any of these questions now, so my preference is for the simplest possible option, knowing we will replace it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably prefer to keep it simple for now, so I'm ok with using boolean. Maybe we open a new issue to review this configuration before we publish a new version of mountpoint-s3-client? I think that's the point where we say there will be no breaking change after this and you can safely use this new config.

}

impl PutObjectParams {
/// Create a default [PutObjectParams].
pub fn new() -> Self {
Self::default()
}

/// Set Crc32c trailing checksums.
pub fn trailing_checksums(mut self, value: bool) -> Self {
self.trailing_checksums = value;
self
}
}

/// A streaming put request which allows callers to asynchronously write
/// the body of the request.
Expand Down
14 changes: 12 additions & 2 deletions mountpoint-s3-client/src/s3_crt_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use mountpoint_s3_crt::io::event_loop::EventLoopGroup;
use mountpoint_s3_crt::io::host_resolver::{HostResolver, HostResolverDefaultOptions};
use mountpoint_s3_crt::io::retry_strategy::{ExponentialBackoffJitterMode, RetryStrategy, StandardRetryOptions};
use mountpoint_s3_crt::s3::client::{
init_default_signing_config, Client, ClientConfig, MetaRequestOptions, MetaRequestResult, MetaRequestType,
RequestType,
init_default_signing_config, ChecksumConfig, Client, ClientConfig, MetaRequestOptions, MetaRequestResult,
MetaRequestType, RequestType,
};

use async_trait::async_trait;
Expand Down Expand Up @@ -246,6 +246,7 @@ impl S3CrtClientInner {
inner: message,
uri,
path_prefix,
checksum_config: None,
})
}

Expand Down Expand Up @@ -273,6 +274,9 @@ impl S3CrtClientInner {
let first_body_part_clone = Arc::clone(&first_body_part);

let mut options = MetaRequestOptions::new();
if let Some(checksum_config) = message.checksum_config {
options.checksum_config(checksum_config);
}
options
.message(message.inner)
.endpoint(message.uri)
Expand Down Expand Up @@ -457,6 +461,7 @@ struct S3Message {
inner: Message,
uri: Uri,
path_prefix: String,
checksum_config: Option<ChecksumConfig>,
}

impl S3Message {
Expand Down Expand Up @@ -528,6 +533,11 @@ impl S3Message {
fn set_body_stream(&mut self, input_stream: Option<AsyncInputStream>) -> Option<AsyncInputStream> {
self.inner.set_body_stream(input_stream)
}

/// Sets the checksum configuration for this message.
fn set_checksum_config(&mut self, checksum_config: Option<ChecksumConfig>) {
self.checksum_config = checksum_config;
}
}

#[derive(Debug)]
Expand Down
7 changes: 6 additions & 1 deletion mountpoint-s3-client/src/s3_crt_client/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{ObjectClientError, PutObjectRequest, PutObjectResult, S3CrtClient, S
use async_trait::async_trait;
use mountpoint_s3_crt::http::request_response::Header;
use mountpoint_s3_crt::io::async_stream::{self, AsyncStreamWriter};
use mountpoint_s3_crt::s3::client::MetaRequestType;
use mountpoint_s3_crt::s3::client::{ChecksumConfig, MetaRequestType};
use tracing::{debug, Span};

use super::{S3CrtClientInner, S3HttpRequest};
Expand Down Expand Up @@ -67,6 +67,11 @@ impl S3PutObjectRequest {
.set_request_path(&key)
.map_err(S3RequestError::construction_failure)?;

if self.params.trailing_checksums {
let checksum_config = ChecksumConfig::trailing_crc32c();
message.set_checksum_config(Some(checksum_config));
}

let (body_async_stream, writer) = async_stream::new_stream(&self.client.allocator);
message.set_body_stream(Some(body_async_stream));

Expand Down
51 changes: 51 additions & 0 deletions mountpoint-s3-client/tests/put_object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,18 @@

pub mod common;

use base64ct::Base64;
use base64ct::Encoding;
use common::*;
use futures::{pin_mut, StreamExt};
use mountpoint_s3_client::GetObjectError;
use mountpoint_s3_client::ObjectClient;
use mountpoint_s3_client::ObjectClientResult;
use mountpoint_s3_client::PutObjectParams;
use mountpoint_s3_client::PutObjectRequest;
use mountpoint_s3_client::S3ClientConfig;
use mountpoint_s3_client::S3CrtClient;
use mountpoint_s3_crt::checksums::crc32c;
use rand::Rng;

// Simple test for PUT object. Puts a single, small object as a single part and checks that the
Expand Down Expand Up @@ -184,3 +190,48 @@ async fn test_put_object_abort() {
let uploads_in_progress = get_mpu_count_for_key(&sdk_client, &bucket, &key).await.unwrap();
assert_eq!(uploads_in_progress, 0);
}

#[tokio::test]
async fn test_put_checksums() {
const PART_SIZE: usize = 5 * 1024 * 1024;
let (bucket, prefix) = get_test_bucket_and_prefix("test_put_checksums");
let client_config = S3ClientConfig {
throughput_target_gbps: Some(10.0),
part_size: Some(PART_SIZE),
..Default::default()
};
let client = S3CrtClient::new(&get_test_region(), client_config).expect("could not create test client");
let key = format!("{prefix}hello");

let mut rng = rand::thread_rng();
let mut contents = vec![0u8; PART_SIZE * 2];
rng.fill(&mut contents[..]);

let params = PutObjectParams::new().trailing_checksums(true);
let mut request = client
.put_object(&bucket, &key, &params)
.await
.expect("put_object should succeed");

request.write(&contents).await.unwrap();
request.complete().await.unwrap();

let sdk_client = get_test_sdk_client().await;
let attributes = sdk_client
.get_object_attributes()
.bucket(bucket)
.key(key)
.object_attributes(aws_sdk_s3::model::ObjectAttributes::ObjectParts)
.send()
.await
.unwrap();
let parts = attributes.object_parts().unwrap().parts().unwrap();
let checksums: Vec<_> = parts.iter().map(|p| p.checksum_crc32_c().unwrap()).collect();
let expected_checksums: Vec<_> = contents.chunks(PART_SIZE).map(crc32c::checksum).collect();

assert_eq!(checksums.len(), expected_checksums.len());
for (checksum, expected_checksum) in checksums.into_iter().zip(expected_checksums.into_iter()) {
let encoded = Base64::encode_string(&expected_checksum.value().to_be_bytes());
assert_eq!(checksum, encoded);
}
}
5 changes: 5 additions & 0 deletions mountpoint-s3-crt/src/checksums/crc32.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ impl Crc32 {
pub fn new(value: u32) -> Crc32 {
Crc32(value)
}

/// The CRC32 checksum value.
pub fn value(&self) -> u32 {
self.0
}
}

/// Computes the CRC32 checksum of a byte slice.
Expand Down
5 changes: 5 additions & 0 deletions mountpoint-s3-crt/src/checksums/crc32c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ impl Crc32c {
pub fn new(value: u32) -> Crc32c {
Crc32c(value)
}

/// The CRC32C checksum value.
pub fn value(&self) -> u32 {
self.0
}
}

/// Computes the CRC32C checksum of a byte slice.
Expand Down
39 changes: 39 additions & 0 deletions mountpoint-s3-crt/src/s3/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ struct MetaRequestOptionsInner {
/// Owned signing config, if provided.
signing_config: Option<SigningConfig>,

/// Owned checksum config, if provided.
checksum_config: Option<ChecksumConfig>,

/// Telemetry callback, if provided
on_telemetry: Option<TelemetryCallback>,

Expand Down Expand Up @@ -210,6 +213,7 @@ impl MetaRequestOptions {
message: None,
endpoint: None,
signing_config: None,
checksum_config: None,
on_telemetry: None,
on_headers: None,
on_body: None,
Expand Down Expand Up @@ -255,6 +259,16 @@ impl MetaRequestOptions {
self
}

/// Set the checksum config used for this message.
pub fn checksum_config(&mut self, checksum_config: ChecksumConfig) -> &mut Self {
// SAFETY: we aren't moving out of the struct.
let options = unsafe { Pin::get_unchecked_mut(Pin::as_mut(&mut self.0)) };
options.checksum_config = Some(checksum_config);
options.inner.checksum_config =
options.checksum_config.as_mut().unwrap().to_inner_ptr() as *mut aws_s3_checksum_config;
self
}

/// Set the signing config used for this message. Not public because we copy it from the client
/// when making a request.
fn signing_config(&mut self, signing_config: SigningConfig) -> &mut Self {
Expand Down Expand Up @@ -829,3 +843,28 @@ pub fn init_default_signing_config(region: &str, credentials_provider: Credentia

SigningConfig(Arc::new(Box::into_pin(signing_config)))
}

/// The checksum configuration.
#[derive(Debug, Clone, Default)]
pub struct ChecksumConfig {
/// The struct we can pass into the CRT's functions.
inner: aws_s3_checksum_config,
}

impl ChecksumConfig {
/// Create a [ChecksumConfig] enabling Crc32c trailing checksums in PUT requests.
pub fn trailing_crc32c() -> Self {
Self {
inner: aws_s3_checksum_config {
location: aws_s3_checksum_location::AWS_SCL_TRAILER,
checksum_algorithm: aws_s3_checksum_algorithm::AWS_SCA_CRC32C,
..Default::default()
},
}
}

/// Get out the inner pointer to the checksum config
pub(crate) fn to_inner_ptr(&self) -> *const aws_s3_checksum_config {
&self.inner
}
}
6 changes: 2 additions & 4 deletions mountpoint-s3/src/upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ where
bucket: &str,
key: &str,
) -> ObjectClientResult<Self, PutObjectError, Client::ClientError> {
let request = inner
.client
.put_object(bucket, key, &PutObjectParams::default())
.await?;
let params = PutObjectParams::new().trailing_checksums(true);
let request = inner.client.put_object(bucket, key, &params).await?;

Ok(Self {
bucket: bucket.to_owned(),
Expand Down
Loading