diff --git a/crates/polars-io/src/predicates.rs b/crates/polars-io/src/predicates.rs index 48aec098702a..7c3dca6b654e 100644 --- a/crates/polars-io/src/predicates.rs +++ b/crates/polars-io/src/predicates.rs @@ -20,7 +20,7 @@ pub trait StatsEvaluator { } #[cfg(feature = "parquet")] -pub(crate) fn apply_predicate( +pub fn apply_predicate( df: &mut DataFrame, predicate: Option<&dyn PhysicalIoExpr>, parallel: bool, diff --git a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs index 9a15ff1584f2..ef5d7f8da914 100644 --- a/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs +++ b/crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs @@ -312,6 +312,17 @@ impl ParquetExec { } fn read(&mut self) -> PolarsResult { + // FIXME: The row index implementation is incorrect when a predicate is + // applied. This code mitigates that by applying the predicate after the + // collection of the entire dataframe if a row index is requested. This is + // inefficient. + let post_predicate = self + .file_options + .row_index + .as_ref() + .and_then(|_| self.predicate.take()) + .map(phys_expr_to_io_expr); + let is_cloud = match self.paths.first() { Some(p) => is_cloud_url(p.as_path()), None => { @@ -352,6 +363,9 @@ impl ParquetExec { }; let mut out = accumulate_dataframes_vertical(out)?; + + polars_io::predicates::apply_predicate(&mut out, post_predicate.as_deref(), true)?; + if self.file_options.rechunk { out.as_single_chunk_par(); } diff --git a/crates/polars-lazy/src/scan/csv.rs b/crates/polars-lazy/src/scan/csv.rs index 89e47a684698..3f6c3665f35a 100644 --- a/crates/polars-lazy/src/scan/csv.rs +++ b/crates/polars-lazy/src/scan/csv.rs @@ -331,6 +331,16 @@ impl LazyFileListReader for LazyCsvReader<'_> { self } + fn with_n_rows(mut self, n_rows: impl Into>) -> Self { + self.n_rows = n_rows.into(); + self + } + + fn with_row_index(mut self, row_index: impl Into>) -> Self { + self.row_index = row_index.into(); + self + } + fn rechunk(&self) -> bool { self.rechunk } diff --git a/crates/polars-lazy/src/scan/file_list_reader.rs b/crates/polars-lazy/src/scan/file_list_reader.rs index a7172ce9b74c..8d7942cd9afe 100644 --- a/crates/polars-lazy/src/scan/file_list_reader.rs +++ b/crates/polars-lazy/src/scan/file_list_reader.rs @@ -40,6 +40,10 @@ pub trait LazyFileListReader: Clone { .map(|r| { let path = r?; self.clone() + // Each individual reader should not apply a row limit. + .with_n_rows(None) + // Each individual reader should not apply a row index. + .with_row_index(None) .with_path(path.clone()) .with_rechunk(false) .finish_no_glob() @@ -100,6 +104,12 @@ pub trait LazyFileListReader: Clone { #[must_use] fn with_paths(self, paths: Arc<[PathBuf]>) -> Self; + /// Configure the row limit. + fn with_n_rows(self, n_rows: impl Into>) -> Self; + + /// Configure the row index. + fn with_row_index(self, row_index: impl Into>) -> Self; + /// Rechunk the memory to contiguous chunks when parsing is done. fn rechunk(&self) -> bool; diff --git a/crates/polars-lazy/src/scan/ipc.rs b/crates/polars-lazy/src/scan/ipc.rs index 0cfcb5333a2c..a849bec3d1c8 100644 --- a/crates/polars-lazy/src/scan/ipc.rs +++ b/crates/polars-lazy/src/scan/ipc.rs @@ -96,6 +96,16 @@ impl LazyFileListReader for LazyIpcReader { self } + fn with_n_rows(mut self, n_rows: impl Into>) -> Self { + self.args.n_rows = n_rows.into(); + self + } + + fn with_row_index(mut self, row_index: impl Into>) -> Self { + self.args.row_index = row_index.into(); + self + } + fn rechunk(&self) -> bool { self.args.rechunk } diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index f64cfc5e4efd..e2b2691e4e08 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -123,6 +123,16 @@ impl LazyFileListReader for LazyJsonLineReader { self } + fn with_n_rows(mut self, n_rows: impl Into>) -> Self { + self.n_rows = n_rows.into(); + self + } + + fn with_row_index(mut self, row_index: impl Into>) -> Self { + self.row_index = row_index.into(); + self + } + fn rechunk(&self) -> bool { self.rechunk } diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index 927a1c2f77ea..aa0dc47b9ca5 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -115,6 +115,16 @@ impl LazyFileListReader for LazyParquetReader { self } + fn with_n_rows(mut self, n_rows: impl Into>) -> Self { + self.args.n_rows = n_rows.into(); + self + } + + fn with_row_index(mut self, row_index: impl Into>) -> Self { + self.args.row_index = row_index.into(); + self + } + fn rechunk(&self) -> bool { self.args.rechunk } diff --git a/py-polars/tests/unit/io/test_scan.py b/py-polars/tests/unit/io/test_scan.py index 5a4e7126a33e..1cccd07f67c1 100644 --- a/py-polars/tests/unit/io/test_scan.py +++ b/py-polars/tests/unit/io/test_scan.py @@ -14,14 +14,39 @@ from polars.type_aliases import SchemaDict -def _scan(file_path: Path, schema: SchemaDict | None = None) -> pl.LazyFrame: +@dataclass +class _RowIndex: + name: str = "index" + offset: int = 0 + + +def _scan( + file_path: Path, + schema: SchemaDict | None = None, + row_index: _RowIndex | None = None, +) -> pl.LazyFrame: suffix = file_path.suffix + row_index_name = None if row_index is None else row_index.name + row_index_offset = 0 if row_index is None else row_index.offset if suffix == ".ipc": - return pl.scan_ipc(file_path) + return pl.scan_ipc( + file_path, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + ) if suffix == ".parquet": - return pl.scan_parquet(file_path) + return pl.scan_parquet( + file_path, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + ) if suffix == ".csv": - return pl.scan_csv(file_path, schema=schema) + return pl.scan_csv( + file_path, + schema=schema, + row_index_name=row_index_name, + row_index_offset=row_index_offset, + ) msg = f"Unknown suffix {suffix}" raise NotImplementedError(msg) @@ -40,10 +65,10 @@ def _write(df: pl.DataFrame, file_path: Path) -> None: @pytest.fixture( scope="session", - params=[".csv", ".ipc", ".parquet"], + params=["csv", "ipc", "parquet"], ) def data_file_extension(request: pytest.FixtureRequest) -> str: - return request.param # type: ignore[no-any-return] + return f".{request.param}" @pytest.fixture(scope="session") @@ -70,13 +95,101 @@ def data_file_single(session_tmp_dir: Path, data_file_extension: str) -> _DataFi return _DataFile(path=file_path, df=df) +@pytest.fixture(scope="session") +def data_file_glob(session_tmp_dir: Path, data_file_extension: str) -> _DataFile: + row_counts = [ + 100, 186, 95, 185, 90, 84, 115, 81, 87, 217, 126, 85, 98, 122, 129, 122, 1089, 82, + 234, 86, 93, 90, 91, 263, 87, 126, 86, 161, 191, 1368, 403, 192, 102, 98, 115, 81, + 111, 305, 92, 534, 431, 150, 90, 128, 152, 118, 127, 124, 229, 368, 81, + ] # fmt: skip + assert sum(row_counts) == 10000 + assert ( + len(row_counts) < 100 + ) # need to make sure we pad file names with enough zeros, otherwise the lexographical ordering of the file names is not what we want. + + df = pl.DataFrame( + { + "seq_int": range(10000), + "seq_str": [str(x) for x in range(10000)], + } + ) + + row_offset = 0 + for index, row_count in enumerate(row_counts): + file_path = (session_tmp_dir / f"data_{index:02}").with_suffix( + data_file_extension + ) + _write(df.slice(row_offset, row_count), file_path) + row_offset += row_count + return _DataFile( + path=(session_tmp_dir / "data_*").with_suffix(data_file_extension), df=df + ) + + +@pytest.fixture(scope="session", params=["single", "glob"]) +def data_file( + request: pytest.FixtureRequest, + data_file_single: _DataFile, + data_file_glob: _DataFile, +) -> _DataFile: + if request.param == "single": + return data_file_single + if request.param == "glob": + return data_file_glob + raise NotImplementedError() + + @pytest.mark.write_disk() -def test_scan(data_file_single: _DataFile) -> None: - df = _scan(data_file_single.path, data_file_single.df.schema).collect() - assert_frame_equal(df, data_file_single.df) +def test_scan(data_file: _DataFile) -> None: + df = _scan(data_file.path, data_file.df.schema).collect() + assert_frame_equal(df, data_file.df) + + +@pytest.mark.write_disk() +def test_scan_with_limit(data_file: _DataFile) -> None: + df = _scan(data_file.path, data_file.df.schema).limit(100).collect() + assert_frame_equal( + df, + pl.DataFrame( + { + "seq_int": range(100), + "seq_str": [str(x) for x in range(100)], + } + ), + ) @pytest.mark.write_disk() -def test_scan_with_limit(data_file_single: _DataFile) -> None: - df = _scan(data_file_single.path, data_file_single.df.schema).limit(100).collect() - assert_frame_equal(df, data_file_single.df.limit(100)) +def test_scan_with_row_index(data_file: _DataFile) -> None: + df = _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()).collect() + assert_frame_equal( + df, + pl.DataFrame( + { + "index": range(10000), + "seq_int": range(10000), + "seq_str": [str(x) for x in range(10000)], + }, + schema_overrides={"index": pl.UInt32}, + ), + ) + + +@pytest.mark.write_disk() +def test_scan_with_row_index_and_predicate(data_file: _DataFile) -> None: + df = ( + _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()) + .filter(pl.col("seq_int") % 2 == 0) + .collect() + ) + assert_frame_equal( + df, + pl.DataFrame( + { + "index": [2 * x for x in range(5000)], + "seq_int": [2 * x for x in range(5000)], + "seq_str": [str(2 * x) for x in range(5000)], + }, + schema_overrides={"index": pl.UInt32}, + ), + )