diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 4130de68a12..332ab906e27 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3117,7 +3117,12 @@ def _groupby_internal_columns(self, by, drop): else: if not isinstance(by, list): by = [by] if by is not None else [] - internal_by = [o for o in by if hashable(o) and o in self.columns] + internal_by = [] + for o in by: + if isinstance(o, pandas.Grouper) and o.key in self.columns: + internal_by.append(o.key) + elif hashable(o) and o in self.columns: + internal_by.append(o) internal_qc = ( [self.getitem_column_array(internal_by)] if len(internal_by) else [] ) diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 706342a8412..08312906453 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -476,35 +476,42 @@ def groupby( drop = by._parent is self idx_name = by.name by = by._query_compiler + elif isinstance(by, pandas.Grouper): + drop = by.key in self elif is_list_like(by): # fastpath for multi column groupby if axis == 0 and all( ( (hashable(o) and (o in self)) or isinstance(o, Series) + or (isinstance(o, pandas.Grouper) and o.key in self) or (is_list_like(o) and len(o) == len(self.axes[axis])) ) for o in by ): - # We want to split 'by's into those that belongs to the self (internal_by) - # and those that doesn't (external_by) - internal_by, external_by = [], [] + has_external = False + processed_by = [] for current_by in by: - if hashable(current_by): - internal_by.append(current_by) + if isinstance(current_by, pandas.Grouper): + processed_by.append(current_by) + has_external = True + elif hashable(current_by): + processed_by.append(current_by) elif isinstance(current_by, Series): if current_by._parent is self: - internal_by.append(current_by.name) + processed_by.append(current_by.name) else: - external_by.append(current_by._query_compiler) + processed_by.append(current_by._query_compiler) + has_external = True else: - external_by.append(current_by) + has_external = True + processed_by.append(current_by) - by = internal_by + external_by + by = processed_by - if len(external_by) == 0: - by = self[internal_by]._query_compiler + if not has_external: + by = self[processed_by]._query_compiler drop = True else: diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 873e1edb678..122899c0218 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -588,11 +588,19 @@ def _internal_by(self): internal_by = tuple() if self._drop: if is_list_like(self._by): - internal_by = tuple(by for by in self._by if isinstance(by, str)) + internal_by_list = [] + for by in self._by: + if isinstance(by, str): + internal_by_list.append(by) + elif isinstance(by, pandas.Grouper): + internal_by_list.append(by.key) + internal_by = tuple(internal_by_list) + elif isinstance(self._by, pandas.Grouper): + internal_by = tuple([self._by.key]) else: ErrorMessage.catch_bugs_and_request_email( failure_condition=not isinstance(self._by, BaseQueryCompiler), - extra_log=f"When 'drop' is True, 'by' must be either list-like or a QueryCompiler, met: {type(self._by)}.", + extra_log=f"When 'drop' is True, 'by' must be either list-like, Grouper, or a QueryCompiler, met: {type(self._by)}.", ) internal_by = tuple(self._by.columns) diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index 34837989563..1e9fd75ffe2 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -16,6 +16,7 @@ import pandas import numpy as np from unittest import mock +import datetime from modin.config import StorageFormat from modin.config.envvars import Engine, ExperimentalGroupbyImpl @@ -2574,6 +2575,42 @@ def test_skew_corner_cases(): eval_general(modin_df, pandas_df, lambda df: df.groupby("col0").skew()) +@pytest.mark.parametrize( + "by", + [ + pandas.Grouper(key="time_stamp", freq="3D"), + [pandas.Grouper(key="time_stamp", freq="1M"), "count"], + ], +) +def test_groupby_with_grouper(by): + # See https://github.com/modin-project/modin/issues/5091 for more details + # Generate larger data so that it can handle partitioning cases + data = { + "id": [i for i in range(200)], + "time_stamp": [ + pd.Timestamp("2000-01-02") + datetime.timedelta(days=x) for x in range(200) + ], + } + for i in range(200): + data[f"count_{i}"] = [i, i + 1] * 100 + + modin_df, pandas_df = create_test_dfs(data) + eval_general( + modin_df, + pandas_df, + lambda df: df.groupby(by).mean(), + ) + + +def test_groupby_preserves_by_order(): + modin_df, pandas_df = create_test_dfs({"col0": [1, 1, 1], "col1": [10, 10, 10]}) + + modin_res = modin_df.groupby([pd.Series([100, 100, 100]), "col0"]).mean() + pandas_res = pandas_df.groupby([pandas.Series([100, 100, 100]), "col0"]).mean() + + df_equals(modin_res, pandas_res) + + @pytest.mark.parametrize( "method", # test all aggregations from pandas.core.groupby.base.reduction_kernels except