Skip to content

Commit 5566b67

Browse files
committed
feat: centrally apply object store layers
Signed-off-by: Robert Pack <[email protected]>
1 parent 5a04abe commit 5566b67

File tree

26 files changed

+307
-320
lines changed

26 files changed

+307
-320
lines changed

crates/aws/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use aws_sdk_dynamodb::{
2727
};
2828
use deltalake_core::logstore::object_store::aws::AmazonS3ConfigKey;
2929
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
30-
use deltalake_core::logstore::{factories, url_prefix_handler, ObjectStoreRef, StorageConfig};
30+
use deltalake_core::logstore::{factories, ObjectStoreRef, StorageConfig};
3131
use deltalake_core::{DeltaResult, Path};
3232
use errors::{DynamoDbConfigError, LockClientError};
3333
use regex::Regex;
@@ -55,7 +55,6 @@ impl LogStoreFactory for S3LogStoreFactory {
5555
location: &Url,
5656
options: &StorageConfig,
5757
) -> DeltaResult<Arc<dyn LogStore>> {
58-
let store = url_prefix_handler(store, Path::parse(location.path())?);
5958
let s3_options = self.with_env_s3(&options.raw.clone().into());
6059
if s3_options.keys().any(|key| {
6160
let key = key.to_ascii_lowercase();

crates/aws/src/storage.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
//! AWS S3 storage backend.
2+
use std::collections::HashMap;
3+
use std::fmt::Debug;
4+
use std::ops::Range;
5+
use std::str::FromStr;
6+
use std::sync::Arc;
7+
use std::time::Duration;
28

39
use aws_config::{Region, SdkConfig};
410
use bytes::Bytes;
@@ -7,18 +13,12 @@ use deltalake_core::logstore::object_store::{
713
GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, ObjectStoreScheme,
814
PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
915
};
10-
use deltalake_core::logstore::{
11-
limit_store_handler, str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageConfig,
12-
};
16+
use deltalake_core::logstore::{str_is_truthy, ObjectStoreFactory, ObjectStoreRef};
1317
use deltalake_core::{DeltaResult, DeltaTableError, ObjectStoreError, Path};
1418
use futures::stream::BoxStream;
1519
use futures::Future;
16-
use std::collections::HashMap;
17-
use std::fmt::Debug;
18-
use std::ops::Range;
19-
use std::str::FromStr;
20-
use std::sync::Arc;
21-
use std::time::Duration;
20+
use object_store::aws::AmazonS3;
21+
use object_store::RetryConfig;
2222
use tracing::log::*;
2323
use url::Url;
2424

@@ -36,9 +36,10 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
3636
fn parse_url_opts(
3737
&self,
3838
url: &Url,
39-
storage_options: &StorageConfig,
39+
storage_options: &HashMap<String, String>,
40+
retry: &RetryConfig,
4041
) -> DeltaResult<(ObjectStoreRef, Path)> {
41-
let options = self.with_env_s3(&storage_options.raw);
42+
let options = self.with_env_s3(storage_options);
4243

4344
// All S3-likes should start their builder the same way
4445
let mut builder = AmazonS3Builder::new().with_url(url.to_string());
@@ -63,31 +64,30 @@ impl ObjectStoreFactory for S3ObjectStoreFactory {
6364
));
6465
}
6566

66-
let inner = builder.with_retry(storage_options.retry.clone()).build()?;
67-
let limit = storage_options.limit.clone().unwrap_or_default();
68-
let store = aws_storage_handler(limit_store_handler(inner, &limit), &s3_options)?;
67+
let inner = builder.with_retry(retry.clone()).build()?;
68+
let store = aws_storage_handler(inner, &s3_options)?;
6969
debug!("Initialized the object store: {store:?}");
7070

7171
Ok((store, prefix))
7272
}
7373
}
7474

7575
fn aws_storage_handler(
76-
store: ObjectStoreRef,
76+
store: AmazonS3,
7777
s3_options: &S3StorageOptions,
7878
) -> DeltaResult<ObjectStoreRef> {
7979
// Nearly all S3 Object stores support conditional put, so we change the default to always returning an S3 Object store
8080
// unless explicitly passing a locking provider key or allow_unsafe_rename. Then we will pass it to the old S3StorageBackend.
8181
if s3_options.locking_provider.as_deref() == Some("dynamodb") || s3_options.allow_unsafe_rename
8282
{
8383
let store = S3StorageBackend::try_new(
84-
store,
84+
Arc::new(store),
8585
Some("dynamodb") == s3_options.locking_provider.as_deref()
8686
|| s3_options.allow_unsafe_rename,
8787
)?;
8888
Ok(Arc::new(store))
8989
} else {
90-
Ok(store)
90+
Ok(Arc::new(store))
9191
}
9292
}
9393

