Skip to content

feat!: update storage configuration system #3383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,43 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
# path dependencies
deltalake-core = { version = "0.25.0", path = "../core", features = ["cloud"] }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
aws-config = { version = "1.5", default-features = false, features = ["behavior-version-latest","rt-tokio", "credentials-process", "sso"] }
aws-sdk-dynamodb = {version = "1.45", default-features = false, features = ["behavior-version-latest", "rt-tokio"] }
aws-sdk-sts = {version = "1.42", default-features = false, features = ["behavior-version-latest", "rt-tokio"] }
maplit = "1"

# workspace dependencies
async-trait = { workspace = true }
bytes = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
object_store = { workspace = true, features = ["aws"]}
object_store = { workspace = true, features = ["aws"] }
regex = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
regex = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }
url = { workspace = true }
backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] }

# crates.io dependencies
aws-smithy-runtime-api = { version = "1.7" }
aws-smithy-runtime = { version = "1.7", optional = true }
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
aws-config = { version = "1.5", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
"credentials-process",
"sso",
] }
aws-sdk-dynamodb = { version = "1.45", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
] }
aws-sdk-sts = { version = "1.42", default-features = false, features = [
"behavior-version-latest",
"rt-tokio",
] }
backon = { version = "1", default-features = false, features = ["tokio-sleep"] }
hyper-tls = { version = "0.5", optional = true }
maplit = "1"

[dev-dependencies]
deltalake-core = { path = "../core", features = ["datafusion"] }
Expand All @@ -51,7 +65,7 @@ integration_test = []
native-tls = [
"aws-config/client-hyper",
"aws-smithy-runtime/connector-hyper-0-14-x",
"hyper-tls"
"hyper-tls",
]
rustls = [
"aws-config/client-hyper",
Expand Down
8 changes: 4 additions & 4 deletions crates/aws/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,19 @@ pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str =
/// The web identity token file to use when using a web identity provider.
///
/// NOTE: web identity related options are set in the environment when
/// creating an instance of [crate::storage::s3::S3StorageOptions].
/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions).
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE";
/// The role name to use for web identity.
///
/// NOTE: web identity related options are set in the environment when
/// creating an instance of [crate::storage::s3::S3StorageOptions].
/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions).
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN";
/// The role session name to use for web identity.
///
/// NOTE: web identity related options are set in the environment when
/// creating an instance of [crate::storage::s3::S3StorageOptions].
/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions).
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME";
/// Allow http connections - mainly useful for integration tests
Expand All @@ -109,7 +109,7 @@ pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD";

