Skip to content

Commit

Permalink
[SPARK-40332][PS] Implement GroupBy.quantile
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Implement `GroupBy.quantile`

### Why are the changes needed?
Improve PS api coverage

### Does this PR introduce _any_ user-facing change?
yes, new API
```python
>>> df = ps.DataFrame([
...     ['a', 1], ['a', 2], ['a', 3],
...     ['b', 1], ['b', 3], ['b', 5]
... ], columns=['key', 'val'])
>>> df.groupby('key').quantile()
     val
key
a    2.0
b    3.0
```

### How was this patch tested?
UT

Closes apache#37816 from Yikun/SPARK-40332.

Authored-by: Yikun Jiang <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
Yikun authored and zhengruifeng committed Sep 9, 2022
1 parent f885418 commit 6577c43
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 3 deletions.
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.pandas/groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Computations / Descriptive Stats
GroupBy.sum
GroupBy.var
GroupBy.nunique
GroupBy.quantile
GroupBy.size
GroupBy.diff
GroupBy.idxmax
Expand Down
64 changes: 63 additions & 1 deletion python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import warnings

import pandas as pd
from pandas.api.types import is_hashable, is_list_like # type: ignore[attr-defined]
from pandas.api.types import is_number, is_hashable, is_list_like # type: ignore[attr-defined]

if LooseVersion(pd.__version__) >= LooseVersion("1.3.0"):
from pandas.core.common import _builtin_table # type: ignore[attr-defined]
Expand All @@ -58,6 +58,7 @@
from pyspark.sql.types import (
BooleanType,
DataType,
DoubleType,
NumericType,
StructField,
StructType,
Expand Down Expand Up @@ -581,6 +582,67 @@ def mean(self, numeric_only: Optional[bool] = True) -> FrameLike:
F.mean, accepted_spark_types=(NumericType,), bool_to_numeric=True
)

# TODO: 'q' accepts list like type
def quantile(self, q: float = 0.5, accuracy: int = 10000) -> FrameLike:
"""
Return group values at the given quantile.
.. versionadded:: 3.4.0
Parameters
----------
q : float, default 0.5 (50% quantile)
Value between 0 and 1 providing the quantile to compute.
accuracy : int, optional
Default accuracy of approximation. Larger value means better accuracy.
The relative error can be deduced by 1.0 / accuracy.
This is a panda-on-Spark specific parameter.
Returns
-------
pyspark.pandas.Series or pyspark.pandas.DataFrame
Return type determined by caller of GroupBy object.
Notes
-------
`quantile` in pandas-on-Spark are using distributed percentile approximation
algorithm unlike pandas, the result might different with pandas, also
`interpolation` parameters are not supported yet.
See Also
--------
pyspark.pandas.Series.quantile
pyspark.pandas.DataFrame.quantile
pyspark.sql.functions.percentile_approx
Examples
--------
>>> df = ps.DataFrame([
... ['a', 1], ['a', 2], ['a', 3],
... ['b', 1], ['b', 3], ['b', 5]
... ], columns=['key', 'val'])
Groupby one column and return the quantile of the remaining columns in
each group.
>>> df.groupby('key').quantile()
val
key
a 2.0
b 3.0
"""
if is_list_like(q):
raise NotImplementedError("q doesn't support for list like type for now")
if not is_number(q):
raise TypeError("must be real number, not %s" % type(q).__name__)
if not 0 <= q <= 1:
raise ValueError("'q' must be between 0 and 1. Got '%s' instead" % q)
return self._reduce_for_stat_function(
lambda col: F.percentile_approx(col.cast(DoubleType()), q, accuracy),
accepted_spark_types=(NumericType, BooleanType),
bool_to_numeric=True,
)

def min(self, numeric_only: Optional[bool] = False) -> FrameLike:
"""
Compute min of group values.
Expand Down
2 changes: 0 additions & 2 deletions python/pyspark/pandas/missing/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class MissingPandasLikeDataFrameGroupBy:
indices = _unsupported_property("indices")
ngroups = _unsupported_property("ngroups")
plot = _unsupported_property("plot")
quantile = _unsupported_property("quantile")
tshift = _unsupported_property("tshift")

# Deprecated properties
Expand Down Expand Up @@ -81,7 +80,6 @@ class MissingPandasLikeSeriesGroupBy:
is_monotonic_increasing = _unsupported_property("is_monotonic_increasing")
ngroups = _unsupported_property("ngroups")
plot = _unsupported_property("plot")
quantile = _unsupported_property("quantile")
tshift = _unsupported_property("tshift")

# Deprecated properties
Expand Down
42 changes: 42 additions & 0 deletions python/pyspark/pandas/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,48 @@ def test_mean(self):
with self.assertRaises(TypeError):
psdf.groupby("A")["C"].mean()

def test_quantile(self):
dfs = [
pd.DataFrame(
[["a", 1], ["a", 2], ["a", 3], ["b", 1], ["b", 3], ["b", 5]], columns=["key", "val"]
),
pd.DataFrame(
[["a", True], ["a", True], ["a", False], ["b", True], ["b", True], ["b", False]],
columns=["key", "val"],
),
]
for df in dfs:
psdf = ps.from_pandas(df)
# q accept float and int between 0 and 1
for i in [0, 0.1, 0.5, 1]:
self.assert_eq(
df.groupby("key").quantile(q=i, interpolation="lower"),
psdf.groupby("key").quantile(q=i),
almost=True,
)
self.assert_eq(
df.groupby("key")["val"].quantile(q=i, interpolation="lower"),
psdf.groupby("key")["val"].quantile(q=i),
almost=True,
)
# raise ValueError when q not in [0, 1]
with self.assertRaises(ValueError):
psdf.groupby("key").quantile(q=1.1)
with self.assertRaises(ValueError):
psdf.groupby("key").quantile(q=-0.1)
with self.assertRaises(ValueError):
psdf.groupby("key").quantile(q=2)
with self.assertRaises(ValueError):
psdf.groupby("key").quantile(q=np.nan)
# raise TypeError when q type mismatch
with self.assertRaises(TypeError):
psdf.groupby("key").quantile(q="0.1")
# raise NotImplementedError when q is list like type
with self.assertRaises(NotImplementedError):
psdf.groupby("key").quantile(q=(0.1, 0.5))
with self.assertRaises(NotImplementedError):
psdf.groupby("key").quantile(q=[0.1, 0.5])

def test_min(self):
self._test_stat_func(lambda groupby_obj: groupby_obj.min())
self._test_stat_func(lambda groupby_obj: groupby_obj.min(numeric_only=None))
Expand Down

0 comments on commit 6577c43

Please sign in to comment.