crates/aws/tests/integration_s3_dynamodb.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ async fn test_repair_commit_entry() -> TestResult<()> {
165165
let options: StorageConfig = OPTIONS.clone().into_iter().collect();
166166
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
167167
ensure_table_uri(table.table_uri())?,
168-
options.clone(),
168+
&options,
169169
&S3_OPTIONS,
170170
std::sync::Arc::new(table.object_store()),
171171
)?;
@@ -240,7 +240,7 @@ async fn test_abort_commit_entry() -> TestResult<()> {
240240
let options: StorageConfig = OPTIONS.clone().into_iter().collect();
241241
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
242242
ensure_table_uri(table.table_uri())?,
243-
options.clone(),
243+
&options,
244244
&S3_OPTIONS,
245245
std::sync::Arc::new(table.object_store()),
246246
)?;
@@ -287,7 +287,7 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
287287
let options: StorageConfig = OPTIONS.clone().into_iter().collect();
288288
let log_store: S3DynamoDbLogStore = S3DynamoDbLogStore::try_new(
289289
ensure_table_uri(table.table_uri())?,
290-
options.clone(),
290+
&options,
291291
&S3_OPTIONS,
292292
std::sync::Arc::new(table.object_store()),
293293
)?;

crates/azure/src/lib.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@ use std::str::FromStr;
33
use std::sync::Arc;
44

55
use deltalake_core::logstore::{default_logstore, logstores, LogStore, LogStoreFactory};
6-
use deltalake_core::logstore::{
7-
factories, limit_store_handler, url_prefix_handler, ObjectStoreFactory, ObjectStoreRef,
8-
StorageConfig,
9-
};
6+
use deltalake_core::logstore::{factories, ObjectStoreFactory, ObjectStoreRef, StorageConfig};
107
use deltalake_core::{DeltaResult, DeltaTableError, Path};
118
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
12-
use object_store::ObjectStoreScheme;
9+
use object_store::{ObjectStoreScheme, RetryConfig};
1310
use url::Url;
1411

1512
mod config;
@@ -19,10 +16,9 @@ trait AzureOptions {
1916
fn as_azure_options(&self) -> HashMap<AzureConfigKey, String>;
2017
}
2118

