diff --git a/crates/polars-io/src/cloud/mod.rs b/crates/polars-io/src/cloud/mod.rs index f22842f23076..6118a4bb9a76 100644 --- a/crates/polars-io/src/cloud/mod.rs +++ b/crates/polars-io/src/cloud/mod.rs @@ -18,80 +18,14 @@ use polars_core::prelude::{polars_bail, PolarsError, PolarsResult}; mod adaptors; #[cfg(feature = "cloud")] mod glob; +#[cfg(feature = "cloud")] +mod object_store_setup; pub mod options; #[cfg(feature = "cloud")] pub use adaptors::*; #[cfg(feature = "cloud")] pub use glob::*; -pub use options::*; - -#[cfg(feature = "cloud")] -type BuildResult = PolarsResult<(CloudLocation, Arc)>; - -#[allow(dead_code)] -#[cfg(feature = "cloud")] -fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { - polars_bail!( - ComputeError: - "feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme, - ); -} -#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))] -fn err_missing_configuration(feature: &str, scheme: &str) -> BuildResult { - polars_bail!( - ComputeError: - "configuration '{}' must be provided in order to use '{}' cloud urls", feature, scheme, - ); -} - -/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store. #[cfg(feature = "cloud")] -pub async fn build_object_store(url: &str, _options: Option<&CloudOptions>) -> BuildResult { - let cloud_location = CloudLocation::new(url)?; - let store = match CloudType::from_str(url)? { - CloudType::File => { - let local = LocalFileSystem::new(); - Ok::<_, PolarsError>(Arc::new(local) as Arc) - }, - CloudType::Aws => { - #[cfg(feature = "aws")] - { - let options = _options - .map(Cow::Borrowed) - .unwrap_or_else(|| Cow::Owned(Default::default())); - let store = options.build_aws(url).await?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) - } - #[cfg(not(feature = "aws"))] - return err_missing_feature("aws", &cloud_location.scheme); - }, - CloudType::Gcp => { - #[cfg(feature = "gcp")] - match _options { - Some(options) => { - let store = options.build_gcp(url)?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) - }, - _ => return err_missing_configuration("gcp", &cloud_location.scheme), - } - #[cfg(not(feature = "gcp"))] - return err_missing_feature("gcp", &cloud_location.scheme); - }, - CloudType::Azure => { - { - #[cfg(feature = "azure")] - match _options { - Some(options) => { - let store = options.build_azure(url)?; - Ok::<_, PolarsError>(Arc::new(store) as Arc) - }, - _ => return err_missing_configuration("azure", &cloud_location.scheme), - } - } - #[cfg(not(feature = "azure"))] - return err_missing_feature("azure", &cloud_location.scheme); - }, - }?; - Ok((cloud_location, store)) -} +pub use object_store_setup::*; +pub use options::*; diff --git a/crates/polars-io/src/cloud/object_store_setup.rs b/crates/polars-io/src/cloud/object_store_setup.rs new file mode 100644 index 000000000000..91119dbbf248 --- /dev/null +++ b/crates/polars-io/src/cloud/object_store_setup.rs @@ -0,0 +1,99 @@ +use once_cell::sync::Lazy; +pub use options::*; +use tokio::sync::RwLock; + +use super::*; + +type CacheKey = (CloudType, Option); + +/// A very simple cache that only stores a single object-store. +/// This greatly reduces the query times as multiple object stores (when reading many small files) +/// get rate limited when querying the DNS (can take up to 5s). +#[allow(clippy::type_complexity)] +static OBJECT_STORE_CACHE: Lazy)>>> = + Lazy::new(Default::default); + +type BuildResult = PolarsResult<(CloudLocation, Arc)>; + +#[allow(dead_code)] +fn err_missing_feature(feature: &str, scheme: &str) -> BuildResult { + polars_bail!( + ComputeError: + "feature '{}' must be enabled in order to use '{}' cloud urls", feature, scheme, + ); +} +#[cfg(any(feature = "azure", feature = "aws", feature = "gcp"))] +fn err_missing_configuration(feature: &str, scheme: &str) -> BuildResult { + polars_bail!( + ComputeError: + "configuration '{}' must be provided in order to use '{}' cloud urls", feature, scheme, + ); +} + +/// Build an [`ObjectStore`] based on the URL and passed in url. Return the cloud location and an implementation of the object store. +pub async fn build_object_store(url: &str, options: Option<&CloudOptions>) -> BuildResult { + let cloud_location = CloudLocation::new(url)?; + + let cloud_type = CloudType::from_str(url)?; + let options = options.cloned(); + let key = (cloud_type, options); + + { + let cache = OBJECT_STORE_CACHE.read().await; + if let Some((stored_key, store)) = cache.as_ref() { + if stored_key == &key { + return Ok((cloud_location, store.clone())); + } + } + } + + let store = match key.0 { + CloudType::File => { + let local = LocalFileSystem::new(); + Ok::<_, PolarsError>(Arc::new(local) as Arc) + }, + CloudType::Aws => { + #[cfg(feature = "aws")] + { + let options = key + .1 + .as_ref() + .map(Cow::Borrowed) + .unwrap_or_else(|| Cow::Owned(Default::default())); + let store = options.build_aws(url).await?; + Ok::<_, PolarsError>(Arc::new(store) as Arc) + } + #[cfg(not(feature = "aws"))] + return err_missing_feature("aws", &cloud_location.scheme); + }, + CloudType::Gcp => { + #[cfg(feature = "gcp")] + match key.1.as_ref() { + Some(options) => { + let store = options.build_gcp(url)?; + Ok::<_, PolarsError>(Arc::new(store) as Arc) + }, + _ => return err_missing_configuration("gcp", &cloud_location.scheme), + } + #[cfg(not(feature = "gcp"))] + return err_missing_feature("gcp", &cloud_location.scheme); + }, + CloudType::Azure => { + { + #[cfg(feature = "azure")] + match key.1.as_ref() { + Some(options) => { + let store = options.build_azure(url)?; + Ok::<_, PolarsError>(Arc::new(store) as Arc) + }, + _ => return err_missing_configuration("azure", &cloud_location.scheme), + } + } + #[cfg(not(feature = "azure"))] + return err_missing_feature("azure", &cloud_location.scheme); + }, + }?; + let mut cache = OBJECT_STORE_CACHE.write().await; + *cache = Some((key, store.clone())); + Ok((cloud_location, store)) +} diff --git a/crates/polars-io/src/cloud/options.rs b/crates/polars-io/src/cloud/options.rs index e052024848ae..6ef88754c405 100644 --- a/crates/polars-io/src/cloud/options.rs +++ b/crates/polars-io/src/cloud/options.rs @@ -75,6 +75,7 @@ where .collect::>>() } +#[derive(PartialEq)] pub enum CloudType { Aws, Azure, diff --git a/crates/polars-io/src/parquet/async_impl.rs b/crates/polars-io/src/parquet/async_impl.rs index b8520f528378..ce5ecb7ce7f4 100644 --- a/crates/polars-io/src/parquet/async_impl.rs +++ b/crates/polars-io/src/parquet/async_impl.rs @@ -79,6 +79,7 @@ impl ParquetObjectStore { let path = self.path.clone(); let length = self.length; let mut reader = CloudReader::new(length, object_store, path); + parquet2_read::read_metadata_async(&mut reader) .await .map_err(to_compute_err) @@ -87,7 +88,6 @@ impl ParquetObjectStore { /// Fetch and memoize the metadata of the parquet file. pub async fn get_metadata(&mut self) -> PolarsResult<&Arc> { if self.metadata.is_none() { - self.initialize_length().await?; self.metadata = Some(Arc::new(self.fetch_metadata().await?)); } Ok(self.metadata.as_ref().unwrap()) diff --git a/crates/polars-io/src/parquet/read_impl.rs b/crates/polars-io/src/parquet/read_impl.rs index 84d4aaf88ba1..66a84fcb114a 100644 --- a/crates/polars-io/src/parquet/read_impl.rs +++ b/crates/polars-io/src/parquet/read_impl.rs @@ -19,8 +19,6 @@ use crate::parquet::async_impl::FetchRowGroupsFromObjectStore; use crate::parquet::mmap::mmap_columns; use crate::parquet::predicates::read_this_row_group; use crate::parquet::{mmap, ParallelStrategy}; -#[cfg(feature = "async")] -use crate::pl_async::get_runtime; use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr}; use crate::utils::{apply_projection, get_reader_bytes}; use crate::RowCount; @@ -111,7 +109,6 @@ fn materialize_hive_partitions( } #[allow(clippy::too_many_arguments)] -// might parallelize over columns fn rg_to_dfs( store: &mmap::ColumnStore, previous_row_count: &mut IdxSize, @@ -126,6 +123,57 @@ fn rg_to_dfs( projection: &[usize], use_statistics: bool, hive_partition_columns: Option<&[Series]>, +) -> PolarsResult> { + if let ParallelStrategy::Columns | ParallelStrategy::None = parallel { + rg_to_dfs_optionally_par_over_columns( + store, + previous_row_count, + row_group_start, + row_group_end, + remaining_rows, + file_metadata, + schema, + predicate, + row_count, + parallel, + projection, + use_statistics, + hive_partition_columns, + ) + } else { + rg_to_dfs_par_over_rg( + store, + row_group_start, + row_group_end, + previous_row_count, + remaining_rows, + file_metadata, + schema, + predicate, + row_count, + projection, + use_statistics, + hive_partition_columns, + ) + } +} + +#[allow(clippy::too_many_arguments)] +// might parallelize over columns +fn rg_to_dfs_optionally_par_over_columns( + store: &mmap::ColumnStore, + previous_row_count: &mut IdxSize, + row_group_start: usize, + row_group_end: usize, + remaining_rows: &mut usize, + file_metadata: &FileMetaData, + schema: &ArrowSchema, + predicate: Option>, + row_count: Option, + parallel: ParallelStrategy, + projection: &[usize], + use_statistics: bool, + hive_partition_columns: Option<&[Series]>, ) -> PolarsResult> { let mut dfs = Vec::with_capacity(row_group_end - row_group_start); @@ -191,7 +239,7 @@ fn rg_to_dfs( #[allow(clippy::too_many_arguments)] // parallelizes over row groups -fn rg_to_dfs_par( +fn rg_to_dfs_par_over_rg( store: &mmap::ColumnStore, row_group_start: usize, row_group_end: usize, @@ -314,39 +362,22 @@ pub fn read_parquet( let reader = ReaderBytes::from(&reader); let bytes = reader.deref(); let store = mmap::ColumnStore::Local(bytes); - let dfs = match parallel { - ParallelStrategy::Columns | ParallelStrategy::None => rg_to_dfs( - &store, - &mut 0, - 0, - n_row_groups, - &mut limit, - &file_metadata, - schema, - predicate, - row_count, - parallel, - &projection, - use_statistics, - hive_partition_columns, - )?, - ParallelStrategy::RowGroups => rg_to_dfs_par( - &store, - 0, - file_metadata.row_groups.len(), - &mut 0, - &mut limit, - &file_metadata, - schema, - predicate, - row_count, - &projection, - use_statistics, - hive_partition_columns, - )?, - // auto should already be replaced by Columns or RowGroups - ParallelStrategy::Auto => unimplemented!(), - }; + + let dfs = rg_to_dfs( + &store, + &mut 0, + 0, + n_row_groups, + &mut limit, + &file_metadata, + schema, + predicate, + row_count, + parallel, + &projection, + use_statistics, + hive_partition_columns, + )?; if dfs.is_empty() { let schema = if let Cow::Borrowed(_) = projection { @@ -424,7 +455,7 @@ pub struct BatchedParquetReader { row_group_fetcher: RowGroupFetcher, limit: usize, projection: Vec, - schema: ArrowSchema, + schema: Arc, metadata: Arc, row_count: Option, rows_read: IdxSize, @@ -449,7 +480,7 @@ impl BatchedParquetReader { use_statistics: bool, hive_partition_columns: Option>, ) -> PolarsResult { - let schema = read::schema::infer_schema(&metadata)?; + let schema = Arc::new(read::schema::infer_schema(&metadata)?); let n_row_groups = metadata.row_groups.len(); let projection = projection.unwrap_or_else(|| (0usize..schema.fields.len()).collect::>()); @@ -488,46 +519,24 @@ impl BatchedParquetReader { .row_group_fetcher .fetch_row_groups(row_group_start..row_group_end) .await?; - let dfs = match self.parallel { - ParallelStrategy::Columns => { - let dfs = rg_to_dfs( - &store, - &mut self.rows_read, - row_group_start, - row_group_end, - &mut self.limit, - &self.metadata, - &self.schema, - None, - self.row_count.clone(), - ParallelStrategy::Columns, - &self.projection, - self.use_statistics, - self.hive_partition_columns.as_deref(), - )?; - self.row_group_offset += n; - dfs - }, - ParallelStrategy::RowGroups => { - let dfs = rg_to_dfs_par( - &store, - self.row_group_offset, - std::cmp::min(self.row_group_offset + n, self.n_row_groups), - &mut self.rows_read, - &mut self.limit, - &self.metadata, - &self.schema, - None, - self.row_count.clone(), - &self.projection, - self.use_statistics, - self.hive_partition_columns.as_deref(), - )?; - self.row_group_offset += n; - dfs - }, - _ => unimplemented!(), - }; + + let dfs = rg_to_dfs( + &store, + &mut self.rows_read, + row_group_start, + row_group_end, + &mut self.limit, + &self.metadata, + &self.schema, + None, + self.row_count.clone(), + self.parallel, + &self.projection, + self.use_statistics, + self.hive_partition_columns.as_deref(), + )?; + + self.row_group_offset += n; // case where there is no data in the file // the streaming engine needs at least a single chunk if self.rows_read == 0 && dfs.is_empty() { @@ -611,12 +620,3 @@ impl BatchedParquetIter { } } } - -#[cfg(feature = "async")] -impl Iterator for BatchedParquetIter { - type Item = PolarsResult; - - fn next(&mut self) -> Option { - get_runtime().block_on(self.next_()) - } -} diff --git a/crates/polars-io/src/pl_async.rs b/crates/polars-io/src/pl_async.rs index 53bbcbfab896..42f56fa1acca 100644 --- a/crates/polars-io/src/pl_async.rs +++ b/crates/polars-io/src/pl_async.rs @@ -1,18 +1,79 @@ +use std::collections::BTreeSet; +use std::future::Future; use std::ops::Deref; +use std::sync::RwLock; use once_cell::sync::Lazy; use polars_core::POOL; use tokio::runtime::{Builder, Runtime}; -static RUNTIME: Lazy = Lazy::new(|| { - Builder::new_multi_thread() - .worker_threads(std::cmp::max(POOL.current_num_threads() / 2, 4)) - .enable_io() - .enable_time() - .build() - .unwrap() -}); +pub struct RuntimeManager { + rt: Runtime, + blocking_rayon_threads: RwLock>, +} + +impl RuntimeManager { + fn new() -> Self { + let rt = Builder::new_multi_thread() + .worker_threads(std::cmp::max(POOL.current_num_threads() / 2, 4)) + .enable_io() + .enable_time() + .build() + .unwrap(); + + Self { + rt, + blocking_rayon_threads: Default::default(), + } + } + + /// Keep track of rayon threads that drive the runtime. Every thread + /// only allows a single runtime. If this thread calls block_on and this + /// rayon thread is already driving an async execution we must start a new thread + /// otherwise we panic. This can happen when we parallelize reads over 100s of files. + pub fn block_on_potential_spawn(&'static self, future: F) -> F::Output + where + F: Future + Send, + F::Output: Send, + { + if let Some(thread_id) = POOL.current_thread_index() { + if self + .blocking_rayon_threads + .read() + .unwrap() + .contains(&thread_id) + { + std::thread::scope(|s| s.spawn(|| self.rt.block_on(future)).join().unwrap()) + } else { + self.blocking_rayon_threads + .write() + .unwrap() + .insert(thread_id); + let out = self.rt.block_on(future); + self.blocking_rayon_threads + .write() + .unwrap() + .remove(&thread_id); + out + } + } + // Assumption that the main thread never runs rayon tasks, so we wouldn't be rescheduled + // on the main thread and thus we can always block. + else { + self.rt.block_on(future) + } + } + + pub fn block_on(&self, future: F) -> F::Output + where + F: Future, + { + self.rt.block_on(future) + } +} + +static RUNTIME: Lazy = Lazy::new(RuntimeManager::new); -pub fn get_runtime() -> &'static Runtime { +pub fn get_runtime() -> &'static RuntimeManager { RUNTIME.deref() } diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 3f83c90f23e5..6a4e6c51e1c7 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -68,7 +68,7 @@ impl ParquetExec { } else if is_cloud_url(self.path.as_path()) { #[cfg(feature = "cloud")] { - polars_io::pl_async::get_runtime().block_on(async { + polars_io::pl_async::get_runtime().block_on_potential_spawn(async { let reader = ParquetAsyncReader::from_uri( &self.path.to_string_lossy(), self.cloud_options.as_ref(),