Skip to content

Commit a878029

Browse files
committed
refactor: isolate factories for storage / log store integrations
Signed-off-by: Robert Pack <[email protected]>
1 parent 5566b67 commit a878029

File tree

24 files changed

+352
-268
lines changed

24 files changed

+352
-268
lines changed

crates/aws/Cargo.toml

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,43 @@ repository.workspace = true
1212
rust-version.workspace = true
1313

1414
[dependencies]
15+
# path dependencies
1516
deltalake-core = { version = "0.25.0", path = "../core", features = ["cloud"] }
16-
aws-smithy-runtime-api = { version="1.7" }
17-
aws-smithy-runtime = { version="1.7", optional = true}
18-
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
19-
aws-config = { version = "1.5", default-features = false, features = ["behavior-version-latest","rt-tokio", "credentials-process", "sso"] }
20-
aws-sdk-dynamodb = {version = "1.45", default-features = false, features = ["behavior-version-latest", "rt-tokio"] }
21-
aws-sdk-sts = {version = "1.42", default-features = false, features = ["behavior-version-latest", "rt-tokio"] }
22-
maplit = "1"
2317

2418
# workspace dependencies
2519
async-trait = { workspace = true }
2620
bytes = { workspace = true }
2721
chrono = { workspace = true }
2822
futures = { workspace = true }
2923
tracing = { workspace = true }
30-
object_store = { workspace = true, features = ["aws"]}
24+
object_store = { workspace = true, features = ["aws"] }
25+
regex = { workspace = true }
3126
thiserror = { workspace = true }
3227
tokio = { workspace = true }
33-
regex = { workspace = true }
3428
uuid = { workspace = true, features = ["serde", "v4"] }
3529
url = { workspace = true }
36-
backon = { version = "1",default-features = false, features = [ "tokio-sleep" ] }
30+
31+
# crates.io dependencies
32+
aws-smithy-runtime-api = { version = "1.7" }
33+
aws-smithy-runtime = { version = "1.7", optional = true }
34+
aws-credential-types = { version = "1.2", features = ["hardcoded-credentials"] }
35+
aws-config = { version = "1.5", default-features = false, features = [
36+
"behavior-version-latest",
37+
"rt-tokio",
38+
"credentials-process",
39+
"sso",
40+
] }
41+
aws-sdk-dynamodb = { version = "1.45", default-features = false, features = [
42+
"behavior-version-latest",
43+
"rt-tokio",
44+
] }
45+
aws-sdk-sts = { version = "1.42", default-features = false, features = [
46+
"behavior-version-latest",
47+
"rt-tokio",
48+
] }
49+
backon = { version = "1", default-features = false, features = ["tokio-sleep"] }
3750
hyper-tls = { version = "0.5", optional = true }
51+
maplit = "1"
3852

3953
[dev-dependencies]
4054
deltalake-core = { path = "../core", features = ["datafusion"] }
@@ -51,7 +65,7 @@ integration_test = []
5165
native-tls = [
5266
"aws-config/client-hyper",
5367
"aws-smithy-runtime/connector-hyper-0-14-x",
54-
"hyper-tls"
68+
"hyper-tls",
5569
]
5670
rustls = [
5771
"aws-config/client-hyper",

crates/aws/src/constants.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,19 @@ pub const AWS_S3_GET_INTERNAL_SERVER_ERROR_RETRIES: &str =
7474
/// The web identity token file to use when using a web identity provider.
7575
///
7676
/// NOTE: web identity related options are set in the environment when
77-
/// creating an instance of [crate::storage::s3::S3StorageOptions].
77+
/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions).
7878
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
7979
pub const AWS_WEB_IDENTITY_TOKEN_FILE: &str = "AWS_WEB_IDENTITY_TOKEN_FILE";
8080
/// The role name to use for web identity.
8181
///
8282
/// NOTE: web identity related options are set in the environment when
83-
/// creating an instance of [crate::storage::s3::S3StorageOptions].
83+
/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions).
8484
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
8585
pub const AWS_ROLE_ARN: &str = "AWS_ROLE_ARN";
8686
/// The role session name to use for web identity.
8787
///
8888
/// NOTE: web identity related options are set in the environment when
89-
/// creating an instance of [crate::storage::s3::S3StorageOptions].
89+
/// creating an instance of [S3StorageOptions](crate::storage::S3StorageOptions).
9090
/// See also <https://docs.rs/rusoto_sts/0.47.0/rusoto_sts/struct.WebIdentityProvider.html#method.from_k8s_env>.
9191
pub const AWS_ROLE_SESSION_NAME: &str = "AWS_ROLE_SESSION_NAME";
9292
/// Allow http connections - mainly useful for integration tests
@@ -109,7 +109,7 @@ pub const AWS_FORCE_CREDENTIAL_LOAD: &str = "AWS_FORCE_CREDENTIAL_LOAD";
109109