22-
impl AzureOptions for StorageConfig {
19+
impl AzureOptions for HashMap<String, String> {
2320
fn as_azure_options(&self) -> HashMap<AzureConfigKey, String> {
24-
self.raw
25-
.iter()
21+
self.iter()
2622
.filter_map(|(key, value)| {
2723
Some((
2824
AzureConfigKey::from_str(&key.to_ascii_lowercase()).ok()?,
@@ -40,7 +36,8 @@ impl ObjectStoreFactory for AzureFactory {
4036
fn parse_url_opts(
4137
&self,
4238
url: &Url,
43-
options: &StorageConfig,
39+
options: &HashMap<String, String>,
40+
retry: &RetryConfig,
4441
) -> DeltaResult<(ObjectStoreRef, Path)> {
4542
let config = config::AzureConfigHelper::try_new(options.as_azure_options())?.build()?;
4643

@@ -51,15 +48,12 @@ impl ObjectStoreFactory for AzureFactory {
5148
let prefix = Path::parse(path)?;
5249

5350
let mut builder = MicrosoftAzureBuilder::new().with_url(url.to_string());
54-
5551
for (key, value) in config.iter() {
5652
builder = builder.with_config(*key, value.clone());
5753
}
54+
let store = builder.with_retry(retry.clone()).build()?;
5855

59-
let inner = builder.with_retry(options.retry.clone()).build()?;
60-
let limit = options.limit.clone().unwrap_or_default();
61-
let store = limit_store_handler(url_prefix_handler(inner, prefix.clone()), &limit);
62-
Ok((store, prefix))
56+
Ok((Arc::new(store), prefix))
6357
}
6458
}
6559

crates/catalog-unity/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ compile_error!(
99

1010
use datafusion_common::DataFusionError;
1111
use deltalake_core::logstore::{
12-
default_logstore, logstores, LogStore, LogStoreFactory, StorageConfig,
12+
default_logstore, logstores, object_store::RetryConfig, LogStore, LogStoreFactory,
13+
StorageConfig,
1314
};
1415
use reqwest::header::{HeaderValue, InvalidHeaderValue, AUTHORIZATION};
1516
use reqwest::Url;
@@ -840,23 +841,25 @@ impl ObjectStoreFactory for UnityCatalogFactory {
840841
fn parse_url_opts(
841842
&self,
842843
table_uri: &Url,
843-
options: &StorageConfig,
844+
options: &HashMap<String, String>,
845+
_retry: &RetryConfig,
844846
) -> DeltaResult<(ObjectStoreRef, Path)> {
845847
let (table_path, temp_creds) = UnityCatalogBuilder::execute_uc_future(
846848
UnityCatalogBuilder::get_uc_location_and_token(table_uri.as_str()),
847849
)??;
848850

849-
let mut storage_options = options.raw.clone();
851+
let mut storage_options = options.clone();
850852
storage_options.extend(temp_creds);
851853

854+
// TODO(roeap): we should not have to go through the table here.
855+
// ideally we just create the right storage ...
852856
let mut builder =
853857
DeltaTableBuilder::from_uri(&table_path).with_io_runtime(IORuntime::default());
854858
if !storage_options.is_empty() {
855859
builder = builder.with_storage_options(storage_options.clone());
856860
}
857-
858861
let prefix = Path::parse(table_uri.path())?;
859-
let store = builder.build()?.object_store();
862+
let store = builder.build_storage()?.object_store(None);
860863

861864
Ok((store, prefix))
862865
}

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ arrow-row = { workspace = true }
3030
arrow-schema = { workspace = true, features = ["serde"] }
3131
arrow-select = { workspace = true }
3232
parquet = { workspace = true, features = ["async", "object_store"] }
33+
object_store = { workspace = true, features = ["cloud"] }
3334
pin-project-lite = "^0.2.7"
3435

3536
# datafusion
@@ -87,7 +88,6 @@ itertools = "0.14"
8788
libc = ">=0.2.90, <1"
8889
num-bigint = "0.4"
8990
num-traits = "0.2.15"
90-
object_store = { workspace = true }
9191
parking_lot = "0.12"
9292
percent-encoding = "2"
9393
roaring = "0.10.1"

crates/core/src/data_catalog/storage/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ impl ListingSchemaProvider {
4949
) -> DeltaResult<Self> {
5050
let uri = ensure_table_uri(root_uri)?;
5151
let options = options.unwrap_or_default();
52-
// We already parsed the url, so unwrapping is safe.
5352
let store = store_for(&uri, &options)?;
5453
Ok(Self {
5554
authority: uri.to_string(),

crates/core/src/logstore/config.rs

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::collections::HashMap;
22

3-
#[cfg(feature = "cloud")]
43
use ::object_store::RetryConfig;
4+
use object_store::{path::Path, prefix::PrefixStore, ObjectStore};
5+
use tokio::runtime::Handle;
56

67
#[cfg(feature = "delta-cache")]
78
use super::storage::cache::LogCacheConfig;
@@ -104,16 +105,56 @@ pub struct StorageConfig {
104105
/// Configuration to set up a dedicated IO runtime to execute IO related operations.
105106
pub runtime: Option<RuntimeConfig>,
106107

107-
#[cfg(feature = "cloud")]
108108
pub retry: ::object_store::RetryConfig,
109109

110+
/// Limit configuration.
111+
///
112+
/// Configuration to limit the number of concurrent requests to the object store.
110113
pub limit: Option<LimitConfig>,
111114

112115
pub unknown_properties: HashMap<String, String>,
113116

114117
pub raw: HashMap<String, String>,
115118
}
116119

120+
impl StorageConfig {
121+
/// Wrap an object store with additional layers of functionality.
122+
///
123+
/// Depending on the configuration, the following layers may be added:
124+
/// - Retry layer: Adds retry logic to the object store.
125+
/// - Limit layer: Limits the number of concurrent requests to the object store.
126+
/// - Runtime layer: Executes IO related operations on a dedicated runtime.
127+
pub fn decorate_store<T: ObjectStore + Clone>(
128+
&self,
129+
store: T,
130+
table_root: &url::Url,
131+
handle: Option<Handle>,
132+
) -> DeltaResult<Box<dyn ObjectStore>> {
133+
let inner = if let Some(runtime) = &self.runtime {
134+
Box::new(runtime.decorate(store, handle)) as Box<dyn ObjectStore>
135+
} else {
136+
Box::new(store) as Box<dyn ObjectStore>
137+
};
138+
139+
let prefix = if table_root.scheme() == "file" {
140+
Path::from_filesystem_path(
141+
table_root
142+
.to_file_path()
143+
.map_err(|_| DeltaTableError::generic("failed to convert fs"))?,
144+
)?
145+
} else {
146+
Path::from_url_path(table_root.path())?
147+
};
148+
let inner = if prefix != Path::from("/") {
149+
Box::new(PrefixStore::new(inner, prefix)) as Box<dyn ObjectStore>
150+
} else {
151+
Box::new(inner) as Box<dyn ObjectStore>
152+
};
153+
154+
Ok(inner)
155+
}
156+
}
157+
117158
impl<K, V> FromIterator<(K, V)> for StorageConfig
118159
where
119160
K: AsRef<str> + Into<String>,

0 commit comments

Comments
 (0)