Skip to content

Commit

Permalink
feat: support STRUCT data type with Series.struct.field to extract …
Browse files Browse the repository at this point in the history
…child fields (googleapis#71)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Fixes #<issue_number_goes_here> 🦕
  • Loading branch information
tswast authored Oct 3, 2023
1 parent 8f3b5b2 commit 17afac9
Show file tree
Hide file tree
Showing 15 changed files with 318 additions and 8 deletions.
3 changes: 3 additions & 0 deletions .kokoro/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ cd "${PROJECT_ROOT}"
# Disable buffering, so that the logs stream through.
export PYTHONUNBUFFERED=1

# Workaround https://github.com/pytest-dev/pytest/issues/9567
export PY_IGNORE_IMPORTMISMATCH=1

# Debug: show build environment
env | grep KOKORO

Expand Down
10 changes: 9 additions & 1 deletion bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,15 @@ def __init__(
columns=columns, # type:ignore
dtype=dtype, # type:ignore
)
if pd_dataframe.size < MAX_INLINE_DF_SIZE:
if (
pd_dataframe.size < MAX_INLINE_DF_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pandas.ArrowDtype)
)
):
self._block = blocks.block_from_local(
pd_dataframe, session or bigframes.pandas.get_global_session()
)
Expand Down
63 changes: 59 additions & 4 deletions bigframes/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@

BIDIRECTIONAL_MAPPINGS: Iterable[Tuple[IbisDtype, Dtype]] = (
(ibis_dtypes.boolean, pd.BooleanDtype()),
(ibis_dtypes.date, pd.ArrowDtype(pa.date32())),
(ibis_dtypes.float64, pd.Float64Dtype()),
(ibis_dtypes.int64, pd.Int64Dtype()),
(ibis_dtypes.string, pd.StringDtype(storage="pyarrow")),
(ibis_dtypes.date, pd.ArrowDtype(pa.date32())),
(ibis_dtypes.time, pd.ArrowDtype(pa.time64("us"))),
(ibis_dtypes.Timestamp(timezone=None), pd.ArrowDtype(pa.timestamp("us"))),
(
Expand All @@ -100,6 +100,19 @@
pandas: ibis for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}

IBIS_TO_ARROW: Dict[ibis_dtypes.DataType, pa.DataType] = {
ibis_dtypes.boolean: pa.bool_(),
ibis_dtypes.date: pa.date32(),
ibis_dtypes.float64: pa.float64(),
ibis_dtypes.int64: pa.int64(),
ibis_dtypes.string: pa.string(),
ibis_dtypes.time: pa.time64("us"),
ibis_dtypes.Timestamp(timezone=None): pa.timestamp("us"),
ibis_dtypes.Timestamp(timezone="UTC"): pa.timestamp("us", tz="UTC"),
}

ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}

IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = {
ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS
}
Expand Down Expand Up @@ -148,11 +161,12 @@ def ibis_dtype_to_bigframes_dtype(
# Special cases: Ibis supports variations on these types, but currently
# our IO returns them as objects. Eventually, we should support them as
# ArrowDType (and update the IO accordingly)
if isinstance(ibis_dtype, ibis_dtypes.Array) or isinstance(
ibis_dtype, ibis_dtypes.Struct
):
if isinstance(ibis_dtype, ibis_dtypes.Array):
return np.dtype("O")

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))

if ibis_dtype in IBIS_TO_BIGFRAMES:
return IBIS_TO_BIGFRAMES[ibis_dtype]
elif isinstance(ibis_dtype, ibis_dtypes.Null):
Expand All @@ -164,6 +178,26 @@ def ibis_dtype_to_bigframes_dtype(
)


def ibis_dtype_to_arrow_dtype(ibis_dtype: ibis_dtypes.DataType) -> pa.DataType:
if isinstance(ibis_dtype, ibis_dtypes.Array):
return pa.list_(ibis_dtype_to_arrow_dtype(ibis_dtype.value_type))