110110
/// The list of option keys owned by the S3 module.
111111
/// Option keys not contained in this list will be added to the `extra_opts`
112-
/// field of [crate::storage::s3::S3StorageOptions].
112+
/// field of [S3StorageOptions](crate::storage::S3StorageOptions).
113113
#[allow(deprecated)]
114114
pub const S3_OPTS: &[&str] = &[
115115
AWS_ENDPOINT_URL,

crates/aws/src/credentials.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ fn assume_session_name(options: &HashMap<String, String>) -> String {
255255
}
256256

257257
/// Take a set of [StorageOptions] and produce an appropriate AWS SDK [SdkConfig]
258-
/// for use with various AWS SDK APIs, such as in our [crate::logstore::S3DynamoDbLogStore]
258+
/// for use with various AWS SDK APIs, such as in our [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore)
259259
pub async fn resolve_credentials(options: &HashMap<String, String>) -> DeltaResult<SdkConfig> {
260260
let default_provider = DefaultCredentialsChain::builder().build().await;
261261

crates/aws/src/lib.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! AWS S3 and similar tooling for delta-rs
22
//!
3-
//! This module also contains the [S3DynamoDbLogStore] implementation for concurrent writer support
4-
//! with AWS S3 specifically.
3+
//! This module also contains the [S3DynamoDbLogStore](crate::logstore::S3DynamoDbLogStore)
4+
//! implementation for concurrent writer support with AWS S3 specifically.
55
66
pub mod constants;
77
mod credentials;
@@ -10,6 +10,7 @@ pub mod logstore;
1010
#[cfg(feature = "native-tls")]
1111
mod native;
1212
pub mod storage;
13+
1314
use aws_config::Region;
1415
use aws_config::SdkConfig;
1516
pub use aws_credential_types::provider::SharedCredentialsProvider;
@@ -26,8 +27,10 @@ use aws_sdk_dynamodb::{
2627
Client,
2728
};
2829
use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey;
29-
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
30-
use deltalake_core::logstore::{factories, ObjectStoreRef, StorageConfig};
30+
use deltalake_core::logstore::{
31+
default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory,
32+
ObjectStoreRef, StorageConfig,
33+
};
3134
use deltalake_core::{DeltaResult, Path};
3235
use errors::{DynamoDbConfigError, LockClientError};
3336
use regex::Regex;
@@ -83,14 +86,16 @@ impl LogStoreFactory for S3LogStoreFactory {
8386
}
8487
}
8588

86-
/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
89+
/// Register an [ObjectStoreFactory] for common S3 url schemes.
90+
///
91+
/// [ObjectStoreFactory]: deltalake_core::logstore::ObjectStoreFactory
8792
pub fn register_handlers(_additional_prefixes: Option<Url>) {
8893
let object_stores = Arc::new(S3ObjectStoreFactory::default());
8994
let log_stores = Arc::new(S3LogStoreFactory::default());
9095
for scheme in ["s3", "s3a"].iter() {
9196
let url = Url::parse(&format!("{scheme}://")).unwrap();
92-
factories().insert(url.clone(), object_stores.clone());
93-
logstores().insert(url.clone(), log_stores.clone());
97+
object_store_factories().insert(url.clone(), object_stores.clone());
98+
logstore_factories().insert(url.clone(), log_stores.clone());
9499
}
95100
}
96101

crates/aws/src/storage.rs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use tracing::log::*;
2323
use url::Url;
2424

2525
use crate::constants;
26+
use crate::credentials::AWSForObjectStore;
2627
use crate::errors::DynamoDbConfigError;
2728

2829
const STORE_NAME: &str = "DeltaS3ObjectStore";
@@ -42,30 +43,28 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
4243
let options = self.with_env_s3(storage_options);
4344

4445
// All S3-likes should start their builder the same way
45-
let mut builder = AmazonS3Builder::new().with_url(url.to_string());
46-
46+
let mut builder = AmazonS3Builder::new()
47+
.with_url(url.to_string())
48+
.with_retry(retry.clone());
4749
for (key, value) in options.iter() {
4850
if let Ok(key) = AmazonS3ConfigKey::from_str(&key.to_ascii_lowercase()) {
4951
builder = builder.with_config(key, value.clone());
5052
}
5153
}
5254

55+
let s3_options = S3StorageOptions::from_map(&options)?;
56+
if let Some(ref sdk_config) = s3_options.sdk_config {
57+
builder =
58+
builder.with_credentials(Arc::new(AWSForObjectStore::new(sdk_config.clone())));
59+
}
60+
5361
let (_, path) =
5462
ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
5563
source: Box::new(e),
5664
})?;
5765
let prefix = Path::parse(path)?;
5866

