From 341f62e34d31b5a78696d735e882b0286a804baa Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 8 Aug 2023 15:26:54 +0000 Subject: [PATCH 1/4] PERF-#6433: Implement '.dropna()' using map-reduce pattern Signed-off-by: Dmitry Chigarev --- .../dataframe/pandas/dataframe/dataframe.py | 79 +++++++++++++++---- .../storage_formats/pandas/query_compiler.py | 47 +++++++++++ 2 files changed, 110 insertions(+), 16 deletions(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index 2f49c4b2404..d889caf2853 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -231,6 +231,22 @@ def _set_axis_lengths_cache(self, value, axis=0): else: self._column_widths_cache = value + def _get_axis_lengths_cache(self, axis=0): + """ + Get partition's shape caches along the specified axis if avaliable. + + Parameters + ---------- + axis : int, default 0 + 0 - get row lengths cache, 1 - get column widths cache. + + Returns + ------- + list of ints or None + If the cache is computed return a list of ints, ``None`` otherwise. + """ + return self._row_lengths_cache if axis == 0 else self._column_widths_cache + @property def has_dtypes_cache(self): """ @@ -2814,7 +2830,14 @@ def apply_select_indices( @lazy_metadata_decorator(apply_axis="both") def broadcast_apply( - self, axis, func, other, join_type="left", labels="keep", dtypes=None + self, + axis, + func, + other, + join_type="left", + copartition=True, + labels="keep", + dtypes=None, ): """ Broadcast axis partitions of `other` to partitions of `self` and apply a function. @@ -2829,6 +2852,13 @@ def broadcast_apply( Modin DataFrame to broadcast. join_type : str, default: "left" Type of join to apply. + copartition : bool, default: True + Whether to align indices/partitioning of the `self` and `other` frame. + Disabling this may save some time, however, you have to be 100% sure that + the indexing and partitioning are identical along the broadcasting axis, + this might be the case for example if `other` is a projection of the `self` + or vice-versa. If copartitioning is disabled and partitioning/indexing are + incompatible then you may end up with undefined behavior. labels : {"keep", "replace", "drop"}, default: "keep" Whether keep labels from `self` Modin DataFrame, replace them with labels from joined DataFrame or drop altogether to make them be computed lazily later. @@ -2840,17 +2870,28 @@ def broadcast_apply( PandasDataframe New Modin DataFrame. """ - # Only sort the indices if they do not match - ( - left_parts, - right_parts, - joined_index, - partition_sizes_along_axis, - ) = self._copartition( - axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis]) - ) - # unwrap list returned by `copartition`. - right_parts = right_parts[0] + if copartition: + # Only sort the indices if they do not match + ( + left_parts, + right_parts, + joined_index, + partition_sizes_along_axis, + ) = self._copartition( + axis, + other, + join_type, + sort=not self.axes[axis].equals(other.axes[axis]), + ) + # unwrap list returned by `copartition`. + right_parts = right_parts[0] + else: + left_parts = self._partitions + right_parts = other._partitions + partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache( + axis + ), self.copy_axis_cache(axis) + new_frame = self._partition_mgr_cls.broadcast_apply( axis, func, left_parts, right_parts ) @@ -2868,13 +2909,19 @@ def _pick_axis(get_axis, sizes_cache): if axis == 0: # Pass shape caches instead of values in order to not trigger shape computation. new_index, new_row_lengths = _pick_axis( - self._get_index, self._row_lengths_cache + self.copy_index_cache, self._row_lengths_cache + ) + new_columns, new_column_widths = ( + self.copy_columns_cache(), + self._column_widths_cache, ) - new_columns, new_column_widths = self.columns, self._column_widths_cache else: - new_index, new_row_lengths = self.index, self._row_lengths_cache + new_index, new_row_lengths = ( + self.copy_index_cache(), + self._row_lengths_cache, + ) new_columns, new_column_widths = _pick_axis( - self._get_columns, self._column_widths_cache + self.copy_columns_cache, self._column_widths_cache ) return self.__constructor__( diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b4df19a88d2..0a601a7963a 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2828,6 +2828,53 @@ def setitem_builder(df, internal_indices=[]): # pragma: no cover # Drop/Dropna # This will change the shape of the resulting data. def dropna(self, **kwargs): + if kwargs.get("axis", 0) == 1 and kwargs.get("thresh", no_default) in ( + no_default, + None, + ): + how = kwargs.get("how", "any") + subset = kwargs.get("subset") + how = "any" if how in (no_default, None) else how + condition = lambda df: getattr(df, how)() # noqa: E731 (lambda assignment) + + def mapper(df: pandas.DataFrame): + if subset is not None: + subset_mask = condition( + df.loc[df.index.intersection(subset)].isna() + ) + mask = pandas.Series( + np.zeros(df.shape[1], dtype=bool), index=df.columns + ) + mask.update(subset_mask) + else: + mask = condition(df.isna()) + return mask.to_frame().T + + masks = self._modin_frame.apply_full_axis( + func=mapper, axis=1, keep_partitioning=True + ) + + def reduce(df: pandas.DataFrame, mask: pandas.DataFrame): + to_take_mask = ~condition(mask) + + to_take = [] + for col, value in to_take_mask.items(): + if value and col in df: + to_take.append(col) + + return df[to_take] + + result = self._modin_frame.broadcast_apply( + # 'masks' have identical partitioning as we specified 'keep_partitioning=True' before, + # this means that we can safely skip the 'co-partitioning' stage + axis=1, + func=reduce, + other=masks, + copartition=False, + labels="drop", + ) + return self.__constructor__(result) + return self.__constructor__( self._modin_frame.filter( kwargs.get("axis", 0) ^ 1, From 0b4960884f8818d71c123903bfe0e4d2b1ed06b1 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Tue, 8 Aug 2023 16:21:52 +0000 Subject: [PATCH 2/4] fix pydocstyle Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/dataframe/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index d889caf2853..eca3dbd89cf 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -237,7 +237,7 @@ def _get_axis_lengths_cache(self, axis=0): Parameters ---------- - axis : int, default 0 + axis : int, default: 0 0 - get row lengths cache, 1 - get column widths cache. Returns From e13e4eeed95f2a5f44035fb64eb40691a209440b Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 9 Aug 2023 10:27:54 +0000 Subject: [PATCH 3/4] tune implementation for square-like frames Signed-off-by: Dmitry Chigarev --- .../core/storage_formats/pandas/query_compiler.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 0a601a7963a..b5261f1c161 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -43,7 +43,7 @@ from pandas.core.groupby.base import transformation_kernels from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler -from modin.config import ExperimentalGroupbyImpl +from modin.config import ExperimentalGroupbyImpl, CpuCount from modin.error_message import ErrorMessage from modin.utils import ( try_cast_to_pandas, @@ -2828,10 +2828,19 @@ def setitem_builder(df, internal_indices=[]): # pragma: no cover # Drop/Dropna # This will change the shape of the resulting data. def dropna(self, **kwargs): - if kwargs.get("axis", 0) == 1 and kwargs.get("thresh", no_default) in ( + is_column_wise = kwargs.get("axis", 0) == 1 + no_thresh_passed = kwargs.get("thresh", no_default) in ( no_default, None, - ): + ) + # FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394 + # if there are too many partitions then all non-full-axis implementations start acting very badly. + # The here threshold is pretty random though it works fine on simple scenarios + processable_amount_of_partitions = ( + np.prod(self._modin_frame._partitions.shape) < CpuCount.get() * 32 + ) + + if is_column_wise and no_thresh_passed and processable_amount_of_partitions: how = kwargs.get("how", "any") subset = kwargs.get("subset") how = "any" if how in (no_default, None) else how From 11084807c53c251f913a5170de4ec2fb1840ca09 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Sun, 20 Aug 2023 19:17:22 +0000 Subject: [PATCH 4/4] Add more comments Signed-off-by: Dmitry Chigarev --- .../core/dataframe/pandas/dataframe/dataframe.py | 11 +++++++++++ .../storage_formats/pandas/query_compiler.py | 16 +++++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index eca3dbd89cf..7e7996ecd60 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -168,6 +168,17 @@ def _validate_axes_lengths(self): f"Column widths cannot be negative: {self._column_widths_cache}", ) + @property + def num_parts(self) -> int: + """ + Get the total number of partitions for this frame. + + Returns + ------- + int + """ + return np.prod(self._partitions.shape) + @property def row_lengths(self): """ diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index b5261f1c161..305dd614e66 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2837,7 +2837,7 @@ def dropna(self, **kwargs): # if there are too many partitions then all non-full-axis implementations start acting very badly. # The here threshold is pretty random though it works fine on simple scenarios processable_amount_of_partitions = ( - np.prod(self._modin_frame._partitions.shape) < CpuCount.get() * 32 + self._modin_frame.num_parts < CpuCount.get() * 32 ) if is_column_wise and no_thresh_passed and processable_amount_of_partitions: @@ -2847,16 +2847,21 @@ def dropna(self, **kwargs): condition = lambda df: getattr(df, how)() # noqa: E731 (lambda assignment) def mapper(df: pandas.DataFrame): + """Compute a mask indicating whether there are all/any NaN values in each column.""" if subset is not None: subset_mask = condition( df.loc[df.index.intersection(subset)].isna() ) + # we have to keep other columns so setting their mask + # values with `False` mask = pandas.Series( np.zeros(df.shape[1], dtype=bool), index=df.columns ) mask.update(subset_mask) else: mask = condition(df.isna()) + # for proper partitioning at the 'reduce' phase each partition has to + # represent a one-row frame rather than a one-column frame, so calling `.T` here return mask.to_frame().T masks = self._modin_frame.apply_full_axis( @@ -2864,6 +2869,15 @@ def mapper(df: pandas.DataFrame): ) def reduce(df: pandas.DataFrame, mask: pandas.DataFrame): + """Drop columns from `df` that satisfy the NaN `mask`.""" + # `mask` here consists of several rows each representing the masks result + # for a certain row partition: + # col1 col2 col3 + # 0 True True False col1 True + # 1 False True False ---> mask.any() ---> col2 True + # 2 True True False col3 False + # in order to get the proper 1D mask we have to reduce the partition's + # results by applying the condition one more time to_take_mask = ~condition(mask) to_take = []