From edccfd01340b7aba490e08272aaf4bc4aa1bca45 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Thu, 30 Mar 2023 15:38:34 +0200 Subject: [PATCH] PERF-#5675: make index calculation for `read_csv` function lazy; introduce `ModinIndex` (#5677) Co-authored-by: Iaroslav Igoshev Co-authored-by: Dmitry Chigarev Signed-off-by: Anatoly Myachev --- .github/workflows/ci-notebooks.yml | 2 +- modin/core/dataframe/algebra/binary.py | 19 +- .../dataframe/pandas/dataframe/dataframe.py | 212 +++++++++++++----- .../dataframe/pandas/metadata/__init__.py | 18 ++ modin/core/dataframe/pandas/metadata/index.py | 144 ++++++++++++ modin/core/io/text/text_file_dispatcher.py | 11 +- .../storage_formats/base/query_compiler.py | 4 +- .../storage_formats/pandas/query_compiler.py | 14 +- .../hdk_on_native/dataframe/dataframe.py | 48 ++-- .../hdk_on_native/dataframe/utils.py | 4 +- .../partitioning/partition_manager.py | 4 +- .../storage_formats/hdk/query_compiler.py | 2 +- modin/pandas/test/dataframe/test_indexing.py | 14 +- modin/pandas/test/dataframe/test_join_sort.py | 8 +- modin/pandas/test/test_groupby.py | 2 +- 15 files changed, 387 insertions(+), 119 deletions(-) create mode 100644 modin/core/dataframe/pandas/metadata/__init__.py create mode 100644 modin/core/dataframe/pandas/metadata/index.py diff --git a/.github/workflows/ci-notebooks.yml b/.github/workflows/ci-notebooks.yml index e515aed5785..8b190a54a78 100644 --- a/.github/workflows/ci-notebooks.yml +++ b/.github/workflows/ci-notebooks.yml @@ -65,7 +65,7 @@ jobs: pip install git+https://github.com/modin-project/modin-spreadsheet.git@49ffd89f683f54c311867d602c55443fb11bf2a5 if: matrix.execution != 'hdk_on_native' # Build Modin from sources for `hdk_on_native` - - run: pip install . + - run: pip install -e . if: matrix.execution == 'hdk_on_native' # install test dependencies # NOTE: If you are changing the set of packages installed here, make sure that diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 8c42c48c782..a5d83fd4add 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -197,7 +197,6 @@ def caller( """ axis = kwargs.get("axis", 0) shape_hint = None - self_columns = query_compiler._modin_frame._columns_cache if isinstance(other, type(query_compiler)): if broadcast: assert ( @@ -211,13 +210,13 @@ def caller( other = other.transpose() if ( - self_columns is not None - and other._modin_frame._columns_cache is not None + query_compiler._modin_frame.has_materialized_columns + and other._modin_frame.has_materialized_columns ): if ( - len(self_columns) == 1 + len(query_compiler.columns) == 1 and len(other.columns) == 1 - and self_columns.equals(other.columns) + and query_compiler.columns.equals(other.columns) ): shape_hint = "column" return query_compiler.__constructor__( @@ -246,11 +245,11 @@ def caller( dtypes = compute_dtypes_common_cast(query_compiler, other) dtypes = dtypes.apply(coerce_int_to_float64) if ( - self_columns is not None - and other._modin_frame._columns_cache is not None + query_compiler._modin_frame.has_materialized_columns + and other._modin_frame.has_materialized_columns ): if ( - len(self_columns) == 1 + len(query_compiler.columns) == 1 and len(other.columns) == 1 and query_compiler.columns.equals(other.columns) ): @@ -277,8 +276,8 @@ def caller( ) else: if ( - self_columns is not None - and len(self_columns) == 1 + query_compiler._modin_frame.has_materialized_columns + and len(query_compiler._modin_frame.columns) == 1 and is_scalar(other) ): shape_hint = "column" diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 8a062926b09..1bf35a1fab8 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -22,7 +22,7 @@ import pandas import datetime from pandas.api.types import is_object_dtype -from pandas.core.indexes.api import ensure_index, Index, RangeIndex +from pandas.core.indexes.api import Index, RangeIndex from pandas.core.dtypes.common import is_numeric_dtype, is_list_like from pandas._libs.lib import no_default from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING @@ -39,6 +39,7 @@ JoinType, ) from modin.core.dataframe.pandas.dataframe.utils import build_sort_functions +from modin.core.dataframe.pandas.metadata import ModinIndex if TYPE_CHECKING: from modin.core.dataframe.base.interchange.dataframe_protocol.dataframe import ( @@ -154,9 +155,11 @@ class PandasDataframe(ClassLogger): ---------- partitions : np.ndarray A 2D NumPy array of partitions. - index : sequence, optional + index : sequence or callable, optional The index for the dataframe. Converted to a ``pandas.Index``. Is computed from partitions on demand if not specified. + If ``callable() -> (pandas.Index, list of row lengths or None)`` type, + then the calculation will be delayed until `self.index` is called. columns : sequence, optional The columns object for the dataframe. Converted to a ``pandas.Index``. Is computed from partitions on demand if not specified. @@ -197,8 +200,8 @@ def __init__( dtypes=None, ): self._partitions = partitions - self._index_cache = ensure_index(index) if index is not None else None - self._columns_cache = ensure_index(columns) if columns is not None else None + self.set_index_cache(index) + self.set_columns_cache(columns) self._row_lengths_cache = row_lengths self._column_widths_cache = column_widths self._dtypes = dtypes @@ -215,8 +218,8 @@ def _validate_axes_lengths(self): num_rows = sum(self._row_lengths_cache) if num_rows > 0: ErrorMessage.catch_bugs_and_request_email( - num_rows != len(self._index_cache), - f"Row lengths: {num_rows} != {len(self._index_cache)}", + num_rows != len(self.index), + f"Row lengths: {num_rows} != {len(self.index)}", ) ErrorMessage.catch_bugs_and_request_email( any(val < 0 for val in self._row_lengths_cache), @@ -229,8 +232,8 @@ def _validate_axes_lengths(self): num_columns = sum(self._column_widths_cache) if num_columns > 0: ErrorMessage.catch_bugs_and_request_email( - num_columns != len(self._columns_cache), - f"Column widths: {num_columns} != {len(self._columns_cache)}", + num_columns != len(self.columns), + f"Column widths: {num_columns} != {len(self.columns)}", ) ErrorMessage.catch_bugs_and_request_email( any(val < 0 for val in self._column_widths_cache), @@ -328,6 +331,104 @@ def dtype_builder(df): _index_cache = None _columns_cache = None + def set_index_cache(self, index): + """ + Set index cache. + + Parameters + ---------- + index : sequence, callable or None + """ + if isinstance(index, ModinIndex) or index is None: + self._index_cache = index + else: + self._index_cache = ModinIndex(index) + + def set_columns_cache(self, columns): + """ + Set columns cache. + + Parameters + ---------- + columns : sequence, callable or None + """ + if isinstance(columns, ModinIndex) or columns is None: + self._columns_cache = columns + else: + self._columns_cache = ModinIndex(columns) + + @property + def has_index_cache(self): + """ + Check if the index cache exists. + + Returns + ------- + bool + """ + return self._index_cache is not None + + def copy_index_cache(self): + """ + Copy the index cache. + + Returns + ------- + pandas.Index, callable or None + If there is an pandas.Index in the cache, then copying occurs. + """ + idx_cache = self._index_cache + if self.has_index_cache: + idx_cache = self._index_cache.copy() + return idx_cache + + @property + def has_columns_cache(self): + """ + Check if the columns cache exists. + + Returns + ------- + bool + """ + return self._columns_cache is not None + + def copy_columns_cache(self): + """ + Copy the columns cache. + + Returns + ------- + pandas.Index or None + If there is an pandas.Index in the cache, then copying occurs. + """ + columns_cache = self._columns_cache + if columns_cache is not None: + columns_cache = columns_cache.copy() + return columns_cache + + @property + def has_materialized_index(self): + """ + Check if dataframe has materialized index cache. + + Returns + ------- + bool + """ + return self.has_index_cache and self._index_cache.is_materialized + + @property + def has_materialized_columns(self): + """ + Check if dataframe has materialized columns cache. + + Returns + ------- + bool + """ + return self.has_columns_cache and self._columns_cache.is_materialized + def _validate_set_axis(self, new_labels, old_labels): """ Validate the possibility of replacement of old labels with the new labels. @@ -344,7 +445,11 @@ def _validate_set_axis(self, new_labels, old_labels): list-like The validated labels. """ - new_labels = ensure_index(new_labels) + new_labels = ( + ModinIndex(new_labels) + if not isinstance(new_labels, ModinIndex) + else new_labels + ) old_len = len(old_labels) new_len = len(new_labels) if old_len != new_len: @@ -363,11 +468,14 @@ def _get_index(self): pandas.Index An index object containing the row labels. """ - if self._index_cache is None: - self._index_cache, row_lengths = self._compute_axis_labels_and_lengths(0) - if self._row_lengths_cache is None: - self._row_lengths_cache = row_lengths - return self._index_cache + if self.has_index_cache: + index, row_lengths = self._index_cache.get(return_lengths=True) + else: + index, row_lengths = self._compute_axis_labels_and_lengths(0) + self.set_index_cache(index) + if self._row_lengths_cache is None: + self._row_lengths_cache = row_lengths + return index def _get_columns(self): """ @@ -378,13 +486,14 @@ def _get_columns(self): pandas.Index An index object containing the column labels. """ - if self._columns_cache is None: - self._columns_cache, column_widths = self._compute_axis_labels_and_lengths( - 1 - ) - if self._column_widths_cache is None: - self._column_widths_cache = column_widths - return self._columns_cache + if self.has_columns_cache: + columns, column_widths = self._columns_cache.get(return_lengths=True) + else: + columns, column_widths = self._compute_axis_labels_and_lengths(1) + self.set_columns_cache(columns) + if self._column_widths_cache is None: + self._column_widths_cache = column_widths + return columns def _set_index(self, new_index): """ @@ -395,11 +504,9 @@ def _set_index(self, new_index): new_index : list-like The new row labels. """ - if self._index_cache is None: - self._index_cache = ensure_index(new_index) - else: + if self.has_materialized_index: new_index = self._validate_set_axis(new_index, self._index_cache) - self._index_cache = new_index + self.set_index_cache(new_index) self.synchronize_labels(axis=0) def _set_columns(self, new_columns): @@ -411,13 +518,11 @@ def _set_columns(self, new_columns): new_columns : list-like The new column labels. """ - if self._columns_cache is None: - self._columns_cache = ensure_index(new_columns) - else: + if self.has_materialized_columns: new_columns = self._validate_set_axis(new_columns, self._columns_cache) - self._columns_cache = new_columns if self._dtypes is not None: self._dtypes.index = new_columns + self.set_columns_cache(new_columns) self.synchronize_labels(axis=1) columns = property(_get_columns, _set_columns) @@ -469,8 +574,8 @@ def _filter_empties(self, compute_metadata=True): Trigger the computations for partition sizes and labels if they're not done already. """ if not compute_metadata and ( - self._index_cache is None - or self._columns_cache is None + not self.has_materialized_index + or not self.has_materialized_columns or self._row_lengths_cache is None or self._column_widths_cache is None ): @@ -826,7 +931,7 @@ def _take_2d_positional( else: row_partitions_dict = {i: slice(None) for i in range(len(self._partitions))} new_row_lengths = self._row_lengths_cache - new_index = self._index_cache + new_index = self.copy_index_cache() if col_positions is not None: sorted_col_positions = self._get_sorted_positions(col_positions) @@ -854,7 +959,7 @@ def _take_2d_positional( i: slice(None) for i in range(len(self._partitions.T)) } new_col_widths = self._column_widths_cache - new_columns = self._columns_cache + new_columns = self.copy_columns_cache() new_dtypes = self._dtypes new_partitions = np.array( @@ -1176,8 +1281,8 @@ def copy(self): """ return self.__constructor__( self._partitions, - self._index_cache.copy() if self._index_cache is not None else None, - self._columns_cache.copy() if self._columns_cache is not None else None, + self.copy_index_cache(), + self.copy_columns_cache(), self._row_lengths_cache, self._column_widths_cache, self._dtypes.copy() if self._dtypes is not None else None, @@ -1237,8 +1342,8 @@ def astype_builder(df): ) return self.__constructor__( new_frame, - self._index_cache, - self._columns_cache, + self.copy_index_cache(), + self.copy_columns_cache(), self._row_lengths_cache, self._column_widths_cache, new_dtypes, @@ -1764,8 +1869,8 @@ def map(self, func: Callable, dtypes: Optional[str] = None) -> "PandasDataframe" ) return self.__constructor__( new_partitions, - self._index_cache, - self._columns_cache, + self.copy_index_cache(), + self.copy_columns_cache(), self._row_lengths_cache, self._column_widths_cache, dtypes=dtypes, @@ -1833,19 +1938,19 @@ def fold(self, axis, func, new_columns=None): The data shape is not changed (length and width of the table). """ if new_columns is not None: - if self._columns_cache is not None: - assert len(self._columns_cache) == len( + if self.has_materialized_columns: + assert len(self.columns) == len( new_columns ), "The length of `new_columns` doesn't match the columns' length of `self`" - self._columns_cache = new_columns + self.set_columns_cache(new_columns) new_partitions = self._partition_mgr_cls.map_axis_partitions( axis, self._partitions, func, keep_partitioning=True ) return self.__constructor__( new_partitions, - self._index_cache, - self._columns_cache, + self.copy_index_cache(), + self.copy_columns_cache(), self._row_lengths_cache, self._column_widths_cache, ) @@ -1889,8 +1994,8 @@ def infer_types(self, col_labels: List[str]) -> "PandasDataframe": new_dtypes[col_labels] = new_cols_dtypes return self.__constructor__( self._partitions, - self._index_cache, - self._columns_cache, + self.copy_index_cache(), + self.copy_columns_cache(), self._row_lengths_cache, self._column_widths_cache, new_dtypes, @@ -2214,7 +2319,7 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> "PandasDatafram new_axes, new_lengths = [0, 0], [0, 0] new_axes[axis.value] = ( - self._index_cache if axis.value == 0 else self._columns_cache + self.copy_index_cache() if axis.value == 0 else self.copy_columns_cache() ) new_lengths[axis.value] = ( self._row_lengths_cache if axis.value == 0 else self._column_widths_cache @@ -3074,7 +3179,7 @@ def n_ary_op( sort=True, ) else: - joined_columns = self._columns_cache + joined_columns = self.copy_columns_cache() column_widths = self._column_widths_cache new_frame = ( @@ -3429,12 +3534,13 @@ def to_pandas(self): if df.empty: df = pandas.DataFrame(columns=self.columns, index=self.index) else: - for axis, external_index in enumerate( - [self._index_cache, self._columns_cache] + for axis, has_external_index in enumerate( + ["has_materialized_index", "has_materialized_columns"] ): # no need to check external and internal axes since in that case # external axes will be computed from internal partitions - if external_index is not None: + if getattr(self, has_external_index): + external_index = self.columns if axis else self.index ErrorMessage.catch_bugs_and_request_email( not df.axes[axis].equals(external_index), f"Internal and external indices on axis {axis} do not match.", @@ -3486,8 +3592,8 @@ def transpose(self): new_dtypes = None return self.__constructor__( new_partitions, - self._columns_cache, - self._index_cache, + self.copy_columns_cache(), + self.copy_index_cache(), self._column_widths_cache, self._row_lengths_cache, dtypes=new_dtypes, diff --git a/modin/core/dataframe/pandas/metadata/__init__.py b/modin/core/dataframe/pandas/metadata/__init__.py new file mode 100644 index 00000000000..3d678411bd2 --- /dev/null +++ b/modin/core/dataframe/pandas/metadata/__init__.py @@ -0,0 +1,18 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Utilities and classes to handle work with metadata.""" + +from .index import ModinIndex + +__all__ = ["ModinIndex"] diff --git a/modin/core/dataframe/pandas/metadata/index.py b/modin/core/dataframe/pandas/metadata/index.py new file mode 100644 index 00000000000..2f9002250fa --- /dev/null +++ b/modin/core/dataframe/pandas/metadata/index.py @@ -0,0 +1,144 @@ +# Licensed to Modin Development Team under one or more contributor license agreements. +# See the NOTICE file distributed with this work for additional information regarding +# copyright ownership. The Modin Development Team licenses this file to you under the +# Apache License, Version 2.0 (the "License"); you may not use this file except in +# compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under +# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF +# ANY KIND, either express or implied. See the License for the specific language +# governing permissions and limitations under the License. + +"""Module contains class ModinIndex.""" + +import pandas +from pandas.core.indexes.api import ensure_index + + +class ModinIndex: + """ + A class that hides the various implementations of the index needed for optimization. + + Parameters + ---------- + value : sequence or callable + """ + + def __init__(self, value): + if callable(value): + self._value = value + else: + self._value = ensure_index(value) + self._lengths_cache = None + + @property + def is_materialized(self) -> bool: + """ + Check if the internal representation is materialized. + + Returns + ------- + bool + """ + return isinstance(self._value, pandas.Index) + + def get(self, return_lengths=False) -> pandas.Index: + """ + Get the materialized internal representation. + + Parameters + ---------- + return_lengths : bool, default: False + In some cases, during the index calculation, it's possible to get + the lengths of the partitions. This flag allows this data to be used + for optimization. + + Returns + ------- + pandas.Index + """ + if not self.is_materialized: + if callable(self._value): + index, self._lengths_cache = self._value() + self._value = ensure_index(index) + else: + raise NotImplementedError(type(self._value)) + if return_lengths: + return self._value, self._lengths_cache + else: + return self._value + + def __len__(self): + """ + Redirect the 'len' request to the internal representation. + + Returns + ------- + int + + Notes + ----- + Executing this function materializes the data. + """ + if not self.is_materialized: + self.get() + return len(self._value) + + def __reduce__(self): + """ + Serialize an object of this class. + + Returns + ------- + tuple + + Notes + ----- + The default implementation generates a recursion error. In a short: + during the construction of the object, `__getattr__` function is called, which + is not intended to be used in situations where the object is not initialized. + """ + if self._lengths_cache is not None: + return (self.__class__, (lambda: (self._value, self._lengths_vache),)) + return (self.__class__, (self._value,)) + + def __getattr__(self, name): + """ + Redirect access to non-existent attributes to the internal representation. + + This is necessary so that objects of this class in most cases mimic the behavior + of the ``pandas.Index``. The main limitations of the current approach are type + checking and the use of this object where pandas indexes are supposed to be used. + + Parameters + ---------- + name : str + Attribute name. + + Returns + ------- + object + Attribute. + + Notes + ----- + Executing this function materializes the data. + """ + if not self.is_materialized: + self.get() + return self._value.__getattribute__(name) + + def copy(self) -> "ModinIndex": + """ + Copy an object without materializing the internal representation. + + Returns + ------- + ModinIndex + """ + idx_cache = self._value + if not callable(idx_cache): + idx_cache = idx_cache.copy() + return ModinIndex(idx_cache) diff --git a/modin/core/io/text/text_file_dispatcher.py b/modin/core/io/text/text_file_dispatcher.py index e3d7046534e..c8f97eed2a1 100644 --- a/modin/core/io/text/text_file_dispatcher.py +++ b/modin/core/io/text/text_file_dispatcher.py @@ -924,10 +924,9 @@ def _get_new_qc( new_query_compiler : BaseQueryCompiler New query compiler, created from `new_frame`. """ - new_index, row_lengths = cls._define_index(index_ids, index_name) - # Compose modin partitions from `partition_ids` - partition_ids = cls.build_partition(partition_ids, row_lengths, column_widths) - + partition_ids = cls.build_partition( + partition_ids, [None] * len(index_ids), column_widths + ) # Compute dtypes by collecting and combining all of the partition dtypes. The # reported dtypes from differing rows can be different based on the inference in # the limited data seen by each worker. We use pandas to compute the exact dtype @@ -936,9 +935,9 @@ def _get_new_qc( new_frame = cls.frame_cls( partition_ids, - new_index, + lambda: cls._define_index(index_ids, index_name), column_names, - row_lengths, + None, column_widths, dtypes=dtypes, ) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 1e1a99ca04b..9469cd2e612 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -5225,8 +5225,8 @@ def repartition(self, axis=None): new_query_compiler._modin_frame.apply_full_axis( _ax, lambda df: df, - new_index=self._modin_frame._index_cache, - new_columns=self._modin_frame._columns_cache, + new_index=self._modin_frame.copy_index_cache(), + new_columns=self._modin_frame.copy_columns_cache(), keep_partitioning=False, sync_labels=False, ) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index e828954b141..e85669a6e09 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -263,7 +263,7 @@ def lazy_execution(self): bool """ frame = self._modin_frame - return frame._index_cache is None or frame._columns_cache is None + return not frame.has_materialized_index or not frame.has_materialized_columns def finalize(self): self._modin_frame.finalize() @@ -511,7 +511,7 @@ def map_func(left, right=right_pandas, kwargs=kwargs): # it's fine too, we can also decide that by columns, which tend to be already # materialized quite often compared to the indexes. keep_index = False - if self._modin_frame._index_cache is not None: + if self._modin_frame.has_materialized_index: if left_on is not None and right_on is not None: keep_index = any( o in self.index.names @@ -614,8 +614,8 @@ def _reset(df, *axis_lengths, partition_idx): df.index = pandas.RangeIndex(start, stop) return df - if self._modin_frame._columns_cache is not None and kwargs["drop"]: - new_columns = self._modin_frame._columns_cache + if self._modin_frame.has_columns_cache and kwargs["drop"]: + new_columns = self._modin_frame.copy_columns_cache() else: new_columns = None @@ -2327,8 +2327,8 @@ def _set_item(df, row_loc): axis=1, func=_set_item, other=row_loc._modin_frame, - new_index=self._modin_frame._index_cache, - new_columns=self._modin_frame._columns_cache, + new_index=self._modin_frame.copy_index_cache(), + new_columns=self._modin_frame.copy_columns_cache(), keep_partitioning=False, ) return self.__constructor__(new_modin_frame) @@ -2550,7 +2550,7 @@ def _compute_duplicated(df): new_modin_frame = hashed_modin_frame.apply_full_axis( axis=0, func=_compute_duplicated, - new_index=self._modin_frame._index_cache, + new_index=self._modin_frame.copy_index_cache(), new_columns=[MODIN_UNNAMED_SERIES_LABEL], dtypes=np.bool_, keep_partitioning=False, diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py index a9ba6c95213..e993775581f 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/dataframe.py @@ -30,7 +30,7 @@ ) from ..partitioning.partition_manager import HdkOnNativeDataframePartitionManager -from pandas.core.indexes.api import ensure_index, Index, MultiIndex, RangeIndex +from pandas.core.indexes.api import Index, MultiIndex, RangeIndex from pandas.core.dtypes.common import ( get_dtype, is_list_like, @@ -136,8 +136,10 @@ class HdkOnNativeDataframe(PandasDataframe): _table_cols : list of str A list of all frame's columns. It includes index columns if any. Index columns are always in the head of the list. - _index_cache : pandas.Index or None + _index_cache : pandas.Index, callable or None Materialized index of the frame or None when index is not materialized. + If ``callable() -> (pandas.Index, list of row lengths or None)`` type, + then the calculation will be done in `__init__`. _has_unsupported_data : bool True for frames holding data not supported by Arrow or HDK storage format. Operations on such frames are not allowed and should be defaulted @@ -178,14 +180,14 @@ def __init__( self.id = str(type(self)._next_id[0]) type(self)._next_id[0] += 1 - if index is not None: - index = ensure_index(index) - columns = ensure_index(columns) self._op = op self._index_cols = index_cols self._partitions = partitions - self._index_cache = index - self._columns_cache = columns + self.set_index_cache(index) + self.set_columns_cache(columns) + # The following code assumes that the type of `columns` is pandas.Index. + # The initial type of `columns` might be callable. + columns = self._columns_cache.get() self._row_lengths_cache = row_lengths self._column_widths_cache = column_widths self._has_unsupported_data = has_unsupported_data @@ -1828,7 +1830,7 @@ def _materialize_rowid(self): HdkOnNativeDataframe The new frame. """ - name = None if self._index_cache is None else self._index_cache.name + name = None if not self.has_index_cache else self._index_cache.name name = "__index__" if name is None else self._mangle_index_names([name])[0] exprs = OrderedDict() exprs[name] = self.ref("__rowid__") @@ -2109,17 +2111,17 @@ def _build_index_cache(self): assert isinstance(self._op, FrameNode) if self._partitions.size == 0: - self._index_cache = Index.__new__(Index) + self.set_index_cache(Index.__new__(Index)) else: assert self._partitions.size == 1 obj = self._partitions[0][0].get() if isinstance(obj, (pd.DataFrame, pd.Series)): - self._index_cache = obj.index + self.set_index_cache(obj.index) else: assert isinstance(obj, pyarrow.Table) if self._index_cols is None: - self._index_cache = Index.__new__( - RangeIndex, data=range(obj.num_rows) + self.set_index_cache( + Index.__new__(RangeIndex, data=range(obj.num_rows)) ) else: index_at = obj.drop([f"F_{col}" for col in self.columns]) @@ -2130,7 +2132,7 @@ def _build_index_cache(self): index_df.index.rename( self._index_names(self._index_cols), inplace=True ) - self._index_cache = index_df.index + self.set_index_cache(index_df.index) def _get_index(self): """ @@ -2143,9 +2145,9 @@ def _get_index(self): pandas.Index """ self._execute() - if self._index_cache is None: + if not self.has_index_cache: self._build_index_cache() - return self._index_cache + return self._index_cache.get() def _set_index(self, new_index): """ @@ -2410,8 +2412,8 @@ def has_multiindex(self): ------- bool """ - if self._index_cache is not None: - return isinstance(self._index_cache, MultiIndex) + if self.has_materialized_index: + return isinstance(self.index, MultiIndex) return self._index_cols is not None and len(self._index_cols) > 1 def get_index_name(self): @@ -2424,7 +2426,7 @@ def get_index_name(self): ------- str or None """ - if self._index_cache is not None: + if self.has_index_cache: return self._index_cache.name if self._index_cols is None: return None @@ -2482,7 +2484,7 @@ def get_index_names(self): ------- list of str """ - if self._index_cache is not None: + if self.has_index_cache: return self._index_cache.names if self.has_multiindex(): return self._index_cols.copy() @@ -2566,9 +2568,9 @@ def to_pandas(self): if len(df.columns) != len(self.columns): assert self._index_cols idx_col_names = [f"F_{col}" for col in self._index_cols] - if self._index_cache is not None: + if self.has_materialized_index: df.drop(columns=idx_col_names, inplace=True) - df.index = self._index_cache.copy() + df.index = self.index else: df.set_index(idx_col_names, inplace=True) df.index.rename(self._index_names(self._index_cols), inplace=True) @@ -2576,8 +2578,8 @@ def to_pandas(self): else: assert self._index_cols is None assert df.index.name is None, f"index name '{df.index.name}' is not None" - if self._index_cache is not None: - df.index = self._index_cache.copy() + if self.has_materialized_index: + df.index = self.index # Restore original column labels encoded in HDK to meet its # restrictions on column names. diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py index 5a408d4ceba..82fa16dfaeb 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/dataframe/utils.py @@ -135,7 +135,7 @@ def check_cols_to_join(what, df, col_names): if col == df._index_name(c): new_name = c break - elif df._index_cache is not None: + elif df.has_index_cache: new_name = f"__index__{0}_{col}" df = df._maybe_materialize_rowid() if new_name is None: @@ -189,7 +189,7 @@ def get_data_for_join_by_index( def to_empty_pandas_df(df): # Create an empty pandas frame with the same columns and index. - idx = df._index_cache + idx = df._index_cache.get() if df.has_index_cache else None if idx is not None: idx = idx[:1] elif df._index_cols is not None: diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py index c5793b57854..9c534838a33 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py @@ -278,8 +278,8 @@ def run_exec_plan(cls, plan, index_cols, dtypes, columns): # Tables without columns are not supported. # Creating an empty table with index columns only. idx_names = ( - frame._index_cache.names - if frame._index_cache is not None + frame.index.names + if frame.has_materialized_index else [None] ) idx_names = frame._mangle_index_names(idx_names) diff --git a/modin/experimental/core/storage_formats/hdk/query_compiler.py b/modin/experimental/core/storage_formats/hdk/query_compiler.py index d7f39849485..b8e4e79683e 100644 --- a/modin/experimental/core/storage_formats/hdk/query_compiler.py +++ b/modin/experimental/core/storage_formats/hdk/query_compiler.py @@ -579,7 +579,7 @@ def drop(self, index=None, columns=None, errors: str = "raise"): # In this case, we copy the index from the current frame. if len(columns) == 0 and new_frame._index_cols is None: assert index is None, "Can't copy old indexes as there was a row drop" - new_frame._index_cache = self._modin_frame.index.copy() + new_frame.set_index_cache(self._modin_frame.index.copy()) return self.__constructor__(new_frame) diff --git a/modin/pandas/test/dataframe/test_indexing.py b/modin/pandas/test/dataframe/test_indexing.py index 28ee98fecc1..41c70444401 100644 --- a/modin/pandas/test/dataframe/test_indexing.py +++ b/modin/pandas/test/dataframe/test_indexing.py @@ -1368,7 +1368,7 @@ def test_reindex_multiindex(): def test_reset_index(data, test_async_reset_index): modin_df, pandas_df = create_test_dfs(data) if test_async_reset_index: - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) modin_result = modin_df.reset_index(inplace=False) pandas_result = pandas_df.reset_index(inplace=False) df_equals(modin_result, pandas_result) @@ -1376,7 +1376,7 @@ def test_reset_index(data, test_async_reset_index): modin_df_cp = modin_df.copy() pd_df_cp = pandas_df.copy() if test_async_reset_index: - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) modin_df_cp.reset_index(inplace=True) pd_df_cp.reset_index(inplace=True) df_equals(modin_df_cp, pd_df_cp) @@ -1547,7 +1547,7 @@ def test_reset_index_with_multi_index_no_drop( if col_fill != "no_col_fill": kwargs["col_fill"] = col_fill if test_async_reset_index: - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) eval_general(modin_df, pandas_df, lambda df: df.reset_index(**kwargs)) @@ -1668,7 +1668,7 @@ def test_reset_index_with_named_index( modin_df.index = modin_df.index modin_df._to_pandas() - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) df_equals(modin_df.reset_index(drop=False), pandas_df.reset_index(drop=False)) if test_async_reset_index: @@ -1676,7 +1676,7 @@ def test_reset_index_with_named_index( modin_df.index = modin_df.index modin_df._to_pandas() - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) modin_df.reset_index(drop=True, inplace=True) pandas_df.reset_index(drop=True, inplace=True) df_equals(modin_df, pandas_df) @@ -1689,7 +1689,7 @@ def test_reset_index_with_named_index( modin_df.index = modin_df.index modin_df._to_pandas() - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) df_equals(modin_df.reset_index(drop=False), pandas_df.reset_index(drop=False)) @@ -1712,7 +1712,7 @@ def test_reset_index_metadata_update(index, test_async_reset_index): modin_df.index = modin_df.index modin_df._to_pandas() - modin_df._query_compiler._modin_frame._index_cache = None + modin_df._query_compiler._modin_frame.set_index_cache(None) eval_general(modin_df, pandas_df, lambda df: df.reset_index()) diff --git a/modin/pandas/test/dataframe/test_join_sort.py b/modin/pandas/test/dataframe/test_join_sort.py index 1b0f8b8e3be..1434b927114 100644 --- a/modin/pandas/test/dataframe/test_join_sort.py +++ b/modin/pandas/test/dataframe/test_join_sort.py @@ -369,17 +369,17 @@ def setup_cache(): if has_index_cache: modin_df1.index # triggering index materialization modin_df2.index - assert modin_df1._query_compiler._modin_frame._index_cache is not None - assert modin_df2._query_compiler._modin_frame._index_cache is not None + assert modin_df1._query_compiler._modin_frame.has_index_cache + assert modin_df2._query_compiler._modin_frame.has_index_cache else: # Propagate deferred indices to partitions # The change in index is not automatically handled by Modin. See #3941. modin_df1.index = modin_df1.index modin_df1._to_pandas() - modin_df1._query_compiler._modin_frame._index_cache = None + modin_df1._query_compiler._modin_frame.set_index_cache(None) modin_df2.index = modin_df2.index modin_df2._to_pandas() - modin_df2._query_compiler._modin_frame._index_cache = None + modin_df2._query_compiler._modin_frame.set_index_cache(None) for on in ( ["col_key1", "idx_key1"], diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 5b883614fe7..536a7d46a72 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -2220,7 +2220,7 @@ def frame_with_deferred_index(self): df = pd.DataFrame(**self._df_kwargs) try: # The frame would stop being lazy once index computation is triggered - df._query_compiler._modin_frame._index_cache = None + df._query_compiler._modin_frame.set_index_cache(None) except AttributeError: pytest.skip( reason="Selected execution doesn't support deferred indices."