diff --git a/crates/polars-io/src/cloud/glob.rs b/crates/polars-io/src/cloud/glob.rs index 4d40f31c9d65..b61ac58baa0a 100644 --- a/crates/polars-io/src/cloud/glob.rs +++ b/crates/polars-io/src/cloud/glob.rs @@ -164,7 +164,6 @@ impl Matcher { } } -#[tokio::main(flavor = "current_thread")] /// List files with a prefix derived from the pattern. pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult> { // Find the fixed prefix, up to the first '*'. @@ -178,16 +177,25 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu }, store, ) = super::build_object_store(url, cloud_options).await?; - let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?; + let matcher = Matcher::new( + if scheme == "file" { + // For local paths the returned location has the leading slash stripped. + prefix[1..].to_string() + } else { + prefix.clone() + }, + expansion.as_deref(), + )?; let list_stream = store .list(Some(&Path::from(prefix))) .map_err(to_compute_err); - let locations: Vec = list_stream + let mut locations: Vec = list_stream .then(|entry| async { Ok::<_, PolarsError>(entry.map_err(to_compute_err)?.location) }) .filter(|name| ready(name.as_ref().map_or(true, |name| matcher.is_matching(name)))) .try_collect() .await?; + locations.sort_unstable(); Ok(locations .into_iter() .map(|l| full_url(&scheme, &bucket, l)) diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 5d7cc53f7b7e..5ffb79fd1c63 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -332,4 +332,9 @@ impl LazyFileListReader for LazyCsvReader { }; concat_impl(&lfs, args) } + + /// [CloudOptions] used to list files. + fn cloud_options(&self) -> Option<&CloudOptions> { + self.cloud_options.as_ref() + } } diff --git a/crates/polars-lazy/src/scan/file_list_reader.rs b/crates/polars-lazy/src/scan/file_list_reader.rs index cbde453987d9..904716a371b4 100644 --- a/crates/polars-lazy/src/scan/file_list_reader.rs +++ b/crates/polars-lazy/src/scan/file_list_reader.rs @@ -1,6 +1,7 @@ use std::collections::VecDeque; use std::path::PathBuf; +use polars_core::config; use polars_core::error::to_compute_err; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; @@ -51,48 +52,117 @@ fn expand_paths( } }; - if is_cloud { + if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } { #[cfg(feature = "async")] { - use polars_io::cloud::{CloudLocation, PolarsObjectStore}; + let format_path = |scheme: &str, bucket: &str, location: &str| { + if is_cloud { + format!("{}://{}/{}", scheme, bucket, location) + } else { + format!("/{}", location) + } + }; - fn is_file_cloud( - path: &str, - cloud_options: Option<&CloudOptions>, - ) -> PolarsResult { + let expand_path_cloud = |path: &str, + cloud_options: Option<&CloudOptions>| + -> PolarsResult<(usize, Vec)> { polars_io::pl_async::get_runtime().block_on_potential_spawn(async { - let (CloudLocation { prefix, .. }, store) = + let (cloud_location, store) = polars_io::cloud::build_object_store(path, cloud_options).await?; - let store = PolarsObjectStore::new(store); - PolarsResult::Ok(store.head(&prefix.into()).await.is_ok()) + + let prefix = cloud_location.prefix.clone().into(); + + let out = if !path.ends_with("/") + && cloud_location.expansion.is_none() + && store.head(&prefix).await.is_ok() + { + ( + 0, + vec![PathBuf::from(format_path( + &cloud_location.scheme, + &cloud_location.bucket, + &cloud_location.prefix, + ))], + ) + } else { + use futures::{StreamExt, TryStreamExt}; + + if !is_cloud { + // FORCE_ASYNC in the test suite wants us to raise a proper error message + // for non-existent file paths. Note we can't do this for cloud paths as + // there is no concept of a "directory" - a non-existent path is + // indistinguishable from an empty directory. + let path = PathBuf::from(path); + if !path.is_dir() { + path.metadata().map_err(|err| { + let msg = + Some(format!("{}: {}", err, path.to_str().unwrap()).into()); + PolarsError::IO { + error: err.into(), + msg, + } + })?; + } + } + + let mut paths = store + .list(Some(&prefix)) + .map(|x| { + x.map(|x| { + PathBuf::from({ + format_path( + &cloud_location.scheme, + &cloud_location.bucket, + x.location.as_ref(), + ) + }) + }) + }) + .try_collect::>() + .await + .map_err(to_compute_err)?; + + paths.sort_unstable(); + ( + format_path( + &cloud_location.scheme, + &cloud_location.bucket, + &cloud_location.prefix, + ) + .len(), + paths, + ) + }; + + PolarsResult::Ok(out) }) - } + }; for (path_idx, path) in paths.iter().enumerate() { let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes()); let path = if glob_start_idx.is_some() { path.clone() - } else if !path.ends_with("/") - && is_file_cloud(path.to_str().unwrap(), cloud_options)? - { - update_expand_start_idx(0, path_idx)?; - out_paths.push(path.clone()); - continue; - } else if !glob { - polars_bail!(ComputeError: "not implemented: did not find cloud file at path = {} and `glob` was set to false", path.to_str().unwrap()); } else { - // FIXME: This will fail! See https://github.com/pola-rs/polars/issues/17105 - path.join("**/*") + let (expand_start_idx, paths) = + expand_path_cloud(path.to_str().unwrap(), cloud_options)?; + out_paths.extend_from_slice(&paths); + update_expand_start_idx(expand_start_idx, path_idx)?; + continue; }; update_expand_start_idx(0, path_idx)?; - out_paths.extend( - polars_io::async_glob(path.to_str().unwrap(), cloud_options)? - .into_iter() - .map(PathBuf::from), - ); + let iter = polars_io::pl_async::get_runtime().block_on_potential_spawn( + polars_io::async_glob(path.to_str().unwrap(), cloud_options), + )?; + + if is_cloud { + out_paths.extend(iter.into_iter().map(PathBuf::from)); + } else { + // FORCE_ASYNC, remove leading file:// as not all readers support it. + out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from)) + } } } #[cfg(not(feature = "async"))] @@ -141,7 +211,10 @@ fn expand_paths( }; for path in paths { - out_paths.push(path.map_err(to_compute_err)?); + let path = path.map_err(to_compute_err)?; + if !path.is_dir() { + out_paths.push(path); + } } } else { update_expand_start_idx(0, path_idx)?; diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index eb3069e0de82..41a5b7b066de 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -109,6 +109,11 @@ impl LazyFileListReader for LazyIpcReader { fn row_index(&self) -> Option<&RowIndex> { self.args.row_index.as_ref() } + + /// [CloudOptions] used to list files. + fn cloud_options(&self) -> Option<&CloudOptions> { + self.args.cloud_options.as_ref() + } } impl LazyFrame { diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index b1afe34f90fe..c4c503539784 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf}; use polars_core::prelude::*; use polars_io::cloud::CloudOptions; use polars_io::parquet::read::ParallelStrategy; +use polars_io::utils::is_cloud_url; use polars_io::{HiveOptions, RowIndex}; use crate::prelude::*; @@ -65,7 +66,10 @@ impl LazyFileListReader for LazyParquetReader { self.paths.len() == 1 && get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none() && !paths.is_empty() - && paths[0] != self.paths[0] + && { + (!is_cloud_url(&paths[0]) && paths[0].is_dir()) + || (paths[0] != self.paths[0]) + } })); self.args.hive_options.hive_start_idx = hive_start_idx; diff --git a/crates/polars-mem-engine/src/executors/scan/ipc.rs b/crates/polars-mem-engine/src/executors/scan/ipc.rs index eaafe10b8c9b..a1a449afdefc 100644 --- a/crates/polars-mem-engine/src/executors/scan/ipc.rs +++ b/crates/polars-mem-engine/src/executors/scan/ipc.rs @@ -25,7 +25,9 @@ pub struct IpcExec { impl IpcExec { fn read(&mut self, verbose: bool) -> PolarsResult { let is_cloud = self.paths.iter().any(is_cloud_url); - let mut out = if is_cloud || config::force_async() { + let force_async = config::force_async(); + + let mut out = if is_cloud || force_async { #[cfg(not(feature = "cloud"))] { panic!("activate cloud feature") @@ -33,7 +35,7 @@ impl IpcExec { #[cfg(feature = "cloud")] { - if !is_cloud && verbose { + if force_async && verbose { eprintln!("ASYNC READING FORCED"); } diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index 30c2a5a1fc27..89acc3de4fc7 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -338,7 +338,7 @@ impl ParquetExec { #[cfg(feature = "cloud")] { - if !is_cloud && config::verbose() { + if force_async && config::verbose() { eprintln!("ASYNC READING FORCED"); } diff --git a/py-polars/tests/unit/conftest.py b/py-polars/tests/unit/conftest.py index f5f848ef4573..828ad87684cb 100644 --- a/py-polars/tests/unit/conftest.py +++ b/py-polars/tests/unit/conftest.py @@ -212,6 +212,9 @@ def memory_usage_without_pyarrow() -> Generator[MemoryUsage, Any, Any]: if not pl.build_info()["compiler"]["debug"]: pytest.skip("Memory usage only available in debug/dev builds.") + if os.getenv("POLARS_FORCE_ASYNC", "0") == "1": + pytest.skip("Hangs when combined with async glob") + if sys.platform == "win32": # abi3 wheels don't have the tracemalloc C APIs, which breaks linking # on Windows. diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 701ebebc1417..e3966a2ca8d1 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -471,3 +471,21 @@ def test_scan_directory( out = scan(tmp_path).collect() assert_frame_equal(out, df) + + +def test_scan_glob_excludes_directories(tmp_path: Path) -> None: + for dir in ["dir1", "dir2", "dir3"]: + (tmp_path / dir).mkdir() + + df = pl.DataFrame({"a": [1, 2, 3]}) + + df.write_parquet(tmp_path / "dir1/data.bin") + df.write_parquet(tmp_path / "dir2/data.parquet") + df.write_parquet(tmp_path / "data.parquet") + + assert_frame_equal(pl.scan_parquet(tmp_path / "**/*.bin").collect(), df) + assert_frame_equal(pl.scan_parquet(tmp_path / "**/data*.bin").collect(), df) + assert_frame_equal( + pl.scan_parquet(tmp_path / "**/*").collect(), pl.concat(3 * [df]) + ) + assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)