From a932d1853b38c85946bde38b3244a9f1d4b6affa Mon Sep 17 00:00:00 2001 From: Konstantin Malanchev Date: Wed, 29 May 2024 17:45:38 -0400 Subject: [PATCH 1/2] NestedExtensionArray._pa_array -> _chunked_array --- src/nested_pandas/series/accessor.py | 4 +- src/nested_pandas/series/ext_array.py | 94 ++++++++++---------- tests/nested_pandas/series/test_ext_array.py | 4 +- 3 files changed, 51 insertions(+), 51 deletions(-) diff --git a/src/nested_pandas/series/accessor.py b/src/nested_pandas/series/accessor.py index 4cd67fd..f6253bd 100644 --- a/src/nested_pandas/series/accessor.py +++ b/src/nested_pandas/series/accessor.py @@ -55,7 +55,7 @@ def to_lists(self, fields: list[str] | None = None) -> pd.DataFrame: raise ValueError("Cannot convert a struct with no fields to lists") list_chunks = defaultdict(list) - for chunk in self._series.array._pa_array.iterchunks(): + for chunk in self._series.array._chunked_array.iterchunks(): struct_array = cast(pa.StructArray, chunk) for field in fields: list_array = cast(pa.ListArray, struct_array.field(field)) @@ -94,7 +94,7 @@ def to_flat(self, fields: list[str] | None = None) -> pd.DataFrame: index = pd.Series(self.get_flat_index(), name=self._series.index.name) flat_chunks = defaultdict(list) - for chunk in self._series.array._pa_array.iterchunks(): + for chunk in self._series.array._chunked_array.iterchunks(): struct_array = cast(pa.StructArray, chunk) for field in fields: list_array = cast(pa.ListArray, struct_array.field(field)) diff --git a/src/nested_pandas/series/ext_array.py b/src/nested_pandas/series/ext_array.py index cc968ae..04def7c 100644 --- a/src/nested_pandas/series/ext_array.py +++ b/src/nested_pandas/series/ext_array.py @@ -142,12 +142,12 @@ def __getitem__(self, item): if isinstance(item, np.ndarray): if len(item) == 0: - return type(self)(pa.chunked_array([], type=self._pa_array.type), validate=False) + return type(self)(pa.chunked_array([], type=self._chunked_array.type), validate=False) pa_item = pa.array(item) if item.dtype.kind in "iu": - return type(self)(self._pa_array.take(pa_item), validate=False) + return type(self)(self._chunked_array.take(pa_item), validate=False) if item.dtype.kind == "b": - return type(self)(self._pa_array.filter(pa_item), validate=False) + return type(self)(self._chunked_array.filter(pa_item), validate=False) # It should be covered by check_array_indexer above raise IndexError( "Only integers, slices and integer or " "boolean arrays are valid indices." @@ -159,7 +159,7 @@ def __getitem__(self, item): if item is Ellipsis: item = slice(None) - scalar_or_array = self._pa_array[item] + scalar_or_array = self._chunked_array[item] if isinstance(scalar_or_array, pa.StructScalar): return self._convert_struct_scalar_to_df(scalar_or_array, copy=False) # Logically, it must be a pa.ChunkedArray if it is not a scalar @@ -212,14 +212,14 @@ def __setitem__(self, key, value) -> None: # We cannot use pa.compute.replace_with_mask(), it is not implemented for struct arrays: # https://github.com/apache/arrow/issues/29558 - # self._pa_array = pa.compute.replace_with_mask(self._pa_array, pa_mask, value) - self._pa_array = replace_with_mask(self._pa_array, pa_mask, value) + # self._chunked_array = pa.compute.replace_with_mask(self._chunked_array, pa_mask, value) + self._chunked_array = replace_with_mask(self._chunked_array, pa_mask, value) def __len__(self) -> int: - return len(self._pa_array) + return len(self._chunked_array) def __iter__(self) -> Iterator[pd.DataFrame]: - for value in self._pa_array: + for value in self._chunked_array: yield self._convert_struct_scalar_to_df(value, copy=False) # We do not implement it yet, because pa.compute.equal does not work for struct arrays @@ -255,7 +255,7 @@ def to_numpy(self, dtype: None = None, copy: bool = False, na_value: Any = no_de # Hack with np.empty is the only way to force numpy to create 1-d array of objects result = np.empty(shape=len(self), dtype=object) - for i, value in enumerate(self._pa_array): + for i, value in enumerate(self._chunked_array): result[i] = self._convert_struct_scalar_to_df(value, copy=copy, na_value=na_value) return result @@ -268,22 +268,22 @@ def dtype(self) -> NestedDtype: @property def nbytes(self) -> int: """Number of bytes consumed by the data in memory.""" - return self._pa_array.nbytes + return self._chunked_array.nbytes def isna(self) -> np.ndarray: """Boolean NumPy array indicating if each value is missing.""" # Fast paths adopted from ArrowExtensionArray - null_count = self._pa_array.null_count + null_count = self._chunked_array.null_count if null_count == 0: return np.zeros(len(self), dtype=bool) if null_count == len(self): return np.ones(len(self), dtype=bool) - return self._pa_array.is_null().to_numpy() + return self._chunked_array.is_null().to_numpy() @property def _hasna(self) -> bool: - return self._pa_array.null_count > 0 + return self._chunked_array.null_count > 0 # We do not implement it yet, neither ArrowExtensionArray does for struct arrays def interpolate( @@ -355,9 +355,9 @@ def take( indices_array = np.asanyarray(indices) - if len(self._pa_array) == 0 and (indices_array >= 0).any(): + if len(self._chunked_array) == 0 and (indices_array >= 0).any(): raise IndexError("cannot do a non-empty take from empty array") - if indices_array.size > 0 and indices_array.max() >= len(self._pa_array): + if indices_array.size > 0 and indices_array.max() >= len(self._chunked_array): raise IndexError("out of bounds value in 'indices'.") if allow_fill: @@ -366,11 +366,11 @@ def take( fill_mask = indices_array < 0 if not fill_mask.any(): # Nothing to fill - return type(self)(self._pa_array.take(indices)) - validate_indices(indices_array, len(self._pa_array)) + return type(self)(self._chunked_array.take(indices)) + validate_indices(indices_array, len(self._chunked_array)) indices_array = pa.array(indices_array, mask=fill_mask) - result = self._pa_array.take(indices_array) + result = self._chunked_array.take(indices_array) if not pa.compute.is_null(fill_value).as_py(): result = pa.compute.if_else(fill_mask, fill_value, result) return type(self)(result) @@ -378,8 +378,8 @@ def take( if (indices_array < 0).any(): # Don't modify in-place indices_array = np.copy(indices_array) - indices_array[indices_array < 0] += len(self._pa_array) - return type(self)(self._pa_array.take(indices_array)) + indices_array[indices_array < 0] += len(self._chunked_array) + return type(self)(self._chunked_array.take(indices_array)) def copy(self) -> Self: # type: ignore[name-defined] # noqa: F821 """Return a copy of the extension array. @@ -387,7 +387,7 @@ def copy(self) -> Self: # type: ignore[name-defined] # noqa: F821 This implementation returns a shallow copy of the extension array, because the underlying PyArrow array is immutable. """ - return type(self)(self._pa_array) + return type(self)(self._chunked_array) def _formatter(self, boxed: bool = False) -> Callable[[Any], str | None]: # TODO: make formatted strings more pretty @@ -405,7 +405,7 @@ def box_formatter(value): @classmethod def _concat_same_type(cls, to_concat: Sequence[Self]) -> Self: # type: ignore[name-defined] # noqa: F821 - chunks = [chunk for array in to_concat for chunk in array._pa_array.iterchunks()] + chunks = [chunk for array in to_concat for chunk in array._chunked_array.iterchunks()] pa_array = pa.chunked_array(chunks) return cls(pa_array) @@ -425,14 +425,14 @@ def equals(self, other) -> bool: """ if not isinstance(other, type(self)): return False - return self._pa_array == other._pa_array + return self._chunked_array == other._chunked_array def dropna(self) -> Self: """Return a new ExtensionArray with missing values removed. Note that this applies to the top-level struct array, not to the list arrays. """ - return type(self)(pa.compute.drop_null(self._pa_array)) + return type(self)(pa.compute.drop_null(self._chunked_array)) # End of ExtensionArray overrides # @@ -441,8 +441,8 @@ def dropna(self) -> Self: def __arrow_array__(self, type=None): """Convert the extension array to a PyArrow array.""" if type is None: - return self._pa_array - return self._pa_array.cast(type) + return self._chunked_array + return self._chunked_array.cast(type) def __array__(self, dtype=None): """Convert the extension array to a numpy array.""" @@ -451,12 +451,12 @@ def __array__(self, dtype=None): # Adopted from ArrowExtensionArray def __getstate__(self): state = self.__dict__.copy() - state["_pa_array"] = self._pa_array.combine_chunks() + state["_chunked_array"] = self._chunked_array.combine_chunks() return state # Adopted from ArrowExtensionArray def __setstate__(self, state): - state["_pa_array"] = pa.chunked_array(state["_pa_array"]) + state["_chunked_array"] = pa.chunked_array(state["_chunked_array"]) self.__dict__.update(state) # End of Additional magic methods # @@ -478,7 +478,7 @@ def _box_pa_scalar(cls, value, *, pa_type: pa.DataType | None) -> pa.Scalar: def _box_pa_array(cls, value, *, pa_type: pa.DataType | None) -> pa.Array | pa.ChunkedArray: """Convert a value to a PyArrow array with the specified type.""" if isinstance(value, cls): - pa_array = value._pa_array + pa_array = value._chunked_array elif isinstance(value, (pa.Array, pa.ChunkedArray)): pa_array = value else: @@ -520,7 +520,7 @@ def _convert_struct_scalar_to_df(cls, value: pa.StructScalar, *, copy: bool, na_ d = {name: pd.Series(list_scalar.values, copy=copy) for name, list_scalar in value.items()} return pd.DataFrame(d, copy=False) - _pa_array: pa.ChunkedArray + _chunked_array: pa.ChunkedArray _dtype: NestedDtype def __init__(self, values: pa.Array | pa.ChunkedArray, *, validate: bool = True) -> None: @@ -530,7 +530,7 @@ def __init__(self, values: pa.Array | pa.ChunkedArray, *, validate: bool = True) if validate: self._validate(values) - self._pa_array = values + self._chunked_array = values self._dtype = NestedDtype(values.type) @classmethod @@ -589,12 +589,12 @@ def from_arrow_ext_array(cls, array: ArrowExtensionArray) -> Self: # type: igno def to_arrow_ext_array(self) -> ArrowExtensionArray: """Convert the extension array to pandas' ArrowExtensionArray""" - return ArrowExtensionArray(self._pa_array) + return ArrowExtensionArray(self._chunked_array) - def _replace_pa_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> None: + def _replace_chunked_array(self, pa_array: pa.ChunkedArray, *, validate: bool) -> None: if validate: self._validate(pa_array) - self._pa_array = pa_array + self._chunked_array = pa_array self._dtype = NestedDtype(pa_array.chunk(0).type) @property @@ -610,8 +610,8 @@ def list_offsets(self) -> pa.Array: The list offsets of the field arrays. """ # Quick and cheap path for a single chunk - if self._pa_array.num_chunks == 1: - struct_array = cast(pa.StructArray, self._pa_array.chunk(0)) + if self._chunked_array.num_chunks == 1: + struct_array = cast(pa.StructArray, self._chunked_array.chunk(0)) return cast(pa.ListArray, struct_array.field(0)).offsets chunks = [] @@ -619,7 +619,7 @@ def list_offsets(self) -> pa.Array: # It is 0 for the first chunk, and the last offset of the previous chunk for the next chunks, # as a pa.Scalar. chunk_offset: pa.Scalar | int = 0 - for chunk in self._pa_array.iterchunks(): + for chunk in self._chunked_array.iterchunks(): list_array = cast(pa.ListArray, chunk.field(0)) if chunk_offset == 0: offsets = list_array.offsets @@ -632,17 +632,17 @@ def list_offsets(self) -> pa.Array: @property def field_names(self) -> list[str]: """Names of the nested columns""" - return [field.name for field in self._pa_array.chunk(0).type] + return [field.name for field in self._chunked_array.chunk(0).type] @property def flat_length(self) -> int: """Length of the flat arrays""" - return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._pa_array.iterchunks()) + return sum(chunk.field(0).value_lengths().sum().as_py() for chunk in self._chunked_array.iterchunks()) @property def num_chunks(self) -> int: """Number of chunks in underlying pyarrow.ChunkedArray""" - return self._pa_array.num_chunks + return self._chunked_array.num_chunks def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-defined] # noqa: F821 """Get a view of the series with only the specified fields @@ -665,7 +665,7 @@ def view_fields(self, fields: str | list[str]) -> Self: # type: ignore[name-def raise ValueError(f"Some fields are not found, given: {fields}, available: {self.field_names}") chunks = [] - for chunk in self._pa_array.iterchunks(): + for chunk in self._chunked_array.iterchunks(): chunk = cast(pa.StructArray, chunk) struct_dict = {} for field in fields: @@ -703,7 +703,7 @@ def set_flat_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal f"Got: {field}, available: {self.field_names}" ) # Get the current element type of list-array - pa_type = self._pa_array.chunk(0).field(field).type.value_type + pa_type = self._chunked_array.chunk(0).field(field).type.value_type else: pa_type = None @@ -752,7 +752,7 @@ def set_list_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal "If keep_dtype is True, the field must exist in the series. " f"Got: {field}, available: {self.field_names}" ) - pa_type = self._pa_array.chunk(0).field(field).type + pa_type = self._chunked_array.chunk(0).field(field).type else: pa_type = None @@ -772,7 +772,7 @@ def set_list_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal raise ValueError("The length of the list-array must be equal to the length of the series") chunks = [] - for sl, chunk in enumerate_chunks(self._pa_array): + for sl, chunk in enumerate_chunks(self._chunked_array): chunk = cast(pa.StructArray, chunk) # Build a new struct array. We collect all existing fields and add/replace the new one. @@ -785,7 +785,7 @@ def set_list_field(self, field: str, value: ArrayLike, *, keep_dtype: bool = Fal chunks.append(struct_array) chunked_array = pa.chunked_array(chunks) - self._replace_pa_array(chunked_array, validate=True) + self._replace_chunked_array(chunked_array, validate=True) def pop_fields(self, fields: Iterable[str]): """Delete fields from the struct array @@ -806,7 +806,7 @@ def pop_fields(self, fields: Iterable[str]): raise ValueError("Cannot delete all fields") chunks = [] - for chunk in self._pa_array.iterchunks(): + for chunk in self._chunked_array.iterchunks(): chunk = cast(pa.StructArray, chunk) struct_dict = {} for pa_field in chunk.type: @@ -816,4 +816,4 @@ def pop_fields(self, fields: Iterable[str]): chunks.append(struct_array) pa_array = pa.chunked_array(chunks) - self._replace_pa_array(pa_array, validate=False) + self._replace_chunked_array(pa_array, validate=False) diff --git a/tests/nested_pandas/series/test_ext_array.py b/tests/nested_pandas/series/test_ext_array.py index 555362d..136f478 100644 --- a/tests/nested_pandas/series/test_ext_array.py +++ b/tests/nested_pandas/series/test_ext_array.py @@ -204,7 +204,7 @@ def test_from_sequence_with_arrow_array_and_dtype(): type=pa_type, ) - actual = NestedExtensionArray.from_sequence(pa_array, dtype=new_pa_type)._pa_array + actual = NestedExtensionArray.from_sequence(pa_array, dtype=new_pa_type)._chunked_array desired = pa.chunked_array([pa_array.cast(new_pa_type)]) # pyarrow doesn't convert pandas boxed missing values to nulls in nested arrays assert actual == desired @@ -525,7 +525,7 @@ def test___setitem___series_of_dfs(): ) desired = NestedExtensionArray(desired_struct_array) - assert ext_array._pa_array == desired._pa_array + assert ext_array._chunked_array == desired._chunked_array assert ext_array.equals(desired) From d1938262e4ad81fc5afeee2334b559523c28127c Mon Sep 17 00:00:00 2001 From: Konstantin Malanchev Date: Wed, 29 May 2024 17:31:45 -0400 Subject: [PATCH 2/2] NestedExtensionArray.chunked_array --- src/nested_pandas/series/ext_array.py | 5 +++++ tests/nested_pandas/series/test_ext_array.py | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/src/nested_pandas/series/ext_array.py b/src/nested_pandas/series/ext_array.py index 04def7c..388e786 100644 --- a/src/nested_pandas/series/ext_array.py +++ b/src/nested_pandas/series/ext_array.py @@ -558,6 +558,11 @@ def _pyarrow_dtype(self) -> pa.DataType: """PyArrow data type of the extension array""" return self._dtype.pyarrow_dtype + @property + def chunked_array(self) -> pa.ChunkedArray: + """The underlying PyArrow ChunkedArray""" + return self._chunked_array + @staticmethod def _validate(array: pa.ChunkedArray) -> None: """Raises ValueError if the input array is not a struct array with all fields being diff --git a/tests/nested_pandas/series/test_ext_array.py b/tests/nested_pandas/series/test_ext_array.py index 136f478..c44732c 100644 --- a/tests/nested_pandas/series/test_ext_array.py +++ b/tests/nested_pandas/series/test_ext_array.py @@ -204,7 +204,7 @@ def test_from_sequence_with_arrow_array_and_dtype(): type=pa_type, ) - actual = NestedExtensionArray.from_sequence(pa_array, dtype=new_pa_type)._chunked_array + actual = NestedExtensionArray.from_sequence(pa_array, dtype=new_pa_type).chunked_array desired = pa.chunked_array([pa_array.cast(new_pa_type)]) # pyarrow doesn't convert pandas boxed missing values to nulls in nested arrays assert actual == desired @@ -525,7 +525,7 @@ def test___setitem___series_of_dfs(): ) desired = NestedExtensionArray(desired_struct_array) - assert ext_array._chunked_array == desired._chunked_array + assert ext_array.chunked_array == desired.chunked_array assert ext_array.equals(desired) @@ -588,6 +588,21 @@ def test_series_built_raises(data): _array = NestedExtensionArray(pa_array) +def test_chunked_array(): + """Test that the .chunked_array property is correct.""" + struct_array = pa.StructArray.from_arrays( + arrays=[ + pa.array([np.array([1, 2, 3]), np.array([1, 2, 1])]), + pa.array([-np.array([4.0, 5.0, 6.0]), -np.array([3.0, 4.0, 5.0])]), + ], + names=["a", "b"], + ) + ext_array = NestedExtensionArray(struct_array) + + # pyarrow returns a single bool for == + assert ext_array.chunked_array == pa.chunked_array(struct_array) + + def test_list_offsets_single_chunk(): """Test that the .list_offset property is correct for a single chunk.""" struct_array = pa.StructArray.from_arrays(