59-
let s3_options: S3StorageOptions = S3StorageOptions::from_map(&options)?;
60-
61-
if let Some(ref sdk_config) = s3_options.sdk_config {
62-
builder = builder.with_credentials(Arc::new(
63-
crate::credentials::AWSForObjectStore::new(sdk_config.clone()),
64-
));
65-
}
66-
67-
let inner = builder.with_retry(retry.clone()).build()?;
68-
let store = aws_storage_handler(inner, &s3_options)?;
67+
let store = aws_storage_handler(builder.build()?, &s3_options)?;
6968
debug!("Initialized the object store: {store:?}");
7069

7170
Ok((store, prefix))
@@ -154,7 +153,7 @@ impl PartialEq for S3StorageOptions {
154153
}
155154

156155
impl S3StorageOptions {
157-
/// Creates an instance of S3StorageOptions from the given HashMap.
156+
/// Creates an instance of [`S3StorageOptions`] from the given HashMap.
158157
pub fn from_map(options: &HashMap<String, String>) -> DeltaResult<S3StorageOptions> {
159158
let extra_opts: HashMap<String, String> = options
160159
.iter()
@@ -407,7 +406,7 @@ impl ObjectStore for S3StorageBackend {
407406
}
408407
}
409408

410-
/// Storage option keys to use when creating [crate::storage::s3::S3StorageOptions].
409+
/// Storage option keys to use when creating [`S3StorageOptions`].
411410
///
412411
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable.
413412
/// Provided keys may include configuration for the S3 backend and also the optional DynamoDb lock used for atomic rename.

crates/aws/tests/repair_s3_rename_test.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
use bytes::Bytes;
44
use deltalake_aws::storage::S3StorageBackend;
55
use deltalake_core::logstore::object_store::{
6-
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, MultipartId,
7-
ObjectMeta, PutOptions, PutResult, Result as ObjectStoreResult,
6+
DynObjectStore, Error as ObjectStoreError, GetOptions, GetResult, ListResult, ObjectMeta,
7+
PutOptions, PutResult, Result as ObjectStoreResult,
88
};
99
use deltalake_core::{DeltaTableBuilder, ObjectStore, Path};
1010
use deltalake_test::utils::IntegrationContext;
@@ -13,7 +13,6 @@ use object_store::{MultipartUpload, PutMultipartOpts, PutPayload};
1313
use serial_test::serial;
1414
use std::ops::Range;
1515
use std::sync::{Arc, Mutex};
16-
use tokio::io::AsyncWrite;
1716
use tokio::task::JoinHandle;
1817
use tokio::time::Duration;
1918

crates/azure/src/lib.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use std::collections::HashMap;
22
use std::str::FromStr;
33
use std::sync::Arc;
44

5-
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
6-
use deltalake_core::logstore::{factories, ObjectStoreFactory, ObjectStoreRef, StorageConfig};
5+
use deltalake_core::logstore::{
6+
default_logstore, logstore_factories, object_store_factories, LogStore, LogStoreFactory,
7+
ObjectStoreFactory, ObjectStoreRef, StorageConfig,
8+
};
79
use deltalake_core::{DeltaResult, DeltaTableError, Path};
810
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
911
use object_store::{ObjectStoreScheme, RetryConfig};
@@ -41,18 +43,20 @@ impl ObjectStoreFactory for AzureFactory {
4143
) -> DeltaResult<(ObjectStoreRef, Path)> {
4244
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
4345

46+
let mut builder = MicrosoftAzureBuilder::new()
47+
.with_url(url.to_string())
48+
.with_retry(retry.clone());
49+
for (key, value) in config.iter() {
50+
builder = builder.with_config(*key, value.clone());
51+
}
52+
let store = builder.build()?;
53+
4454
let (_, path) =
4555
ObjectStoreScheme::parse(url).map_err(|e| DeltaTableError::GenericError {
4656
source: Box::new(e),
4757
})?;
4858
let prefix = Path::parse(path)?;
4959

50-
let mut builder = MicrosoftAzureBuilder::new().with_url(url.to_string());
51-
for (key, value) in config.iter() {
52-
builder = builder.with_config(*key, value.clone());
53-
}
54-
let store = builder.with_retry(retry.clone()).build()?;
55-
5660
Ok((Arc::new(store), prefix))
5761
}
5862
}
@@ -73,7 +77,7 @@ pub fn register_handlers(_additional_prefixes: Option<Url>) {
7377
let factory = Arc::new(AzureFactory {});
7478
for scheme in ["az", "adl", "azure", "abfs", "abfss"].iter() {
7579
let url = Url::parse(&format!("{scheme}://")).unwrap();
76-
factories().insert(url.clone(), factory.clone());
77-
logstores().insert(url.clone(), factory.clone());
80+
object_store_factories().insert(url.clone(), factory.clone());
81+
logstore_factories().insert(url.clone(), factory.clone());
7882
}
7983
}

