Skip to content

Commit

Permalink
Relax inconsistent schema handling in dask_cudf.read_parquet (#17554)
Browse files Browse the repository at this point in the history
Addresses an issue raised offline by @praateekmahajan

Authors:
  - Richard (Rick) Zamora (https://github.com/rjzamora)

Approvers:
  - Tom Augspurger (https://github.com/TomAugspurger)
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #17554
  • Loading branch information
rjzamora authored Feb 26, 2025
1 parent 46b9799 commit 72d5792
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 13 deletions.
18 changes: 6 additions & 12 deletions python/dask_cudf/dask_cudf/_legacy/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,18 +434,12 @@ def set_object_dtypes_from_pa_schema(df, schema):
# pyarrow schema.
if schema:
for col_name, col in df._data.items():
if col_name is None:
# Pyarrow cannot handle `None` as a field name.
# However, this should be a simple range index that
# we can ignore anyway
continue
typ = cudf_dtype_from_pa_type(schema.field(col_name).type)
if (
col_name in schema.names
and not isinstance(typ, (cudf.ListDtype, cudf.StructDtype))
and isinstance(col, cudf.core.column.StringColumn)
):
df._data[col_name] = col.astype(typ)
if col_name in schema.names:
typ = cudf_dtype_from_pa_type(schema.field(col_name).type)
if not isinstance(
typ, (cudf.ListDtype, cudf.StructDtype)
) and isinstance(col, cudf.core.column.StringColumn):
df._data[col_name] = col.astype(typ)


to_parquet = dd.to_parquet
Expand Down
48 changes: 47 additions & 1 deletion python/dask_cudf/dask_cudf/io/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import pytest

import dask
Expand Down Expand Up @@ -486,6 +487,52 @@ def test_create_metadata_file_inconsistent_schema(tmpdir):
dd.assert_eq(ddf1.compute(), ddf2.compute())


@pytest.mark.parametrize("specify_schema", [True, False])
def test_read_inconsistent_schema(tmpdir, specify_schema):
if specify_schema:
# If we specify the expected schema,
# we also need to specify the partitioning.
kwargs = {
"dataset": {
"schema": pa.schema(
[
("id", pa.int64()),
("text", pa.string()),
("meta1", pa.struct([("field1", pa.string())])),
]
),
"partitioning": None,
},
}
else:
kwargs = {}

records = [
{"id": 123, "text": "foo"},
{
"text": "bar",
"meta1": [{"field1": "cat"}],
"id": 456,
},
]
columns = ["text", "id"]
pd.DataFrame(records[:1]).to_parquet(tmpdir / "part.0.parquet")
pd.DataFrame(records[1:]).to_parquet(tmpdir / "part.1.parquet")
# Check that cuDF and Dask cuDF match
dd.assert_eq(
cudf.read_parquet(
tmpdir, columns=columns, allow_mismatched_pq_schemas=True
),
dask_cudf.read_parquet(tmpdir, columns=columns, **kwargs),
check_index=False,
)
# Check that "pandas" and "cudf" backends match
dd.assert_eq(
dd.read_parquet(tmpdir, columns=columns),
dask_cudf.read_parquet(tmpdir, columns=columns, **kwargs),
)


@pytest.mark.parametrize(
"data",
[
Expand Down Expand Up @@ -526,7 +573,6 @@ def test_cudf_list_struct_write(tmpdir):


def test_null_partition(tmpdir):
import pyarrow as pa
from pyarrow.dataset import HivePartitioning

ids = pd.Series([0, 1, None], dtype="Int64")
Expand Down

0 comments on commit 72d5792

Please sign in to comment.