Skip to content

Commit

Permalink
feat: improve performance and fix panic in async parquet reader (#11607)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Oct 9, 2023
1 parent a16ea64 commit 56a8f93
Show file tree
Hide file tree
Showing 7 changed files with 264 additions and 169 deletions.
74 changes: 4 additions & 70 deletions crates/polars-io/src/cloud/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,80 +18,14 @@ use polars_core::prelude::{polars_bail, PolarsError, PolarsResult};
mod adaptors;
#[cfg(feature = "cloud")]
mod glob;
#[cfg(feature = "cloud")]
mod object_store_setup;
pub mod options;

#[cfg(feature = "cloud")]
pub use adaptors::*;
#[cfg(feature = "cloud")]
pub use glob::*;
pub use options::*;

#[cfg(feature = "cloud")]
type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;

#[allow(dead_code)]
#[cfg(feature = "cloud")]
fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
polars_bail!(
ComputeError:
"feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
);
}
#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
fn err_missing_configuration(feature: &str, scheme: &str) -> BuildResult {
polars_bail!(
ComputeError:
"configuration '{}' must be provided in order to use '{}' cloud urls", feature, scheme,
);
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
#[cfg(feature = "cloud")]
pub async fn build_object_store(url: &str, _options: Option<&CloudOptions>) -> BuildResult {
let cloud_location = CloudLocation::new(url)?;
let store = match CloudType::from_str(url)? {
CloudType::File => {
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Aws => {
#[cfg(feature = "aws")]
{
let options = _options
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));
let store = options.build_aws(url).await?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "aws"))]
return err_missing_feature("aws", &cloud_location.scheme);
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
match _options {
Some(options) => {
let store = options.build_gcp(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
},
_ => return err_missing_configuration("gcp", &cloud_location.scheme),
}
#[cfg(not(feature = "gcp"))]
return err_missing_feature("gcp", &cloud_location.scheme);
},
CloudType::Azure => {
{
#[cfg(feature = "azure")]
match _options {
Some(options) => {
let store = options.build_azure(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
},
_ => return err_missing_configuration("azure", &cloud_location.scheme),
}
}
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &cloud_location.scheme);
},
}?;
Ok((cloud_location, store))
}
pub use object_store_setup::*;
pub use options::*;
99 changes: 99 additions & 0 deletions crates/polars-io/src/cloud/object_store_setup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use once_cell::sync::Lazy;
pub use options::*;
use tokio::sync::RwLock;

use super::*;

type CacheKey = (CloudType, Option<CloudOptions>);

/// A very simple cache that only stores a single object-store.
/// This greatly reduces the query times as multiple object stores (when reading many small files)
/// get rate limited when querying the DNS (can take up to 5s).
#[allow(clippy::type_complexity)]
static OBJECT_STORE_CACHE: Lazy<RwLock<Option<(CacheKey, Arc<dyn ObjectStore>)>>> =
Lazy::new(Default::default);

type BuildResult = PolarsResult<(CloudLocation, Arc<dyn ObjectStore>)>;

#[allow(dead_code)]
fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult {
polars_bail!(
ComputeError:
"feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme,
);
}
#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))]
fn err_missing_configuration(feature: &str, scheme: &str) -> BuildResult {
polars_bail!(
ComputeError:
"configuration '{}' must be provided in order to use '{}' cloud urls", feature, scheme,
);
}

/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store.
pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult {
let cloud_location = CloudLocation::new(url)?;

let cloud_type = CloudType::from_str(url)?;
let options = options.cloned();
let key = (cloud_type, options);

{
let cache = OBJECT_STORE_CACHE.read().await;
if let Some((stored_key, store)) = cache.as_ref() {
if stored_key == &key {
return Ok((cloud_location, store.clone()));
}
}
}

let store = match key.0 {
CloudType::File => {
let local = LocalFileSystem::new();
Ok::<_, PolarsError>(Arc::new(local) as Arc<dyn ObjectStore>)
},
CloudType::Aws => {
#[cfg(feature = "aws")]
{
let options = key
.1
.as_ref()
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned(Default::default()));
let store = options.build_aws(url).await?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
}
#[cfg(not(feature = "aws"))]
return err_missing_feature("aws", &cloud_location.scheme);
},
CloudType::Gcp => {
#[cfg(feature = "gcp")]
match key.1.as_ref() {
Some(options) => {
let store = options.build_gcp(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
},
_ => return err_missing_configuration("gcp", &cloud_location.scheme),
}
#[cfg(not(feature = "gcp"))]
return err_missing_feature("gcp", &cloud_location.scheme);
},
CloudType::Azure => {
{
#[cfg(feature = "azure")]
match key.1.as_ref() {
Some(options) => {
let store = options.build_azure(url)?;
Ok::<_, PolarsError>(Arc::new(store) as Arc<dyn ObjectStore>)
},
_ => return err_missing_configuration("azure", &cloud_location.scheme),
}
}
#[cfg(not(feature = "azure"))]
return err_missing_feature("azure", &cloud_location.scheme);
},
}?;
let mut cache = OBJECT_STORE_CACHE.write().await;
*cache = Some((key, store.clone()));
Ok((cloud_location, store))
}
1 change: 1 addition & 0 deletions crates/polars-io/src/cloud/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ where
.collect::<PolarsResult<Configs<T>>>()
}

#[derive(PartialEq)]
pub enum CloudType {
Aws,
Azure,
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl ParquetObjectStore {
let path = self.path.clone();
let length = self.length;
let mut reader = CloudReader::new(length, object_store, path);

parquet2_read::read_metadata_async(&mut reader)
.await
.map_err(to_compute_err)
Expand All @@ -87,7 +88,6 @@ impl ParquetObjectStore {
/// Fetch and memoize the metadata of the parquet file.
pub async fn get_metadata(&mut self) -> PolarsResult<&Arc<FileMetaData>> {
if self.metadata.is_none() {
self.initialize_length().await?;
self.metadata = Some(Arc::new(self.fetch_metadata().await?));
}
Ok(self.metadata.as_ref().unwrap())
Expand Down
Loading

0 comments on commit 56a8f93

Please sign in to comment.