crates/azure/tests/integration.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ async fn test_concurrency_azure() -> TestResult {
4646
// comment](https://github.com/delta-io/delta-rs/pull/1564#issuecomment-1721048753) and we should
4747
// figure out a way to re-enable this test at least in the GitHub Actions CI environment
4848
#[ignore]
49-
#[cfg(feature = "azure")]
5049
#[tokio::test]
5150
#[serial]
5251
async fn test_object_store_onelake() -> TestResult {
@@ -60,7 +59,6 @@ async fn test_object_store_onelake() -> TestResult {
6059
// comment](https://github.com/delta-io/delta-rs/pull/1564#issuecomment-1721048753) and we should
6160
// figure out a way to re-enable this test at least in the GitHub Actions CI environment
6261
#[ignore]
63-
#[cfg(feature = "azure")]
6462
#[tokio::test]
6563
#[serial]
6664
async fn test_object_store_onelake_abfs() -> TestResult {

crates/catalog-unity/src/lib.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ compile_error!(
99

1010
use datafusion_common::DataFusionError;
1111
use deltalake_core::logstore::{
12-
default_logstore, logstores, object_store::RetryConfig, LogStore, LogStoreFactory,
12+
default_logstore, logstore_factories, object_store::RetryConfig, LogStore, LogStoreFactory,
1313
StorageConfig,
1414
};
1515
use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
@@ -35,7 +35,7 @@ use deltalake_core::{
3535

3636
use crate::client::retry::*;
3737
use deltalake_core::logstore::{
38-
factories, str_is_truthy, IORuntime, ObjectStoreFactory, ObjectStoreRef,
38+
object_store_factories, str_is_truthy, IORuntime, ObjectStoreFactory, ObjectStoreRef,
3939
};
4040
pub mod client;
4141
pub mod credential;
@@ -879,10 +879,9 @@ impl LogStoreFactory for UnityCatalogFactory {
879879
/// Register an [ObjectStoreFactory] for common UnityCatalogFactory [Url] schemes
880880
pub fn register_handlers(_additional_prefixes: Option<Url>) {
881881
let factory = Arc::new(UnityCatalogFactory::default());
882-
let scheme = "uc";
883-
let url = Url::parse(&format!("{scheme}://")).unwrap();
884-
factories().insert(url.clone(), factory.clone());
885-
logstores().insert(url.clone(), factory.clone());
882+
let url = Url::parse(&format!("uc://")).unwrap();
883+
object_store_factories().insert(url.clone(), factory.clone());
884+
logstore_factories().insert(url.clone(), factory.clone());
886885
}
887886

888887
#[async_trait::async_trait]

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ repository.workspace = true
1212
rust-version.workspace = true
1313

1414
[package.metadata.docs.rs]
15-
features = ["datafusion", "json", "unity-experimental"]
15+
features = ["datafusion", "json"]
1616

1717
[dependencies]
1818
delta_kernel.workspace = true

crates/core/src/data_catalog/storage/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use futures::TryStreamExt;
1313
use object_store::ObjectStore;
1414

1515
use crate::errors::DeltaResult;
16-
use crate::logstore::{storage::*, StorageConfig};
16+
use crate::logstore::{store_for, StorageConfig};
1717
use crate::open_table_with_storage_options;
1818
use crate::table::builder::ensure_table_uri;
1919

crates/core/src/delta_datafusion/planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! #[derive(Clone)]
77
//! struct MergeMetricExtensionPlanner {}
88
//!
9-
//! #[async_trait]
9+
//! #[macro@async_trait]
1010
//! impl ExtensionPlanner for MergeMetricExtensionPlanner {
1111
//! async fn plan_extension(
1212
//! &self,

crates/core/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
// #![deny(missing_docs)]
6969
#![allow(rustdoc::invalid_html_tags)]
7070
#![allow(clippy::nonminimal_bool)]
71-
7271
pub mod data_catalog;
7372
pub mod errors;
7473
pub mod kernel;
@@ -86,6 +85,7 @@ pub mod delta_datafusion;
8685
pub mod writer;
8786

8887
use std::collections::HashMap;
88+
use std::sync::OnceLock;
8989

9090
pub use self::data_catalog::{DataCatalog, DataCatalogError};
9191
pub use self::errors::*;
@@ -96,14 +96,14 @@ pub use self::table::config::TableProperty;
9696
pub use self::table::DeltaTable;
9797
pub use object_store::{path::Path, Error as ObjectStoreError, ObjectMeta, ObjectStore};
9898
pub use operations::DeltaOps;
99-
use std::sync::OnceLock;
99+
100+
pub use protocol::checkpoints;
100101

101102
// convenience exports for consumers to avoid aligning crate versions
102103
pub use arrow;
103104
#[cfg(feature = "datafusion")]
104105
pub use datafusion;
105106
pub use parquet;
106-
pub use protocol::checkpoints;
107107

108108
/// Creates and loads a DeltaTable from the given path with current metadata.
109109
/// Infers the storage backend to use from the scheme in the given table path.

0 commit comments

Comments
 (0)