if isinstance(ibis_dtype, ibis_dtypes.Struct):
return pa.struct(
[
(name, ibis_dtype_to_arrow_dtype(dtype))
for name, dtype in ibis_dtype.fields.items()
]
)

if ibis_dtype in IBIS_TO_ARROW:
return IBIS_TO_ARROW[ibis_dtype]
else:
raise ValueError(
f"Unexpected Ibis data type {ibis_dtype}. {constants.FEEDBACK_LINK}"
)


def ibis_value_to_canonical_type(value: ibis_types.Value) -> ibis_types.Value:
"""Converts an Ibis expression to canonical type.
Expand All @@ -187,6 +221,24 @@ def ibis_table_to_canonical_types(table: ibis_types.Table) -> ibis_types.Table:
return table.select(*casted_columns)


def arrow_dtype_to_ibis_dtype(arrow_dtype: pa.DataType) -> ibis_dtypes.DataType:
if pa.types.is_struct(arrow_dtype):
struct_dtype = typing.cast(pa.StructType, arrow_dtype)
return ibis_dtypes.Struct.from_tuples(
[
(field.name, arrow_dtype_to_ibis_dtype(field.type))
for field in struct_dtype
]
)

if arrow_dtype in ARROW_TO_IBIS:
return ARROW_TO_IBIS[arrow_dtype]
else:
raise ValueError(
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
)


def bigframes_dtype_to_ibis_dtype(
bigframes_dtype: Union[DtypeString, Dtype, np.dtype[Any]]
) -> ibis_dtypes.DataType:
Expand All @@ -202,6 +254,9 @@ def bigframes_dtype_to_ibis_dtype(
Raises:
ValueError: If passed a dtype not supported by BigQuery DataFrames.
"""
if isinstance(bigframes_dtype, pd.ArrowDtype):
return arrow_dtype_to_ibis_dtype(bigframes_dtype.pyarrow_dtype)