/// The list of option keys owned by the S3 module.
/// Option keys not contained in this list will be added to the `extra_opts`
/// field of [crate::storage::s3::S3StorageOptions].
/// field of [S3StorageOptions](crate::storage::S3StorageOptions).
#[allow(deprecated)]
pub const S3_OPTS: &[&str] = &[
AWS_ENDPOINT_URL,
Expand Down
52 changes: 23 additions & 29 deletions crates/aws/src/credentials.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use deltalake_core::logstore::object_store::aws::{AmazonS3ConfigKey, AwsCredenti
use deltalake_core::logstore::object_store::{
CredentialProvider, Error as ObjectStoreError, Result as ObjectStoreResult,
};
use deltalake_core::logstore::StorageOptions;
use deltalake_core::DeltaResult;
use tokio::sync::Mutex;
use tracing::log::*;
Expand Down Expand Up @@ -117,7 +116,7 @@ const OPTS_PROVIDER: &str = "DeltaStorageOptionsProvider";
/// [aws_config::default_provider::credentials::DefaultCredentialsChain]
#[derive(Clone, Debug)]
pub(crate) struct OptionsCredentialsProvider {
options: StorageOptions,
options: HashMap<String, String>,
}

impl OptionsCredentialsProvider {
Expand All @@ -130,7 +129,7 @@ impl OptionsCredentialsProvider {
// [object_store::aws::AmazonS3ConfigKey] supports a couple different variants for key
// names.
let config_keys: HashMap<AmazonS3ConfigKey, String> =
HashMap::from_iter(self.options.0.iter().filter_map(|(k, v)| {
HashMap::from_iter(self.options.iter().filter_map(|(k, v)| {
match AmazonS3ConfigKey::from_str(&k.to_lowercase()) {
Ok(k) => Some((k, v.into())),
Err(_) => None,
Expand Down Expand Up @@ -171,7 +170,7 @@ mod options_tests {

#[test]
fn test_empty_options_error() {
let options = StorageOptions::default();
let options = HashMap::default();
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(
Expand All @@ -182,11 +181,10 @@ mod options_tests {

#[test]
fn test_uppercase_options_resolve() {
let mash = hashmap! {
let options = hashmap! {
"AWS_ACCESS_KEY_ID".into() => "key".into(),
"AWS_SECRET_ACCESS_KEY".into() => "secret".into(),
};
let options = StorageOptions(mash);
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve");
Expand All @@ -197,11 +195,10 @@ mod options_tests {

#[test]
fn test_lowercase_options_resolve() {
let mash = hashmap! {
let options = hashmap! {
"aws_access_key_id".into() => "key".into(),
"aws_secret_access_key".into() => "secret".into(),
};
let options = StorageOptions(mash);
let provider = OptionsCredentialsProvider { options };
let result = provider.credentials();
assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve");
Expand All @@ -219,13 +216,12 @@ fn assume_role_session_name() -> String {
}

/// Return the configured IAM role ARN or whatever is defined in the environment
fn assume_role_arn(options: &StorageOptions) -> Option<String> {
fn assume_role_arn(options: &HashMap<String, String>) -> Option<String> {
options
.0
.get(constants::AWS_IAM_ROLE_ARN)
.or(
#[allow(deprecated)]
options.0.get(constants::AWS_S3_ASSUME_ROLE_ARN),
options.get(constants::AWS_S3_ASSUME_ROLE_ARN),
)
.or(std::env::var_os(constants::AWS_IAM_ROLE_ARN)
.map(|o| {
Expand All @@ -247,22 +243,20 @@ fn assume_role_arn(options: &StorageOptions) -> Option<String> {
}

/// Return the configured IAM assume role session name or provide a unique one
fn assume_session_name(options: &StorageOptions) -> String {
fn assume_session_name(options: &HashMap<String, String>) -> String {
let assume_session = options
.0
.get(constants::AWS_IAM_ROLE_SESSION_NAME)
.or(
#[allow(deprecated)]
options.0.get(constants::AWS_S3_ROLE_SESSION_NAME),
options.get(constants::AWS_S3_ROLE_SESSION_NAME),
)
.cloned();

assume_session.unwrap_or_else(assume_role_session_name)
}

/// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig]
/// for use with various AWS SDK APIs, such as in our [crate::logstore::S3DynamoDbLogStore]
pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult<SdkConfig> {
/// for use with various AWS SDK APIs, such as in our [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore)
pub async fn resolve_credentials(options: &HashMap<String, String>) -> DeltaResult<SdkConfig> {
let default_provider = DefaultCredentialsChain::builder().build().await;

let credentials_provider = match assume_role_arn(&options) {
Expand Down Expand Up @@ -308,12 +302,12 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_options_credentials_provider() {
let options = StorageOptions(hashmap! {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
});
};

let config = resolve_credentials(options).await;
let config = resolve_credentials(&options).await;
assert!(config.is_ok(), "{config:?}");
let config = config.unwrap();

Expand All @@ -340,13 +334,13 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_options_credentials_provider_session_token() {
let options = StorageOptions(hashmap! {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
constants::AWS_SESSION_TOKEN.to_string() => "test_token".to_string(),
});
};

let config = resolve_credentials(options)
let config = resolve_credentials(&options)
.await
.expect("Failed to resolve_credentials");

Expand All @@ -368,11 +362,11 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_object_store_credential_provider() -> DeltaResult<()> {
let options = StorageOptions(hashmap! {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
});
let sdk_config = resolve_credentials(options)
};
let sdk_config = resolve_credentials(&options)
.await
.expect("Failed to resolve credentials for the test");
let provider = AWSForObjectStore::new(sdk_config);
Expand All @@ -392,11 +386,11 @@ mod tests {
#[tokio::test]
#[serial]
async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> {
let options = StorageOptions(hashmap! {
let options = hashmap! {
constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(),
constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(),
});
let sdk_config = resolve_credentials(options)
};
let sdk_config = resolve_credentials(&options)
.await
.expect("Failed to resolve credentijals for the test");
let provider = AWSForObjectStore::new(sdk_config);
Expand Down
36 changes: 20 additions & 16 deletions crates/aws/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! AWS S3 and similar tooling for delta-rs
//!
//! This module also contains the [S3DynamoDbLogStore] implementation for concurrent writer support
//! with AWS S3 specifically.
//! This module also contains the [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore)
//! implementation for concurrent writer support with AWS S3 specifically.

pub mod constants;
mod credentials;
Expand All @@ -10,6 +10,7 @@ pub mod logstore;
#[cfg(feature = "native-tls")]
mod native;
pub mod storage;

use aws_config::Region;
use aws_config::SdkConfig;
pub use aws_credential_types::provider::SharedCredentialsProvider;
Expand All @@ -26,8 +27,10 @@ use aws_sdk_dynamodb::{
Client,
};
use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey;
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
use deltalake_core::logstore::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::logstore::{
default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory,
ObjectStoreRef, StorageConfig,
};
use deltalake_core::{DeltaResult, Path};
use errors::{DynamoDbConfigError, LockClientError};
use regex::Regex;
Expand All @@ -53,11 +56,10 @@ impl LogStoreFactory for S3LogStoreFactory {
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
options: &StorageConfig,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?);
let options = self.with_env_s3(options);
if options.0.keys().any(|key| {
let s3_options = self.with_env_s3(&options.raw.clone().into());
if s3_options.keys().any(|key| {
let key = key.to_ascii_lowercase();
[
AmazonS3ConfigKey::CopyIfNotExists.as_ref(),
Expand All @@ -67,15 +69,15 @@ impl LogStoreFactory for S3LogStoreFactory {
}) {
debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required");
warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put.");
return Ok(logstore::default_s3_logstore(store, location, &options));
return Ok(logstore::default_s3_logstore(store, location, options));
}

let s3_options = S3StorageOptions::from_map(&options.0)?;
let s3_options = S3StorageOptions::from_map(&s3_options)?;
if s3_options.locking_provider.as_deref() == Some("dynamodb") {
debug!("S3LogStoreFactory has been asked to create a LogStore with the dynamodb locking provider");
return Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options.clone(),
options,
&s3_options,
store,
)?));
Expand All @@ -84,14 +86,16 @@ impl LogStoreFactory for S3LogStoreFactory {
}
}

/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
/// Register an [ObjectStoreFactory] for common S3 url schemes.
///
/// [ObjectStoreFactory]: deltalake_core::logstore::ObjectStoreFactory
pub fn register_handlers(_additional_prefixes: Option<Url>) {
let object_stores = Arc::new(S3ObjectStoreFactory::default());
let log_stores = Arc::new(S3LogStoreFactory::default());
for scheme in ["s3", "s3a"].iter() {
let url = Url::parse(&format!("{scheme}://")).unwrap();
factories().insert(url.clone(), object_stores.clone());
logstores().insert(url.clone(), log_stores.clone());
object_store_factories().insert(url.clone(), object_stores.clone());
logstore_factories().insert(url.clone(), log_stores.clone());
}
}

Expand Down Expand Up @@ -763,7 +767,7 @@ mod tests {
std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER);
}
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.with_options(Arc::new(store), &url, &Default::default())
.unwrap();
assert_eq!(logstore.name(), "DefaultLogStore");
}
Expand All @@ -779,7 +783,7 @@ mod tests {
}

let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.with_options(Arc::new(store), &url, &Default::default())
.unwrap();
assert_eq!(logstore.name(), "S3DynamoDbLogStore");
}
Expand Down
6 changes: 2 additions & 4 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use std::sync::Arc;
use bytes::Bytes;
use deltalake_core::logstore::*;
use deltalake_core::{
kernel::transaction::TransactionError,
logstore::{ObjectStoreRef, StorageOptions},
DeltaResult,
kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult,
};
use object_store::{Error as ObjectStoreError, ObjectStore};
use url::Url;
Expand All @@ -17,7 +15,7 @@ use uuid::Uuid;
pub fn default_s3_logstore(
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
options: &StorageConfig,
) -> Arc<dyn LogStore> {
Arc::new(S3LogStore::new(
store,
Expand Down
Loading
Loading