Skip to content

Commit

Permalink
fix(python,rust): Fix file scan bugs for ipc, csv and parquet that oc…
Browse files Browse the repository at this point in the history
…cur with combinations of glob paths, row indices and predicates (#15065)
  • Loading branch information
mickvangelderen authored Mar 15, 2024
1 parent b9a5603 commit 8c34bcc
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub trait StatsEvaluator {
}

#[cfg(feature = "parquet")]
pub(crate) fn apply_predicate(
pub fn apply_predicate(
df: &mut DataFrame,
predicate: Option<&dyn PhysicalIoExpr>,
parallel: bool,
Expand Down
14 changes: 14 additions & 0 deletions crates/polars-lazy/src/physical_plan/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,17 @@ impl ParquetExec {
}

fn read(&mut self) -> PolarsResult<DataFrame> {
// FIXME: The row index implementation is incorrect when a predicate is
// applied. This code mitigates that by applying the predicate after the
// collection of the entire dataframe if a row index is requested. This is
// inefficient.
let post_predicate = self
.file_options
.row_index
.as_ref()
.and_then(|_| self.predicate.take())
.map(phys_expr_to_io_expr);

let is_cloud = match self.paths.first() {
Some(p) => is_cloud_url(p.as_path()),
None => {
Expand Down Expand Up @@ -352,6 +363,9 @@ impl ParquetExec {
};

let mut out = accumulate_dataframes_vertical(out)?;

polars_io::predicates::apply_predicate(&mut out, post_predicate.as_deref(), true)?;

if self.file_options.rechunk {
out.as_single_chunk_par();
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ impl LazyFileListReader for LazyCsvReader<'_> {
self
}

fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
self.n_rows = n_rows.into();
self
}

fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.row_index = row_index.into();
self
}

fn rechunk(&self) -> bool {
self.rechunk
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/scan/file_list_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ pub trait LazyFileListReader: Clone {
.map(|r| {
let path = r?;
self.clone()
// Each individual reader should not apply a row limit.
.with_n_rows(None)
// Each individual reader should not apply a row index.
.with_row_index(None)
.with_path(path.clone())
.with_rechunk(false)
.finish_no_glob()
Expand Down Expand Up @@ -100,6 +104,12 @@ pub trait LazyFileListReader: Clone {
#[must_use]
fn with_paths(self, paths: Arc<[PathBuf]>) -> Self;

/// Configure the row limit.
fn with_n_rows(self, n_rows: impl Into<Option<usize>>) -> Self;

/// Configure the row index.
fn with_row_index(self, row_index: impl Into<Option<RowIndex>>) -> Self;

/// Rechunk the memory to contiguous chunks when parsing is done.
fn rechunk(&self) -> bool;

Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ impl LazyFileListReader for LazyIpcReader {
self
}

fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
self.args.n_rows = n_rows.into();
self
}

fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.args.row_index = row_index.into();
self
}

fn rechunk(&self) -> bool {
self.args.rechunk
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,16 @@ impl LazyFileListReader for LazyJsonLineReader {
self
}

fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
self.n_rows = n_rows.into();
self
}

fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.row_index = row_index.into();
self
}

fn rechunk(&self) -> bool {
self.rechunk
}
Expand Down
10 changes: 10 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,16 @@ impl LazyFileListReader for LazyParquetReader {
self
}

fn with_n_rows(mut self, n_rows: impl Into<Option<usize>>) -> Self {
self.args.n_rows = n_rows.into();
self
}

fn with_row_index(mut self, row_index: impl Into<Option<RowIndex>>) -> Self {
self.args.row_index = row_index.into();
self
}

fn rechunk(&self) -> bool {
self.args.rechunk
}
Expand Down
137 changes: 125 additions & 12 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,39 @@
from polars.type_aliases import SchemaDict


def _scan(file_path: Path, schema: SchemaDict | None = None) -> pl.LazyFrame:
@dataclass
class _RowIndex:
name: str = "index"
offset: int = 0


def _scan(
file_path: Path,
schema: SchemaDict | None = None,
row_index: _RowIndex | None = None,
) -> pl.LazyFrame:
suffix = file_path.suffix
row_index_name = None if row_index is None else row_index.name
row_index_offset = 0 if row_index is None else row_index.offset
if suffix == ".ipc":
return pl.scan_ipc(file_path)
return pl.scan_ipc(
file_path,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
)
if suffix == ".parquet":
return pl.scan_parquet(file_path)
return pl.scan_parquet(
file_path,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
)
if suffix == ".csv":
return pl.scan_csv(file_path, schema=schema)
return pl.scan_csv(
file_path,
schema=schema,
row_index_name=row_index_name,
row_index_offset=row_index_offset,
)
msg = f"Unknown suffix {suffix}"
raise NotImplementedError(msg)

Expand All @@ -40,10 +65,10 @@ def _write(df: pl.DataFrame, file_path: Path) -> None:

@pytest.fixture(
scope="session",
params=[".csv", ".ipc", ".parquet"],
params=["csv", "ipc", "parquet"],
)
def data_file_extension(request: pytest.FixtureRequest) -> str:
return request.param # type: ignore[no-any-return]
return f".{request.param}"


@pytest.fixture(scope="session")
Expand All @@ -70,13 +95,101 @@ def data_file_single(session_tmp_dir: Path, data_file_extension: str) -> _DataFi
return _DataFile(path=file_path, df=df)


@pytest.fixture(scope="session")
def data_file_glob(session_tmp_dir: Path, data_file_extension: str) -> _DataFile:
row_counts = [
100, 186, 95, 185, 90, 84, 115, 81, 87, 217, 126, 85, 98, 122, 129, 122, 1089, 82,
234, 86, 93, 90, 91, 263, 87, 126, 86, 161, 191, 1368, 403, 192, 102, 98, 115, 81,
111, 305, 92, 534, 431, 150, 90, 128, 152, 118, 127, 124, 229, 368, 81,
] # fmt: skip
assert sum(row_counts) == 10000
assert (
len(row_counts) < 100
) # need to make sure we pad file names with enough zeros, otherwise the lexographical ordering of the file names is not what we want.

df = pl.DataFrame(
{
"seq_int": range(10000),
"seq_str": [str(x) for x in range(10000)],
}
)

row_offset = 0
for index, row_count in enumerate(row_counts):
file_path = (session_tmp_dir / f"data_{index:02}").with_suffix(
data_file_extension
)
_write(df.slice(row_offset, row_count), file_path)
row_offset += row_count
return _DataFile(
path=(session_tmp_dir / "data_*").with_suffix(data_file_extension), df=df
)


@pytest.fixture(scope="session", params=["single", "glob"])
def data_file(
request: pytest.FixtureRequest,
data_file_single: _DataFile,
data_file_glob: _DataFile,
) -> _DataFile:
if request.param == "single":
return data_file_single
if request.param == "glob":
return data_file_glob
raise NotImplementedError()


@pytest.mark.write_disk()
def test_scan(data_file_single: _DataFile) -> None:
df = _scan(data_file_single.path, data_file_single.df.schema).collect()
assert_frame_equal(df, data_file_single.df)
def test_scan(data_file: _DataFile) -> None:
df = _scan(data_file.path, data_file.df.schema).collect()
assert_frame_equal(df, data_file.df)


@pytest.mark.write_disk()
def test_scan_with_limit(data_file: _DataFile) -> None:
df = _scan(data_file.path, data_file.df.schema).limit(100).collect()
assert_frame_equal(
df,
pl.DataFrame(
{
"seq_int": range(100),
"seq_str": [str(x) for x in range(100)],
}
),
)


@pytest.mark.write_disk()
def test_scan_with_limit(data_file_single: _DataFile) -> None:
df = _scan(data_file_single.path, data_file_single.df.schema).limit(100).collect()
assert_frame_equal(df, data_file_single.df.limit(100))
def test_scan_with_row_index(data_file: _DataFile) -> None:
df = _scan(data_file.path, data_file.df.schema, row_index=_RowIndex()).collect()
assert_frame_equal(
df,
pl.DataFrame(
{
"index": range(10000),
"seq_int": range(10000),
"seq_str": [str(x) for x in range(10000)],
},
schema_overrides={"index": pl.UInt32},
),
)


@pytest.mark.write_disk()
def test_scan_with_row_index_and_predicate(data_file: _DataFile) -> None:
df = (
_scan(data_file.path, data_file.df.schema, row_index=_RowIndex())
.filter(pl.col("seq_int") % 2 == 0)
.collect()
)
assert_frame_equal(
df,
pl.DataFrame(
{
"index": [2 * x for x in range(5000)],
"seq_int": [2 * x for x in range(5000)],
"seq_str": [str(2 * x) for x in range(5000)],
},
schema_overrides={"index": pl.UInt32},
),
)

0 comments on commit 8c34bcc

Please sign in to comment.