type_string = str(bigframes_dtype)
if type_string in BIGFRAMES_STRING_TO_BIGFRAMES:
bigframes_dtype = BIGFRAMES_STRING_TO_BIGFRAMES[
Expand Down
10 changes: 9 additions & 1 deletion bigframes/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ def __init__(
if pd_series.name is None:
# to_frame will set default numeric column label if unnamed, but we do not support int column label, so must rename
pd_dataframe = pd_dataframe.set_axis(["unnamed_col"], axis=1)
if pd_dataframe.size < MAX_INLINE_SERIES_SIZE:
if (
pd_dataframe.size < MAX_INLINE_SERIES_SIZE
# TODO(swast): Workaround data types limitation in inline data.
and not any(
dt.pyarrow_dtype
for dt in pd_dataframe.dtypes
if isinstance(dt, pd.ArrowDtype)
)
):
self._block = blocks.block_from_local(
pd_dataframe, session or bigframes.pandas.get_global_session()
)
Expand Down
61 changes: 61 additions & 0 deletions bigframes/operations/structs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import typing

import ibis.expr.types as ibis_types

import bigframes.dataframe
import bigframes.operations
import bigframes.operations.base
import bigframes.series
import third_party.bigframes_vendored.pandas.core.arrays.arrow.accessors as vendoracessors


class StructField(bigframes.operations.UnaryOp):
def __init__(self, name_or_index: str | int):
self._name_or_index = name_or_index

def _as_ibis(self, x: ibis_types.Value):
struct_value = typing.cast(ibis_types.StructValue, x)
if isinstance(self._name_or_index, str):
name = self._name_or_index
else:
name = struct_value.names[self._name_or_index]
return struct_value[name].name(name)


class StructAccessor(
bigframes.operations.base.SeriesMethods, vendoracessors.StructAccessor
):
__doc__ = vendoracessors.StructAccessor.__doc__

def field(self, name_or_index: str | int) -> bigframes.series.Series:
series = self._apply_unary_op(StructField(name_or_index))
if isinstance(name_or_index, str):
name = name_or_index
else:
struct_field = self._dtype.pyarrow_dtype[name_or_index]
name = struct_field.name
return series.rename(name)

def explode(self) -> bigframes.dataframe.DataFrame:
import bigframes.pandas

pa_type = self._dtype.pyarrow_dtype
return bigframes.pandas.concat(
[self.field(i) for i in range(pa_type.num_fields)], axis="columns"
)
5 changes: 5 additions & 0 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import bigframes.operations.base
import bigframes.operations.datetimes as dt
import bigframes.operations.strings as strings
import bigframes.operations.structs as structs
import third_party.bigframes_vendored.pandas.core.series as vendored_pandas_series

LevelType = typing.Union[str, int]
Expand Down Expand Up @@ -118,6 +119,10 @@ def query_job(self) -> Optional[bigquery.QueryJob]:
self._set_internal_query_job(self._compute_dry_run())
return self._query_job

@property
def struct(self) -> structs.StructAccessor:
return structs.StructAccessor(self._block)

def _set_internal_query_job(self, query_job: bigquery.QueryJob):
self._query_job = query_job

Expand Down
2 changes: 1 addition & 1 deletion noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def doctest(session: nox.sessions.Session):
run_system(
session=session,
prefix_name="doctest",
extra_pytest_options=("--doctest-modules",),
extra_pytest_options=("--doctest-modules", "third_party"),
test_folder="bigframes",
check_cov=True,
)
Expand Down
14 changes: 13 additions & 1 deletion tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -917,7 +917,19 @@ def test_get_dtypes_array_struct(session):
dtypes = df.dtypes
pd.testing.assert_series_equal(
dtypes,
pd.Series({"array_column": np.dtype("O"), "struct_column": np.dtype("O")}),
pd.Series(
{
"array_column": np.dtype("O"),
"struct_column": pd.ArrowDtype(
pa.struct(
[
("string_field", pa.string()),
("float_field", pa.float64()),
]
)
),
}
),
)


Expand Down
64 changes: 64 additions & 0 deletions tests/unit/test_dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,70 @@ def test_ibis_float32_raises_unexpected_datatype():
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(ibis_dtypes.float32)


IBIS_ARROW_DTYPES = (
(ibis_dtypes.boolean, pa.bool_()),
(ibis_dtypes.date, pa.date32()),
(ibis_dtypes.Timestamp(), pa.timestamp("us")),
(ibis_dtypes.float64, pa.float64()),
(
ibis_dtypes.Timestamp(timezone="UTC"),
pa.timestamp("us", tz="UTC"),
),
(
ibis_dtypes.Struct.from_tuples(
[
("name", ibis_dtypes.string()),
("version", ibis_dtypes.int64()),
]
),
pa.struct(
[
("name", pa.string()),
("version", pa.int64()),
]
),
),
(
ibis_dtypes.Struct.from_tuples(
[
(
"nested",
ibis_dtypes.Struct.from_tuples(
[
("field", ibis_dtypes.string()),
]
),
),
]
),
pa.struct(
[
(
"nested",
pa.struct(
[
("field", pa.string()),
]
),
),
]
),
),
)


@pytest.mark.parametrize(("ibis_dtype", "arrow_dtype"), IBIS_ARROW_DTYPES)
def test_arrow_dtype_to_ibis_dtype(ibis_dtype, arrow_dtype):
result = bigframes.dtypes.arrow_dtype_to_ibis_dtype(arrow_dtype)
assert result == ibis_dtype


@pytest.mark.parametrize(("ibis_dtype", "arrow_dtype"), IBIS_ARROW_DTYPES)
def test_ibis_dtype_to_arrow_dtype(ibis_dtype, arrow_dtype):
result = bigframes.dtypes.ibis_dtype_to_arrow_dtype(ibis_dtype)
assert result == arrow_dtype


@pytest.mark.parametrize(
["bigframes_dtype", "ibis_dtype"],
[
Expand Down
Empty file.
Empty file.
Loading

0 comments on commit 17afac9

Please sign in to comment.