Skip to content

Commit

Permalink
Automatically disable checksums on S3 on Outposts
Browse files Browse the repository at this point in the history
I refactored our personality detection a little to track the various
"quirks" of each S3 implementation. I also added new tests to make sure
checksums are still enabled by default. This test probably fails when
targeting an Outposts bucket, but we can cross that bridge if we ever
start running CI against Outposts.

Signed-off-by: James Bornholt <[email protected]>
  • Loading branch information
jamesbornholt committed Apr 12, 2024
1 parent 1d09ce8 commit 7908cde
Show file tree
Hide file tree
Showing 10 changed files with 159 additions and 58 deletions.
5 changes: 4 additions & 1 deletion mountpoint-s3-client/src/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ impl ObjectClient for MockClient {
Some(MockObjectParts::Parts(parts)) => Some(GetObjectAttributesParts {
is_truncated: Some(false),
max_parts: Some(10000),
next_part_number_marker: None,
next_part_number_marker: Some(parts.len()),
part_number_marker: Some(0),
parts: Some(
parts
Expand Down Expand Up @@ -861,6 +861,9 @@ struct MockObjectPartAttributes {
checksum: Option<String>,
}

/// Some S3 implementations only report per-part data from GetObjectAttributes if parts were
/// uploaded with additional checksums. This enum is how we remember whether additional checksums
/// were used; if not, the only thing we report is the number of parts.
#[derive(Debug, Clone)]
enum MockObjectParts {
Count(usize),
Expand Down
8 changes: 4 additions & 4 deletions mountpoint-s3/src/bin/mock-mount-s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
//! This binary is intended only for use in testing and development of Mountpoint.

use futures::executor::ThreadPool;
use mountpoint_s3::cli::{CliArgs, S3PersonalityArg};
use mountpoint_s3::fs::S3Personality;
use mountpoint_s3::cli::CliArgs;
use mountpoint_s3::s3::S3Personality;
use mountpoint_s3_client::mock_client::throughput_client::ThroughputMockClient;
use mountpoint_s3_client::mock_client::{MockClientConfig, MockObject};
use mountpoint_s3_client::types::ETag;
Expand Down Expand Up @@ -44,8 +44,8 @@ fn create_mock_client(args: &CliArgs) -> anyhow::Result<(ThroughputMockClient, T

let runtime = ThreadPool::builder().name_prefix("runtime").create()?;

let s3_personality = if let Some(S3PersonalityArg(personality)) = args.bucket_type {
personality
let s3_personality = if let Some(bucket_type) = &args.bucket_type {
bucket_type.to_personality()
} else {
S3Personality::Standard
};
Expand Down
47 changes: 33 additions & 14 deletions mountpoint-s3/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@ use regex::Regex;
use crate::build_info;
use crate::data_cache::{CacheLimit, DiskDataCache, DiskDataCacheConfig, ManagedCacheDir};
use crate::fs::ServerSideEncryption;
use crate::fs::{CacheConfig, S3FilesystemConfig, S3Personality};
use crate::fs::{CacheConfig, S3FilesystemConfig};
use crate::fuse::session::FuseSession;
use crate::fuse::S3FuseFilesystem;
use crate::logging::{init_logging, LoggingConfig};
use crate::prefetch::{caching_prefetch, default_prefetch, Prefetch};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
use crate::{autoconfigure, metrics};

const CLIENT_OPTIONS_HEADER: &str = "Client options";
Expand Down Expand Up @@ -87,7 +88,7 @@ pub struct CliArgs {
pub requester_pays: bool,

#[clap(long, help = "Type of S3 bucket to use [default: inferred from bucket name]", help_heading = BUCKET_OPTIONS_HEADER)]
pub bucket_type: Option<S3PersonalityArg>,
pub bucket_type: Option<BucketType>,

#[clap(
long,
Expand Down Expand Up @@ -296,17 +297,29 @@ pub struct CliArgs {
}

#[derive(Debug, Clone)]
pub struct S3PersonalityArg(pub S3Personality);
pub enum BucketType {
GeneralPurpose,
Directory,
}

impl BucketType {
pub fn to_personality(&self) -> S3Personality {
match self {
Self::GeneralPurpose => S3Personality::Standard,
Self::Directory => S3Personality::ExpressOneZone,
}
}
}

impl ValueEnum for S3PersonalityArg {
impl ValueEnum for BucketType {
fn value_variants<'a>() -> &'a [Self] {
&[Self(S3Personality::Standard), Self(S3Personality::ExpressOneZone)]
&[Self::GeneralPurpose, Self::Directory]
}

fn to_possible_value(&self) -> Option<clap::builder::PossibleValue> {
match self.0 {
S3Personality::Standard => Some(clap::builder::PossibleValue::new("general-purpose")),
S3Personality::ExpressOneZone => Some(clap::builder::PossibleValue::new("directory")),
match self {
Self::GeneralPurpose => Some(clap::builder::PossibleValue::new("general-purpose")),
Self::Directory => Some(clap::builder::PossibleValue::new("directory")),
}
}
}
Expand Down Expand Up @@ -599,7 +612,7 @@ pub fn create_s3_client(args: &CliArgs) -> anyhow::Result<(S3CrtClient, EventLoo
)
.context("Failed to create S3 client")?;
let runtime = client.event_loop_group();
let s3_personality = get_s3_personality(args.bucket_type.clone(), &args.bucket_name, client.endpoint_config());
let s3_personality = infer_s3_personality(args.bucket_type.clone(), &args.bucket_name, client.endpoint_config());

Ok((client, runtime, s3_personality))
}
Expand Down Expand Up @@ -639,8 +652,12 @@ where
filesystem_config.storage_class = args.storage_class;
filesystem_config.allow_delete = args.allow_delete;
filesystem_config.allow_overwrite = args.allow_overwrite;
filesystem_config.s3_personality = s3_personality;
filesystem_config.use_upload_checksums = !args.disable_upload_checksums;
if !s3_personality.supports_additional_checksums() {
tracing::info!("disabling upload checksums because target S3 personality does not support them");
filesystem_config.use_upload_checksums = false;
}
filesystem_config.s3_personality = s3_personality;
filesystem_config.server_side_encryption = ServerSideEncryption::new(args.sse, args.sse_kms_key_id);

let prefetcher_config = Default::default();
Expand Down Expand Up @@ -872,13 +889,13 @@ fn get_region(args_region: Option<String>, instance_info: &InstanceInfo) -> (Str
(DEFAULT_REGION.to_owned(), false)
}

fn get_s3_personality(
args_personality: Option<S3PersonalityArg>,
fn infer_s3_personality(
bucket_type: Option<BucketType>,
bucket: &str,
endpoint_config: EndpointConfig,
) -> S3Personality {
if let Some(S3PersonalityArg(personality)) = args_personality {
return personality;
if let Some(bucket_type) = bucket_type {
return bucket_type.to_personality();
}

let Ok(resolved) = endpoint_config.resolve_for_bucket(bucket) else {
Expand All @@ -889,6 +906,8 @@ fn get_s3_personality(
};
if auth_scheme.scheme_name() == SigningAlgorithm::SigV4Express {
S3Personality::ExpressOneZone
} else if auth_scheme.signing_name() == "s3-outposts" {
S3Personality::Outposts
} else {
S3Personality::Standard
}
Expand Down
24 changes: 2 additions & 22 deletions mountpoint-s3/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::inode::{Inode, InodeError, InodeKind, LookedUp, ReaddirHandle, Superb
use crate::logging;
use crate::prefetch::{Prefetch, PrefetchReadError, PrefetchResult};
use crate::prefix::Prefix;
use crate::s3::S3Personality;
use crate::sync::atomic::{AtomicI64, AtomicU64, Ordering};
use crate::sync::{Arc, AsyncMutex, AsyncRwLock};
use crate::upload::{UploadRequest, Uploader};
Expand Down Expand Up @@ -377,34 +378,13 @@ impl Default for S3FilesystemConfig {
allow_delete: false,
allow_overwrite: false,
storage_class: None,
s3_personality: S3Personality::Standard,
s3_personality: S3Personality::default(),
server_side_encryption: Default::default(),
use_upload_checksums: true,
}
}
}

/// The type of S3 we're talking to. S3 Standard and S3 Express One Zone have slightly different
/// semantics around ListObjects (ordered versus unordered) that this enum captures.
///
/// This enum intentionally doesn't implement PartialEq/Eq. You shouldn't test it directly. Instead,
/// use its methods like `is_list_ordered` to check the actual behavior you're looking for.
#[derive(Debug, Clone, Copy, Default)]
pub enum S3Personality {
#[default]
Standard,
ExpressOneZone,
}

impl S3Personality {
pub fn is_list_ordered(self) -> bool {
match self {
Self::Standard => true,
Self::ExpressOneZone => false,
}
}
}

/// Server-side encryption configuration for newly created objects
#[derive(Debug, Clone)]
pub struct ServerSideEncryption {
Expand Down
3 changes: 2 additions & 1 deletion mountpoint-s3/src/inode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ use thiserror::Error;
use time::OffsetDateTime;
use tracing::{debug, error, trace, warn};

use crate::fs::{CacheConfig, S3Personality};
use crate::fs::CacheConfig;
use crate::logging;
use crate::prefix::Prefix;
use crate::s3::S3Personality;
use crate::sync::atomic::{AtomicU64, Ordering};
use crate::sync::RwLockReadGuard;
use crate::sync::RwLockWriteGuard;
Expand Down
1 change: 1 addition & 0 deletions mountpoint-s3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub mod metrics;
mod object;
pub mod prefetch;
pub mod prefix;
pub mod s3;
mod sync;
mod upload;

Expand Down
32 changes: 32 additions & 0 deletions mountpoint-s3/src/s3.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! Personalities of different S3 implementations. We use this to auto-configure some sensible
//! defaults that differ between implementations.

/// The type of S3 we're talking to.
///
/// This enum intentionally doesn't implement PartialEq/Eq. You shouldn't test it directly. Instead,
/// use its methods like `is_list_ordered` to check the actual behavior you're looking for.
#[derive(Debug, Clone, Copy, Default)]
pub enum S3Personality {
#[default]
Standard,
ExpressOneZone,
Outposts,
}

impl S3Personality {
pub fn is_list_ordered(&self) -> bool {
match self {
S3Personality::Standard => true,
S3Personality::ExpressOneZone => false,
S3Personality::Outposts => true,
}
}

pub fn supports_additional_checksums(&self) -> bool {
match self {
S3Personality::Standard => true,
S3Personality::ExpressOneZone => true,
S3Personality::Outposts => false,
}
}
}
3 changes: 2 additions & 1 deletion mountpoint-s3/tests/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

use fuser::FileType;
use libc::S_IFREG;
use mountpoint_s3::fs::{CacheConfig, S3Personality, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::fs::{CacheConfig, ToErrno, FUSE_ROOT_INODE};
use mountpoint_s3::prefix::Prefix;
use mountpoint_s3::s3::S3Personality;
use mountpoint_s3::S3FilesystemConfig;
use mountpoint_s3_client::failure_client::countdown_failure_client;
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, Operation};
Expand Down
76 changes: 69 additions & 7 deletions mountpoint-s3/tests/fuse_tests/fork_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,23 @@ use aws_config::BehaviorVersion;
use aws_sdk_s3::primitives::ByteStream;
#[cfg(not(feature = "s3express_tests"))]
use aws_sdk_sts::config::Region;
use std::fs;
use std::io::{BufRead, BufReader};
use std::fs::{self, File};
#[cfg(not(feature = "s3express_tests"))]
use std::io::{Read, Write};
use std::io::Read;
use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::process::{Child, ExitStatus, Stdio};
use std::time::{Duration, Instant};
use std::{path::PathBuf, process::Command};
use test_case::test_case;

use crate::common::fuse::read_dir_to_entry_names;
use crate::common::s3::{create_objects, get_test_bucket_and_prefix, get_test_bucket_forbidden, get_test_region};
use crate::common::s3::{
create_objects, get_test_bucket_and_prefix, get_test_bucket_forbidden, get_test_region, get_test_sdk_client,
tokio_block_on,
};
#[cfg(not(feature = "s3express_tests"))]
use crate::common::s3::{get_scoped_down_credentials, get_test_kms_key_id, get_test_sdk_client};
#[cfg(not(feature = "s3express_tests"))]
use crate::common::s3::{get_subsession_iam_role, tokio_block_on};
use crate::common::s3::{get_scoped_down_credentials, get_subsession_iam_role, get_test_kms_key_id};

const MAX_WAIT_DURATION: std::time::Duration = std::time::Duration::from_secs(10);

Expand Down Expand Up @@ -312,6 +313,67 @@ fn mount_allow_delete(allow_delete: bool) -> Result<(), Box<dyn std::error::Erro
Ok(())
}

#[test_case(true; "checksums disabled")]
#[test_case(false; "default checksums")]
fn mount_disable_checksums(disable_checksums: bool) -> Result<(), Box<dyn std::error::Error>> {
let (bucket, prefix) = get_test_bucket_and_prefix("mount_enable_checksums");
let mount_point = assert_fs::TempDir::new()?;
let region = get_test_region();

let mut cmd = Command::cargo_bin("mount-s3")?;
cmd.arg(&bucket)
.arg(mount_point.path())
.arg(format!("--prefix={prefix}"))
.arg("--auto-unmount")
.arg(format!("--region={region}"));
if disable_checksums {
cmd.arg("--disable-upload-checksums");
}
let child = cmd.spawn().expect("unable to spawn child");

let exit_status = wait_for_exit(child);

// verify mount status and mount entry
assert!(exit_status.success());
assert!(mount_exists("mountpoint-s3", mount_point.path().to_str().unwrap()));

// try to upload an object
{
let mut f = File::create(mount_point.path().join("file.txt")).unwrap();
f.write_all(b"hello world").unwrap();
f.sync_all().unwrap();
}

// check it's there and has checksums if we expected it to
let sdk_client = tokio_block_on(get_test_sdk_client(&region));
let attrs = tokio_block_on(
sdk_client
.get_object_attributes()
.bucket(&bucket)
.key(format!("{prefix}file.txt"))
.object_attributes(aws_sdk_s3::types::ObjectAttributes::ObjectParts)
.send(),
)
.unwrap();
let parts = attrs.object_parts().unwrap();
// Parts may not be present when checksums are disabled, but if they are (on Express), they
// shouldn't be crc32c which is our default
if !disable_checksums {
assert!(
!parts.parts().is_empty(),
"checksums must be present when checksums enabled"
);
}
for part in parts.parts() {
let checksum_present = part.checksum_crc32_c().is_some();
assert_eq!(checksum_present, !disable_checksums, "checksum presence mismatch");
}

unmount(mount_point.path());

Ok(())
}

#[test]
// S3 Express One Zone doesn't support scoped credentials
#[cfg(not(feature = "s3express_tests"))]
Expand Down
18 changes: 10 additions & 8 deletions mountpoint-s3/tests/fuse_tests/write_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1155,17 +1155,19 @@ where
let _crc32c = checksum.checksum_crc32c.expect("crc32c is used for trailing checksums");
}
} else {
// For S3 Express, crc32 is used by default, otherwise no checksums are used by default. So
// allow either case -- the important thing is that crc32c (which we use for trailing
// checksums) isn't present because we disabled it.
// For S3 Express, crc32 is used by default. For S3 Standard, no checksums are used by
// default and the list of parts is empty in GetObjectAttributes. So allow either case --
// the important thing is that crc32c checksums aren't present because we disabled those by
// disabling our upload checksums.
if let Some(parts) = parts {
assert_eq!(parts.len(), (OBJECT_SIZE + PART_SIZE - 1) / PART_SIZE);
for part in parts {
let checksum = part.checksum.expect("checksum should be present");
assert!(
checksum.checksum_crc32c.is_none(),
"crc32c is not default for trailing checksums"
);
if let Some(checksum) = part.checksum {
assert!(
checksum.checksum_crc32c.is_none(),
"crc32c should not be present when upload checksums are disabled"
);
}
}
}
}
Expand Down

0 comments on commit 7908cde

Please sign in to comment.