Skip to content

Commit ba80e15

Browse files
roeaprtyler
authored andcommitted
fix: PR feedback
Signed-off-by: Robert Pack <[email protected]>
1 parent aa4f112 commit ba80e15

File tree

3 files changed

+57
-20
lines changed

3 files changed

+57
-20
lines changed

crates/core/src/logstore/config.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
//! Configuration for the Delta Log Store.
2+
//!
3+
//! This module manages the various pieces of configuration for the Delta Log Store.
4+
//! It provides methods for parsing and updating configuration settings. All configuration
5+
//! is parsed from String -> String mappings.
6+
//!
7+
//! Specific pieces of configuration must implement the `TryUpdateKey` trait which
8+
//! defines how to update internal fields based on key-value pairs.
19
use std::collections::HashMap;
210

311
use ::object_store::RetryConfig;
@@ -42,10 +50,15 @@ where
4250
}
4351
}
4452

53+
/// Generic container for parsing configuration
4554
pub(super) struct ParseResult<T> {
55+
/// Parsed configuration
4656
pub config: T,
57+
/// Unrecognized key value pairs.
4758
pub unparsed: HashMap<String, String>,
59+
/// Errors encountered during parsing
4860
pub errors: Vec<(String, String)>,
61+
/// Whether the configuration is defaults only - i.e. no custom values were provided
4962
pub is_default: bool,
5063
}
5164

@@ -112,8 +125,15 @@ pub struct StorageConfig {
112125
/// Configuration to limit the number of concurrent requests to the object store.
113126
pub limit: Option<LimitConfig>,
114127

128+
/// Properties that are not recognized by the storage configuration.
129+
///
130+
/// These properties are ignored by the storage configuration and can be used for custom purposes.
115131
pub unknown_properties: HashMap<String, String>,
116132

133+
/// Original unprocessed properties.
134+
///
135+
/// Since we remove properties during processing, but downstream integrations may
136+
/// use them for their own purposes, we keep a copy of the original properties.
117137
pub raw: HashMap<String, String>,
118138
}
119139

@@ -202,6 +222,15 @@ impl StorageConfig {
202222
self.raw.iter()
203223
}
204224

225+
/// Parse options into a StorageConfig.
226+
///
227+
/// This method will raise if it cannot parse a value. StorageConfig can also
228+
/// be constructed from an iterator of key-value pairs which will ignore any
229+
/// parsing errors.
230+
///
231+
/// # Raises
232+
///
233+
/// Raises a `DeltaError` if any of the options are invalid - i.e. cannot be parsed into target type.
205234
pub fn parse_options<K, V, I>(options: I) -> DeltaResult<Self>
206235
where
207236
I: IntoIterator<Item = (K, V)>,

crates/core/src/logstore/storage/cache.rs

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -99,23 +99,19 @@ impl config::TryUpdateKey for LogCacheConfig {
9999

100100
async fn get_default_cache(
101101
config: LogCacheConfig,
102-
) -> DeltaResult<(DeltaLogCache, Option<TempDir>), CachedStoreError> {
103-
let dir = tempfile::tempdir()?;
104-
105-
Ok((
106-
Arc::new(
107-
HybridCacheBuilder::new()
108-
.memory(config.max_size_memory_mb * 1024 * 1024)
109-
.storage(Engine::Large)
110-
.with_device_options(
111-
DirectFsDeviceOptions::new(dir.path())
112-
.with_capacity(config.max_size_disk_mb * 1024 * 1024),
113-
)
114-
.build()
115-
.await
116-
.map_err(|_| CachedStoreError::CacheInitialization)?,
117-
),
118-
Some(dir),
102+
cache_dir: std::path::PathBuf,
103+
) -> DeltaResult<DeltaLogCache, CachedStoreError> {
104+
Ok(Arc::new(
105+
HybridCacheBuilder::new()
106+
.memory(config.max_size_memory_mb * 1024 * 1024)
107+
.storage(Engine::Large)
108+
.with_device_options(
109+
DirectFsDeviceOptions::new(cache_dir.as_path())
110+
.with_capacity(config.max_size_disk_mb * 1024 * 1024),
111+
)
112+
.build()
113+
.await
114+
.map_err(|_| CachedStoreError::CacheInitialization)?,
119115
))
120116
}
121117

@@ -181,7 +177,7 @@ impl std::fmt::Display for CommitCacheObjectStore {
181177
}
182178
}
183179

184-
fn cache_delta(path: &Url) -> bool {
180+
fn is_delta_commit(path: &Url) -> bool {
185181
let Ok(Some(log_path)) = ParsedLogPath::try_from(path.clone()) else {
186182
return false;
187183
};
@@ -212,7 +208,14 @@ impl CommitCacheObjectStore {
212208
root_url: url::Url,
213209
config: LogCacheConfig,
214210
) -> DeltaResult<Self> {
215-
let (cache, cache_dir) = get_default_cache(config).await?;
211+
let (path, cache_dir) = if let Some(dir) = &config.cache_directory {
212+
let path = std::fs::canonicalize(dir)?;
213+
(path, None)
214+
} else {
215+
let tmp_dir = tempfile::tempdir()?;
216+
(tmp_dir.path().to_path_buf(), Some(tmp_dir))
217+
};
218+
let cache = get_default_cache(config, path).await?;
216219
Ok(Self::new(inner, root_url, cache, cache_dir))
217220
}
218221

@@ -231,7 +234,7 @@ impl CommitCacheObjectStore {
231234
) -> ObjectStoreResult<GetResult> {
232235
let cache_key = self.cache_key(location)?;
233236

234-
if options.range.is_some() || !cache_delta(&cache_key) || options.head {
237+
if options.range.is_some() || !is_delta_commit(&cache_key) || options.head {
235238
return self.inner.get_opts(location, options).await;
236239
}
237240

crates/core/src/logstore/storage/runtime.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,15 @@ fn io_rt(config: Option<&RuntimeConfig>) -> &Runtime {
6868
/// Configuration for Tokio runtime
6969
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
7070
pub struct RuntimeConfig {
71+
/// Whether to use a multi-threaded runtime
7172
pub(crate) multi_threaded: Option<bool>,
73+
/// Number of worker threads to use
7274
pub(crate) worker_threads: Option<usize>,
75+
/// Name of the thread
7376
pub(crate) thread_name: Option<String>,
77+
/// Whether to enable IO
7478
pub(crate) enable_io: Option<bool>,
79+
/// Whether to enable time
7580
pub(crate) enable_time: Option<bool>,
7681
}
7782

0 commit comments

Comments
 (0)