From 0559fa2d36f35d6f7d53f07453cd406f0c0ee309 Mon Sep 17 00:00:00 2001 From: Anatoly Myachev Date: Mon, 13 May 2024 09:28:22 +0200 Subject: [PATCH] REFACTOR-#7242: Add type hints for `modin/core/dataframe/algebra/` (#7243) Signed-off-by: Anatoly Myachev --- modin/core/dataframe/algebra/binary.py | 74 ++++++--- modin/core/dataframe/algebra/fold.py | 32 +++- modin/core/dataframe/algebra/groupby.py | 152 +++++++++++------- modin/core/dataframe/algebra/map.py | 26 ++- modin/core/dataframe/algebra/operator.py | 10 +- modin/core/dataframe/algebra/reduce.py | 22 ++- modin/core/dataframe/algebra/tree_reduce.py | 28 +++- .../dataframe/pandas/dataframe/dataframe.py | 65 ++++---- .../storage_formats/base/query_compiler.py | 10 +- .../storage_formats/pandas/query_compiler.py | 10 +- .../dataframe/pandas/partitions.py | 4 +- 11 files changed, 283 insertions(+), 150 deletions(-) diff --git a/modin/core/dataframe/algebra/binary.py b/modin/core/dataframe/algebra/binary.py index 6af31ab826c..7f098e746ac 100644 --- a/modin/core/dataframe/algebra/binary.py +++ b/modin/core/dataframe/algebra/binary.py @@ -13,8 +13,10 @@ """Module houses builder class for Binary operator.""" +from __future__ import annotations + import warnings -from typing import Optional +from typing import TYPE_CHECKING, Any, Callable, Optional, Union import numpy as np import pandas @@ -24,13 +26,20 @@ from .operator import Operator +if TYPE_CHECKING: + from pandas._typing import DtypeObj + + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + def maybe_compute_dtypes_common_cast( - first, - second, - trigger_computations=False, - axis=0, - func=None, + first: PandasQueryCompiler, + second: Union[PandasQueryCompiler, dict, list, tuple, np.ndarray, str, DtypeObj], + trigger_computations: bool = False, + axis: int = 0, + func: Optional[ + Callable[[pandas.DataFrame, pandas.DataFrame], pandas.DataFrame] + ] = None, ) -> Optional[pandas.Series]: """ Precompute data types for binary operations by finding common type between operands. @@ -39,7 +48,7 @@ def maybe_compute_dtypes_common_cast( ---------- first : PandasQueryCompiler First operand for which the binary operation would be performed later. - second : PandasQueryCompiler, list-like or scalar + second : PandasQueryCompiler, dict, list, tuple, np.ndarray, str or DtypeObj Second operand for which the binary operation would be performed later. trigger_computations : bool, default: False Whether to trigger computation of the lazy metadata for `first` and `second`. @@ -155,7 +164,7 @@ def maybe_compute_dtypes_common_cast( ], index=common_columns, ) - dtypes = pandas.concat( + dtypes: pandas.Series = pandas.concat( [ dtypes, pandas.Series( @@ -168,7 +177,10 @@ def maybe_compute_dtypes_common_cast( def maybe_build_dtypes_series( - first, second, dtype, trigger_computations=False + first: PandasQueryCompiler, + second: Union[PandasQueryCompiler, Any], + dtype: DtypeObj, + trigger_computations: bool = False, ) -> Optional[pandas.Series]: """ Build a ``pandas.Series`` describing dtypes of the result of a binary operation. @@ -179,7 +191,7 @@ def maybe_build_dtypes_series( First operand for which the binary operation would be performed later. second : PandasQueryCompiler, list-like or scalar Second operand for which the binary operation would be performed later. - dtype : np.dtype + dtype : DtypeObj Dtype of the result. trigger_computations : bool, default: False Whether to trigger computation of the lazy metadata for `first` and `second`. @@ -217,8 +229,15 @@ def maybe_build_dtypes_series( def try_compute_new_dtypes( - first, second, infer_dtypes=None, result_dtype=None, axis=0, func=None -): + first: PandasQueryCompiler, + second: Union[PandasQueryCompiler, Any], + infer_dtypes: Optional[str] = None, + result_dtype: Optional[Union[DtypeObj, str]] = None, + axis: int = 0, + func: Optional[ + Callable[[pandas.DataFrame, pandas.DataFrame], pandas.DataFrame] + ] = None, +) -> Optional[pandas.Series]: """ Precompute resulting dtypes of the binary operation if possible. @@ -285,11 +304,11 @@ class Binary(Operator): @classmethod def register( cls, - func, - join_type="outer", - labels="replace", - infer_dtypes=None, - ): + func: Callable[..., pandas.DataFrame], + join_type: str = "outer", + labels: str = "replace", + infer_dtypes: Optional[str] = None, + ) -> Callable[..., PandasQueryCompiler]: """ Build template binary operator. @@ -318,34 +337,39 @@ def register( """ def caller( - query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs - ): + query_compiler: PandasQueryCompiler, + other: Union[PandasQueryCompiler, Any], + broadcast: bool = False, + *args: tuple, + dtypes: Optional[Union[DtypeObj, str]] = None, + **kwargs: dict, + ) -> PandasQueryCompiler: """ Apply binary `func` to passed operands. Parameters ---------- - query_compiler : QueryCompiler + query_compiler : PandasQueryCompiler Left operand of `func`. - other : QueryCompiler, list-like object or scalar + other : PandasQueryCompiler, list-like object or scalar Right operand of `func`. broadcast : bool, default: False If `other` is a one-column query compiler, indicates whether it is a Series or not. Frames and Series have to be processed differently, however we can't distinguish them at the query compiler level, so this parameter is a hint that passed from a high level API. - *args : args, + *args : tuple, Arguments that will be passed to `func`. dtypes : "copy", scalar dtype or None, default: None Dtypes of the result. "copy" to keep old dtypes and None to compute them on demand. - **kwargs : kwargs, + **kwargs : dict, Arguments that will be passed to `func`. Returns ------- - QueryCompiler + PandasQueryCompiler Result of binary function. """ - axis = kwargs.get("axis", 0) + axis: int = kwargs.get("axis", 0) if isinstance(other, type(query_compiler)) and broadcast: assert ( len(other.columns) == 1 diff --git a/modin/core/dataframe/algebra/fold.py b/modin/core/dataframe/algebra/fold.py index 419a0b56903..9be11b454d2 100644 --- a/modin/core/dataframe/algebra/fold.py +++ b/modin/core/dataframe/algebra/fold.py @@ -13,20 +13,31 @@ """Module houses builder class for Fold operator.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Optional + from .operator import Operator +if TYPE_CHECKING: + import pandas + + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + class Fold(Operator): """Builder class for Fold functions.""" @classmethod - def register(cls, fold_function): + def register( + cls, fold_function: Callable[..., pandas.DataFrame] + ) -> Callable[..., PandasQueryCompiler]: """ Build Fold operator that will be performed across rows/columns. Parameters ---------- - fold_function : callable(pandas.DataFrame) -> pandas.DataFrame + fold_function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame Function to apply across rows/columns. Returns @@ -35,25 +46,30 @@ def register(cls, fold_function): Function that takes query compiler and executes Fold function. """ - def caller(query_compiler, fold_axis=None, *args, **kwargs): + def caller( + query_compiler: PandasQueryCompiler, + fold_axis: Optional[int] = None, + *args: tuple, + **kwargs: dict, + ) -> PandasQueryCompiler: """ Execute Fold function against passed query compiler. Parameters ---------- - query_compiler : BaseQueryCompiler + query_compiler : PandasQueryCompiler The query compiler to execute the function on. fold_axis : int, optional 0 or None means apply across full column partitions. 1 means apply across full row partitions. - *args : iterable - Additional arguments passed to fold_function. + *args : tuple + Additional arguments passed to `fold_function`. **kwargs: dict - Additional keyword arguments passed to fold_function. + Additional keyword arguments passed to `fold_function`. Returns ------- - BaseQueryCompiler + PandasQueryCompiler A new query compiler representing the result of executing the function. """ diff --git a/modin/core/dataframe/algebra/groupby.py b/modin/core/dataframe/algebra/groupby.py index 6967f904c10..cc9196a422a 100644 --- a/modin/core/dataframe/algebra/groupby.py +++ b/modin/core/dataframe/algebra/groupby.py @@ -13,6 +13,10 @@ """Module houses builder class for GroupByReduce operator.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Optional, Union + import pandas from modin.core.dataframe.pandas.metadata import ModinIndex @@ -22,6 +26,9 @@ from .default2pandas.groupby import GroupBy, GroupByDefault from .tree_reduce import TreeReduce +if TYPE_CHECKING: + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + class GroupByReduce(TreeReduce): """ @@ -41,11 +48,16 @@ class GroupByReduce(TreeReduce): arbitrary aggregation. Note: this attribute should be considered private. """ - ID_LEVEL_NAME = "__ID_LEVEL_NAME__" - _GROUPBY_REDUCE_IMPL_FLAG = "__groupby_reduce_impl_func__" + ID_LEVEL_NAME: str = "__ID_LEVEL_NAME__" + _GROUPBY_REDUCE_IMPL_FLAG: str = "__groupby_reduce_impl_func__" @classmethod - def register(cls, map_func, reduce_func=None, **call_kwds): + def register( + cls, + map_func: Union[str, dict, Callable[..., pandas.DataFrame]], + reduce_func: Optional[Union[str, dict, Callable[..., pandas.DataFrame]]] = None, + **call_kwds: dict, + ) -> Callable[..., PandasQueryCompiler]: """ Build template GroupBy aggregation function. @@ -59,7 +71,7 @@ def register(cls, map_func, reduce_func=None, **call_kwds): reduce_func : str, dict or callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame, optional Function to apply to the ``DataFrameGroupBy`` at the reduce phase. If not specified will be set the same as 'map_func'. - **call_kwds : kwargs + **call_kwds : dict Kwargs that will be passed to the returned function. Returns @@ -90,7 +102,11 @@ def build_fn(name): ) @classmethod - def register_implementation(cls, map_func, reduce_func): + def register_implementation( + cls, + map_func: Callable[..., pandas.DataFrame], + reduce_func: Callable[..., pandas.DataFrame], + ) -> None: """ Register callables to be recognized as an implementations of tree-reduce phases. @@ -107,16 +123,16 @@ def register_implementation(cls, map_func, reduce_func): @classmethod def map( cls, - df, - map_func, - axis, - groupby_kwargs, - agg_args, - agg_kwargs, - other=None, + df: pandas.DataFrame, + map_func: Callable[..., pandas.DataFrame], + axis: int, + groupby_kwargs: dict, + agg_args: list, + agg_kwargs: dict, + other: Optional[pandas.DataFrame] = None, by=None, - drop=False, - ): + drop: bool = False, + ) -> pandas.DataFrame: """ Execute Map phase of GroupByReduce. @@ -194,17 +210,17 @@ def map( @classmethod def reduce( cls, - df, - reduce_func, - axis, - groupby_kwargs, - agg_args, - agg_kwargs, - partition_idx=0, - drop=False, - method=None, - finalizer_fn=None, - ): + df: pandas.DataFrame, + reduce_func: Union[dict, Callable[..., pandas.DataFrame]], + axis: int, + groupby_kwargs: dict, + agg_args: list, + agg_kwargs: dict, + partition_idx: int = 0, + drop: bool = False, + method: Optional[str] = None, + finalizer_fn: Optional[Callable[[pandas.DataFrame], pandas.DataFrame]] = None, + ) -> pandas.DataFrame: """ Execute Reduce phase of GroupByReduce. @@ -231,7 +247,7 @@ def reduce( Indicates whether or not by-data came from the `self` frame. method : str, optional Name of the groupby function. This is a hint to be able to do special casing. - finalizer_fn : callable(pandas.DataFrame) -> pandas.DataFrame, default: None + finalizer_fn : callable(pandas.DataFrame) -> pandas.DataFrame, optional A callable to execute at the end a groupby kernel against groupby result. Returns @@ -286,27 +302,27 @@ def reduce( @classmethod def caller( cls, - query_compiler, + query_compiler: PandasQueryCompiler, by, - map_func, - reduce_func, - axis, - groupby_kwargs, - agg_args, - agg_kwargs, - drop=False, - method=None, - default_to_pandas_func=None, - finalizer_fn=None, - ): + map_func: Union[dict, Callable[..., pandas.DataFrame]], + reduce_func: Union[dict, Callable[..., pandas.DataFrame]], + axis: int, + groupby_kwargs: dict, + agg_args: list, + agg_kwargs: dict, + drop: bool = False, + method: Optional[str] = None, + default_to_pandas_func: Optional[Callable[..., pandas.DataFrame]] = None, + finalizer_fn: Optional[Callable[[pandas.DataFrame], pandas.DataFrame]] = None, + ) -> PandasQueryCompiler: """ Execute GroupBy aggregation with TreeReduce approach. Parameters ---------- - query_compiler : BaseQueryCompiler + query_compiler : PandasQueryCompiler Frame to group. - by : BaseQueryCompiler, column or index label, Grouper or list of such + by : PandasQueryCompiler, column or index label, Grouper or list of such Object that determine groups. map_func : dict or callable(pandas.DataFrameGroupBy) -> pandas.DataFrame Function to apply to the `GroupByObject` at the Map phase. @@ -328,12 +344,12 @@ def caller( default_to_pandas_func : callable(pandas.DataFrameGroupBy) -> pandas.DataFrame, optional The pandas aggregation function equivalent to the `map_func + reduce_func`. Used in case of defaulting to pandas. If not specified `map_func` is used. - finalizer_fn : callable(pandas.DataFrame) -> pandas.DataFrame, default: None + finalizer_fn : callable(pandas.DataFrame) -> pandas.DataFrame, optional A callable to execute at the end a groupby kernel against groupby result. Returns ------- - The same type as `query_compiler` + PandasQueryCompiler QueryCompiler which carries the result of GroupBy aggregation. """ is_unsupported_axis = axis != 0 @@ -434,7 +450,12 @@ def caller( return result @classmethod - def get_callable(cls, agg_func, df, preserve_aggregation_order=True): + def get_callable( + cls, + agg_func: Union[dict, Callable[..., pandas.DataFrame]], + df: pandas.DataFrame, + preserve_aggregation_order: bool = True, + ) -> Callable[..., pandas.DataFrame]: """ Build aggregation function to apply to each group at this particular partition. @@ -473,8 +494,11 @@ def get_callable(cls, agg_func, df, preserve_aggregation_order=True): @classmethod def _build_callable_for_dict( - cls, agg_dict, preserve_aggregation_order=True, grp_has_id_level=False - ): + cls, + agg_dict: dict, + preserve_aggregation_order: bool = True, + grp_has_id_level: bool = False, + ) -> Callable[..., pandas.DataFrame]: """ Build callable for an aggregation dictionary. @@ -643,7 +667,7 @@ def aggregate_on_dict(grp_obj, *args, **kwargs): return aggregate_on_dict @classmethod - def is_registered_implementation(cls, func): + def is_registered_implementation(cls, func: Callable) -> bool: """ Check whether the passed `func` was registered as a TreeReduce implementation. @@ -661,16 +685,16 @@ def is_registered_implementation(cls, func): def build_map_reduce_functions( cls, by, - axis, - groupby_kwargs, - map_func, - reduce_func, - agg_args, - agg_kwargs, - drop=False, - method=None, - finalizer_fn=None, - ): + axis: int, + groupby_kwargs: dict, + map_func: Union[dict, Callable[..., pandas.DataFrame]], + reduce_func: Union[dict, Callable[..., pandas.DataFrame]], + agg_args: list, + agg_kwargs: dict, + drop: bool = False, + method: Optional[str] = None, + finalizer_fn: Callable[[pandas.DataFrame], pandas.DataFrame] = None, + ) -> tuple[Callable, Callable]: """ Bind appropriate arguments to map and reduce functions. @@ -695,7 +719,7 @@ def build_map_reduce_functions( Indicates whether or not by-data came from the `self` frame. method : str, optional Name of the GroupBy aggregation function. This is a hint to be able to do special casing. - finalizer_fn : callable(pandas.DataFrame) -> pandas.DataFrame, default: None + finalizer_fn : callable(pandas.DataFrame) -> pandas.DataFrame, optional A callable to execute at the end a groupby kernel against groupby result. Returns @@ -709,8 +733,14 @@ def build_map_reduce_functions( if hasattr(by, "_modin_frame"): by = None - def _map(df, other=None, **kwargs): - def wrapper(df, other=None): + def _map( + df: pandas.DataFrame, + other: Optional[pandas.DataFrame] = None, + **kwargs: dict, + ) -> pandas.DataFrame: + def wrapper( + df: pandas.DataFrame, other: Optional[pandas.DataFrame] = None + ) -> pandas.DataFrame: return cls.map( df, other=other, @@ -732,8 +762,8 @@ def wrapper(df, other=None): result = wrapper(df.copy(), other if other is None else other.copy()) return result - def _reduce(df, **call_kwargs): - def wrapper(df): + def _reduce(df: pandas.DataFrame, **call_kwargs: dict) -> pandas.DataFrame: + def wrapper(df: pandas.DataFrame): return cls.reduce( df, axis=axis, diff --git a/modin/core/dataframe/algebra/map.py b/modin/core/dataframe/algebra/map.py index 57b21f6e1b0..42e4eaef42d 100644 --- a/modin/core/dataframe/algebra/map.py +++ b/modin/core/dataframe/algebra/map.py @@ -13,26 +13,40 @@ """Module houses builder class for Map operator.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable + from .operator import Operator +if TYPE_CHECKING: + import pandas + + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + class Map(Operator): """Builder class for Map operator.""" @classmethod - def register(cls, function, *call_args, **call_kwds): + def register( + cls, + function: Callable[..., pandas.DataFrame], + *call_args: tuple, + **call_kwds: dict, + ) -> Callable[..., PandasQueryCompiler]: """ Build Map operator that will be performed across each partition. Parameters ---------- - function : callable(pandas.DataFrame) -> pandas.DataFrame + function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame Function that will be applied to the each partition. Function takes `pandas.DataFrame` and returns `pandas.DataFrame` of the same shape. - *call_args : args + *call_args : tuple Args that will be passed to the returned function. - **call_kwds : kwargs + **call_kwds : dict Kwargs that will be passed to the returned function. Returns @@ -41,7 +55,9 @@ def register(cls, function, *call_args, **call_kwds): Function that takes query compiler and executes map function. """ - def caller(query_compiler, *args, **kwargs): + def caller( + query_compiler: PandasQueryCompiler, *args: tuple, **kwargs: dict + ) -> PandasQueryCompiler: """Execute Map function against passed query compiler.""" shape_hint = call_kwds.pop("shape_hint", None) or query_compiler._shape_hint return query_compiler.__constructor__( diff --git a/modin/core/dataframe/algebra/operator.py b/modin/core/dataframe/algebra/operator.py index cc093e6720b..5261a2c45cf 100644 --- a/modin/core/dataframe/algebra/operator.py +++ b/modin/core/dataframe/algebra/operator.py @@ -13,13 +13,15 @@ """Module contains an interface for operator builder classes.""" -from typing import Optional +from __future__ import annotations + +from typing import Callable, Optional class Operator(object): """Interface for building operators that can execute in parallel across partitions.""" - def __init__(self): + def __init__(self) -> None: raise ValueError( "Please use {}.register instead of the constructor".format( type(self).__name__ @@ -27,7 +29,7 @@ def __init__(self): ) @classmethod - def register(cls, func, **kwargs): + def register(cls, func: Callable, **kwargs: dict): """ Build operator that applies source function across the entire dataset. @@ -35,7 +37,7 @@ def register(cls, func, **kwargs): ---------- func : callable Source function. - **kwargs : kwargs + **kwargs : dict Kwargs that will be passed to the builder function. Returns diff --git a/modin/core/dataframe/algebra/reduce.py b/modin/core/dataframe/algebra/reduce.py index 156b7c07e44..7bafff0ddc2 100644 --- a/modin/core/dataframe/algebra/reduce.py +++ b/modin/core/dataframe/algebra/reduce.py @@ -13,14 +13,28 @@ """Module houses builder class for Reduce operator.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Optional + from .operator import Operator +if TYPE_CHECKING: + import pandas + + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + class Reduce(Operator): """Builder class for Reduce operator.""" @classmethod - def register(cls, reduce_function, axis=None, shape_hint=None): + def register( + cls, + reduce_function: Callable[..., pandas.Series], + axis: Optional[int] = None, + shape_hint: Optional[str] = None, + ) -> Callable[..., PandasQueryCompiler]: """ Build Reduce operator that will be performed across rows/columns. @@ -28,7 +42,7 @@ def register(cls, reduce_function, axis=None, shape_hint=None): Parameters ---------- - reduce_function : callable(pandas.DataFrame) -> pandas.Series + reduce_function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.Series Source function. axis : int, optional Axis to apply function along. @@ -41,7 +55,9 @@ def register(cls, reduce_function, axis=None, shape_hint=None): Function that takes query compiler and executes Reduce function. """ - def caller(query_compiler, *args, **kwargs): + def caller( + query_compiler: PandasQueryCompiler, *args: tuple, **kwargs: dict + ) -> PandasQueryCompiler: """Execute Reduce function against passed query compiler.""" _axis = kwargs.get("axis") if axis is None else axis return query_compiler.__constructor__( diff --git a/modin/core/dataframe/algebra/tree_reduce.py b/modin/core/dataframe/algebra/tree_reduce.py index fa7b731e6f5..9dd42fa46b2 100644 --- a/modin/core/dataframe/algebra/tree_reduce.py +++ b/modin/core/dataframe/algebra/tree_reduce.py @@ -13,28 +13,42 @@ """Module houses builder class for TreeReduce operator.""" +from __future__ import annotations + +from typing import TYPE_CHECKING, Callable, Optional + from .operator import Operator +if TYPE_CHECKING: + import pandas + from pandas._typing import DtypeObj + + from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler + class TreeReduce(Operator): """Builder class for TreeReduce operator.""" @classmethod def register( - cls, map_function, reduce_function=None, axis=None, compute_dtypes=None - ): + cls, + map_function: Optional[Callable[..., pandas.DataFrame]], + reduce_function: Optional[Callable[..., pandas.Series]] = None, + axis: Optional[int] = None, + compute_dtypes: Optional[Callable[..., DtypeObj]] = None, + ) -> Callable[..., PandasQueryCompiler]: """ Build TreeReduce operator. Parameters ---------- - map_function : callable(pandas.DataFrame) -> pandas.DataFrame + map_function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.DataFrame Source map function. - reduce_function : callable(pandas.DataFrame) -> pandas.Series, optional + reduce_function : callable(pandas.DataFrame, *args, **kwargs) -> pandas.Series, optional Source reduce function. axis : int, optional Specifies axis to apply function along. - compute_dtypes : callable(pandas.Series, *func_args, **func_kwargs) -> np.dtype, optional + compute_dtypes : callable(pandas.Series, *func_args, **func_kwargs) -> DtypeObj, optional Callable for computing dtypes. Returns @@ -46,7 +60,9 @@ def register( if reduce_function is None: reduce_function = map_function - def caller(query_compiler, *args, **kwargs): + def caller( + query_compiler: PandasQueryCompiler, *args: tuple, **kwargs: dict + ) -> PandasQueryCompiler: """Execute TreeReduce function against passed query compiler.""" _axis = kwargs.get("axis") if axis is None else axis diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index e07d725a2bc..a5327f3b484 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -58,6 +58,9 @@ ProtocolDataframe, ) from pandas._typing import npt + from modin.core.dataframe.pandas.partitioning.partition_manager import ( + PandasDataframePartitionManager, + ) from modin.logging import ClassLogger from modin.logging.config import LogLevel @@ -96,7 +99,7 @@ class PandasDataframe( The data types for the dataframe columns. """ - _partition_mgr_cls = None + _partition_mgr_cls: PandasDataframePartitionManager _query_compiler_cls = PandasQueryCompiler # These properties flag whether or not we are deferring the metadata synchronization _deferred_index = False @@ -299,7 +302,7 @@ def _get_axis_lengths(self, axis: int = 0) -> List[int]: return self.row_lengths if axis == 0 else self.column_widths @property - def has_dtypes_cache(self): + def has_dtypes_cache(self) -> bool: """ Check if the dtypes cache exists. @@ -310,7 +313,7 @@ def has_dtypes_cache(self): return self._dtypes is not None @property - def has_materialized_dtypes(self): + def has_materialized_dtypes(self) -> bool: """ Check if dataframe has materialized index cache. @@ -875,7 +878,7 @@ def synchronize_labels(self, axis=None): else: self._deferred_column = True - def _propagate_index_objs(self, axis=None): + def _propagate_index_objs(self, axis=None) -> None: """ Synchronize labels by applying the index object for specific `axis` to the `self._partitions` lazily. @@ -988,7 +991,7 @@ def take_2d_labels_or_positional( row_positions: Optional[List[int]] = None, col_labels: Optional[List[Hashable]] = None, col_positions: Optional[List[int]] = None, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Lazily select columns or rows from given indices. @@ -1107,7 +1110,7 @@ def _get_new_lengths(self, partitions_dict, *, axis: int) -> List[int]: def _get_new_index_obj( self, positions, sorted_positions, axis: int - ) -> "tuple[pandas.Index, slice | npt.NDArray[np.intp]]": + ) -> tuple[pandas.Index, slice | npt.NDArray[np.intp]]: """ Find the new Index object for take_2d_positional result. @@ -1144,7 +1147,7 @@ def _take_2d_positional( self, row_positions: Optional[List[int]] = None, col_positions: Optional[List[int]] = None, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Lazily select columns or rows from given indices. @@ -1317,10 +1320,10 @@ def _take_2d_positional( def _maybe_reorder_labels( self, - intermediate: "PandasDataframe", + intermediate: PandasDataframe, row_positions, col_positions, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Call re-order labels on take_2d_labels_or_positional result if necessary. @@ -1384,7 +1387,7 @@ def _maybe_reorder_labels( ) @lazy_metadata_decorator(apply_axis="rows") - def from_labels(self) -> "PandasDataframe": + def from_labels(self) -> PandasDataframe: """ Convert the row labels to a column of data, inserted at the first position. @@ -1486,7 +1489,7 @@ def from_labels_executor(df, **kwargs): result.synchronize_labels(axis=0) return result - def to_labels(self, column_list: List[Hashable]) -> "PandasDataframe": + def to_labels(self, column_list: List[Hashable]) -> PandasDataframe: """ Move one or more columns into the row labels. Previous labels are dropped. @@ -2097,7 +2100,7 @@ def reduce( axis: Union[int, Axis], function: Callable, dtypes: Optional[str] = None, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Perform a user-defined aggregation on the specified axis, where the axis reduces down to a singleton. Requires knowledge of the full axis for the reduction. @@ -2135,7 +2138,7 @@ def tree_reduce( map_func: Callable, reduce_func: Optional[Callable] = None, dtypes: Optional[str] = None, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Apply function that will reduce the data to a pandas Series. @@ -2182,7 +2185,7 @@ def map( func_args=None, func_kwargs=None, lazy=False, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Perform a function that maps across the entire dataset. @@ -2281,7 +2284,7 @@ def window( reduce_fn: Callable, window_size: int, result_schema: Optional[Dict[Hashable, type]] = None, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Apply a sliding window operator that acts as a GROUPBY on each window, and reduces down to a single row (column) per window. @@ -2354,7 +2357,7 @@ def fold(self, axis, func, new_columns=None): self._column_widths_cache, ) - def infer_objects(self) -> "PandasDataframe": + def infer_objects(self) -> PandasDataframe: """ Attempt to infer better dtypes for object columns. @@ -2372,7 +2375,7 @@ def infer_objects(self) -> "PandasDataframe": ] return self.infer_types(obj_cols) - def infer_types(self, col_labels: List[str]) -> "PandasDataframe": + def infer_types(self, col_labels: List[str]) -> PandasDataframe: """ Determine the compatible type shared by all values in the specified columns, and coerce them to that type. @@ -2406,7 +2409,7 @@ def join( condition: Callable, other: ModinDataframe, join_type: Union[str, JoinType], - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Join this dataframe with the other. @@ -2442,7 +2445,7 @@ def rename( self, new_row_labels: Optional[Union[Dict[Hashable, Hashable], Callable]] = None, new_col_labels: Optional[Union[Dict[Hashable, Hashable], Callable]] = None, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Replace the row and column labels with the specified new labels. @@ -2698,7 +2701,7 @@ def sort_by( columns: Union[str, List[str]], ascending: bool = True, **kwargs, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Logically reorder rows (columns if axis=1) lexicographically by the data in a column or set of columns. @@ -2766,7 +2769,7 @@ def sort_function(df): # pragma: no cover return result @lazy_metadata_decorator(apply_axis="both") - def filter(self, axis: Union[Axis, int], condition: Callable) -> "PandasDataframe": + def filter(self, axis: Union[Axis, int], condition: Callable) -> PandasDataframe: """ Filter data based on the function provided along an entire axis. @@ -2808,7 +2811,7 @@ def filter(self, axis: Union[Axis, int], condition: Callable) -> "PandasDatafram self.copy_dtypes_cache() if axis == Axis.COL_WISE else None, ) - def filter_by_types(self, types: List[Hashable]) -> "PandasDataframe": + def filter_by_types(self, types: List[Hashable]) -> PandasDataframe: """ Allow the user to specify a type or set of types by which to filter the columns. @@ -2827,7 +2830,7 @@ def filter_by_types(self, types: List[Hashable]) -> "PandasDataframe": ) @lazy_metadata_decorator(apply_axis="both") - def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe": + def explode(self, axis: Union[int, Axis], func: Callable) -> PandasDataframe: """ Explode list-like entries along an entire axis. @@ -2862,7 +2865,7 @@ def explode(self, axis: Union[int, Axis], func: Callable) -> "PandasDataframe": partitions, new_index, new_columns, row_lengths, column_widths ) - def combine(self) -> "PandasDataframe": + def combine(self) -> PandasDataframe: """ Create a single partition PandasDataframe from the partitions of the current dataframe. @@ -2905,7 +2908,7 @@ def apply_full_axis( num_splits=None, sync_labels=True, pass_axis_lengths_to_partitions=False, - ): + ) -> PandasDataframe: """ Perform a function across an entire axis. @@ -3598,7 +3601,7 @@ def broadcast_apply_full_axis( result.synchronize_labels(axis=1) return result - def _check_if_axes_identical(self, other: "PandasDataframe", axis: int = 0) -> bool: + def _check_if_axes_identical(self, other: PandasDataframe, axis: int = 0) -> bool: """ Check whether indices/partitioning along the specified `axis` are identical when compared with `other`. @@ -3863,10 +3866,10 @@ def n_ary_op( def concat( self, axis: Union[int, Axis], - others: Union["PandasDataframe", List["PandasDataframe"]], + others: Union[PandasDataframe, List[PandasDataframe]], how, sort, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Concatenate `self` with one or more other Modin DataFrames. @@ -4059,7 +4062,7 @@ def groupby( self, axis: Union[int, Axis], internal_by: List[str], - external_by: List["PandasDataframe"], + external_by: List[PandasDataframe], by_positions: List[int], operator: Callable, result_schema: Optional[Dict[Hashable, type]] = None, @@ -4067,7 +4070,7 @@ def groupby( series_groupby: bool = False, add_missing_cats: bool = False, **kwargs: dict, - ) -> "PandasDataframe": + ) -> PandasDataframe: """ Generate groups based on values in the input column(s) and perform the specified operation on each. @@ -4718,7 +4721,7 @@ def __dataframe__(self, nan_as_null: bool = False, allow_copy: bool = True): ) @classmethod - def from_dataframe(cls, df: "ProtocolDataframe") -> "PandasDataframe": + def from_dataframe(cls, df: ProtocolDataframe) -> PandasDataframe: """ Convert a DataFrame implementing the dataframe exchange protocol to a Core Modin Dataframe. diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index f9bd784726d..06b2ed17aea 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -22,7 +22,7 @@ import abc import warnings from functools import cached_property -from typing import Hashable, List, Optional +from typing import TYPE_CHECKING, Hashable, List, Optional import numpy as np import pandas @@ -53,6 +53,11 @@ from . import doc_utils +if TYPE_CHECKING: + # TODO: should be ModinDataframe + # https://github.com/modin-project/modin/issues/7244 + from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe + def _get_axis(axis): """ @@ -127,6 +132,9 @@ class BaseQueryCompiler( for a list of requirements for subclassing this object. """ + _modin_frame: PandasDataframe + _shape_hint: Optional[str] + def __wrap_in_qc(self, obj): """ Wrap `obj` in query compiler. diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index feb766e38c5..234cc58133b 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -18,6 +18,8 @@ queries for the ``PandasDataframe``. """ +from __future__ import annotations + import ast import hashlib import re @@ -313,11 +315,11 @@ def from_dataframe(cls, df, data_cls): # END Dataframe exchange protocol - index = property(_get_axis(0), _set_axis(0)) - columns = property(_get_axis(1), _set_axis(1)) + index: pandas.Index = property(_get_axis(0), _set_axis(0)) + columns: pandas.Index = property(_get_axis(1), _set_axis(1)) @property - def dtypes(self): + def dtypes(self) -> pandas.Series: return self._modin_frame.dtypes def get_dtypes_set(self): @@ -811,7 +813,7 @@ def set_index_from_columns( # data inside. Sometimes we have to reverse this transposition of blocks # for simplicity of implementation. - def transpose(self, *args, **kwargs): + def transpose(self, *args, **kwargs) -> PandasQueryCompiler: # Switch the index and columns and transpose the data within the blocks. return self.__constructor__(self._modin_frame.transpose()) diff --git a/modin/distributed/dataframe/pandas/partitions.py b/modin/distributed/dataframe/pandas/partitions.py index cac3bec93b6..62a05ff81d3 100644 --- a/modin/distributed/dataframe/pandas/partitions.py +++ b/modin/distributed/dataframe/pandas/partitions.py @@ -90,7 +90,7 @@ def unwrap_partitions( f"Only API Layer objects may be passed in here, got {type(api_layer_object)} instead." ) - modin_frame = api_layer_object._query_compiler._modin_frame # type: ignore[attr-defined] + modin_frame = api_layer_object._query_compiler._modin_frame modin_frame._propagate_index_objs(None) if axis is None: @@ -122,7 +122,7 @@ def get_block(partition: PartitionUnionType) -> np.ndarray: ] actual_engine = type( - api_layer_object._query_compiler._modin_frame._partitions[0][0] # type: ignore[attr-defined] + api_layer_object._query_compiler._modin_frame._partitions[0][0] ).__name__ if actual_engine in ( "PandasOnRayDataframePartition",