Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Jun 27, 2024
1 parent 4d2f928 commit d5a82fc
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 33 deletions.
14 changes: 11 additions & 3 deletions crates/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ impl Matcher {
}
}

#[tokio::main(flavor = "current_thread")]
/// List files with a prefix derived from the pattern.
pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
// Find the fixed prefix, up to the first '*'.
Expand All @@ -178,16 +177,25 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu
},
store,
) = super::build_object_store(url, cloud_options).await?;
let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?;
let matcher = Matcher::new(
if scheme == "file" {
// For local paths the returned location has the leading slash stripped.
prefix[1..].to_string()
} else {
prefix.clone()
},
expansion.as_deref(),
)?;

let list_stream = store
.list(Some(&Path::from(prefix)))
.map_err(to_compute_err);
let locations: Vec<Path> = list_stream
let mut locations: Vec<Path> = list_stream
.then(|entry| async { Ok::<_, PolarsError>(entry.map_err(to_compute_err)?.location) })
.filter(|name| ready(name.as_ref().map_or(true, |name| matcher.is_matching(name))))
.try_collect()
.await?;
locations.sort_unstable();
Ok(locations
.into_iter()
.map(|l| full_url(&scheme, &bucket, l))
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,9 @@ impl LazyFileListReader for LazyCsvReader {
};
concat_impl(&lfs, args)
}

/// [CloudOptions] used to list files.
fn cloud_options(&self) -> Option<&CloudOptions> {
self.cloud_options.as_ref()
}
}
125 changes: 99 additions & 26 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::collections::VecDeque;
use std::path::PathBuf;

use polars_core::config;
use polars_core::error::to_compute_err;
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
Expand Down Expand Up @@ -51,48 +52,117 @@ fn expand_paths(
}
};

