diff --git a/crates/aws/Cargo.toml b/crates/aws/Cargo.toml index 4102f7486f..9fda2beb95 100644 --- a/crates/aws/Cargo.toml +++ b/crates/aws/Cargo.toml @@ -12,14 +12,8 @@ repository.workspace = true rust-version.workspace = true [dependencies] +# path dependencies deltalake-core = { version = "0.25.0", path = "../core", features = ["cloud"] } -aws-smithy-runtime-api = { version="1.7" } -aws-smithy-runtime = { version="1.7", optional = true} -aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]} -aws-config = { version = "1.5", default-features = false, features = ["behavior-version-latest","rt-tokio", "credentials-process", "sso"] } -aws-sdk-dynamodb = {version = "1.45", default-features = false, features = ["behavior-version-latest", "rt-tokio"] } -aws-sdk-sts = {version = "1.42", default-features = false, features = ["behavior-version-latest", "rt-tokio"] } -maplit = "1" # workspace dependencies async-trait = { workspace = true } @@ -27,14 +21,34 @@ bytes = { workspace = true } chrono = { workspace = true } futures = { workspace = true } tracing = { workspace = true } -object_store = { workspace = true, features = ["aws"]} +object_store = { workspace = true, features = ["aws"] } +regex = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } -regex = { workspace = true } uuid = { workspace = true, features = ["serde", "v4"] } url = { workspace = true } -backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] } + +# crates.io dependencies +aws-smithy-runtime-api = { version = "1.7" } +aws-smithy-runtime = { version = "1.7", optional = true } +aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] } +aws-config = { version = "1.5", default-features = false, features = [ + "behavior-version-latest", + "rt-tokio", + "credentials-process", + "sso", +] } +aws-sdk-dynamodb = { version = "1.45", default-features = false, features = [ + "behavior-version-latest", + "rt-tokio", +] } +aws-sdk-sts = { version = "1.42", default-features = false, features = [ + "behavior-version-latest", + "rt-tokio", +] } +backon = { version = "1", default-features = false, features = ["tokio-sleep"] } hyper-tls = { version = "0.5", optional = true } +maplit = "1" [dev-dependencies] deltalake-core = { path = "../core", features = ["datafusion"] } @@ -51,7 +65,7 @@ integration_test = [] native-tls = [ "aws-config/client-hyper", "aws-smithy-runtime/connector-hyper-0-14-x", - "hyper-tls" + "hyper-tls", ] rustls = [ "aws-config/client-hyper", diff --git a/crates/aws/src/constants.rs b/crates/aws/src/constants.rs index 2fdd19dd5d..ecd37b15af 100644 --- a/crates/aws/src/constants.rs +++ b/crates/aws/src/constants.rs @@ -74,19 +74,19 @@ pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str = /// The web identity token file to use when using a web identity provider. /// /// NOTE: web identity related options are set in the environment when -/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions). /// See also . pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE"; /// The role name to use for web identity. /// /// NOTE: web identity related options are set in the environment when -/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions). /// See also . pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN"; /// The role session name to use for web identity. /// /// NOTE: web identity related options are set in the environment when -/// creating an instance of [crate::storage::s3::S3StorageOptions]. +/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions). /// See also . pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME"; /// Allow http connections - mainly useful for integration tests @@ -109,7 +109,7 @@ pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD"; /// The list of option keys owned by the S3 module. /// Option keys not contained in this list will be added to the `extra_opts` -/// field of [crate::storage::s3::S3StorageOptions]. +/// field of [S3StorageOptions](crate::storage::S3StorageOptions). #[allow(deprecated)] pub const S3_OPTS: &[&str] = &[ AWS_ENDPOINT_URL, diff --git a/crates/aws/src/credentials.rs b/crates/aws/src/credentials.rs index c5e8b9310e..bcf37ac4bf 100644 --- a/crates/aws/src/credentials.rs +++ b/crates/aws/src/credentials.rs @@ -18,7 +18,6 @@ use deltalake_core::logstore::object_store::aws::{AmazonS3ConfigKey, AwsCredenti use deltalake_core::logstore::object_store::{ CredentialProvider, Error as ObjectStoreError, Result as ObjectStoreResult, }; -use deltalake_core::logstore::StorageOptions; use deltalake_core::DeltaResult; use tokio::sync::Mutex; use tracing::log::*; @@ -117,7 +116,7 @@ const OPTS_PROVIDER: &str = "DeltaStorageOptionsProvider"; /// [aws_config::default_provider::credentials::DefaultCredentialsChain] #[derive(Clone, Debug)] pub(crate) struct OptionsCredentialsProvider { - options: StorageOptions, + options: HashMap, } impl OptionsCredentialsProvider { @@ -130,7 +129,7 @@ impl OptionsCredentialsProvider { // [object_store::aws::AmazonS3ConfigKey] supports a couple different variants for key // names. let config_keys: HashMap = - HashMap::from_iter(self.options.0.iter().filter_map(|(k, v)| { + HashMap::from_iter(self.options.iter().filter_map(|(k, v)| { match AmazonS3ConfigKey::from_str(&k.to_lowercase()) { Ok(k) => Some((k, v.into())), Err(_) => None, @@ -171,7 +170,7 @@ mod options_tests { #[test] fn test_empty_options_error() { - let options = StorageOptions::default(); + let options = HashMap::default(); let provider = OptionsCredentialsProvider { options }; let result = provider.credentials(); assert!( @@ -182,11 +181,10 @@ mod options_tests { #[test] fn test_uppercase_options_resolve() { - let mash = hashmap! { + let options = hashmap! { "AWS_ACCESS_KEY_ID".into() => "key".into(), "AWS_SECRET_ACCESS_KEY".into() => "secret".into(), }; - let options = StorageOptions(mash); let provider = OptionsCredentialsProvider { options }; let result = provider.credentials(); assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve"); @@ -197,11 +195,10 @@ mod options_tests { #[test] fn test_lowercase_options_resolve() { - let mash = hashmap! { + let options = hashmap! { "aws_access_key_id".into() => "key".into(), "aws_secret_access_key".into() => "secret".into(), }; - let options = StorageOptions(mash); let provider = OptionsCredentialsProvider { options }; let result = provider.credentials(); assert!(result.is_ok(), "StorageOptions with at least AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY should resolve"); @@ -219,13 +216,12 @@ fn assume_role_session_name() -> String { } /// Return the configured IAM role ARN or whatever is defined in the environment -fn assume_role_arn(options: &StorageOptions) -> Option { +fn assume_role_arn(options: &HashMap) -> Option { options - .0 .get(constants::AWS_IAM_ROLE_ARN) .or( #[allow(deprecated)] - options.0.get(constants::AWS_S3_ASSUME_ROLE_ARN), + options.get(constants::AWS_S3_ASSUME_ROLE_ARN), ) .or(std::env::var_os(constants::AWS_IAM_ROLE_ARN) .map(|o| { @@ -247,22 +243,20 @@ fn assume_role_arn(options: &StorageOptions) -> Option { } /// Return the configured IAM assume role session name or provide a unique one -fn assume_session_name(options: &StorageOptions) -> String { +fn assume_session_name(options: &HashMap) -> String { let assume_session = options - .0 .get(constants::AWS_IAM_ROLE_SESSION_NAME) .or( #[allow(deprecated)] - options.0.get(constants::AWS_S3_ROLE_SESSION_NAME), + options.get(constants::AWS_S3_ROLE_SESSION_NAME), ) .cloned(); - assume_session.unwrap_or_else(assume_role_session_name) } /// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig] -/// for use with various AWS SDK APIs, such as in our [crate::logstore::S3DynamoDbLogStore] -pub async fn resolve_credentials(options: StorageOptions) -> DeltaResult { +/// for use with various AWS SDK APIs, such as in our [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore) +pub async fn resolve_credentials(options: &HashMap) -> DeltaResult { let default_provider = DefaultCredentialsChain::builder().build().await; let credentials_provider = match assume_role_arn(&options) { @@ -308,12 +302,12 @@ mod tests { #[tokio::test] #[serial] async fn test_options_credentials_provider() { - let options = StorageOptions(hashmap! { + let options = hashmap! { constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), - }); + }; - let config = resolve_credentials(options).await; + let config = resolve_credentials(&options).await; assert!(config.is_ok(), "{config:?}"); let config = config.unwrap(); @@ -340,13 +334,13 @@ mod tests { #[tokio::test] #[serial] async fn test_options_credentials_provider_session_token() { - let options = StorageOptions(hashmap! { + let options = hashmap! { constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), constants::AWS_SESSION_TOKEN.to_string() => "test_token".to_string(), - }); + }; - let config = resolve_credentials(options) + let config = resolve_credentials(&options) .await .expect("Failed to resolve_credentials"); @@ -368,11 +362,11 @@ mod tests { #[tokio::test] #[serial] async fn test_object_store_credential_provider() -> DeltaResult<()> { - let options = StorageOptions(hashmap! { + let options = hashmap! { constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), - }); - let sdk_config = resolve_credentials(options) + }; + let sdk_config = resolve_credentials(&options) .await .expect("Failed to resolve credentials for the test"); let provider = AWSForObjectStore::new(sdk_config); @@ -392,11 +386,11 @@ mod tests { #[tokio::test] #[serial] async fn test_object_store_credential_provider_consistency() -> DeltaResult<()> { - let options = StorageOptions(hashmap! { + let options = hashmap! { constants::AWS_ACCESS_KEY_ID.to_string() => "test_id".to_string(), constants::AWS_SECRET_ACCESS_KEY.to_string() => "test_secret".to_string(), - }); - let sdk_config = resolve_credentials(options) + }; + let sdk_config = resolve_credentials(&options) .await .expect("Failed to resolve credentijals for the test"); let provider = AWSForObjectStore::new(sdk_config); diff --git a/crates/aws/src/lib.rs b/crates/aws/src/lib.rs index 9ccc21a1ea..6f704c7de8 100644 --- a/crates/aws/src/lib.rs +++ b/crates/aws/src/lib.rs @@ -1,7 +1,7 @@ //! AWS S3 and similar tooling for delta-rs //! -//! This module also contains the [S3DynamoDbLogStore] implementation for concurrent writer support -//! with AWS S3 specifically. +//! This module also contains the [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore) +//! implementation for concurrent writer support with AWS S3 specifically. pub mod constants; mod credentials; @@ -10,6 +10,7 @@ pub mod logstore; #[cfg(feature = "native-tls")] mod native; pub mod storage; + use aws_config::Region; use aws_config::SdkConfig; pub use aws_credential_types::provider::SharedCredentialsProvider; @@ -26,8 +27,10 @@ use aws_sdk_dynamodb::{ Client, }; use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey; -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; -use deltalake_core::logstore::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions}; +use deltalake_core::logstore::{ + default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory, + ObjectStoreRef, StorageConfig, +}; use deltalake_core::{DeltaResult, Path}; use errors::{DynamoDbConfigError, LockClientError}; use regex::Regex; @@ -53,11 +56,10 @@ impl LogStoreFactory for S3LogStoreFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { - let store = url_prefix_handler(store, Path::parse(location.path())?); - let options = self.with_env_s3(options); - if options.0.keys().any(|key| { + let s3_options = self.with_env_s3(&options.raw.clone().into()); + if s3_options.keys().any(|key| { let key = key.to_ascii_lowercase(); [ AmazonS3ConfigKey::CopyIfNotExists.as_ref(), @@ -67,15 +69,15 @@ impl LogStoreFactory for S3LogStoreFactory { }) { debug!("S3LogStoreFactory has been asked to create a LogStore where the underlying store has copy-if-not-exists enabled - no locking provider required"); warn!("Most S3 object store support conditional put, remove copy_if_not_exists parameter to use a more performant conditional put."); - return Ok(logstore::default_s3_logstore(store, location, &options)); + return Ok(logstore::default_s3_logstore(store, location, options)); } - let s3_options = S3StorageOptions::from_map(&options.0)?; + let s3_options = S3StorageOptions::from_map(&s3_options)?; if s3_options.locking_provider.as_deref() == Some("dynamodb") { debug!("S3LogStoreFactory has been asked to create a LogStore with the dynamodb locking provider"); return Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new( location.clone(), - options.clone(), + options, &s3_options, store, )?)); @@ -84,14 +86,16 @@ impl LogStoreFactory for S3LogStoreFactory { } } -/// Register an [ObjectStoreFactory] for common S3 [Url] schemes +/// Register an [ObjectStoreFactory] for common S3 url schemes. +/// +/// [ObjectStoreFactory]: deltalake_core::logstore::ObjectStoreFactory pub fn register_handlers(_additional_prefixes: Option) { let object_stores = Arc::new(S3ObjectStoreFactory::default()); let log_stores = Arc::new(S3LogStoreFactory::default()); for scheme in ["s3", "s3a"].iter() { let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), object_stores.clone()); - logstores().insert(url.clone(), log_stores.clone()); + object_store_factories().insert(url.clone(), object_stores.clone()); + logstore_factories().insert(url.clone(), log_stores.clone()); } } @@ -763,7 +767,7 @@ mod tests { std::env::remove_var(crate::constants::AWS_S3_LOCKING_PROVIDER); } let logstore = factory - .with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new())) + .with_options(Arc::new(store), &url, &Default::default()) .unwrap(); assert_eq!(logstore.name(), "DefaultLogStore"); } @@ -779,7 +783,7 @@ mod tests { } let logstore = factory - .with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new())) + .with_options(Arc::new(store), &url, &Default::default()) .unwrap(); assert_eq!(logstore.name(), "S3DynamoDbLogStore"); } diff --git a/crates/aws/src/logstore/default_logstore.rs b/crates/aws/src/logstore/default_logstore.rs index 3e343def9a..9e13ec965e 100644 --- a/crates/aws/src/logstore/default_logstore.rs +++ b/crates/aws/src/logstore/default_logstore.rs @@ -5,9 +5,7 @@ use std::sync::Arc; use bytes::Bytes; use deltalake_core::logstore::*; use deltalake_core::{ - kernel::transaction::TransactionError, - logstore::{ObjectStoreRef, StorageOptions}, - DeltaResult, + kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult, }; use object_store::{Error as ObjectStoreError, ObjectStore}; use url::Url; @@ -17,7 +15,7 @@ use uuid::Uuid; pub fn default_s3_logstore( store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> Arc { Arc::new(S3LogStore::new( store, diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index 8b3db324c7..26bd00b054 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -14,9 +14,7 @@ use url::Url; use deltalake_core::logstore::*; use deltalake_core::{ - kernel::transaction::TransactionError, - logstore::{ObjectStoreRef, StorageOptions}, - DeltaResult, DeltaTableError, + kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult, DeltaTableError, }; use uuid::Uuid; @@ -41,7 +39,7 @@ impl S3DynamoDbLogStore { /// Create log store pub fn try_new( location: Url, - options: impl Into + Clone, + options: &StorageConfig, s3_options: &S3StorageOptions, object_store: ObjectStoreRef, ) -> DeltaResult { @@ -77,7 +75,7 @@ impl S3DynamoDbLogStore { lock_client, config: LogStoreConfig { location, - options: options.into(), + options: options.clone(), }, table_path, }) diff --git a/crates/aws/src/storage.rs b/crates/aws/src/storage.rs index a5ac27244b..104f15e70c 100644 --- a/crates/aws/src/storage.rs +++ b/crates/aws/src/storage.rs @@ -1,4 +1,10 @@ //! AWS S3 storage backend. +use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::Range; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; use aws_config::{Region, SdkConfig}; use bytes::Bytes; @@ -7,26 +13,18 @@ use deltalake_core::logstore::object_store::{ GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, }; -use deltalake_core::logstore::{ - limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse, - StorageOptions, -}; +use deltalake_core::logstore::{str_is_truthy, ObjectStoreFactory, ObjectStoreRef}; use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path}; use futures::stream::BoxStream; use futures::Future; -use std::collections::HashMap; -use std::fmt::Debug; -use std::ops::Range; -use std::str::FromStr; -use std::sync::Arc; -use std::time::Duration; +use object_store::aws::AmazonS3; +use object_store::RetryConfig; use tracing::log::*; use url::Url; use crate::constants; +use crate::credentials::AWSForObjectStore; use crate::errors::DynamoDbConfigError; -#[cfg(feature = "native-tls")] -use crate::native; const STORE_NAME: &str = "DeltaS3ObjectStore"; @@ -35,44 +33,38 @@ pub struct S3ObjectStoreFactory {} impl S3StorageOptionsConversion for S3ObjectStoreFactory {} -impl RetryConfigParse for S3ObjectStoreFactory {} - impl ObjectStoreFactory for S3ObjectStoreFactory { fn parse_url_opts( &self, url: &Url, - storage_options: &StorageOptions, + storage_options: &HashMap, + retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { let options = self.with_env_s3(storage_options); // All S3-likes should start their builder the same way - let mut builder = AmazonS3Builder::new().with_url(url.to_string()); - - for (key, value) in options.0.iter() { + let mut builder = AmazonS3Builder::new() + .with_url(url.to_string()) + .with_retry(retry.clone()); + for (key, value) in options.iter() { if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { builder = builder.with_config(key, value.clone()); } } + let s3_options = S3StorageOptions::from_map(&options)?; + if let Some(ref sdk_config) = s3_options.sdk_config { + builder = + builder.with_credentials(Arc::new(AWSForObjectStore::new(sdk_config.clone()))); + } + let (_, path) = ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError { source: Box::new(e), })?; let prefix = Path::parse(path)?; - let s3_options: S3StorageOptions = S3StorageOptions::from_map(&options.0)?; - - if let Some(ref sdk_config) = s3_options.sdk_config { - builder = builder.with_credentials(Arc::new( - crate::credentials::AWSForObjectStore::new(sdk_config.clone()), - )); - } - - let inner = builder - .with_retry(self.parse_retry_config(&options)?) - .build()?; - - let store = aws_storage_handler(limit_store_handler(inner, &options), &s3_options)?; + let store = aws_storage_handler(builder.build()?, &s3_options)?; debug!("Initialized the object store: {store:?}"); Ok((store, prefix)) @@ -80,7 +72,7 @@ impl ObjectStoreFactory for S3ObjectStoreFactory { } fn aws_storage_handler( - store: ObjectStoreRef, + store: AmazonS3, s3_options: &S3StorageOptions, ) -> DeltaResult { // Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store @@ -88,13 +80,13 @@ fn aws_storage_handler( if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename { let store = S3StorageBackend::try_new( - store, + Arc::new(store), Some("dynamodb") == s3_options.locking_provider.as_deref() || s3_options.allow_unsafe_rename, )?; Ok(Arc::new(store)) } else { - Ok(store) + Ok(Arc::new(store)) } } @@ -102,22 +94,22 @@ fn aws_storage_handler( // // This function will return true in the default case since it's most likely that the absence of // options will mean default/S3 configuration -fn is_aws(options: &StorageOptions) -> bool { +fn is_aws(options: &HashMap) -> bool { // Checks storage option first then env var for existence of aws force credential load // .from_s3_env never inserts these into the options because they are delta-rs specific - if str_option(&options.0, constants::AWS_FORCE_CREDENTIAL_LOAD).is_some() { + if str_option(options, constants::AWS_FORCE_CREDENTIAL_LOAD).is_some() { return true; } // Checks storage option first then env var for existence of locking provider // .from_s3_env never inserts these into the options because they are delta-rs specific - if str_option(&options.0, constants::AWS_S3_LOCKING_PROVIDER).is_some() { + if str_option(options, constants::AWS_S3_LOCKING_PROVIDER).is_some() { return true; } // Options at this stage should only contain 'aws_endpoint' in lowercase // due to with_env_s3 - !(options.0.contains_key("aws_endpoint") || options.0.contains_key(constants::AWS_ENDPOINT_URL)) + !(options.contains_key("aws_endpoint") || options.contains_key(constants::AWS_ENDPOINT_URL)) } /// Options used to configure the [S3StorageBackend]. @@ -161,7 +153,7 @@ impl PartialEq for S3StorageOptions { } impl S3StorageOptions { - /// Creates an instance of S3StorageOptions from the given HashMap. + /// Creates an instance of [`S3StorageOptions`] from the given HashMap. pub fn from_map(options: &HashMap) -> DeltaResult { let extra_opts: HashMap = options .iter() @@ -198,14 +190,12 @@ impl S3StorageOptions { .map(|val| str_is_truthy(&val)) .unwrap_or(false); - let storage_options = StorageOptions(options.clone()); - - let sdk_config = match is_aws(&storage_options) { + let sdk_config = match is_aws(&options) { false => None, true => { debug!("Detected AWS S3 Storage options, resolving AWS credentials"); Some(execute_sdk_future( - crate::credentials::resolve_credentials(storage_options.clone()), + crate::credentials::resolve_credentials(&options), )??) } }; @@ -416,7 +406,7 @@ impl ObjectStore for S3StorageBackend { } } -/// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions]. +/// Storage option keys to use when creating [`S3StorageOptions`]. /// /// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. /// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename. @@ -441,27 +431,23 @@ pub(crate) fn str_option(map: &HashMap, key: &str) -> Option StorageOptions { - let mut options = StorageOptions( - options - .0 - .clone() - .into_iter() - .map(|(k, v)| { - if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { - (config_key.as_ref().to_string(), v) - } else { - (k, v) - } - }) - .collect(), - ); + fn with_env_s3(&self, options: &HashMap) -> HashMap { + let mut options: HashMap = options + .clone() + .into_iter() + .map(|(k, v)| { + if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { + (config_key.as_ref().to_string(), v) + } else { + (k, v) + } + }) + .collect(); for (os_key, os_value) in std::env::vars_os() { if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { options - .0 .entry(config_key.as_ref().to_string()) .or_insert(value.to_string()); } @@ -472,7 +458,7 @@ pub(crate) trait S3StorageOptionsConversion { // set this behaviour. We will however assume, when a locking provider/copy-if-not-exists keys are not provided // that PutIfAbsent is supported. // With conditional put in S3-like API we can use the deltalake default logstore which use PutIfAbsent - if !options.0.keys().any(|key| { + if !options.keys().any(|key| { let key = key.to_ascii_lowercase(); [ AmazonS3ConfigKey::ConditionalPut.as_ref(), @@ -480,7 +466,7 @@ pub(crate) trait S3StorageOptionsConversion { ] .contains(&key.as_str()) }) { - options.0.insert("conditional_put".into(), "etag".into()); + options.insert("conditional_put".into(), "etag".into()); } options } @@ -793,13 +779,12 @@ mod tests { std::env::set_var(constants::AWS_ENDPOINT_URL, "env_key"); std::env::set_var(constants::AWS_SECRET_ACCESS_KEY, "env_key"); std::env::set_var(constants::AWS_REGION, "env_key"); - let combined_options = - S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options)); + let combined_options = S3ObjectStoreFactory {}.with_env_s3(&raw_options); // Four and then the conditional_put built-in - assert_eq!(combined_options.0.len(), 5); + assert_eq!(combined_options.len(), 5); - for (key, v) in combined_options.0 { + for (key, v) in combined_options { if key != "conditional_put" { assert_eq!(v, "env_key"); } @@ -823,10 +808,9 @@ mod tests { std::env::set_var("aws_secret_access_key", "env_key"); std::env::set_var("aws_region", "env_key"); - let combined_options = - S3ObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options)); + let combined_options = S3ObjectStoreFactory {}.with_env_s3(&raw_options); - for (key, v) in combined_options.0 { + for (key, v) in combined_options { if key != "conditional_put" { assert_eq!(v, "options_key"); } @@ -838,26 +822,23 @@ mod tests { #[serial] fn test_is_aws() { clear_env_of_aws_keys(); - let options = StorageOptions::default(); + let options = HashMap::default(); assert!(is_aws(&options)); let minio: HashMap = hashmap! { constants::AWS_ENDPOINT_URL.to_string() => "http://minio:8080".to_string(), }; - let options = StorageOptions::from(minio); - assert!(!is_aws(&options)); + assert!(!is_aws(&minio)); let minio: HashMap = hashmap! { "aws_endpoint".to_string() => "http://minio:8080".to_string(), }; - let options = StorageOptions::from(minio); - assert!(!is_aws(&options)); + assert!(!is_aws(&minio)); let localstack: HashMap = hashmap! { constants::AWS_FORCE_CREDENTIAL_LOAD.to_string() => "true".to_string(), "aws_endpoint".to_string() => "http://minio:8080".to_string(), }; - let options = StorageOptions::from(localstack); - assert!(is_aws(&options)); + assert!(is_aws(&localstack)); } } diff --git a/crates/aws/tests/integration_s3_dynamodb.rs b/crates/aws/tests/integration_s3_dynamodb.rs index f78598d6ab..1cef6b49e0 100644 --- a/crates/aws/tests/integration_s3_dynamodb.rs +++ b/crates/aws/tests/integration_s3_dynamodb.rs @@ -12,7 +12,7 @@ use deltalake_aws::storage::S3StorageOptions; use deltalake_aws::{CommitEntry, DynamoDbConfig, DynamoDbLockClient}; use deltalake_core::kernel::transaction::CommitBuilder; use deltalake_core::kernel::{Action, Add, DataType, PrimitiveType, StructField, StructType}; -use deltalake_core::logstore::{commit_uri_from_version, StorageOptions}; +use deltalake_core::logstore::{commit_uri_from_version, StorageConfig}; use deltalake_core::logstore::{logstore_for, CommitOrBytes, LogStore}; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::protocol::{DeltaOperation, SaveMode}; @@ -162,10 +162,10 @@ async fn test_repair_commit_entry() -> TestResult<()> { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let client = make_client()?; let table = prepare_table(&context, "repair_needed").await?; - let options: StorageOptions = OPTIONS.clone().into(); + let options: StorageConfig = OPTIONS.clone().into_iter().collect(); let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new( ensure_table_uri(table.table_uri())?, - options.clone(), + &options, &S3_OPTIONS, std::sync::Arc::new(table.object_store()), )?; @@ -237,10 +237,10 @@ async fn test_abort_commit_entry() -> TestResult<()> { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let client = make_client()?; let table = prepare_table(&context, "abort_entry").await?; - let options: StorageOptions = OPTIONS.clone().into(); + let options: StorageConfig = OPTIONS.clone().into_iter().collect(); let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new( ensure_table_uri(table.table_uri())?, - options.clone(), + &options, &S3_OPTIONS, std::sync::Arc::new(table.object_store()), )?; @@ -284,10 +284,10 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> { let context = IntegrationContext::new(Box::new(S3Integration::default()))?; let client = make_client()?; let table = prepare_table(&context, "abort_entry_fail").await?; - let options: StorageOptions = OPTIONS.clone().into(); + let options: StorageConfig = OPTIONS.clone().into_iter().collect(); let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new( ensure_table_uri(table.table_uri())?, - options.clone(), + &options, &S3_OPTIONS, std::sync::Arc::new(table.object_store()), )?; diff --git a/crates/aws/tests/repair_s3_rename_test.rs b/crates/aws/tests/repair_s3_rename_test.rs index 1ade998851..6f50c5b8d5 100644 --- a/crates/aws/tests/repair_s3_rename_test.rs +++ b/crates/aws/tests/repair_s3_rename_test.rs @@ -3,8 +3,8 @@ use bytes::Bytes; use deltalake_aws::storage::S3StorageBackend; use deltalake_core::logstore::object_store::{ - DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId, - ObjectMeta, PutOptions, PutResult, Result as ObjectStoreResult, + DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, ObjectMeta, + PutOptions, PutResult, Result as ObjectStoreResult, }; use deltalake_core::{DeltaTableBuilder, ObjectStore, Path}; use deltalake_test::utils::IntegrationContext; @@ -13,7 +13,6 @@ use object_store::{MultipartUpload, PutMultipartOpts, PutPayload}; use serial_test::serial; use std::ops::Range; use std::sync::{Arc, Mutex}; -use tokio::io::AsyncWrite; use tokio::task::JoinHandle; use tokio::time::Duration; diff --git a/crates/azure/src/lib.rs b/crates/azure/src/lib.rs index 7046d184b1..124892124c 100644 --- a/crates/azure/src/lib.rs +++ b/crates/azure/src/lib.rs @@ -2,14 +2,13 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::logstore::{ - factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, - RetryConfigParse, StorageOptions, + default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory, + ObjectStoreFactory, ObjectStoreRef, StorageConfig, }; use deltalake_core::{DeltaResult, DeltaTableError, Path}; use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder}; -use object_store::ObjectStoreScheme; +use object_store::{ObjectStoreScheme, RetryConfig}; use url::Url; mod config; @@ -19,10 +18,9 @@ trait AzureOptions { fn as_azure_options(&self) -> HashMap; } -impl AzureOptions for StorageOptions { +impl AzureOptions for HashMap { fn as_azure_options(&self) -> HashMap { - self.0 - .iter() + self.iter() .filter_map(|(key, value)| { Some(( AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?, @@ -36,34 +34,30 @@ impl AzureOptions for StorageOptions { #[derive(Clone, Default, Debug)] pub struct AzureFactory {} -impl RetryConfigParse for AzureFactory {} - impl ObjectStoreFactory for AzureFactory { fn parse_url_opts( &self, url: &Url, - options: &StorageOptions, + options: &HashMap, + retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?; + let mut builder = MicrosoftAzureBuilder::new() + .with_url(url.to_string()) + .with_retry(retry.clone()); + for (key, value) in config.iter() { + builder = builder.with_config(*key, value.clone()); + } + let store = builder.build()?; + let (_, path) = ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError { source: Box::new(e), })?; let prefix = Path::parse(path)?; - let mut builder = MicrosoftAzureBuilder::new().with_url(url.to_string()); - - for (key, value) in config.iter() { - builder = builder.with_config(*key, value.clone()); - } - - let inner = builder - .with_retry(self.parse_retry_config(options)?) - .build()?; - - let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), options); - Ok((store, prefix)) + Ok((Arc::new(store), prefix)) } } @@ -72,7 +66,7 @@ impl LogStoreFactory for AzureFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { Ok(default_logstore(store, location, options)) } @@ -83,7 +77,7 @@ pub fn register_handlers(_additional_prefixes: Option) { let factory = Arc::new(AzureFactory {}); for scheme in ["az", "adl", "azure", "abfs", "abfss"].iter() { let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), factory.clone()); - logstores().insert(url.clone(), factory.clone()); + object_store_factories().insert(url.clone(), factory.clone()); + logstore_factories().insert(url.clone(), factory.clone()); } } diff --git a/crates/azure/tests/integration.rs b/crates/azure/tests/integration.rs index b07a2faf1a..ebf9960650 100644 --- a/crates/azure/tests/integration.rs +++ b/crates/azure/tests/integration.rs @@ -46,7 +46,6 @@ async fn test_concurrency_azure() -> TestResult { // comment](https://github.com/delta-io/delta-rs/pull/1564#issuecomment-1721048753) and we should // figure out a way to re-enable this test at least in the GitHub Actions CI environment #[ignore] -#[cfg(feature = "azure")] #[tokio::test] #[serial] async fn test_object_store_onelake() -> TestResult { @@ -60,7 +59,6 @@ async fn test_object_store_onelake() -> TestResult { // comment](https://github.com/delta-io/delta-rs/pull/1564#issuecomment-1721048753) and we should // figure out a way to re-enable this test at least in the GitHub Actions CI environment #[ignore] -#[cfg(feature = "azure")] #[tokio::test] #[serial] async fn test_object_store_onelake_abfs() -> TestResult { diff --git a/crates/catalog-unity/src/lib.rs b/crates/catalog-unity/src/lib.rs index ea642f8771..3ec2fcaeb4 100644 --- a/crates/catalog-unity/src/lib.rs +++ b/crates/catalog-unity/src/lib.rs @@ -8,7 +8,10 @@ compile_error!( ); use datafusion_common::DataFusionError; -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; +use deltalake_core::logstore::{ + default_logstore, logstore_factories, object_store::RetryConfig, LogStore, LogStoreFactory, + StorageConfig, +}; use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION}; use reqwest::Url; use std::collections::HashMap; @@ -32,8 +35,7 @@ use deltalake_core::{ use crate::client::retry::*; use deltalake_core::logstore::{ - factories, str_is_truthy, IORuntime, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse, - StorageOptions, + object_store_factories, str_is_truthy, IORuntime, ObjectStoreFactory, ObjectStoreRef, }; pub mod client; pub mod credential; @@ -835,29 +837,29 @@ impl UnityCatalog { #[derive(Clone, Default, Debug)] pub struct UnityCatalogFactory {} -impl RetryConfigParse for UnityCatalogFactory {} - impl ObjectStoreFactory for UnityCatalogFactory { fn parse_url_opts( &self, table_uri: &Url, - options: &StorageOptions, + options: &HashMap, + _retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future( UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()), )??; - let mut storage_options = options.0.clone(); + let mut storage_options = options.clone(); storage_options.extend(temp_creds); + // TODO(roeap): we should not have to go through the table here. + // ideally we just create the right storage ... let mut builder = DeltaTableBuilder::from_uri(&table_path).with_io_runtime(IORuntime::default()); if !storage_options.is_empty() { builder = builder.with_storage_options(storage_options.clone()); } - let prefix = Path::parse(table_uri.path())?; - let store = builder.build()?.object_store(); + let store = builder.build_storage()?.object_store(None); Ok((store, prefix)) } @@ -868,7 +870,7 @@ impl LogStoreFactory for UnityCatalogFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { Ok(default_logstore(store, location, options)) } @@ -877,10 +879,9 @@ impl LogStoreFactory for UnityCatalogFactory { /// Register an [ObjectStoreFactory] for common UnityCatalogFactory [Url] schemes pub fn register_handlers(_additional_prefixes: Option) { let factory = Arc::new(UnityCatalogFactory::default()); - let scheme = "uc"; - let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), factory.clone()); - logstores().insert(url.clone(), factory.clone()); + let url = Url::parse(&format!("uc://")).unwrap(); + object_store_factories().insert(url.clone(), factory.clone()); + logstore_factories().insert(url.clone(), factory.clone()); } #[async_trait::async_trait] diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 778a833d9d..e7374d40fd 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,7 +12,7 @@ repository.workspace = true rust-version.workspace = true [package.metadata.docs.rs] -features = ["datafusion", "json", "unity-experimental"] +features = ["datafusion", "json"] [dependencies] delta_kernel.workspace = true @@ -30,6 +30,7 @@ arrow-row = { workspace = true } arrow-schema = { workspace = true, features = ["serde"] } arrow-select = { workspace = true } parquet = { workspace = true, features = ["async", "object_store"] } +object_store = { workspace = true, features = ["cloud"] } pin-project-lite = "^0.2.7" # datafusion @@ -72,6 +73,10 @@ tokio = { workspace = true, features = [ "parking_lot", ] } +# caching +foyer = { version = "0.16.1", optional = true, features = ["serde"] } +tempfile = { version = "3.19.1", optional = true } + # other deps (these should be organized and pulled into workspace.dependencies as necessary) cfg-if = "1" dashmap = "6" @@ -83,7 +88,6 @@ itertools = "0.14" libc = ">=0.2.90, <1" num-bigint = "0.4" num-traits = "0.2.15" -object_store = { workspace = true } parking_lot = "0.12" percent-encoding = "2" roaring = "0.10.1" @@ -127,3 +131,6 @@ python = ["arrow/pyarrow"] native-tls = ["delta_kernel/default-engine"] rustls = ["delta_kernel/default-engine-rustls"] cloud = ["object_store/cloud"] + +# enable caching some file I/O operations when scanning delta logs +delta-cache = ["foyer", "tempfile", "url/serde"] diff --git a/crates/core/src/data_catalog/storage/mod.rs b/crates/core/src/data_catalog/storage/mod.rs index d40bfa6a2b..996331aa46 100644 --- a/crates/core/src/data_catalog/storage/mod.rs +++ b/crates/core/src/data_catalog/storage/mod.rs @@ -13,7 +13,7 @@ use futures::TryStreamExt; use object_store::ObjectStore; use crate::errors::DeltaResult; -use crate::logstore::storage::*; +use crate::logstore::{store_for, StorageConfig}; use crate::open_table_with_storage_options; use crate::table::builder::ensure_table_uri; @@ -38,24 +38,23 @@ pub struct ListingSchemaProvider { /// A map of table names to a fully quilfied storage location tables: DashMap, /// Options used to create underlying object stores - storage_options: StorageOptions, + storage_options: StorageConfig, } impl ListingSchemaProvider { /// Create a new [`ListingSchemaProvider`] pub fn try_new( root_uri: impl AsRef, - storage_options: Option>, + options: Option>, ) -> DeltaResult { let uri = ensure_table_uri(root_uri)?; - let storage_options: StorageOptions = storage_options.unwrap_or_default().into(); - // We already parsed the url, so unwrapping is safe. - let store = store_for(&uri, &storage_options)?; + let options = options.unwrap_or_default(); + let store = store_for(&uri, &options)?; Ok(Self { authority: uri.to_string(), store, tables: DashMap::new(), - storage_options, + storage_options: StorageConfig::parse_options(options)?, }) } @@ -116,7 +115,7 @@ impl SchemaProvider for ListingSchemaProvider { return Ok(None); }; let provider = - open_table_with_storage_options(location, self.storage_options.0.clone()).await?; + open_table_with_storage_options(location, self.storage_options.raw.clone()).await?; Ok(Some(Arc::new(provider) as Arc)) } diff --git a/crates/core/src/delta_datafusion/planner.rs b/crates/core/src/delta_datafusion/planner.rs index c167b4bb7c..c0eef2b6c0 100644 --- a/crates/core/src/delta_datafusion/planner.rs +++ b/crates/core/src/delta_datafusion/planner.rs @@ -6,7 +6,7 @@ //! #[derive(Clone)] //! struct MergeMetricExtensionPlanner {} //! -//! #[async_trait] +//! #[macro@async_trait] //! impl ExtensionPlanner for MergeMetricExtensionPlanner { //! async fn plan_extension( //! &self, diff --git a/crates/core/src/errors.rs b/crates/core/src/errors.rs index c97d753408..3a5f494e06 100644 --- a/crates/core/src/errors.rs +++ b/crates/core/src/errors.rs @@ -6,7 +6,7 @@ use crate::kernel::transaction::{CommitBuilderError, TransactionError}; use crate::protocol::ProtocolError; /// A result returned by delta-rs -pub type DeltaResult = Result; +pub type DeltaResult = Result; /// Delta Table specific error #[allow(missing_docs)] diff --git a/crates/core/src/kernel/transaction/mod.rs b/crates/core/src/kernel/transaction/mod.rs index 75abc1dcbf..a7e9024ed6 100644 --- a/crates/core/src/kernel/transaction/mod.rs +++ b/crates/core/src/kernel/transaction/mod.rs @@ -926,7 +926,7 @@ impl std::future::IntoFuture for PostCommit { #[cfg(test)] mod tests { - use std::{collections::HashMap, sync::Arc}; + use std::sync::Arc; use super::*; use crate::logstore::{commit_uri_from_version, default_logstore::DefaultLogStore, LogStore}; @@ -949,7 +949,7 @@ mod tests { store.clone(), crate::logstore::LogStoreConfig { location: url, - options: HashMap::new().into(), + options: Default::default(), }, ); let version_path = Path::from("_delta_log/00000000000000000000.json"); diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index b58d11635a..7be00d458b 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -68,7 +68,6 @@ // #![deny(missing_docs)] #![allow(rustdoc::invalid_html_tags)] #![allow(clippy::nonminimal_bool)] - pub mod data_catalog; pub mod errors; pub mod kernel; @@ -86,6 +85,7 @@ pub mod delta_datafusion; pub mod writer; use std::collections::HashMap; +use std::sync::OnceLock; pub use self::data_catalog::{DataCatalog, DataCatalogError}; pub use self::errors::*; @@ -96,14 +96,14 @@ pub use self::table::config::TableProperty; pub use self::table::DeltaTable; pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore}; pub use operations::DeltaOps; -use std::sync::OnceLock; + +pub use protocol::checkpoints; // convenience exports for consumers to avoid aligning crate versions pub use arrow; #[cfg(feature = "datafusion")] pub use datafusion; pub use parquet; -pub use protocol::checkpoints; /// Creates and loads a DeltaTable from the given path with current metadata. /// Infers the storage backend to use from the scheme in the given table path. diff --git a/crates/core/src/logstore/config.rs b/crates/core/src/logstore/config.rs new file mode 100644 index 0000000000..6f63a2c147 --- /dev/null +++ b/crates/core/src/logstore/config.rs @@ -0,0 +1,298 @@ +//! Configuration for the Delta Log Store. +//! +//! This module manages the various pieces of configuration for the Delta Log Store. +//! It provides methods for parsing and updating configuration settings. All configuration +//! is parsed from String -> String mappings. +//! +//! Specific pieces of configuration must implement the `TryUpdateKey` trait which +//! defines how to update internal fields based on key-value pairs. +use std::collections::HashMap; + +use ::object_store::RetryConfig; +use object_store::{path::Path, prefix::PrefixStore, ObjectStore, ObjectStoreScheme}; +use tokio::runtime::Handle; + +use super::storage::runtime::RuntimeConfig; +use super::storage::LimitConfig; +use crate::{DeltaResult, DeltaTableError}; + +pub(super) trait TryUpdateKey { + /// Update an internal field in the configuration. + /// + /// ## Returns + /// - `Ok(Some(()))` if the key was updated. + /// - `Ok(None)` if the key was not found and no internal field was updated. + /// - `Err(_)` if the update failed. Failed updates may include finding a known key, + /// but failing to parse the value into the expected type. + fn try_update_key(&mut self, key: &str, value: &str) -> DeltaResult>; +} + +impl FromIterator<(K, V)> for RuntimeConfig +where + K: AsRef + Into, + V: AsRef + Into, +{ + fn from_iter>(iter: I) -> Self { + ParseResult::from_iter(iter).config + } +} + +/// Generic container for parsing configuration +pub(super) struct ParseResult { + /// Parsed configuration + pub config: T, + /// Unrecognized key value pairs. + pub unparsed: HashMap, + /// Errors encountered during parsing + pub errors: Vec<(String, String)>, + /// Whether the configuration is defaults only - i.e. no custom values were provided + pub is_default: bool, +} + +impl ParseResult { + pub fn raise_errors(&self) -> DeltaResult<()> { + if !self.errors.is_empty() { + return Err(DeltaTableError::Generic(format!( + "Failed to parse config: {:?}", + self.errors + ))); + } + Ok(()) + } +} + +impl FromIterator<(K, V)> for ParseResult +where + T: TryUpdateKey + Default, + K: AsRef + Into, + V: AsRef + Into, +{ + fn from_iter>(iter: I) -> Self { + let mut config = T::default(); + let mut unparsed = HashMap::new(); + let mut errors = Vec::new(); + let mut is_default = true; + for (k, v) in iter { + match config.try_update_key(k.as_ref(), v.as_ref()) { + Ok(None) => { + unparsed.insert(k.into(), v.into()); + } + Ok(Some(_)) => is_default = false, + Err(e) => errors.push((k.into(), e.to_string())), + } + } + ParseResult { + config, + unparsed, + errors, + is_default, + } + } +} + +#[derive(Default, Debug, Clone)] +pub struct StorageConfig { + /// Runtime configuration. + /// + /// Configuration to set up a dedicated IO runtime to execute IO related operations. + pub runtime: Option, + + pub retry: ::object_store::RetryConfig, + + /// Limit configuration. + /// + /// Configuration to limit the number of concurrent requests to the object store. + pub limit: Option, + + /// Properties that are not recognized by the storage configuration. + /// + /// These properties are ignored by the storage configuration and can be used for custom purposes. + pub unknown_properties: HashMap, + + /// Original unprocessed properties. + /// + /// Since we remove properties during processing, but downstream integrations may + /// use them for their own purposes, we keep a copy of the original properties. + pub raw: HashMap, +} + +impl StorageConfig { + /// Wrap an object store with additional layers of functionality. + /// + /// Depending on the configuration, the following layers may be added: + /// - Retry layer: Adds retry logic to the object store. + /// - Limit layer: Limits the number of concurrent requests to the object store. + /// - Runtime layer: Executes IO related operations on a dedicated runtime. + pub fn decorate_store( + &self, + store: T, + table_root: &url::Url, + handle: Option, + ) -> DeltaResult> { + let inner = if let Some(runtime) = &self.runtime { + Box::new(runtime.decorate(store, handle)) as Box + } else { + Box::new(store) as Box + }; + + let inner = Self::decorate_prefix(inner, table_root)?; + + Ok(inner) + } + + pub(crate) fn decorate_prefix( + store: T, + table_root: &url::Url, + ) -> DeltaResult> { + let prefix = match ObjectStoreScheme::parse(table_root) { + Ok((ObjectStoreScheme::AmazonS3, _)) => Path::parse(table_root.path())?, + Ok((_, path)) => path, + _ => Path::parse(table_root.path())?, + }; + Ok(if prefix != Path::from("/") { + Box::new(PrefixStore::new(store, prefix)) as Box + } else { + Box::new(store) as Box + }) + } +} + +impl FromIterator<(K, V)> for StorageConfig +where + K: AsRef + Into, + V: AsRef + Into, +{ + fn from_iter>(iter: I) -> Self { + let mut config = Self::default(); + config.raw = iter + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + + let result = ParseResult::::from_iter(&config.raw); + config.runtime = (!result.is_default).then_some(result.config); + + let result = ParseResult::::from_iter(result.unparsed); + config.limit = (!result.is_default).then_some(result.config); + + let remainder = result.unparsed; + + #[cfg(feature = "cloud")] + let remainder = { + let result = ParseResult::::from_iter(remainder); + config.retry = result.config; + result.unparsed + }; + + config.unknown_properties = remainder; + config + } +} + +impl StorageConfig { + pub fn raw(&self) -> impl Iterator { + self.raw.iter() + } + + /// Parse options into a StorageConfig. + /// + /// This method will raise if it cannot parse a value. StorageConfig can also + /// be constructed from an iterator of key-value pairs which will ignore any + /// parsing errors. + /// + /// # Raises + /// + /// Raises a `DeltaError` if any of the options are invalid - i.e. cannot be parsed into target type. + pub fn parse_options(options: I) -> DeltaResult + where + I: IntoIterator, + K: AsRef + Into, + V: AsRef + Into, + { + let mut props = StorageConfig::default(); + props.raw = options + .into_iter() + .map(|(k, v)| (k.into(), v.into())) + .collect(); + + let (runtime, remainder): (RuntimeConfig, _) = try_parse_impl(&props.raw)?; + // NOTE: we only want to assign an actual runtime config we consumed an option + if props.raw.len() > remainder.len() { + props.runtime = Some(runtime); + } + + let result = ParseResult::::from_iter(remainder); + result.raise_errors()?; + props.limit = (!result.is_default).then_some(result.config); + let remainder = result.unparsed; + + #[cfg(feature = "cloud")] + let remainder = { + let (retry, remainder): (RetryConfig, _) = try_parse_impl(remainder)?; + props.retry = retry; + remainder + }; + + props.unknown_properties = remainder; + Ok(props) + } +} + +pub(super) fn try_parse_impl(options: I) -> DeltaResult<(T, HashMap)> +where + I: IntoIterator, + K: AsRef + Into, + V: AsRef + Into, + T: TryUpdateKey + Default, +{ + let result = ParseResult::from_iter(options); + result.raise_errors()?; + Ok((result.config, result.unparsed)) +} + +pub(super) fn parse_usize(value: &str) -> DeltaResult { + value + .parse::() + .map_err(|_| DeltaTableError::Generic(format!("failed to parse \"{value}\" as usize"))) +} + +pub(super) fn parse_f64(value: &str) -> DeltaResult { + value + .parse::() + .map_err(|_| DeltaTableError::Generic(format!("failed to parse \"{value}\" as f64"))) +} + +pub(super) fn parse_duration(value: &str) -> DeltaResult { + humantime::parse_duration(value) + .map_err(|_| DeltaTableError::Generic(format!("failed to parse \"{value}\" as Duration"))) +} + +pub(super) fn parse_bool(value: &str) -> DeltaResult { + Ok(super::storage::utils::str_is_truthy(value)) +} + +#[cfg(all(test, feature = "cloud"))] +mod tests { + use maplit::hashmap; + use object_store::RetryConfig; + use std::time::Duration; + + #[test] + fn test_retry_config_from_options() { + let options = hashmap! { + "max_retries".to_string() => "100".to_string() , + "retry_timeout".to_string() => "300s".to_string() , + "backoff_config.init_backoff".to_string() => "20s".to_string() , + "backoff_config.max_backoff".to_string() => "1h".to_string() , + "backoff_config.base".to_string() => "50.0".to_string() , + }; + let (retry_config, remainder): (RetryConfig, _) = super::try_parse_impl(options).unwrap(); + assert!(remainder.is_empty()); + + assert_eq!(retry_config.max_retries, 100); + assert_eq!(retry_config.retry_timeout, Duration::from_secs(300)); + assert_eq!(retry_config.backoff.init_backoff, Duration::from_secs(20)); + assert_eq!(retry_config.backoff.max_backoff, Duration::from_secs(3600)); + assert_eq!(retry_config.backoff.base, 50_f64); + } +} diff --git a/crates/core/src/logstore/factories.rs b/crates/core/src/logstore/factories.rs new file mode 100644 index 0000000000..9458fc1495 --- /dev/null +++ b/crates/core/src/logstore/factories.rs @@ -0,0 +1,144 @@ +use std::{ + collections::HashMap, + sync::{Arc, OnceLock}, +}; + +use dashmap::DashMap; +use object_store::{path::Path, RetryConfig}; +use url::Url; + +use super::{default_logstore, LogStore, ObjectStoreRef, StorageConfig}; +use crate::{DeltaResult, DeltaTableError}; + +/// Factory registry to manage [`ObjectStoreFactory`] instances +pub type ObjectStoreFactoryRegistry = Arc>>; + +/// Factory trait for creating [`ObjectStore`](::object_store::ObjectStore) instances at runtime +pub trait ObjectStoreFactory: Send + Sync { + /// Parse URL options and create an object store instance. + /// + /// The object store instance returned by this method must point at the root of the storage location. + /// Root in this case means scheme, authority/host and maybe port. + /// The path segment is returned as second element of the tuple. It must point at the path + /// corresponding to the path segment of the URL. + /// + /// The store should __NOT__ apply the decorations via the passed `StorageConfig` + fn parse_url_opts( + &self, + url: &Url, + options: &HashMap, + retry: &RetryConfig, + ) -> DeltaResult<(ObjectStoreRef, Path)>; +} + +#[derive(Clone, Debug, Default)] +pub(crate) struct DefaultObjectStoreFactory {} + +impl ObjectStoreFactory for DefaultObjectStoreFactory { + fn parse_url_opts( + &self, + url: &Url, + options: &HashMap, + _retry: &RetryConfig, + ) -> DeltaResult<(ObjectStoreRef, Path)> { + match url.scheme() { + "memory" | "file" => { + let (store, path) = object_store::parse_url_opts(url, options)?; + tracing::debug!( + "building store with:\n\tParsed URL: {url}\n\tPath in store: {path}" + ); + Ok((Arc::new(store), path)) + } + _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())), + } + } +} + +/// Access global registry of object store factories +pub fn object_store_factories() -> ObjectStoreFactoryRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + let factory = Arc::new(DefaultObjectStoreFactory::default()); + REGISTRY + .get_or_init(|| { + let registry = ObjectStoreFactoryRegistry::default(); + registry.insert(Url::parse("memory://").unwrap(), factory.clone()); + registry.insert(Url::parse("file://").unwrap(), factory); + registry + }) + .clone() +} + +/// Simpler access pattern for the [ObjectStoreFactoryRegistry] to get a single store +pub fn store_for(url: &Url, options: I) -> DeltaResult +where + I: IntoIterator, + K: AsRef + Into, + V: AsRef + Into, +{ + let scheme = Url::parse(&format!("{}://", url.scheme())).unwrap(); + let storage_config = StorageConfig::parse_options(options)?; + if let Some(factory) = object_store_factories().get(&scheme) { + let (store, _prefix) = + factory.parse_url_opts(url, &storage_config.raw, &storage_config.retry)?; + let store = storage_config.decorate_store(store, url, None)?; + Ok(Arc::new(store)) + } else { + Err(DeltaTableError::InvalidTableLocation(url.clone().into())) + } +} + +/// Registry of [`LogStoreFactory`] instances +pub type LogStoreFactoryRegistry = Arc>>; + +/// Trait for generating [LogStore] implementations +pub trait LogStoreFactory: Send + Sync { + /// Create a new [`LogStore`] from options. + /// + /// This method is responsible for creating a new instance of the [LogStore] implementation. + /// + /// ## Parameters + /// - `store`: A reference to the object store. + /// - `location`: A reference to the URL of the location. + /// - `options`: A reference to the storage configuration options. + /// + /// It returns a [DeltaResult] containing an [Arc] to the newly created [LogStore] implementation. + fn with_options( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageConfig, + ) -> DeltaResult>; +} + +#[derive(Clone, Debug, Default)] +struct DefaultLogStoreFactory {} + +impl LogStoreFactory for DefaultLogStoreFactory { + fn with_options( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageConfig, + ) -> DeltaResult> { + Ok(default_logstore(store, location, options)) + } +} + +/// Access global registry of logstore factories. +pub fn logstore_factories() -> LogStoreFactoryRegistry { + static REGISTRY: OnceLock = OnceLock::new(); + REGISTRY + .get_or_init(|| { + let registry = LogStoreFactoryRegistry::default(); + registry.insert( + Url::parse("memory://").unwrap(), + Arc::new(DefaultLogStoreFactory::default()), + ); + registry.insert( + Url::parse("file://").unwrap(), + Arc::new(DefaultLogStoreFactory::default()), + ); + registry + }) + .clone() +} diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index ccd3f82505..5abe439f54 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -1,11 +1,58 @@ -//! Delta log store. -use std::cmp::min; +//! # DeltaLake storage system +//! +//! Interacting with storage systems is a crucial part of any table format. +//! On one had the storage abstractions need to provide certain guarantees +//! (e.g. atomic rename, ...) and meet certain assumptions (e.g. sorted list results) +//! on the other hand can we exploit our knowledge about the general file layout +//! and access patterns to optimize our operations in terms of cost and performance. +//! +//! Two distinct phases are involved in querying a Delta table: +//! - **Metadata**: Fetching metadata about the table, such as schema, partitioning, and statistics. +//! - **Data**: Reading and processing data files based on the metadata. +//! +//! When writing to a table, we see the same phases, just in inverse order: +//! - **Data**: Writing data files that should become part of the table. +//! - **Metadata**: Updating table metadata to incorporate updates. +//! +//! Two main abstractions govern the file operations [`LogStore`] and [`ObjectStore`]. +//! +//! [`LogStore`]s are scoped to individual tables and are responsible for maintaining proper +//! behaviours and ensuring consistency during the metadata phase. The correctness is predicated +//! on the atomicity and durability guarantees of the implementation of this interface. +//! +//! - Atomic visibility: Partial writes must not be visible to readers. +//! - Mutual exclusion: Only one writer must be able to write to a specific log file. +//! - Consistent listing: Once a file has been written, any future list files operation must return +//! the underlying file system entry must immediately. +//! +//!
+//! +//! While most object stores today provide the required guarantees, the specific +//! locking mechanics are a table level responsibility. Specific implementations may +//! decide to refer to a central catalog or other mechanisms for coordination. +//! +//!
+//! +//! [`ObjectStore`]s are responsible for direct interactions with storage systems. Either +//! during the data phase, where additional requirements are imposed on the storage system, +//! or by specific LogStore implementations for their internal object store interactions. +//! +//! ## Managing LogStores and ObjectStores. +//! +//! Aside from very basic implementations (i.e. in-memory and local file system) we rely +//! on external integrations to provide [`ObjectStore`] and/or [`LogStore`] implementations. +//! +//! At runtime, deltalake needs to produce appropriate [`ObjectStore`]s to access the files +//! discovered in a table. This is done via +//! +//! ## Configuration +//! +use std::cmp::{max, min}; +use std::collections::HashMap; use std::io::{BufRead, BufReader, Cursor}; -use std::sync::{LazyLock, OnceLock}; -use std::{cmp::max, collections::HashMap, sync::Arc}; +use std::sync::{Arc, LazyLock}; use bytes::Bytes; -use dashmap::DashMap; #[cfg(feature = "datafusion")] use datafusion::datasource::object_store::ObjectStoreUrl; use delta_kernel::AsAny; @@ -15,6 +62,7 @@ use regex::Regex; use serde::de::{Error, SeqAccess, Visitor}; use serde::ser::SerializeSeq; use serde::{Deserialize, Serialize}; +use tokio::runtime::Handle; use tracing::{debug, error, warn}; use url::Url; use uuid::Uuid; @@ -25,29 +73,57 @@ use crate::kernel::Action; use crate::protocol::{get_last_checkpoint, ProtocolError}; use crate::{DeltaResult, DeltaTableError}; -pub(crate) mod default_logstore; -pub(crate) mod storage; - +pub use self::config::StorageConfig; +pub use self::factories::{ + logstore_factories, object_store_factories, store_for, LogStoreFactory, + LogStoreFactoryRegistry, ObjectStoreFactory, ObjectStoreFactoryRegistry, +}; pub use self::storage::utils::{commit_uri_from_version, str_is_truthy}; -#[cfg(feature = "cloud")] -pub use self::storage::RetryConfigParse; pub use self::storage::{ - factories, limit_store_handler, store_for, url_prefix_handler, DefaultObjectStoreRegistry, - DeltaIOStorageBackend, IORuntime, ObjectStoreFactory, ObjectStoreRef, ObjectStoreRegistry, - ObjectStoreRetryExt, StorageOptions, + DefaultObjectStoreRegistry, DeltaIOStorageBackend, IORuntime, ObjectStoreRef, + ObjectStoreRegistry, ObjectStoreRetryExt, }; +/// Convenience re-export of the object store crate pub use ::object_store; -/// Trait for generating [LogStore] implementations -pub trait LogStoreFactory: Send + Sync { - /// Create a new [LogStore] - fn with_options( +mod config; +pub(crate) mod default_logstore; +pub(crate) mod factories; +pub(crate) mod storage; + +/// Internal trait to handle object store configuration and initialization. +trait LogStoreFactoryExt { + fn with_options_internal( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageConfig, + io_runtime: Option, + ) -> DeltaResult>; +} + +impl LogStoreFactoryExt for T { + fn with_options_internal( + &self, + store: ObjectStoreRef, + location: &Url, + options: &StorageConfig, + io_runtime: Option, + ) -> DeltaResult> { + let store = options.decorate_store(store, location, io_runtime.map(|r| r.get_handle()))?; + self.with_options(Arc::new(store), location, options) + } +} + +impl LogStoreFactoryExt for Arc { + fn with_options_internal( &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, + io_runtime: Option, ) -> DeltaResult> { - Ok(default_logstore(store, location, options)) + T::with_options_internal(&self, store, location, options, io_runtime) } } @@ -55,7 +131,7 @@ pub trait LogStoreFactory: Send + Sync { pub fn default_logstore( store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> Arc { Arc::new(default_logstore::DefaultLogStore::new( store, @@ -66,32 +142,6 @@ pub fn default_logstore( )) } -#[derive(Clone, Debug, Default)] -struct DefaultLogStoreFactory {} -impl LogStoreFactory for DefaultLogStoreFactory {} - -/// Registry of [LogStoreFactory] instances -pub type FactoryRegistry = Arc>>; - -/// TODO -pub fn logstores() -> FactoryRegistry { - static REGISTRY: OnceLock = OnceLock::new(); - REGISTRY - .get_or_init(|| { - let registry = FactoryRegistry::default(); - registry.insert( - Url::parse("memory://").unwrap(), - Arc::new(DefaultLogStoreFactory::default()), - ); - registry.insert( - Url::parse("file://").unwrap(), - Arc::new(DefaultLogStoreFactory::default()), - ); - registry - }) - .clone() -} - /// Sharable reference to [`LogStore`] pub type LogStoreRef = Arc; @@ -106,47 +156,58 @@ static DELTA_LOG_PATH: LazyLock = LazyLock::new(|| Path::from("_delta_log" /// # use std::collections::HashMap; /// # use url::Url; /// let location = Url::parse("memory:///").expect("Failed to make location"); -/// let logstore = logstore_for(location, HashMap::new(), None).expect("Failed to get a logstore"); +/// let logstore = logstore_for(location, HashMap::::new(), None).expect("Failed to get a logstore"); /// ``` -pub fn logstore_for( +pub fn logstore_for( location: Url, - options: impl Into + Clone, + options: I, io_runtime: Option, -) -> DeltaResult { +) -> DeltaResult +where + I: IntoIterator, + K: AsRef + Into, + V: AsRef + Into, +{ // turn location into scheme let scheme = Url::parse(&format!("{}://", location.scheme())) .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; - if let Some(entry) = self::storage::factories().get(&scheme) { - debug!("Found a storage provider for {scheme} ({location})"); + let storage_options = StorageConfig::parse_options(options)?; - let (store, _prefix) = entry - .value() - .parse_url_opts(&location, &options.clone().into())?; - return logstore_with(store, location, options, io_runtime); + if let Some(entry) = object_store_factories().get(&scheme) { + debug!("Found a storage provider for {scheme} ({location})"); + let (store, _prefix) = entry.value().parse_url_opts( + &location, + &storage_options.raw, + &storage_options.retry, + )?; + return logstore_with(store, location, storage_options.raw, io_runtime); } + Err(DeltaTableError::InvalidTableLocation(location.into())) } /// Return the [LogStoreRef] using the given [ObjectStoreRef] -pub fn logstore_with( +pub fn logstore_with( store: ObjectStoreRef, location: Url, - options: impl Into + Clone, + options: I, io_runtime: Option, -) -> DeltaResult { +) -> DeltaResult +where + I: IntoIterator, + K: AsRef + Into, + V: AsRef + Into, +{ let scheme = Url::parse(&format!("{}://", location.scheme())) .map_err(|_| DeltaTableError::InvalidTableLocation(location.clone().into()))?; - let store = if let Some(io_runtime) = io_runtime { - Arc::new(DeltaIOStorageBackend::new(store, io_runtime.get_handle())) as ObjectStoreRef - } else { - store - }; - - if let Some(factory) = logstores().get(&scheme) { + let config = StorageConfig::parse_options(options)?; + if let Some(factory) = logstore_factories().get(&scheme) { debug!("Found a logstore provider for {scheme}"); - return factory.with_options(store, &location, &options.into()); + return factory + .value() + .with_options_internal(store, &location, &config, io_runtime); } error!("Could not find a logstore for the scheme {scheme}"); @@ -179,8 +240,24 @@ pub enum PeekCommit { pub struct LogStoreConfig { /// url corresponding to the storage location. pub location: Url, - /// Options used for configuring backend storage - pub options: StorageOptions, + // Options used for configuring backend storage + pub options: StorageConfig, +} + +impl LogStoreConfig { + pub fn decorate_store( + &self, + store: T, + table_root: Option<&url::Url>, + handle: Option, + ) -> DeltaResult> { + let table_url = table_root.unwrap_or(&self.location); + self.options.decorate_store(store, table_url, handle) + } + + pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry { + self::factories::object_store_factories() + } } /// Trait for critical operations required to read and write commit entries in Delta logs. @@ -388,7 +465,7 @@ impl Serialize for LogStoreConfig { { let mut seq = serializer.serialize_seq(None)?; seq.serialize_element(&self.location.to_string())?; - seq.serialize_element(&self.options.0)?; + seq.serialize_element(&self.options.raw)?; seq.end() } } @@ -417,10 +494,11 @@ impl<'de> Deserialize<'de> for LogStoreConfig { let options: HashMap = seq .next_element()? .ok_or_else(|| A::Error::invalid_length(0, &self))?; - let location = Url::parse(&location_str).unwrap(); + let location = Url::parse(&location_str).map_err(|e| A::Error::custom(e))?; Ok(LogStoreConfig { location, - options: options.into(), + options: StorageConfig::parse_options(options) + .map_err(|e| A::Error::custom(e))?, }) } } @@ -592,24 +670,26 @@ pub async fn abort_commit_entry( pub(crate) mod tests { use super::*; + type Opts = HashMap; + #[test] fn logstore_with_invalid_url() { let location = Url::parse("nonexistent://table").unwrap(); - let store = logstore_for(location, HashMap::default(), None); + let store = logstore_for(location, Opts::default(), None); assert!(store.is_err()); } #[test] fn logstore_with_memory() { - let location = Url::parse("memory://table").unwrap(); - let store = logstore_for(location, HashMap::default(), None); + let location = Url::parse("memory:///table").unwrap(); + let store = logstore_for(location, Opts::default(), None); assert!(store.is_ok()); } #[test] fn logstore_with_memory_and_rt() { - let location = Url::parse("memory://table").unwrap(); - let store = logstore_for(location, HashMap::default(), Some(IORuntime::default())); + let location = Url::parse("memory:///table").unwrap(); + let store = logstore_for(location, Opts::default(), Some(IORuntime::default())); assert!(store.is_ok()); } @@ -617,9 +697,8 @@ pub(crate) mod tests { async fn test_is_location_a_table() { use object_store::path::Path; use object_store::{PutOptions, PutPayload}; - let location = Url::parse("memory://table").unwrap(); - let store = - logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + let location = Url::parse("memory:///table").unwrap(); + let store = logstore_for(location, Opts::default(), None).expect("Failed to get logstore"); assert!(!store .is_delta_table_location() .await @@ -647,9 +726,8 @@ pub(crate) mod tests { async fn test_is_location_a_table_commit() { use object_store::path::Path; use object_store::{PutOptions, PutPayload}; - let location = Url::parse("memory://table").unwrap(); - let store = - logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + let location = Url::parse("memory:///table").unwrap(); + let store = logstore_for(location, Opts::default(), None).expect("Failed to get logstore"); assert!(!store .is_delta_table_location() .await @@ -677,9 +755,8 @@ pub(crate) mod tests { async fn test_is_location_a_table_checkpoint() { use object_store::path::Path; use object_store::{PutOptions, PutPayload}; - let location = Url::parse("memory://table").unwrap(); - let store = - logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + let location = Url::parse("memory:///table").unwrap(); + let store = logstore_for(location, Opts::default(), None).expect("Failed to get logstore"); assert!(!store .is_delta_table_location() .await @@ -707,9 +784,8 @@ pub(crate) mod tests { async fn test_is_location_a_table_crc() { use object_store::path::Path; use object_store::{PutOptions, PutPayload}; - let location = Url::parse("memory://table").unwrap(); - let store = - logstore_for(location, HashMap::default(), None).expect("Failed to get logstore"); + let location = Url::parse("memory:///table").unwrap(); + let store = logstore_for(location, Opts::default(), None).expect("Failed to get logstore"); assert!(!store .is_delta_table_location() .await @@ -771,7 +847,7 @@ pub(crate) mod tests { async fn test_peek_with_invalid_json() -> DeltaResult<()> { use crate::logstore::object_store::memory::InMemory; let memory_store = Arc::new(InMemory::new()); - let log_path = Path::from("_delta_log/00000000000000000001.json"); + let log_path = Path::from("delta-table/_delta_log/00000000000000000001.json"); let log_content = r#"{invalid_json"#; diff --git a/crates/core/src/logstore/storage/mod.rs b/crates/core/src/logstore/storage/mod.rs index 6e5f1e3eac..cc61f9d033 100644 --- a/crates/core/src/logstore/storage/mod.rs +++ b/crates/core/src/logstore/storage/mod.rs @@ -1,71 +1,26 @@ //! Object storage backend abstraction layer for Delta Table transaction logs and data -use std::collections::HashMap; -use std::sync::{Arc, LazyLock, OnceLock}; +use std::sync::{Arc, LazyLock}; use dashmap::DashMap; -use object_store::limit::LimitStore; -use object_store::local::LocalFileSystem; -use object_store::memory::InMemory; -use object_store::prefix::PrefixStore; -use serde::{Deserialize, Serialize}; +use object_store::path::Path; +use object_store::{DynObjectStore, ObjectStore}; use url::Url; +use super::config; use crate::{DeltaResult, DeltaTableError}; -use object_store::path::Path; -use object_store::{DynObjectStore, ObjectStore}; pub use retry_ext::ObjectStoreRetryExt; -#[cfg(feature = "cloud")] -pub use retry_ext::RetryConfigParse; pub use runtime::{DeltaIOStorageBackend, IORuntime}; -pub mod retry_ext; -mod runtime; -pub mod utils; +pub(super) mod retry_ext; +pub(super) mod runtime; +pub(super) mod utils; static DELTA_LOG_PATH: LazyLock = LazyLock::new(|| Path::from("_delta_log")); /// Sharable reference to [`ObjectStore`] pub type ObjectStoreRef = Arc; -/// Factory trait for creating [ObjectStoreRef] instances at runtime -pub trait ObjectStoreFactory: Send + Sync { - #[allow(missing_docs)] - fn parse_url_opts( - &self, - url: &Url, - options: &StorageOptions, - ) -> DeltaResult<(ObjectStoreRef, Path)>; -} - -#[derive(Clone, Debug, Default)] -pub(crate) struct DefaultObjectStoreFactory {} - -impl ObjectStoreFactory for DefaultObjectStoreFactory { - fn parse_url_opts( - &self, - url: &Url, - options: &StorageOptions, - ) -> DeltaResult<(ObjectStoreRef, Path)> { - match url.scheme() { - "memory" => { - let path = Path::from_url_path(url.path())?; - let inner = Arc::new(InMemory::new()) as ObjectStoreRef; - let store = limit_store_handler(url_prefix_handler(inner, path.clone()), options); - Ok((store, path)) - } - "file" => { - let inner = Arc::new(LocalFileSystem::new_with_prefix( - url.to_file_path().unwrap(), - )?) as ObjectStoreRef; - let store = limit_store_handler(inner, options); - Ok((store, Path::from("/"))) - } - _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())), - } - } -} - pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static + Clone { /// If a store with the same key existed before, it is replaced and returned fn register_store( @@ -143,121 +98,22 @@ impl ObjectStoreRegistry for DefaultObjectStoreRegistry { } } -/// TODO -pub type FactoryRegistry = Arc>>; - -/// TODO -pub fn factories() -> FactoryRegistry { - static REGISTRY: OnceLock = OnceLock::new(); - REGISTRY - .get_or_init(|| { - let registry = FactoryRegistry::default(); - registry.insert( - Url::parse("memory://").unwrap(), - Arc::new(DefaultObjectStoreFactory::default()), - ); - registry.insert( - Url::parse("file://").unwrap(), - Arc::new(DefaultObjectStoreFactory::default()), - ); - registry - }) - .clone() +#[derive(Debug, Clone, Default)] +pub struct LimitConfig { + pub max_concurrency: Option, } -/// Simpler access pattern for the [FactoryRegistry] to get a single store -pub fn store_for(url: &Url, storage_options: &StorageOptions) -> DeltaResult { - let scheme = Url::parse(&format!("{}://", url.scheme())).unwrap(); - if let Some(factory) = factories().get(&scheme) { - let (store, _prefix) = factory.parse_url_opts(url, storage_options)?; - Ok(store) - } else { - Err(DeltaTableError::InvalidTableLocation(url.clone().into())) - } -} - -/// Options used for configuring backend storage -#[derive(Clone, Debug, Serialize, Deserialize, Default)] -pub struct StorageOptions(pub HashMap); - -impl From> for StorageOptions { - fn from(value: HashMap) -> Self { - Self(value) - } -} - -/// Simple function to wrap the given [ObjectStore] in a [PrefixStore] if necessary -/// -/// This simplifies the use of the storage since it ensures that list/get/etc operations -/// start from the prefix in the object storage rather than from the root configured URI of the -/// [ObjectStore] -pub fn url_prefix_handler(store: T, prefix: Path) -> ObjectStoreRef { - if prefix != Path::from("/") { - Arc::new(PrefixStore::new(store, prefix)) - } else { - Arc::new(store) - } -} - -/// Simple function to wrap the given [ObjectStore] in a [LimitStore] if configured -/// -/// Limits the number of concurrent connections the underlying object store -/// Reference [LimitStore](https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html) for more information -pub fn limit_store_handler(store: T, options: &StorageOptions) -> ObjectStoreRef { - let concurrency_limit = options - .0 - .get(storage_constants::OBJECT_STORE_CONCURRENCY_LIMIT) - .and_then(|v| v.parse().ok()); - - if let Some(limit) = concurrency_limit { - Arc::new(LimitStore::new(store, limit)) - } else { - Arc::new(store) - } -} - -/// Storage option keys to use when creating [ObjectStore]. -/// -/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. -/// Must be implemented for a given storage provider -pub mod storage_constants { - - /// The number of concurrent connections the underlying object store can create - /// Reference [LimitStore](https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html) for more information - pub const OBJECT_STORE_CONCURRENCY_LIMIT: &str = "OBJECT_STORE_CONCURRENCY_LIMIT"; -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_url_prefix_handler() { - let store = InMemory::new(); - let path = Path::parse("/databases/foo/bar").expect("Failed to parse path"); - - let prefixed = url_prefix_handler(store, path.clone()); - - assert_eq!( - String::from("PrefixObjectStore(databases/foo/bar)"), - format!("{prefixed}") - ); - } - - #[test] - fn test_limit_store_handler() { - let store = InMemory::new(); - - let options = StorageOptions(HashMap::from_iter(vec![( - "OBJECT_STORE_CONCURRENCY_LIMIT".into(), - "500".into(), - )])); - - let limited = limit_store_handler(store, &options); - - assert_eq!( - String::from("LimitStore(500, InMemory)"), - format!("{limited}") - ); +impl config::TryUpdateKey for LimitConfig { + fn try_update_key(&mut self, key: &str, v: &str) -> DeltaResult> { + match key { + // The number of concurrent connections the underlying object store can create + // Reference [LimitStore](https://docs.rs/object_store/latest/object_store/limit/struct.LimitStore.html) + // for more information + "OBJECT_STORE_CONCURRENCY_LIMIT" | "concurrency_limit" => { + self.max_concurrency = Some(config::parse_usize(v)?); + } + _ => return Ok(None), + } + Ok(Some(())) } } diff --git a/crates/core/src/logstore/storage/retry_ext.rs b/crates/core/src/logstore/storage/retry_ext.rs index 1acdc62442..e60033cdb0 100644 --- a/crates/core/src/logstore/storage/retry_ext.rs +++ b/crates/core/src/logstore/storage/retry_ext.rs @@ -2,11 +2,11 @@ use ::object_store::path::Path; use ::object_store::{Error, ObjectStore, PutPayload, PutResult, Result}; -use humantime::parse_duration; use tracing::log::*; -use super::StorageOptions; -use crate::{DeltaResult, DeltaTableError}; +use crate::logstore::config; + +impl ObjectStoreRetryExt for T {} /// Retry extension for [`ObjectStore`] /// @@ -77,78 +77,23 @@ pub trait ObjectStoreRetryExt: ObjectStore { } } -impl ObjectStoreRetryExt for T {} - #[cfg(feature = "cloud")] -pub trait RetryConfigParse { - fn parse_retry_config( - &self, - options: &StorageOptions, - ) -> DeltaResult<::object_store::RetryConfig> { - let mut retry_config = ::object_store::RetryConfig::default(); - if let Some(max_retries) = options.0.get("max_retries") { - retry_config.max_retries = max_retries - .parse::() - .map_err(|e| DeltaTableError::generic(e.to_string()))?; - } - - if let Some(retry_timeout) = options.0.get("retry_timeout") { - retry_config.retry_timeout = parse_duration(retry_timeout).map_err(|_| { - DeltaTableError::generic(format!("failed to parse \"{retry_timeout}\" as Duration")) - })?; - } - - if let Some(bc_init_backoff) = options.0.get("backoff_config.init_backoff") { - retry_config.backoff.init_backoff = parse_duration(bc_init_backoff).map_err(|_| { - DeltaTableError::generic(format!( - "failed to parse \"{bc_init_backoff}\" as Duration" - )) - })?; - } - - if let Some(bc_max_backoff) = options.0.get("backoff_config.max_backoff") { - retry_config.backoff.max_backoff = parse_duration(bc_max_backoff).map_err(|_| { - DeltaTableError::generic(format!( - "failed to parse \"{bc_max_backoff}\" as Duration" - )) - })?; - } - - if let Some(bc_base) = options.0.get("backoff_config.base") { - retry_config.backoff.base = bc_base - .parse::() - .map_err(|e| DeltaTableError::generic(e.to_string()))?; +impl config::TryUpdateKey for object_store::RetryConfig { + fn try_update_key(&mut self, key: &str, v: &str) -> crate::DeltaResult> { + match key { + "max_retries" => self.max_retries = config::parse_usize(v)?, + "retry_timeout" => self.retry_timeout = config::parse_duration(v)?, + "init_backoff" | "backoff_config.init_backoff" | "backoff.init_backoff" => { + self.backoff.init_backoff = config::parse_duration(v)? + } + "max_backoff" | "backoff_config.max_backoff" | "backoff.max_backoff" => { + self.backoff.max_backoff = config::parse_duration(v)?; + } + "base" | "backoff_config.base" | "backoff.base" => { + self.backoff.base = config::parse_f64(v)?; + } + _ => return Ok(None), } - - Ok(retry_config) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use std::time::Duration; - - #[test] - fn test_retry_config_from_options() { - struct TestFactory {} - impl RetryConfigParse for TestFactory {} - - let options = maplit::hashmap! { - "max_retries".to_string() => "100".to_string() , - "retry_timeout".to_string() => "300s".to_string() , - "backoff_config.init_backoff".to_string() => "20s".to_string() , - "backoff_config.max_backoff".to_string() => "1h".to_string() , - "backoff_config.base".to_string() => "50.0".to_string() , - }; - let retry_config = TestFactory {} - .parse_retry_config(&StorageOptions(options)) - .unwrap(); - - assert_eq!(retry_config.max_retries, 100); - assert_eq!(retry_config.retry_timeout, Duration::from_secs(300)); - assert_eq!(retry_config.backoff.init_backoff, Duration::from_secs(20)); - assert_eq!(retry_config.backoff.max_backoff, Duration::from_secs(3600)); - assert_eq!(retry_config.backoff.base, 50_f64); + Ok(Some(())) } } diff --git a/crates/core/src/logstore/storage/runtime.rs b/crates/core/src/logstore/storage/runtime.rs index 76068de902..8063f9fa23 100644 --- a/crates/core/src/logstore/storage/runtime.rs +++ b/crates/core/src/logstore/storage/runtime.rs @@ -1,5 +1,5 @@ use std::ops::Range; -use std::sync::{Arc, OnceLock}; +use std::sync::OnceLock; use bytes::Bytes; use futures::future::BoxFuture; @@ -11,12 +11,12 @@ use object_store::{ Error as ObjectStoreError, GetOptions, GetResult, ListResult, ObjectMeta, ObjectStore, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult, }; +use object_store::{MultipartUpload, PutMultipartOpts}; use serde::{Deserialize, Serialize}; use tokio::runtime::{Builder as RuntimeBuilder, Handle, Runtime}; -use object_store::{MultipartUpload, PutMultipartOpts}; - -use super::ObjectStoreRef; +use crate::logstore::config; +use crate::DeltaResult; /// Creates static IO Runtime with optional configuration fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { @@ -24,24 +24,30 @@ fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { IO_RT.get_or_init(|| { let rt = match config { Some(config) => { - let mut builder = if config.multi_threaded { + let mut builder = if let Some(true) = config.multi_threaded { RuntimeBuilder::new_multi_thread() } else { RuntimeBuilder::new_current_thread() }; - let builder = builder.worker_threads(config.worker_threads); - #[allow(unused_mut)] - let mut builder = if config.enable_io && config.enable_time { - builder.enable_all() - } else if !config.enable_io && config.enable_time { - builder.enable_time() - } else { - builder + + if let Some(threads) = config.worker_threads { + builder.worker_threads(threads); + } + + match (config.enable_io, config.enable_time) { + (Some(true), Some(true)) => { + builder.enable_all(); + } + (Some(false), Some(true)) => { + builder.enable_time(); + } + _ => (), }; + #[cfg(unix)] { - if config.enable_io && !config.enable_time { - builder = builder.enable_io(); + if let (Some(true), Some(false)) = (config.enable_io, config.enable_time) { + builder.enable_io(); } } builder @@ -60,13 +66,46 @@ fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime { } /// Configuration for Tokio runtime -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct RuntimeConfig { - multi_threaded: bool, - worker_threads: usize, - thread_name: Option, - enable_io: bool, - enable_time: bool, + /// Whether to use a multi-threaded runtime + pub(crate) multi_threaded: Option, + /// Number of worker threads to use + pub(crate) worker_threads: Option, + /// Name of the thread + pub(crate) thread_name: Option, + /// Whether to enable IO + pub(crate) enable_io: Option, + /// Whether to enable time + pub(crate) enable_time: Option, +} + +impl config::TryUpdateKey for RuntimeConfig { + fn try_update_key(&mut self, key: &str, v: &str) -> DeltaResult> { + match key { + "multi_threaded" => self.multi_threaded = Some(config::parse_bool(v)?), + "worker_threads" => self.worker_threads = Some(config::parse_usize(v)?), + "thread_name" => self.thread_name = Some(v.into()), + "enable_io" => self.enable_io = Some(config::parse_bool(v)?), + "enable_time" => self.enable_time = Some(config::parse_bool(v)?), + _ => return Ok(None), + } + Ok(Some(())) + } +} + +impl RuntimeConfig { + pub fn decorate( + &self, + store: T, + handle: Option, + ) -> DeltaIOStorageBackend { + let handle = handle.unwrap_or_else(|| io_rt(Some(&self)).handle().clone()); + DeltaIOStorageBackend { + inner: store, + rt_handle: handle, + } + } } /// Provide custom Tokio RT or a runtime config @@ -96,37 +135,25 @@ impl IORuntime { } /// Wraps any object store and runs IO in it's own runtime [EXPERIMENTAL] -pub struct DeltaIOStorageBackend { - inner: ObjectStoreRef, - rt_handle: Handle, +#[derive(Clone)] +pub struct DeltaIOStorageBackend { + pub(crate) inner: T, + pub(crate) rt_handle: Handle, } -impl DeltaIOStorageBackend { - /// create wrapped object store which spawns tasks in own runtime - pub fn new(storage: ObjectStoreRef, rt_handle: Handle) -> Self { - Self { - inner: storage, - rt_handle, - } - } - +impl DeltaIOStorageBackend { /// spawn tasks on IO runtime pub fn spawn_io_rt( &self, f: F, - store: &Arc, + store: &T, path: Path, ) -> BoxFuture<'_, ObjectStoreResult> where - F: for<'a> FnOnce( - &'a Arc, - &'a Path, - ) -> BoxFuture<'a, ObjectStoreResult> - + Send - + 'static, + F: for<'a> FnOnce(&'a T, &'a Path) -> BoxFuture<'a, ObjectStoreResult> + Send + 'static, O: Send + 'static, { - let store = Arc::clone(store); + let store = store.clone(); let fut = self.rt_handle.spawn(async move { f(&store, &path).await }); fut.unwrap_or_else(|e| match e.try_into_panic() { Ok(p) => std::panic::resume_unwind(p), @@ -139,21 +166,17 @@ impl DeltaIOStorageBackend { pub fn spawn_io_rt_from_to( &self, f: F, - store: &Arc, + store: &T, from: Path, to: Path, ) -> BoxFuture<'_, ObjectStoreResult> where - F: for<'a> FnOnce( - &'a Arc, - &'a Path, - &'a Path, - ) -> BoxFuture<'a, ObjectStoreResult> + F: for<'a> FnOnce(&'a T, &'a Path, &'a Path) -> BoxFuture<'a, ObjectStoreResult> + Send + 'static, O: Send + 'static, { - let store = Arc::clone(store); + let store = store.clone(); let fut = self .rt_handle .spawn(async move { f(&store, &from, &to).await }); @@ -165,20 +188,20 @@ impl DeltaIOStorageBackend { } } -impl std::fmt::Debug for DeltaIOStorageBackend { +impl std::fmt::Debug for DeltaIOStorageBackend { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!(fmt, "DeltaIOStorageBackend") + write!(fmt, "DeltaIOStorageBackend({:?})", self.inner) } } -impl std::fmt::Display for DeltaIOStorageBackend { +impl std::fmt::Display for DeltaIOStorageBackend { fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - write!(fmt, "DeltaIOStorageBackend") + write!(fmt, "DeltaIOStorageBackend({})", self.inner) } } #[async_trait::async_trait] -impl ObjectStore for DeltaIOStorageBackend { +impl ObjectStore for DeltaIOStorageBackend { async fn put(&self, location: &Path, bytes: PutPayload) -> ObjectStoreResult { self.spawn_io_rt( |store, path| store.put(path, bytes), diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 5096b979cd..63e41b4cb3 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -470,11 +470,8 @@ mod tests { use tempfile::tempdir; use super::*; - use crate::{ - kernel::{DataType, PrimitiveType}, - logstore::StorageOptions, - open_table, Path, - }; + use crate::kernel::{DataType, PrimitiveType}; + use crate::{open_table, Path}; fn schema_field(key: &str, primitive: PrimitiveType, nullable: bool) -> StructField { StructField::new(key.to_string(), DataType::Primitive(primitive), nullable) @@ -501,7 +498,7 @@ mod tests { fn log_store(path: impl Into) -> LogStoreRef { let path: String = path.into(); let location = ensure_table_uri(path).expect("Failed to get the URI from the path"); - crate::logstore::logstore_for(location, StorageOptions::default(), None) + crate::logstore::logstore_for(location, HashMap::::new(), None) .expect("Failed to create an object store") } diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 2cc0e1b892..ca8d8b4830 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -460,7 +460,7 @@ mod tests { async fn test_create_table_metadata() { let schema = get_delta_schema(); let table = CreateBuilder::new() - .with_location("memory://") + .with_location("memory:///") .with_columns(schema.fields().cloned()) .await .unwrap(); @@ -483,7 +483,7 @@ mod tests { reader_features: None, }; let table = CreateBuilder::new() - .with_location("memory://") + .with_location("memory:///") .with_columns(schema.fields().cloned()) .with_actions(vec![Action::Protocol(protocol)]) .await @@ -492,7 +492,7 @@ mod tests { assert_eq!(table.protocol().unwrap().min_writer_version, 0); let table = CreateBuilder::new() - .with_location("memory://") + .with_location("memory:///") .with_columns(schema.fields().cloned()) .with_configuration_property(TableProperty::AppendOnly, Some("true")) .await @@ -610,7 +610,7 @@ mod tests { // Fail to create table with unknown Delta key let table = CreateBuilder::new() - .with_location("memory://") + .with_location("memory:///") .with_columns(schema.fields().cloned()) .with_configuration(config.clone()) .await; @@ -618,7 +618,7 @@ mod tests { // Succeed in creating table with unknown Delta key since we set raise_if_key_not_exists to false let table = CreateBuilder::new() - .with_location("memory://") + .with_location("memory:///") .with_columns(schema.fields().cloned()) .with_raise_if_key_not_exists(false) .with_configuration(config) diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index f17f630d98..a3ebad0a7b 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -130,7 +130,7 @@ impl DeltaOps { /// use deltalake_core::DeltaOps; /// /// async { - /// let ops = DeltaOps::try_from_uri("memory://").await.unwrap(); + /// let ops = DeltaOps::try_from_uri("memory:///").await.unwrap(); /// }; /// ``` pub async fn try_from_uri(uri: impl AsRef) -> DeltaResult { @@ -171,7 +171,7 @@ impl DeltaOps { /// ``` #[must_use] pub fn new_in_memory() -> Self { - DeltaTableBuilder::from_uri("memory://") + DeltaTableBuilder::from_uri("memory:///") .build() .unwrap() .into() @@ -183,7 +183,7 @@ impl DeltaOps { /// use deltalake_core::DeltaOps; /// /// async { - /// let ops = DeltaOps::try_from_uri("memory://").await.unwrap(); + /// let ops = DeltaOps::try_from_uri("memory:///").await.unwrap(); /// let table = ops.create().with_table_name("my_table").await.unwrap(); /// assert_eq!(table.version(), 0); /// }; diff --git a/crates/core/src/operations/write/writer.rs b/crates/core/src/operations/write/writer.rs index fac0923a2e..174e17960a 100644 --- a/crates/core/src/operations/write/writer.rs +++ b/crates/core/src/operations/write/writer.rs @@ -534,7 +534,7 @@ mod tests { #[tokio::test] async fn test_write_partition() { - let log_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory:///") .build_storage() .unwrap(); let object_store = log_store.object_store(None); @@ -566,7 +566,7 @@ mod tests { ])); let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); - let object_store = DeltaTableBuilder::from_uri("memory://") + let object_store = DeltaTableBuilder::from_uri("memory:///") .build_storage() .unwrap() .object_store(None); @@ -597,7 +597,7 @@ mod tests { ])); let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); - let object_store = DeltaTableBuilder::from_uri("memory://") + let object_store = DeltaTableBuilder::from_uri("memory:///") .build_storage() .unwrap() .object_store(None); @@ -624,7 +624,7 @@ mod tests { ])); let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap(); - let object_store = DeltaTableBuilder::from_uri("memory://") + let object_store = DeltaTableBuilder::from_uri("memory:///") .build_storage() .unwrap() .object_store(None); @@ -639,7 +639,7 @@ mod tests { #[tokio::test] async fn test_write_mismatched_schema() { - let log_store = DeltaTableBuilder::from_uri("memory://") + let log_store = DeltaTableBuilder::from_uri("memory:///") .build_storage() .unwrap(); let object_store = log_store.object_store(None); diff --git a/crates/core/src/table/builder.rs b/crates/core/src/table/builder.rs index cf868401a5..cd7025b719 100644 --- a/crates/core/src/table/builder.rs +++ b/crates/core/src/table/builder.rs @@ -10,33 +10,9 @@ use serde::{Deserialize, Serialize}; use tracing::debug; use url::Url; -use super::DeltaTable; -use crate::errors::{DeltaResult, DeltaTableError}; -use crate::logstore::storage::{factories, IORuntime, StorageOptions}; -use crate::logstore::LogStoreRef; - -#[allow(dead_code)] -#[derive(Debug, thiserror::Error)] -enum BuilderError { - #[error("Store {backend} requires host in storage url, got: {url}")] - MissingHost { backend: String, url: String }, - #[error("Missing configuration {0}")] - Required(String), - #[error("Failed to find valid credential.")] - MissingCredential, - #[error("Failed to decode SAS key: {0}\nSAS keys must be percent-encoded. They come encoded in the Azure portal and Azure Storage Explorer.")] - Decode(String), - #[error("Delta-rs must be build with feature '{feature}' to support url: {url}.")] - MissingFeature { feature: &'static str, url: String }, - #[error("Failed to parse table uri")] - TableUri(#[from] url::ParseError), -} - -impl From for DeltaTableError { - fn from(err: BuilderError) -> Self { - DeltaTableError::Generic(err.to_string()) - } -} +use crate::logstore::storage::IORuntime; +use crate::logstore::{object_store_factories, LogStoreRef}; +use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// possible version specifications for loading a delta table #[derive(Debug, Copy, Clone, PartialEq, Eq, Default)] @@ -54,20 +30,13 @@ pub enum DeltaVersion { #[derive(Debug, Serialize, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct DeltaTableConfig { - /// Indicates whether our use case requires tracking tombstones. - /// This defaults to `true` - /// - /// Read-only applications never require tombstones. Tombstones - /// are only required when writing checkpoints, so even many writers - /// may want to skip them. - pub require_tombstones: bool, - /// Indicates whether DeltaTable should track files. /// This defaults to `true` /// /// Some append-only applications might have no need of tracking any files. /// Hence, DeltaTable will be loaded with significant memory reduction. pub require_files: bool, + /// Controls how many files to buffer from the commit log when updating the table. /// This defaults to 4 * number of cpus /// @@ -76,9 +45,11 @@ pub struct DeltaTableConfig { /// last checkpoint, but will also increase memory usage. Possible rate limits of the storage backend should /// also be considered for optimal performance. pub log_buffer_size: usize, + /// Control the number of records to read / process from the commit / checkpoint files /// when processing record batches. pub log_batch_size: usize, + #[serde(skip_serializing, skip_deserializing)] /// When a runtime handler is provided, all IO tasks are spawn in that handle pub io_runtime: Option, @@ -87,7 +58,6 @@ pub struct DeltaTableConfig { impl Default for DeltaTableConfig { fn default() -> Self { Self { - require_tombstones: true, require_files: true, log_buffer_size: num_cpus::get() * 4, log_batch_size: 1024, @@ -98,8 +68,7 @@ impl Default for DeltaTableConfig { impl PartialEq for DeltaTableConfig { fn eq(&self, other: &Self) -> bool { - self.require_tombstones == other.require_tombstones - && self.require_files == other.require_files + self.require_files == other.require_files && self.log_buffer_size == other.log_buffer_size && self.log_batch_size == other.log_batch_size } @@ -168,12 +137,6 @@ impl DeltaTableBuilder { }) } - /// Sets `require_tombstones=false` to the builder - pub fn without_tombstones(mut self) -> Self { - self.table_config.require_tombstones = false; - self - } - /// Sets `require_files=false` to the builder pub fn without_files(mut self) -> Self { self.table_config.require_files = false; @@ -217,7 +180,8 @@ impl DeltaTableBuilder { /// /// # Arguments /// - /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with "/" pointing at delta table root (i.e. where `_delta_log` is located). + /// * `storage` - A shared reference to an [`ObjectStore`](object_store::ObjectStore) with + /// "/" pointing at delta table root (i.e. where `_delta_log` is located). /// * `location` - A url corresponding to the storagle location of `storage`. pub fn with_storage_backend(mut self, storage: Arc, location: Url) -> Self { self.storage_backend = Some((storage, location)); @@ -271,7 +235,7 @@ impl DeltaTableBuilder { } /// Storage options for configuring backend object store - pub fn storage_options(&self) -> StorageOptions { + pub fn storage_options(&self) -> HashMap { let mut storage_options = self.storage_options.clone().unwrap_or_default(); if let Some(allow) = self.allow_http { storage_options.insert( @@ -279,7 +243,7 @@ impl DeltaTableBuilder { if allow { "true" } else { "false" }.into(), ); }; - storage_options.into() + storage_options } /// Build a delta storage backend for the given config @@ -340,7 +304,7 @@ enum UriType { /// Will return an error if the path is not valid. fn resolve_uri_type(table_uri: impl AsRef) -> DeltaResult { let table_uri = table_uri.as_ref(); - let known_schemes: Vec<_> = factories() + let known_schemes: Vec<_> = object_store_factories() .iter() .map(|v| v.key().scheme().to_owned()) .collect(); @@ -432,11 +396,11 @@ fn ensure_file_location_exists(path: PathBuf) -> DeltaResult<()> { #[cfg(test)] mod tests { use super::*; - use crate::logstore::storage::DefaultObjectStoreFactory; + use crate::logstore::factories::DefaultObjectStoreFactory; #[test] fn test_ensure_table_uri() { - factories().insert( + object_store_factories().insert( Url::parse("s3://").unwrap(), Arc::new(DefaultObjectStoreFactory::default()), ); @@ -597,7 +561,7 @@ mod tests { let table = DeltaTableBuilder::from_uri(table_uri).with_storage_options(storage_opts); let found_opts = table.storage_options(); - assert_eq!(expected, found_opts.0.get(key).unwrap()); + assert_eq!(expected, found_opts.get(key).unwrap()); } } } diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 80cc16ff37..3ce8ba57d8 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -284,7 +284,7 @@ impl<'de> Deserialize<'de> for DeltaTable { .ok_or_else(|| A::Error::invalid_length(0, &self))?; let log_store = crate::logstore::logstore_for( storage_config.location, - storage_config.options, + storage_config.options.raw, None, ) .map_err(|_| A::Error::custom("Failed deserializing LogStore"))?; diff --git a/crates/core/tests/fs_common/mod.rs b/crates/core/tests/fs_common/mod.rs index 3f8d155bfb..a3f7d4ebd5 100644 --- a/crates/core/tests/fs_common/mod.rs +++ b/crates/core/tests/fs_common/mod.rs @@ -3,10 +3,7 @@ use deltalake_core::kernel::transaction::CommitBuilder; use deltalake_core::kernel::{ Action, Add, DataType, PrimitiveType, Remove, StructField, StructType, }; -use deltalake_core::logstore::{ - object_store::{GetResult, Result as ObjectStoreResult}, - StorageOptions, -}; +use deltalake_core::logstore::object_store::{GetResult, Result as ObjectStoreResult}; use deltalake_core::operations::create::CreateBuilder; use deltalake_core::protocol::{DeltaOperation, SaveMode}; use deltalake_core::DeltaTable; @@ -150,12 +147,9 @@ impl std::fmt::Display for SlowStore { impl SlowStore { #[allow(dead_code)] - pub fn new( - location: Url, - _options: impl Into + Clone, - ) -> deltalake_core::DeltaResult { + pub fn new(location: Url) -> deltalake_core::DeltaResult { Ok(Self { - inner: deltalake_core::logstore::store_for(&location, &StorageOptions::default())?, + inner: deltalake_core::logstore::store_for(&location, None::<(&str, &str)>)?, }) } } diff --git a/crates/core/tests/read_delta_log_test.rs b/crates/core/tests/read_delta_log_test.rs index f453d3567a..e944f8ef1e 100644 --- a/crates/core/tests/read_delta_log_test.rs +++ b/crates/core/tests/read_delta_log_test.rs @@ -1,12 +1,12 @@ use deltalake_core::{DeltaResult, DeltaTableBuilder}; use pretty_assertions::assert_eq; -use std::collections::HashMap; use std::time::SystemTime; #[allow(dead_code)] mod fs_common; #[tokio::test] +#[ignore] async fn test_log_buffering() { let n_commits = 10; let path = "../test/tests/data/simple_table_with_no_checkpoint"; @@ -22,13 +22,7 @@ async fn test_log_buffering() { let location = deltalake_core::table::builder::ensure_table_uri(path).unwrap(); // use storage that sleeps 10ms on every `get` - let store = std::sync::Arc::new( - fs_common::SlowStore::new( - location.clone(), - deltalake_core::logstore::StorageOptions::from(HashMap::new()), - ) - .unwrap(), - ); + let store = std::sync::Arc::new(fs_common::SlowStore::new(location.clone()).unwrap()); let mut seq_version = 0; let t = SystemTime::now(); diff --git a/crates/deltalake/Cargo.toml b/crates/deltalake/Cargo.toml index f6178585fb..ec606a099c 100644 --- a/crates/deltalake/Cargo.toml +++ b/crates/deltalake/Cargo.toml @@ -13,7 +13,16 @@ rust-version.workspace = true [package.metadata.docs.rs] # We cannot use all_features because TLS features are mutually exclusive. -features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"] +features = [ + "azure", + "datafusion", + "gcs", + "hdfs", + "json", + "python", + "s3", + "unity-experimental", +] [dependencies] deltalake-core = { version = "0.25.0", path = "../core" } diff --git a/crates/gcp/src/lib.rs b/crates/gcp/src/lib.rs index 37c00287d6..fa309aae56 100644 --- a/crates/gcp/src/lib.rs +++ b/crates/gcp/src/lib.rs @@ -2,14 +2,13 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; +use deltalake_core::logstore::object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}; +use deltalake_core::logstore::object_store::{ObjectStoreScheme, RetryConfig}; +use deltalake_core::logstore::{default_logstore, logstore_factories, LogStore, LogStoreFactory}; use deltalake_core::logstore::{ - factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, - RetryConfigParse, StorageOptions, + object_store_factories, ObjectStoreFactory, ObjectStoreRef, StorageConfig, }; use deltalake_core::{DeltaResult, DeltaTableError, Path}; -use object_store::gcp::{GoogleCloudStorageBuilder, GoogleConfigKey}; -use object_store::ObjectStoreScheme; use url::Url; mod config; @@ -20,10 +19,9 @@ trait GcpOptions { fn as_gcp_options(&self) -> HashMap; } -impl GcpOptions for StorageOptions { +impl GcpOptions for HashMap { fn as_gcp_options(&self) -> HashMap { - self.0 - .iter() + self.iter() .filter_map(|(key, value)| { Some(( GoogleConfigKey::from_str(&key.to_ascii_lowercase()).ok()?, @@ -37,13 +35,12 @@ impl GcpOptions for StorageOptions { #[derive(Clone, Default, Debug)] pub struct GcpFactory {} -impl RetryConfigParse for GcpFactory {} - impl ObjectStoreFactory for GcpFactory { fn parse_url_opts( &self, url: &Url, - options: &StorageOptions, + options: &HashMap, + retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::GcpConfigHelper::try_new(options.as_gcp_options())?.build()?; @@ -59,13 +56,10 @@ impl ObjectStoreFactory for GcpFactory { builder = builder.with_config(*key, value.clone()); } - let inner = builder - .with_retry(self.parse_retry_config(options)?) - .build()?; + let inner = builder.with_retry(retry.clone()).build()?; + let store = crate::storage::GcsStorageBackend::try_new(Arc::new(inner))?; - let gcs_backend = crate::storage::GcsStorageBackend::try_new(Arc::new(inner))?; - let store = limit_store_handler(url_prefix_handler(gcs_backend, prefix.clone()), options); - Ok((store, prefix)) + Ok((Arc::new(store), prefix)) } } @@ -74,7 +68,7 @@ impl LogStoreFactory for GcpFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { Ok(default_logstore(store, location, options)) } @@ -85,6 +79,6 @@ pub fn register_handlers(_additional_prefixes: Option) { let factory = Arc::new(GcpFactory {}); let scheme = &"gs"; let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), factory.clone()); - logstores().insert(url.clone(), factory.clone()); + object_store_factories().insert(url.clone(), factory.clone()); + logstore_factories().insert(url.clone(), factory.clone()); } diff --git a/crates/gcp/tests/context.rs b/crates/gcp/tests/context.rs index c095a27e3f..7174323408 100644 --- a/crates/gcp/tests/context.rs +++ b/crates/gcp/tests/context.rs @@ -129,6 +129,7 @@ pub mod gs_cli { .wait() } + #[allow(unused)] pub fn delete_bucket(container_name: impl AsRef) -> std::io::Result { let endpoint = std::env::var("GOOGLE_ENDPOINT_URL") .expect("variable GOOGLE_ENDPOINT_URL must be set to connect to GCS Emulator"); diff --git a/crates/hdfs/src/lib.rs b/crates/hdfs/src/lib.rs index e99219c884..f308137393 100644 --- a/crates/hdfs/src/lib.rs +++ b/crates/hdfs/src/lib.rs @@ -1,11 +1,13 @@ +use std::collections::HashMap; use std::sync::Arc; -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::logstore::{ - factories, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef, StorageOptions, + default_logstore, logstore_factories, LogStore, LogStoreFactory, StorageConfig, }; +use deltalake_core::logstore::{object_store_factories, ObjectStoreFactory, ObjectStoreRef}; use deltalake_core::{DeltaResult, Path}; use hdfs_native_object_store::HdfsObjectStore; +use object_store::RetryConfig; use url::Url; #[derive(Clone, Default, Debug)] @@ -15,14 +17,13 @@ impl ObjectStoreFactory for HdfsFactory { fn parse_url_opts( &self, url: &Url, - options: &StorageOptions, + options: &HashMap, + _retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { - let store: ObjectStoreRef = Arc::new(HdfsObjectStore::with_config( - url.as_str(), - options.0.clone(), - )?); + let store: ObjectStoreRef = + Arc::new(HdfsObjectStore::with_config(url.as_str(), options.clone())?); let prefix = Path::parse(url.path())?; - Ok((url_prefix_handler(store, prefix.clone()), prefix)) + Ok((store, prefix)) } } @@ -31,7 +32,7 @@ impl LogStoreFactory for HdfsFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { Ok(default_logstore(store, location, options)) } @@ -42,7 +43,7 @@ pub fn register_handlers(_additional_prefixes: Option) { let factory = Arc::new(HdfsFactory {}); for scheme in ["hdfs", "viewfs"].iter() { let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), factory.clone()); - logstores().insert(url.clone(), factory.clone()); + object_store_factories().insert(url.clone(), factory.clone()); + logstore_factories().insert(url.clone(), factory.clone()); } } diff --git a/crates/lakefs/src/execute.rs b/crates/lakefs/src/execute.rs index 61dd5f60a5..1bc4cb8767 100644 --- a/crates/lakefs/src/execute.rs +++ b/crates/lakefs/src/execute.rs @@ -310,8 +310,9 @@ mod tests { #[tokio::test] async fn test_execute_error_with_invalid_log_store() { - let location = Url::parse("memory://table").unwrap(); - let invalid_default_store = logstore_for(location, HashMap::default(), None).unwrap(); + let location = Url::parse("memory:///table").unwrap(); + let invalid_default_store = + logstore_for(location, HashMap::::default(), None).unwrap(); let handler = LakeFSCustomExecuteHandler {}; let operation_id = Uuid::new_v4(); @@ -365,8 +366,9 @@ mod tests { async fn test_noop_commit_hook_executor() { // When file operations is false, the commit hook executor is a noop, since we don't need // to create any branches, or commit and merge them back. - let location = Url::parse("memory://table").unwrap(); - let invalid_default_store = logstore_for(location, HashMap::default(), None).unwrap(); + let location = Url::parse("memory:///table").unwrap(); + let invalid_default_store = + logstore_for(location, HashMap::::default(), None).unwrap(); let handler = LakeFSCustomExecuteHandler {}; let operation_id = Uuid::new_v4(); diff --git a/crates/lakefs/src/lib.rs b/crates/lakefs/src/lib.rs index 96c03dc2bc..29fed6d5bc 100644 --- a/crates/lakefs/src/lib.rs +++ b/crates/lakefs/src/lib.rs @@ -8,9 +8,9 @@ pub mod errors; pub mod execute; pub mod logstore; pub mod storage; -use deltalake_core::logstore::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions}; -use deltalake_core::logstore::{logstores, LogStore, LogStoreFactory}; -use deltalake_core::{DeltaResult, Path}; +use deltalake_core::logstore::{logstore_factories, LogStore, LogStoreFactory}; +use deltalake_core::logstore::{object_store_factories, ObjectStoreRef, StorageConfig}; +use deltalake_core::DeltaResult; pub use execute::LakeFSCustomExecuteHandler; use logstore::lakefs_logstore; use std::sync::Arc; @@ -29,10 +29,9 @@ impl LogStoreFactory for LakeFSLogStoreFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + config: &StorageConfig, ) -> DeltaResult> { - let options = self.with_env_s3(options); - let store = url_prefix_handler(store, Path::parse(location.path())?); + let options = StorageConfig::parse_options(self.with_env_s3(&config.raw.clone().into()))?; debug!("LakeFSLogStoreFactory has been asked to create a LogStore"); lakefs_logstore(store, location, &options) } @@ -44,6 +43,6 @@ pub fn register_handlers(_additional_prefixes: Option) { let log_stores = Arc::new(LakeFSLogStoreFactory::default()); let scheme = "lakefs"; let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), object_stores.clone()); - logstores().insert(url.clone(), log_stores.clone()); + object_store_factories().insert(url.clone(), object_stores.clone()); + logstore_factories().insert(url.clone(), log_stores.clone()); } diff --git a/crates/lakefs/src/logstore.rs b/crates/lakefs/src/logstore.rs index 96e2370d8e..2d25ec5312 100644 --- a/crates/lakefs/src/logstore.rs +++ b/crates/lakefs/src/logstore.rs @@ -1,45 +1,41 @@ //! Default implementation of [`LakeFSLogStore`] for LakeFS - use std::sync::{Arc, OnceLock}; -use crate::client::LakeFSConfig; -use crate::errors::LakeFSConfigError; - -use super::client::LakeFSClient; use bytes::Bytes; use deltalake_core::logstore::{ commit_uri_from_version, DefaultObjectStoreRegistry, ObjectStoreRegistry, }; -use deltalake_core::logstore::{url_prefix_handler, DeltaIOStorageBackend, IORuntime}; use deltalake_core::{ - kernel::transaction::TransactionError, - logstore::{ObjectStoreRef, StorageOptions}, - DeltaResult, + kernel::transaction::TransactionError, logstore::ObjectStoreRef, DeltaResult, }; -use deltalake_core::{logstore::*, DeltaTableError, Path}; +use deltalake_core::{logstore::*, DeltaTableError}; use object_store::{Attributes, Error as ObjectStoreError, ObjectStore, PutOptions, TagSet}; use tracing::debug; use url::Url; use uuid::Uuid; +use super::client::LakeFSClient; +use crate::client::LakeFSConfig; +use crate::errors::LakeFSConfigError; + /// Return the [LakeFSLogStore] implementation with the provided configuration options pub fn lakefs_logstore( store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { let host = options - .0 + .raw .get("aws_endpoint") .ok_or(LakeFSConfigError::EndpointMissing)? .to_string(); let username = options - .0 + .raw .get("aws_access_key_id") .ok_or(LakeFSConfigError::UsernameCredentialMissing)? .to_string(); let password = options - .0 + .raw .get("aws_secret_access_key") .ok_or(LakeFSConfigError::PasswordCredentialMissing)? .to_string(); @@ -87,12 +83,13 @@ impl LakeFSLogStore { let scheme = Url::parse(&format!("{}://", url.scheme())) .map_err(|_| DeltaTableError::InvalidTableLocation(url.clone().into()))?; - if let Some(entry) = deltalake_core::logstore::factories().get(&scheme) { + if let Some(entry) = self.config().object_store_factory().get(&scheme) { debug!("Creating new storage with storage provider for {scheme} ({url})"); - - let (store, _prefix) = entry - .value() - .parse_url_opts(url, &self.config().options.clone())?; + let (store, _prefix) = entry.value().parse_url_opts( + url, + &self.config().options.raw, + &self.config().options.retry, + )?; return Ok(store); } Err(DeltaTableError::InvalidTableLocation(url.to_string())) @@ -123,13 +120,11 @@ impl LakeFSLogStore { .await?; // Build new object store store using the new lakefs url - let txn_store = url_prefix_handler( - Arc::new(DeltaIOStorageBackend::new( - self.build_new_store(&lakefs_url)?, - IORuntime::default().get_handle(), - )) as ObjectStoreRef, - Path::parse(lakefs_url.path())?, - ); + let txn_store = Arc::new(self.config.decorate_store( + self.build_new_store(&lakefs_url)?, + Some(&lakefs_url), + None, + )?); // Register transaction branch as ObjectStore in log_store storages self.register_object_store(&lakefs_url, txn_store); diff --git a/crates/lakefs/src/storage.rs b/crates/lakefs/src/storage.rs index a8961a1486..b9fc0c0efd 100644 --- a/crates/lakefs/src/storage.rs +++ b/crates/lakefs/src/storage.rs @@ -1,15 +1,14 @@ //! LakeFS storage backend (internally S3). use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey; -use deltalake_core::logstore::{ - limit_store_handler, ObjectStoreFactory, ObjectStoreRef, RetryConfigParse, StorageOptions, -}; +use deltalake_core::logstore::{ObjectStoreFactory, ObjectStoreRef}; use deltalake_core::{DeltaResult, DeltaTableError, Path}; use object_store::aws::AmazonS3Builder; -use object_store::ObjectStoreScheme; +use object_store::{ObjectStoreScheme, RetryConfig}; use std::collections::HashMap; use std::fmt::Debug; use std::str::FromStr; +use std::sync::Arc; use tracing::log::*; use url::Url; @@ -17,36 +16,31 @@ use url::Url; pub struct LakeFSObjectStoreFactory {} pub(crate) trait S3StorageOptionsConversion { - fn with_env_s3(&self, options: &StorageOptions) -> StorageOptions { - let mut options = StorageOptions( - options - .0 - .clone() - .into_iter() - .map(|(k, v)| { - if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { - (config_key.as_ref().to_string(), v) - } else { - (k, v) - } - }) - .collect(), - ); + fn with_env_s3(&self, options: &HashMap) -> HashMap { + let mut options: HashMap = options + .clone() + .into_iter() + .map(|(k, v)| { + if let Ok(config_key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { + (config_key.as_ref().to_string(), v) + } else { + (k, v) + } + }) + .collect(); for (os_key, os_value) in std::env::vars_os() { if let (Some(key), Some(value)) = (os_key.to_str(), os_value.to_str()) { if let Ok(config_key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) { - if !options.0.contains_key(config_key.as_ref()) { - options - .0 - .insert(config_key.as_ref().to_string(), value.to_string()); + if !options.contains_key(config_key.as_ref()) { + options.insert(config_key.as_ref().to_string(), value.to_string()); } } } } // Conditional put is supported in LakeFS since v1.47 - if !options.0.keys().any(|key| { + if !options.keys().any(|key| { let key = key.to_ascii_lowercase(); [ AmazonS3ConfigKey::ConditionalPut.as_ref(), @@ -54,7 +48,7 @@ pub(crate) trait S3StorageOptionsConversion { ] .contains(&key.as_str()) }) { - options.0.insert("conditional_put".into(), "etag".into()); + options.insert("conditional_put".into(), "etag".into()); } options } @@ -62,26 +56,22 @@ pub(crate) trait S3StorageOptionsConversion { impl S3StorageOptionsConversion for LakeFSObjectStoreFactory {} -impl RetryConfigParse for LakeFSObjectStoreFactory {} - impl ObjectStoreFactory for LakeFSObjectStoreFactory { fn parse_url_opts( &self, url: &Url, - storage_options: &StorageOptions, + storage_config: &HashMap, + retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { - let options = self.with_env_s3(storage_options); - // Convert LakeFS URI to equivalent S3 URI. let s3_url = url.to_string().replace("lakefs://", "s3://"); - let s3_url = Url::parse(&s3_url) .map_err(|_| DeltaTableError::InvalidTableLocation(url.clone().into()))?; // All S3-likes should start their builder the same way + let options = self.with_env_s3(storage_config); let config = options .clone() - .0 .into_iter() .filter_map(|(k, v)| { if let Ok(key) = AmazonS3ConfigKey::from_str(&k.to_ascii_lowercase()) { @@ -99,18 +89,14 @@ impl ObjectStoreFactory for LakeFSObjectStoreFactory { let prefix = Path::parse(path)?; let mut builder = AmazonS3Builder::new().with_url(s3_url.to_string()); - for (key, value) in config.iter() { builder = builder.with_config(*key, value.clone()); } - let inner = builder - .with_retry(self.parse_retry_config(&options)?) - .build()?; + let store = builder.with_retry(retry.clone()).build()?; - let store = limit_store_handler(inner, &options); debug!("Initialized the object store: {store:?}"); - Ok((store, prefix)) + Ok((Arc::new(store), prefix)) } } @@ -160,13 +146,12 @@ mod tests { std::env::set_var("ENDPOINT", "env_key"); std::env::set_var("SECRET_ACCESS_KEY", "env_key"); std::env::set_var("REGION", "env_key"); - let combined_options = - LakeFSObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options)); + let combined_options = LakeFSObjectStoreFactory {}.with_env_s3(&raw_options); // Four and then the conditional_put built-in - assert_eq!(combined_options.0.len(), 5); + assert_eq!(combined_options.len(), 5); - for (key, v) in combined_options.0 { + for (key, v) in combined_options { if key != "conditional_put" { assert_eq!(v, "env_key"); } @@ -190,10 +175,9 @@ mod tests { std::env::set_var("aws_secret_access_key", "env_key"); std::env::set_var("aws_region", "env_key"); - let combined_options = - LakeFSObjectStoreFactory {}.with_env_s3(&StorageOptions(raw_options)); + let combined_options = LakeFSObjectStoreFactory {}.with_env_s3(&raw_options); - for (key, v) in combined_options.0 { + for (key, v) in combined_options { if key != "conditional_put" { assert_eq!(v, "options_key"); } diff --git a/crates/mount/src/file.rs b/crates/mount/src/file.rs index 9c87bf3b2f..9b202d88b0 100644 --- a/crates/mount/src/file.rs +++ b/crates/mount/src/file.rs @@ -108,42 +108,20 @@ impl From for ObjectStoreError { #[derive(Debug)] pub struct MountFileStorageBackend { inner: Arc, - root_url: Arc, } impl MountFileStorageBackend { /// Creates a new MountFileStorageBackend. - pub fn try_new(path: impl AsRef) -> ObjectStoreResult { + pub fn try_new() -> ObjectStoreResult { Ok(Self { - root_url: Arc::new(Self::path_to_root_url(path.as_ref())?), - inner: Arc::new(LocalFileSystem::new_with_prefix(path)?), - }) - } - - fn path_to_root_url(path: &std::path::Path) -> ObjectStoreResult { - let root_path = - std::fs::canonicalize(path).map_err(|e| object_store::Error::InvalidPath { - source: object_store::path::Error::Canonicalize { - path: path.into(), - source: e, - }, - })?; - - Url::from_file_path(root_path).map_err(|_| object_store::Error::InvalidPath { - source: object_store::path::Error::InvalidPath { path: path.into() }, + inner: Arc::new(LocalFileSystem::new()), }) } /// Return an absolute filesystem path of the given location fn path_to_filesystem(&self, location: &ObjectStorePath) -> String { - let mut url = self.root_url.as_ref().clone(); - url.path_segments_mut() - .expect("url path") - // technically not necessary as Path ignores empty segments - // but avoids creating paths with "//" which look odd in error messages. - .pop_if_empty() - .extend(location.parts()); - + let mut url = url::Url::parse("file:///").unwrap(); + url.set_path(location.as_ref()); url.to_file_path().unwrap().to_str().unwrap().to_owned() } } @@ -264,6 +242,19 @@ impl ObjectStore for MountFileStorageBackend { } } +fn path_to_root_url(path: &std::path::Path) -> ObjectStoreResult { + let root_path = std::fs::canonicalize(path).map_err(|e| object_store::Error::InvalidPath { + source: object_store::path::Error::Canonicalize { + path: path.into(), + source: e, + }, + })?; + + Url::from_file_path(root_path).map_err(|_| object_store::Error::InvalidPath { + source: object_store::path::Error::InvalidPath { path: path.into() }, + }) +} + /// Regular renames `from` to `to`. /// `from` has to exist, but `to` is not, otherwise the operation will fail. /// It's not atomic and cannot be called in parallel with other operations on the same file. diff --git a/crates/mount/src/lib.rs b/crates/mount/src/lib.rs index 4e0e485461..be7fc4cf2c 100644 --- a/crates/mount/src/lib.rs +++ b/crates/mount/src/lib.rs @@ -2,12 +2,15 @@ use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; -use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory}; use deltalake_core::logstore::{ - factories, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions, + default_logstore, logstore_factories, LogStore, LogStoreFactory, StorageConfig, +}; +use deltalake_core::logstore::{ + object_store_factories, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, }; use deltalake_core::{DeltaResult, DeltaTableError, Path}; use object_store::local::LocalFileSystem; +use object_store::RetryConfig; use url::Url; mod config; @@ -18,10 +21,9 @@ trait MountOptions { fn as_mount_options(&self) -> HashMap; } -impl MountOptions for StorageOptions { +impl MountOptions for HashMap { fn as_mount_options(&self) -> HashMap { - self.0 - .iter() + self.iter() .filter_map(|(key, value)| { Some(( config::MountConfigKey::from_str(&key.to_ascii_lowercase()).ok()?, @@ -39,7 +41,8 @@ impl ObjectStoreFactory for MountFactory { fn parse_url_opts( &self, url: &Url, - options: &StorageOptions, + options: &HashMap, + _retry: &RetryConfig, ) -> DeltaResult<(ObjectStoreRef, Path)> { let config = config::MountConfigHelper::try_new(options.as_mount_options())?.build()?; @@ -57,22 +60,19 @@ impl ObjectStoreFactory for MountFactory { } // We need to convert the dbfs url to a file url let new_url = Url::parse(&format!("file:///dbfs{}", url.path())).unwrap(); - let store = Arc::new(file::MountFileStorageBackend::try_new( - new_url.to_file_path().unwrap(), - )?) as ObjectStoreRef; + let store = Arc::new(file::MountFileStorageBackend::try_new()?) as ObjectStoreRef; Ok((store, Path::from("/"))) } "file" => { if allow_unsafe_rename { - let store = Arc::new(file::MountFileStorageBackend::try_new( - url.to_file_path().unwrap(), - )?) as ObjectStoreRef; - Ok((store, Path::from("/"))) + let store = + Arc::new(file::MountFileStorageBackend::try_new()?) as ObjectStoreRef; + let prefix = Path::from_filesystem_path(url.to_file_path().unwrap())?; + Ok((store, prefix)) } else { - let store = Arc::new(LocalFileSystem::new_with_prefix( - url.to_file_path().unwrap(), - )?) as ObjectStoreRef; - Ok((store, Path::from("/"))) + let store = Arc::new(LocalFileSystem::new()) as ObjectStoreRef; + let prefix = Path::from_filesystem_path(url.to_file_path().unwrap())?; + Ok((store, prefix)) } } _ => Err(DeltaTableError::InvalidTableLocation(url.clone().into())), @@ -85,7 +85,7 @@ impl LogStoreFactory for MountFactory { &self, store: ObjectStoreRef, location: &Url, - options: &StorageOptions, + options: &StorageConfig, ) -> DeltaResult> { Ok(default_logstore(store, location, options)) } @@ -96,7 +96,7 @@ pub fn register_handlers(_additional_prefixes: Option) { let factory = Arc::new(MountFactory {}); for scheme in ["dbfs", "file"].iter() { let url = Url::parse(&format!("{scheme}://")).unwrap(); - factories().insert(url.clone(), factory.clone()); - logstores().insert(url.clone(), factory.clone()); + object_store_factories().insert(url.clone(), factory.clone()); + logstore_factories().insert(url.clone(), factory.clone()); } } diff --git a/crates/mount/tests/integration.rs b/crates/mount/tests/integration.rs index 14fcbcdc95..729d64261b 100644 --- a/crates/mount/tests/integration.rs +++ b/crates/mount/tests/integration.rs @@ -11,7 +11,7 @@ static TEST_PREFIXES: &[&str] = &["my table", "你好/😊"]; #[tokio::test] #[serial] -async fn test_integration_local() -> TestResult { +async fn test_integration_mount() -> TestResult { let context = IntegrationContext::new(Box::::default())?; test_read_tables(&context).await?; diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 4413a381eb..95a91ce578 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -815,7 +815,8 @@ def to_pyarrow_dataset( ): raise DeltaProtocolError( f"The table's minimum reader version is {table_protocol.min_reader_version} " - f"but deltalake only supports version 1 or {MAX_SUPPORTED_READER_VERSION} with these reader features: {SUPPORTED_READER_FEATURES}" + f"but deltalake only supports version 1 or {MAX_SUPPORTED_READER_VERSION} " + f"with these reader features: {SUPPORTED_READER_FEATURES}" ) if ( table_protocol.min_reader_version >= 3 @@ -826,7 +827,8 @@ def to_pyarrow_dataset( ) if len(missing_features) > 0: raise DeltaProtocolError( - f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader." + f"The table has set these reader features: {missing_features} " + "but these are not yet supported by the deltalake reader." ) if not filesystem: filesystem = pa_fs.PyFileSystem( @@ -991,7 +993,7 @@ def delete( post_commithook_properties: PostCommitHookProperties | None = None, commit_properties: CommitProperties | None = None, ) -> dict[str, Any]: - """Delete records from a Delta Table that statisfy a predicate. + """Delete records from a Delta Table that satisfy a predicate. When a predicate is not provided then all records are deleted from the Delta Table. Otherwise a scan of the Delta table is performed to mark any files @@ -1590,7 +1592,7 @@ def add_feature( Args: feature: Table Feature e.g. Deletion Vectors, Change Data Feed - allow_protocol_versions_increase: Allow the protocol to be implicitily bumped to reader 3 or writer 7 + allow_protocol_versions_increase: Allow the protocol to be implicitly bumped to reader 3 or writer 7 commit_properties: properties of the transaction commit. If None, default values are used. post_commithook_properties: properties for the post commit hook. If None, default values are used. diff --git a/python/pyproject.toml b/python/pyproject.toml index 11e237eeaa..f732d8cdf8 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -95,7 +95,7 @@ dev = [ "pytest-timeout", "pytest-benchmark", "mypy==1.10.1", - "ruff==0.11.2", + "ruff>=0.11.2", ] polars = ["polars==1.17.1"] lakefs = ["lakefs==0.8.0"] @@ -104,10 +104,7 @@ pyspark = [ "delta-spark", "numpy==1.26.4", # pyspark is not compatible with latest numpy ] -docs = [ - "sphinx<=4.5", - "sphinx-rtd-theme", -] +docs = ["sphinx<=4.5", "sphinx-rtd-theme"] other = [ "azure-storage-blob==12.20.0", "packaging>=20", diff --git a/python/src/lib.rs b/python/src/lib.rs index 181ccfe939..c8afe15523 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -28,6 +28,8 @@ use deltalake::arrow::record_batch::RecordBatch; use deltalake::arrow::{self, datatypes::Schema as ArrowSchema}; use deltalake::checkpoints::{cleanup_metadata, create_checkpoint}; use deltalake::datafusion::catalog::TableProvider; +use deltalake::datafusion::datasource::provider_as_source; +use deltalake::datafusion::logical_expr::LogicalPlanBuilder; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::{DeltaCdfTableProvider, DeltaDataChecker}; use deltalake::errors::DeltaTableError; @@ -50,6 +52,7 @@ use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; use deltalake::operations::restore::RestoreBuilder; use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder; use deltalake::operations::update::UpdateBuilder; +use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::operations::write::WriteBuilder; use deltalake::operations::CustomExecuteHandler; @@ -59,45 +62,32 @@ use deltalake::parquet::file::properties::{EnabledStatistics, WriterProperties}; use deltalake::partitions::PartitionFilter; use deltalake::protocol::{DeltaOperation, SaveMode}; use deltalake::table::state::DeltaTableState; -use deltalake::{init_client_version, DeltaTableBuilder}; -use deltalake::{DeltaOps, DeltaResult}; -use error::DeltaError; -use reader::convert_stream_to_reader; +use deltalake::{init_client_version, DeltaOps, DeltaResult, DeltaTableBuilder}; +use pyo3::exceptions::{PyRuntimeError, PyValueError}; +use pyo3::pybacked::PyBackedStr; +use pyo3::types::{PyCapsule, PyDict, PyFrozenSet}; +use pyo3::{prelude::*, IntoPyObjectExt}; +use serde_json::{Map, Value}; use tracing::log::*; +use uuid::Uuid; -use crate::writer::to_lazy_table; -use deltalake::datafusion::datasource::provider_as_source; -use deltalake::datafusion::logical_expr::LogicalPlanBuilder; - -use crate::error::DeltaProtocolError; -use crate::error::PythonError; +use crate::error::{DeltaError, DeltaProtocolError, PythonError}; use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::merge::PyMergeBuilder; use crate::query::PyQueryBuilder; +use crate::reader::convert_stream_to_reader; use crate::schema::{schema_to_pyobject, Field}; use crate::utils::rt; -use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder; -use pyo3::exceptions::{PyRuntimeError, PyValueError}; -use pyo3::pybacked::PyBackedStr; -use pyo3::types::{PyCapsule, PyDict, PyFrozenSet}; -use pyo3::{prelude::*, IntoPyObjectExt}; -use serde_json::{Map, Value}; -use uuid::Uuid; - -#[cfg(all(target_family = "unix", not(target_os = "emscripten")))] -use jemallocator::Jemalloc; - -#[cfg(any(not(target_family = "unix"), target_os = "emscripten"))] -use mimalloc::MiMalloc; +use crate::writer::to_lazy_table; #[global_allocator] #[cfg(all(target_family = "unix", not(target_os = "emscripten")))] -static ALLOC: Jemalloc = Jemalloc; +static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; #[global_allocator] #[cfg(any(not(target_family = "unix"), target_os = "emscripten"))] -static ALLOC: MiMalloc = MiMalloc; +static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; #[derive(FromPyObject)] enum PartitionFilterValue { @@ -1334,14 +1324,6 @@ impl RawDeltaTable { }) } - pub fn get_py_storage_backend(&self) -> PyResult { - Ok(filesystem::DeltaFileSystemHandler { - inner: self.object_store()?, - config: self._config.clone(), - known_sizes: None, - }) - } - pub fn create_checkpoint(&self, py: Python) -> PyResult<()> { py.allow_threads(|| { let operation_id = Uuid::new_v4();