Skip to content

Commit

Permalink
PERF-#6433: Implement '.dropna()' using map-reduce pattern (#6472)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Aug 21, 2023
1 parent cf335c3 commit cfee6b4
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 17 deletions.
90 changes: 74 additions & 16 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ def _validate_axes_lengths(self):
f"Column widths cannot be negative: {self._column_widths_cache}",
)

@property
def num_parts(self) -> int:
"""
Get the total number of partitions for this frame.
Returns
-------
int
"""
return np.prod(self._partitions.shape)

@property
def row_lengths(self):
"""
Expand Down Expand Up @@ -231,6 +242,22 @@ def _set_axis_lengths_cache(self, value, axis=0):
else:
self._column_widths_cache = value

def _get_axis_lengths_cache(self, axis=0):
"""
Get partition's shape caches along the specified axis if avaliable.
Parameters
----------
axis : int, default: 0
0 - get row lengths cache, 1 - get column widths cache.
Returns
-------
list of ints or None
If the cache is computed return a list of ints, ``None`` otherwise.
"""
return self._row_lengths_cache if axis == 0 else self._column_widths_cache

@property
def has_dtypes_cache(self):
"""
Expand Down Expand Up @@ -2823,7 +2850,14 @@ def apply_select_indices(

@lazy_metadata_decorator(apply_axis="both")
def broadcast_apply(
self, axis, func, other, join_type="left", labels="keep", dtypes=None
self,
axis,
func,
other,
join_type="left",
copartition=True,
labels="keep",
dtypes=None,
):
"""
Broadcast axis partitions of `other` to partitions of `self` and apply a function.
Expand All @@ -2838,6 +2872,13 @@ def broadcast_apply(
Modin DataFrame to broadcast.
join_type : str, default: "left"
Type of join to apply.
copartition : bool, default: True
Whether to align indices/partitioning of the `self` and `other` frame.
Disabling this may save some time, however, you have to be 100% sure that
the indexing and partitioning are identical along the broadcasting axis,
this might be the case for example if `other` is a projection of the `self`
or vice-versa. If copartitioning is disabled and partitioning/indexing are
incompatible then you may end up with undefined behavior.
labels : {"keep", "replace", "drop"}, default: "keep"
Whether keep labels from `self` Modin DataFrame, replace them with labels
from joined DataFrame or drop altogether to make them be computed lazily later.
Expand All @@ -2849,17 +2890,28 @@ def broadcast_apply(
PandasDataframe
New Modin DataFrame.
"""
# Only sort the indices if they do not match
(
left_parts,
right_parts,
joined_index,
partition_sizes_along_axis,
) = self._copartition(
axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis])
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
if copartition:
# Only sort the indices if they do not match
(
left_parts,
right_parts,
joined_index,
partition_sizes_along_axis,
) = self._copartition(
axis,
other,
join_type,
sort=not self.axes[axis].equals(other.axes[axis]),
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
else:
left_parts = self._partitions
right_parts = other._partitions
partition_sizes_along_axis, joined_index = self._get_axis_lengths_cache(
axis
), self.copy_axis_cache(axis)

new_frame = self._partition_mgr_cls.broadcast_apply(
axis, func, left_parts, right_parts
)
Expand All @@ -2877,13 +2929,19 @@ def _pick_axis(get_axis, sizes_cache):
if axis == 0:
# Pass shape caches instead of values in order to not trigger shape computation.
new_index, new_row_lengths = _pick_axis(
self._get_index, self._row_lengths_cache
self.copy_index_cache, self._row_lengths_cache
)
new_columns, new_column_widths = (
self.copy_columns_cache(),
self._column_widths_cache,
)
new_columns, new_column_widths = self.columns, self._column_widths_cache
else:
new_index, new_row_lengths = self.index, self._row_lengths_cache
new_index, new_row_lengths = (
self.copy_index_cache(),
self._row_lengths_cache,
)
new_columns, new_column_widths = _pick_axis(
self._get_columns, self._column_widths_cache
self.copy_columns_cache, self._column_widths_cache
)

return self.__constructor__(
Expand Down
72 changes: 71 additions & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from pandas.core.groupby.base import transformation_kernels

from modin.core.storage_formats.base.query_compiler import BaseQueryCompiler
from modin.config import ExperimentalGroupbyImpl
from modin.config import ExperimentalGroupbyImpl, CpuCount
from modin.error_message import ErrorMessage
from modin.utils import (
try_cast_to_pandas,
Expand Down Expand Up @@ -2848,6 +2848,76 @@ def setitem_builder(df, internal_indices=[]): # pragma: no cover
# Drop/Dropna
# This will change the shape of the resulting data.
def dropna(self, **kwargs):
is_column_wise = kwargs.get("axis", 0) == 1
no_thresh_passed = kwargs.get("thresh", no_default) in (
no_default,
None,
)
# FIXME: this is a naive workaround for this problem: https://github.com/modin-project/modin/issues/5394
# if there are too many partitions then all non-full-axis implementations start acting very badly.
# The here threshold is pretty random though it works fine on simple scenarios
processable_amount_of_partitions = (
self._modin_frame.num_parts < CpuCount.get() * 32
)

if is_column_wise and no_thresh_passed and processable_amount_of_partitions:
how = kwargs.get("how", "any")
subset = kwargs.get("subset")
how = "any" if how in (no_default, None) else how
condition = lambda df: getattr(df, how)() # noqa: E731 (lambda assignment)

def mapper(df: pandas.DataFrame):
"""Compute a mask indicating whether there are all/any NaN values in each column."""
if subset is not None:
subset_mask = condition(
df.loc[df.index.intersection(subset)].isna()
)
# we have to keep other columns so setting their mask
# values with `False`
mask = pandas.Series(
np.zeros(df.shape[1], dtype=bool), index=df.columns
)
mask.update(subset_mask)
else:
mask = condition(df.isna())
# for proper partitioning at the 'reduce' phase each partition has to
# represent a one-row frame rather than a one-column frame, so calling `.T` here
return mask.to_frame().T

masks = self._modin_frame.apply_full_axis(
func=mapper, axis=1, keep_partitioning=True
)

def reduce(df: pandas.DataFrame, mask: pandas.DataFrame):
"""Drop columns from `df` that satisfy the NaN `mask`."""
# `mask` here consists of several rows each representing the masks result
# for a certain row partition:
# col1 col2 col3
# 0 True True False col1 True
# 1 False True False ---> mask.any() ---> col2 True
# 2 True True False col3 False
# in order to get the proper 1D mask we have to reduce the partition's
# results by applying the condition one more time
to_take_mask = ~condition(mask)

to_take = []
for col, value in to_take_mask.items():
if value and col in df:
to_take.append(col)

return df[to_take]

result = self._modin_frame.broadcast_apply(
# 'masks' have identical partitioning as we specified 'keep_partitioning=True' before,
# this means that we can safely skip the 'co-partitioning' stage
axis=1,
func=reduce,
other=masks,
copartition=False,
labels="drop",
)
return self.__constructor__(result)

return self.__constructor__(
self._modin_frame.filter(
kwargs.get("axis", 0) ^ 1,
Expand Down

0 comments on commit cfee6b4

Please sign in to comment.