if is_cloud {
if is_cloud || { cfg!(not(target_family = "windows")) && config::force_async() } {
#[cfg(feature = "async")]
{
use polars_io::cloud::{CloudLocation, PolarsObjectStore};
let format_path = |scheme: &str, bucket: &str, location: &str| {
if is_cloud {
format!("{}://{}/{}", scheme, bucket, location)
} else {
format!("/{}", location)
}
};

fn is_file_cloud(
path: &str,
cloud_options: Option<&CloudOptions>,
) -> PolarsResult<bool> {
let expand_path_cloud = |path: &str,
cloud_options: Option<&CloudOptions>|
-> PolarsResult<(usize, Vec<PathBuf>)> {
polars_io::pl_async::get_runtime().block_on_potential_spawn(async {
let (CloudLocation { prefix, .. }, store) =
let (cloud_location, store) =
polars_io::cloud::build_object_store(path, cloud_options).await?;
let store = PolarsObjectStore::new(store);
PolarsResult::Ok(store.head(&prefix.into()).await.is_ok())

let prefix = cloud_location.prefix.clone().into();

let out = if !path.ends_with("/")
&& cloud_location.expansion.is_none()
&& store.head(&prefix).await.is_ok()
{
(
0,
vec![PathBuf::from(format_path(
&cloud_location.scheme,
&cloud_location.bucket,
&cloud_location.prefix,
))],
)
} else {
use futures::{StreamExt, TryStreamExt};

if !is_cloud {
// FORCE_ASYNC in the test suite wants us to raise a proper error message
// for non-existent file paths. Note we can't do this for cloud paths as
// there is no concept of a "directory" - a non-existent path is
// indistinguishable from an empty directory.
let path = PathBuf::from(path);
if !path.is_dir() {
path.metadata().map_err(|err| {
let msg =
Some(format!("{}: {}", err, path.to_str().unwrap()).into());
PolarsError::IO {
error: err.into(),
msg,
}
})?;
}
}

let mut paths = store
.list(Some(&prefix))
.map(|x| {
x.map(|x| {
PathBuf::from({
format_path(
&cloud_location.scheme,
&cloud_location.bucket,
x.location.as_ref(),
)
})
})
})
.try_collect::<Vec<_>>()
.await
.map_err(to_compute_err)?;

paths.sort_unstable();
(
format_path(
&cloud_location.scheme,
&cloud_location.bucket,
&cloud_location.prefix,
)
.len(),
paths,
)
};

PolarsResult::Ok(out)
})
}
};

for (path_idx, path) in paths.iter().enumerate() {
let glob_start_idx = get_glob_start_idx(path.to_str().unwrap().as_bytes());

let path = if glob_start_idx.is_some() {
path.clone()
} else if !path.ends_with("/")
&& is_file_cloud(path.to_str().unwrap(), cloud_options)?
{
update_expand_start_idx(0, path_idx)?;
out_paths.push(path.clone());
continue;
} else if !glob {
polars_bail!(ComputeError: "not implemented: did not find cloud file at path = {} and `glob` was set to false", path.to_str().unwrap());
} else {
// FIXME: This will fail! See https://github.com/pola-rs/polars/issues/17105
path.join("**/*")
let (expand_start_idx, paths) =
expand_path_cloud(path.to_str().unwrap(), cloud_options)?;
out_paths.extend_from_slice(&paths);
update_expand_start_idx(expand_start_idx, path_idx)?;
continue;
};

update_expand_start_idx(0, path_idx)?;

out_paths.extend(
polars_io::async_glob(path.to_str().unwrap(), cloud_options)?
.into_iter()
.map(PathBuf::from),
);
let iter = polars_io::pl_async::get_runtime().block_on_potential_spawn(
polars_io::async_glob(path.to_str().unwrap(), cloud_options),
)?;

if is_cloud {
out_paths.extend(iter.into_iter().map(PathBuf::from));
} else {
// FORCE_ASYNC, remove leading file:// as not all readers support it.
out_paths.extend(iter.iter().map(|x| &x[7..]).map(PathBuf::from))
}
}
}
#[cfg(not(feature = "async"))]
Expand Down Expand Up @@ -141,7 +211,10 @@ fn expand_paths(
};

for path in paths {
out_paths.push(path.map_err(to_compute_err)?);
let path = path.map_err(to_compute_err)?;
if !path.is_dir() {
out_paths.push(path);
}
}
} else {
update_expand_start_idx(0, path_idx)?;
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ impl LazyFileListReader for LazyIpcReader {
fn row_index(&self) -> Option<&RowIndex> {
self.args.row_index.as_ref()
}

/// [CloudOptions] used to list files.
fn cloud_options(&self) -> Option<&CloudOptions> {
self.args.cloud_options.as_ref()
}
}

impl LazyFrame {
Expand Down
6 changes: 5 additions & 1 deletion crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::{Path, PathBuf};
use polars_core::prelude::*;
use polars_io::cloud::CloudOptions;
use polars_io::parquet::read::ParallelStrategy;
use polars_io::utils::is_cloud_url;
use polars_io::{HiveOptions, RowIndex};

use crate::prelude::*;
Expand Down Expand Up @@ -65,7 +66,10 @@ impl LazyFileListReader for LazyParquetReader {
self.paths.len() == 1
&& get_glob_start_idx(self.paths[0].to_str().unwrap().as_bytes()).is_none()
&& !paths.is_empty()
&& paths[0] != self.paths[0]
&& {
(!is_cloud_url(&paths[0]) && paths[0].is_dir())
|| (paths[0] != self.paths[0])
}
}));
self.args.hive_options.hive_start_idx = hive_start_idx;

Expand Down
6 changes: 4 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,17 @@ pub struct IpcExec {
impl IpcExec {
fn read(&mut self, verbose: bool) -> PolarsResult<DataFrame> {
let is_cloud = self.paths.iter().any(is_cloud_url);
let mut out = if is_cloud || config::force_async() {
let force_async = config::force_async();

let mut out = if is_cloud || force_async {
#[cfg(not(feature = "cloud"))]
{
panic!("activate cloud feature")
}

#[cfg(feature = "cloud")]
{
if !is_cloud && verbose {
if force_async && verbose {
eprintln!("ASYNC READING FORCED");
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ impl ParquetExec {

#[cfg(feature = "cloud")]
{
if !is_cloud && config::verbose() {
if force_async && config::verbose() {
eprintln!("ASYNC READING FORCED");
}

Expand Down
3 changes: 3 additions & 0 deletions py-polars/tests/unit/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ def memory_usage_without_pyarrow() -> Generator[MemoryUsage, Any, Any]:
if not pl.build_info()["compiler"]["debug"]:
pytest.skip("Memory usage only available in debug/dev builds.")

if os.getenv("POLARS_FORCE_ASYNC", "0") == "1":
pytest.skip("Hangs when combined with async glob")

if sys.platform == "win32":
# abi3 wheels don't have the tracemalloc C APIs, which breaks linking
# on Windows.
Expand Down
18 changes: 18 additions & 0 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,3 +471,21 @@ def test_scan_directory(

out = scan(tmp_path).collect()
assert_frame_equal(out, df)


def test_scan_glob_excludes_directories(tmp_path: Path) -> None:
for dir in ["dir1", "dir2", "dir3"]:
(tmp_path / dir).mkdir()

df = pl.DataFrame({"a": [1, 2, 3]})

df.write_parquet(tmp_path / "dir1/data.bin")
df.write_parquet(tmp_path / "dir2/data.parquet")
df.write_parquet(tmp_path / "data.parquet")

assert_frame_equal(pl.scan_parquet(tmp_path / "**/*.bin").collect(), df)
assert_frame_equal(pl.scan_parquet(tmp_path / "**/data*.bin").collect(), df)
assert_frame_equal(
pl.scan_parquet(tmp_path / "**/*").collect(), pl.concat(3 * [df])
)
assert_frame_equal(pl.scan_parquet(tmp_path / "*").collect(), df)

0 comments on commit d5a82fc

Please sign in to comment.