-
Notifications
You must be signed in to change notification settings - Fork 475
feat!: update storage configuration system #3383
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Robert Pack <[email protected]>
Signed-off-by: Robert Pack <[email protected]>
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3383 +/- ##
==========================================
- Coverage 71.97% 71.90% -0.07%
==========================================
Files 145 147 +2
Lines 45774 45848 +74
Branches 45774 45848 +74
==========================================
+ Hits 32944 32966 +22
- Misses 10746 10786 +40
- Partials 2084 2096 +12 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Signed-off-by: Robert Pack <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good, happy to see the simplification of StorageConfig, makes it also easier in the future to extend it!
Just left some minor comments
2e8c4fe
to
dba8d52
Compare
Signed-off-by: Robert Pack <[email protected]>
85e9e93
to
f70d4d8
Compare
Signed-off-by: Robert Pack <[email protected]>
Signed-off-by: Robert Pack <[email protected]>
let prefix = match ObjectStoreScheme::parse(table_root) { | ||
Ok((ObjectStoreScheme::AmazonS3, _)) => Path::parse(table_root.path())?, | ||
Ok((_, path)) => path, | ||
_ => Path::parse(table_root.path())?, | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part is concerning me a bit - why does AWS need special treatment? However as we move to working with URLs, I hope we can create a very narrow code path and convert urls to paths in exactly one way ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems not necessary both return the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed! but if we don't do this, tests start to fail .. i was also wondering if maybe the aws cli behaves differently encoding paths when uploading files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@roeap @ion-elgreco Why do we wrap with a PrefixStore() always? This totally breaks the DeltaTableBuilder:
/// Set the storage backend.
///
/// If a backend is not provided then it is derived from table_uri
.
///
/// # Arguments
///
/// * storage
- A shared reference to an ObjectStore
with
/// "/" pointing at delta table root (i.e. where _delta_log
is located).
/// * location
- A url corresponding to the storagle location of storage
.
pub fn with_storage_backend(mut self, storage: Arc, location: Url) -> Self {
self.storage_backend = Some((storage, location));
self
}
this api is now broken, the url paths will be all wrong since the passed location is now prefixed and not relative to the assumed root of the storage backend
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to prepare for support for absolute paths, but if you are seeing something wrong. Please create a repro :)
pub trait ObjectStoreFactory: Send + Sync { | ||
/// Parse URL options and create an object store instance. | ||
/// | ||
/// The object store instance returned by this method must point at the root of the storage location. | ||
/// Root in this case means scheme, authority/host and maybe port. | ||
/// The path segment is returned as second element of the tuple. It must point at the path | ||
/// corresponding to the path segment of the URL. | ||
/// | ||
/// The store should __NOT__ apply the decorations via the passed `StorageConfig` | ||
fn parse_url_opts( | ||
&self, | ||
url: &Url, | ||
options: &HashMap<String, String>, | ||
retry: &RetryConfig, | ||
) -> DeltaResult<(ObjectStoreRef, Path)>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we document the new expected behavior of this method.
trait LogStoreFactoryExt { | ||
fn with_options_internal( | ||
&self, | ||
store: ObjectStoreRef, | ||
location: &Url, | ||
options: &StorageConfig, | ||
io_runtime: Option<IORuntime>, | ||
) -> DeltaResult<Arc<dyn LogStore>>; | ||
} | ||
|
||
impl<T: LogStoreFactory + ?Sized> LogStoreFactoryExt for T { | ||
fn with_options_internal( | ||
&self, | ||
store: ObjectStoreRef, | ||
location: &Url, | ||
options: &StorageConfig, | ||
io_runtime: Option<IORuntime>, | ||
) -> DeltaResult<Arc<dyn LogStore>> { | ||
let store = options.decorate_store(store, location, io_runtime.map(|r| r.get_handle()))?; | ||
self.with_options(Arc::new(store), location, options) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now we use the internal trait to make sure we are injection ourselves into the flow to wrap our layers around the object store. I feel this can be improved, bit hoping to do that in a follou up where we can have a more targeted discussion around this.
impl LogStoreConfig { | ||
pub fn decorate_store<T: ObjectStore + Clone>( | ||
&self, | ||
store: T, | ||
table_root: Option<&url::Url>, | ||
handle: Option<Handle>, | ||
) -> DeltaResult<Box<dyn ObjectStore>> { | ||
let table_url = table_root.unwrap_or(&self.location); | ||
self.options.decorate_store(store, table_url, handle) | ||
} | ||
|
||
pub fn object_store_factory(&self) -> ObjectStoreFactoryRegistry { | ||
self::factories::object_store_factories() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logstore config is a good way for us to inject functionality into the logstore, since the API dictates log stores also expose it again, but we control the implementation... this should alos gove us a way to just inject a registry taken from - let's say datafusion session - and wire it through.
/// Storage option keys to use when creating [ObjectStore]. | ||
/// | ||
/// The same key should be used whether passing a key in the hashmap or setting it as an environment variable. | ||
/// Must be implemented for a given storage provider | ||
pub mod storage_constants { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i could not find any way we actually inspect the environment to pichk up this variable. that said, wit the updated configuration parsing we should now have a clear way how we can enable much more config via then environment in a manageable way.
use super::DeltaTable; | ||
use crate::errors::{DeltaResult, DeltaTableError}; | ||
use crate::logstore::storage::{factories, IORuntime, StorageOptions}; | ||
use crate::logstore::LogStoreRef; | ||
|
||
#[allow(dead_code)] | ||
#[derive(Debug, thiserror::Error)] | ||
enum BuilderError { | ||
#[error("Store {backend} requires host in storage url, got: {url}")] | ||
MissingHost { backend: String, url: String }, | ||
#[error("Missing configuration {0}")] | ||
Required(String), | ||
#[error("Failed to find valid credential.")] | ||
MissingCredential, | ||
#[error("Failed to decode SAS key: {0}\nSAS keys must be percent-encoded. They come encoded in the Azure portal and Azure Storage Explorer.")] | ||
Decode(String), | ||
#[error("Delta-rs must be build with feature '{feature}' to support url: {url}.")] | ||
MissingFeature { feature: &'static str, url: String }, | ||
#[error("Failed to parse table uri")] | ||
TableUri(#[from] url::ParseError), | ||
} | ||
|
||
impl From<BuilderError> for DeltaTableError { | ||
fn from(err: BuilderError) -> Self { | ||
DeltaTableError::Generic(err.to_string()) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we never used any of these 😄
/// Indicates whether our use case requires tracking tombstones. | ||
/// This defaults to `true` | ||
/// | ||
/// Read-only applications never require tombstones. Tombstones | ||
/// are only required when writing checkpoints, so even many writers | ||
/// may want to skip them. | ||
pub require_tombstones: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we never reference this config ... reading tomstones has been lazy for a while now.
pub fn get_py_storage_backend(&self) -> PyResult<filesystem::DeltaFileSystemHandler> { | ||
Ok(filesystem::DeltaFileSystemHandler { | ||
inner: self.object_store()?, | ||
config: self._config.clone(), | ||
known_sizes: None, | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also a relic ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you resolve these open comments? Then we are good to go
Signed-off-by: Robert Pack <[email protected]>
@ion-elgreco - your comments should be addressed now. |
let path = std::fs::canonicalize(dir)?; | ||
(path, None) | ||
} else { | ||
let tmp_dir = tempfile::tempdir()?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you tested this? I feel like this will still get destroyed before actually access those tmp files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not hooked up yet, the plan is to actually wire this up in the next PR, then I'll also add tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy to also just remove the cache impl from this PR, and put it all in hte next one.a follow up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
went with just removing it ... better then introducing dead code. should hopefully make for a cleaner review in the next one 😄.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah because the docs say, once TempDir goes out of scope, it's deconstructed
Signed-off-by: Robert Pack <[email protected]>
Description
Along with the addition of a caching layer, this PR also attempts to formalise how we handle configuration a bit more. With the caching, we can now configure four layers that can be applied to object stores.
Additionally, we currently need to wrap stores in a
PrefixStore
pointing at the table root.Each of these layers has its own configuration that is parsed from the same global map of configuration keys. The configuration for all of these is now aggregated in a new struct
StorageConfig
.Our previous abstraction
StorageOptions
had over time lost its utility since no implementations lived in the core crate anymore and we were just wrapping aHashMap
.The same configuration is also passed again to
LogStore
andObjectStore
factories where again some parsing happens. With this PR we now present the already parsed configuration (StorageConfig
) which exposes utility methods for integrations to decorate / augment the storages the produce.The new caching store is fairly contained, and I'd be happy to carve that out if it helps reviews.