Skip to content

Commit fd8224e

Browse files
authored
Update Aggregate functions to take builder parameters (#859)
* Add NullTreatment enum wrapper and add filter option to approx_distinct * Small usability on aggregate * Adding documentation and additional unit test for approx_median * Update approx_percentil_cont with builder parameters it uses, which is filter but not distinct * Update approx_percentil_cont_with_weight with builder parameters it uses, which is filter but not distinct * Update array_agg to use aggregate options * Update builder options for avg aggregate function * move bit_and bit_or to use macro to generaty python fn * Update builder arguments for bitwise operators * Use macro for bool_and and bool_or * Update python wrapper for arguments appropriate to bool operators * Set corr to use macro for pyfunction * Update unit test to make it easier to debug * Update corr python wrapper to expose only builder parameters used * Update count and count_star to use macro for exposing * Update count and count_star with approprate aggregation options * Move covar_pop and covar_samp to use macro for aggregates * Updateing covar_pop and covar_samp with builder option * Use macro for last_value and move first_value to be near it * Update first_value and last_value with the builder parameters that are relevant * Remove grouping since it is not actually implemented upstream * Move median to use macro * Expose builder options for median * Expose nth value * Updating linear regression functions to use filter and macro * Update stddev and stddev_pop to use filter and macro * Expose string_agg * Add string_agg to python wrappers and add unit test * Switch sum to use macro in rust side and expose correct options in python wrapper * Use macro for exposing var_pop and var_samp * Add unit tests for filtering on var_pop and var_samp * Move approximation functions to use macro when possible * Update user documentation to explain in detail the options for aggregate functions * Update unit test to handle Python 3.10 * Clean up commented code
1 parent 4ea0032 commit fd8224e

File tree

9 files changed

+1470
-619
lines changed

9 files changed

+1470
-619
lines changed

docs/source/user-guide/common-operations/aggregations.rst

+184-22
Original file line numberDiff line numberDiff line change
@@ -20,43 +20,205 @@
2020
Aggregation
2121
============
2222

23-
An aggregate or aggregation is a function where the values of multiple rows are processed together to form a single summary value.
24-
For performing an aggregation, DataFusion provides the :py:func:`~datafusion.dataframe.DataFrame.aggregate`
23+
An aggregate or aggregation is a function where the values of multiple rows are processed together
24+
to form a single summary value. For performing an aggregation, DataFusion provides the
25+
:py:func:`~datafusion.dataframe.DataFrame.aggregate`
2526

2627
.. ipython:: python
2728
29+
import urllib.request
2830
from datafusion import SessionContext
29-
from datafusion import column, lit
31+
from datafusion import col, lit
3032
from datafusion import functions as f
31-
import random
3233
33-
ctx = SessionContext()
34-
df = ctx.from_pydict(
35-
{
36-
"a": ["foo", "bar", "foo", "bar", "foo", "bar", "foo", "foo"],
37-
"b": ["one", "one", "two", "three", "two", "two", "one", "three"],
38-
"c": [random.randint(0, 100) for _ in range(8)],
39-
"d": [random.random() for _ in range(8)],
40-
},
41-
name="foo_bar"
34+
urllib.request.urlretrieve(
35+
"https://gist.githubusercontent.com/ritchie46/cac6b337ea52281aa23c049250a4ff03/raw/89a957ff3919d90e6ef2d34235e6bf22304f3366/pokemon.csv",
36+
"pokemon.csv",
4237
)
4338
44-
col_a = column("a")
45-
col_b = column("b")
46-
col_c = column("c")
47-
col_d = column("d")
39+
ctx = SessionContext()
40+
df = ctx.read_csv("pokemon.csv")
41+
42+
col_type_1 = col('"Type 1"')
43+
col_type_2 = col('"Type 2"')
44+
col_speed = col('"Speed"')
45+
col_attack = col('"Attack"')
4846
49-
df.aggregate([], [f.approx_distinct(col_c), f.approx_median(col_d), f.approx_percentile_cont(col_d, lit(0.5))])
47+
df.aggregate([col_type_1], [
48+
f.approx_distinct(col_speed).alias("Count"),
49+
f.approx_median(col_speed).alias("Median Speed"),
50+
f.approx_percentile_cont(col_speed, 0.9).alias("90% Speed")])
5051
51-
When the :code:`group_by` list is empty the aggregation is done over the whole :class:`.DataFrame`. For grouping
52-
the :code:`group_by` list must contain at least one column
52+
When the :code:`group_by` list is empty the aggregation is done over the whole :class:`.DataFrame`.
53+
For grouping the :code:`group_by` list must contain at least one column.
5354

5455
.. ipython:: python
5556
56-
df.aggregate([col_a], [f.sum(col_c), f.max(col_d), f.min(col_d)])
57+
df.aggregate([col_type_1], [
58+
f.max(col_speed).alias("Max Speed"),
59+
f.avg(col_speed).alias("Avg Speed"),
60+
f.min(col_speed).alias("Min Speed")])
5761
5862
More than one column can be used for grouping
5963

6064
.. ipython:: python
6165
62-
df.aggregate([col_a, col_b], [f.sum(col_c), f.max(col_d), f.min(col_d)])
66+
df.aggregate([col_type_1, col_type_2], [
67+
f.max(col_speed).alias("Max Speed"),
68+
f.avg(col_speed).alias("Avg Speed"),
69+
f.min(col_speed).alias("Min Speed")])
70+
71+
72+
73+
Setting Parameters
74+
------------------
75+
76+
Each of the built in aggregate functions provides arguments for the parameters that affect their
77+
operation. These can also be overridden using the builder approach to setting any of the following
78+
parameters. When you use the builder, you must call ``build()`` to finish. For example, these two
79+
expressions are equivalent.
80+
81+
.. ipython:: python
82+
83+
first_1 = f.first_value(col("a"), order_by=[col("a")])
84+
first_2 = f.first_value(col("a")).order_by(col("a")).build()
85+
86+
Ordering
87+
^^^^^^^^
88+
89+
You can control the order in which rows are processed by window functions by providing
90+
a list of ``order_by`` functions for the ``order_by`` parameter. In the following example, we
91+
sort the Pokemon by their attack in increasing order and take the first value, which gives us the
92+
Pokemon with the smallest attack value in each ``Type 1``.
93+
94+
.. ipython:: python
95+
96+
df.aggregate(
97+
[col('"Type 1"')],
98+
[f.first_value(
99+
col('"Name"'),
100+
order_by=[col('"Attack"').sort(ascending=True)]
101+
).alias("Smallest Attack")
102+
])
103+
104+
Distinct
105+
^^^^^^^^
106+
107+
When you set the parameter ``distinct`` to ``True``, then unique values will only be evaluated one
108+
time each. Suppose we want to create an array of all of the ``Type 2`` for each ``Type 1`` of our
109+
Pokemon set. Since there will be many entries of ``Type 2`` we only one each distinct value.
110+
111+
.. ipython:: python
112+
113+
df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")])
114+
115+
In the output of the above we can see that there are some ``Type 1`` for which the ``Type 2`` entry
116+
is ``null``. In reality, we probably want to filter those out. We can do this in two ways. First,
117+
we can filter DataFrame rows that have no ``Type 2``. If we do this, we might have some ``Type 1``
118+
entries entirely removed. The second is we can use the ``filter`` argument described below.
119+
120+
.. ipython:: python
121+
122+
df.filter(col_type_2.is_not_null()).aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True).alias("Type 2 List")])
123+
124+
df.aggregate([col_type_1], [f.array_agg(col_type_2, distinct=True, filter=col_type_2.is_not_null()).alias("Type 2 List")])
125+
126+
Which approach you take should depend on your use case.
127+
128+
Null Treatment
129+
^^^^^^^^^^^^^^
130+
131+
This option allows you to either respect or ignore null values.
132+
133+
One common usage for handling nulls is the case where you want to find the first value within a
134+
partition. By setting the null treatment to ignore nulls, we can find the first non-null value
135+
in our partition.
136+
137+
138+
.. ipython:: python
139+
140+
from datafusion.common import NullTreatment
141+
142+
df.aggregate([col_type_1], [
143+
f.first_value(
144+
col_type_2,
145+
order_by=[col_attack],
146+
null_treatment=NullTreatment.RESPECT_NULLS
147+
).alias("Lowest Attack Type 2")])
148+
149+
df.aggregate([col_type_1], [
150+
f.first_value(
151+
col_type_2,
152+
order_by=[col_attack],
153+
null_treatment=NullTreatment.IGNORE_NULLS
154+
).alias("Lowest Attack Type 2")])
155+
156+
Filter
157+
^^^^^^
158+
159+
Using the filter option is useful for filtering results to include in the aggregate function. It can
160+
be seen in the example above on how this can be useful to only filter rows evaluated by the
161+
aggregate function without filtering rows from the entire DataFrame.
162+
163+
Filter takes a single expression.
164+
165+
Suppose we want to find the speed values for only Pokemon that have low Attack values.
166+
167+
.. ipython:: python
168+
169+
df.aggregate([col_type_1], [
170+
f.avg(col_speed).alias("Avg Speed All"),
171+
f.avg(col_speed, filter=col_attack < lit(50)).alias("Avg Speed Low Attack")])
172+
173+
174+
Aggregate Functions
175+
-------------------
176+
177+
The available aggregate functions are:
178+
179+
1. Comparison Functions
180+
- :py:func:`datafusion.functions.min`
181+
- :py:func:`datafusion.functions.max`
182+
2. Math Functions
183+
- :py:func:`datafusion.functions.sum`
184+
- :py:func:`datafusion.functions.avg`
185+
- :py:func:`datafusion.functions.median`
186+
3. Array Functions
187+
- :py:func:`datafusion.functions.array_agg`
188+
4. Logical Functions
189+
- :py:func:`datafusion.functions.bit_and`
190+
- :py:func:`datafusion.functions.bit_or`
191+
- :py:func:`datafusion.functions.bit_xor`
192+
- :py:func:`datafusion.functions.bool_and`
193+
- :py:func:`datafusion.functions.bool_or`
194+
5. Statistical Functions
195+
- :py:func:`datafusion.functions.count`
196+
- :py:func:`datafusion.functions.corr`
197+
- :py:func:`datafusion.functions.covar_samp`
198+
- :py:func:`datafusion.functions.covar_pop`
199+
- :py:func:`datafusion.functions.stddev`
200+
- :py:func:`datafusion.functions.stddev_pop`
201+
- :py:func:`datafusion.functions.var_samp`
202+
- :py:func:`datafusion.functions.var_pop`
203+
6. Linear Regression Functions
204+
- :py:func:`datafusion.functions.regr_count`
205+
- :py:func:`datafusion.functions.regr_slope`
206+
- :py:func:`datafusion.functions.regr_intercept`
207+
- :py:func:`datafusion.functions.regr_r2`
208+
- :py:func:`datafusion.functions.regr_avgx`
209+
- :py:func:`datafusion.functions.regr_avgy`
210+
- :py:func:`datafusion.functions.regr_sxx`
211+
- :py:func:`datafusion.functions.regr_syy`
212+
- :py:func:`datafusion.functions.regr_slope`
213+
7. Positional Functions
214+
- :py:func:`datafusion.functions.first_value`
215+
- :py:func:`datafusion.functions.last_value`
216+
- :py:func:`datafusion.functions.nth_value`
217+
8. String Functions
218+
- :py:func:`datafusion.functions.string_agg`
219+
9. Approximation Functions
220+
- :py:func:`datafusion.functions.approx_distinct`
221+
- :py:func:`datafusion.functions.approx_median`
222+
- :py:func:`datafusion.functions.approx_percentile_cont`
223+
- :py:func:`datafusion.functions.approx_percentile_cont_with_weight`
224+

python/datafusion/common.py

+14-1
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,13 @@
1717
"""Common data types used throughout the DataFusion project."""
1818

1919
from ._internal import common as common_internal
20+
from enum import Enum
2021

2122
# TODO these should all have proper wrapper classes
2223

2324
DFSchema = common_internal.DFSchema
2425
DataType = common_internal.DataType
2526
DataTypeMap = common_internal.DataTypeMap
26-
NullTreatment = common_internal.NullTreatment
2727
PythonType = common_internal.PythonType
2828
RexType = common_internal.RexType
2929
SqlFunction = common_internal.SqlFunction
@@ -47,3 +47,16 @@
4747
"SqlStatistics",
4848
"SqlFunction",
4949
]
50+
51+
52+
class NullTreatment(Enum):
53+
"""Describe how null values are to be treated by functions.
54+
55+
This is used primarily by aggregate and window functions. It can be set on
56+
these functions using the builder approach described in
57+
ref:`_window_functions` and ref:`_aggregation` in the online documentation.
58+
59+
"""
60+
61+
RESPECT_NULLS = common_internal.NullTreatment.RESPECT_NULLS
62+
IGNORE_NULLS = common_internal.NullTreatment.IGNORE_NULLS

python/datafusion/dataframe.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ def with_column_renamed(self, old_name: str, new_name: str) -> DataFrame:
180180
"""
181181
return DataFrame(self.df.with_column_renamed(old_name, new_name))
182182

183-
def aggregate(self, group_by: list[Expr], aggs: list[Expr]) -> DataFrame:
183+
def aggregate(
184+
self, group_by: list[Expr] | Expr, aggs: list[Expr] | Expr
185+
) -> DataFrame:
184186
"""Aggregates the rows of the current DataFrame.
185187
186188
Args:
@@ -190,6 +192,9 @@ def aggregate(self, group_by: list[Expr], aggs: list[Expr]) -> DataFrame:
190192
Returns:
191193
DataFrame after aggregation.
192194
"""
195+
group_by = group_by if isinstance(group_by, list) else [group_by]
196+
aggs = aggs if isinstance(aggs, list) else [aggs]
197+
193198
group_by = [e.expr for e in group_by]
194199
aggs = [e.expr for e in aggs]
195200
return DataFrame(self.df.aggregate(group_by, aggs))

python/datafusion/expr.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder:
473473
set parameters for either window or aggregate functions. If used on any other
474474
type of expression, an error will be generated when ``build()`` is called.
475475
"""
476-
return ExprFuncBuilder(self.expr.null_treatment(null_treatment))
476+
return ExprFuncBuilder(self.expr.null_treatment(null_treatment.value))
477477

478478
def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder:
479479
"""Set the partitioning for a window function.
@@ -518,7 +518,7 @@ def distinct(self) -> ExprFuncBuilder:
518518

519519
def null_treatment(self, null_treatment: NullTreatment) -> ExprFuncBuilder:
520520
"""Set how nulls are treated for either window or aggregate functions."""
521-
return ExprFuncBuilder(self.builder.null_treatment(null_treatment))
521+
return ExprFuncBuilder(self.builder.null_treatment(null_treatment.value))
522522

523523
def partition_by(self, *partition_by: Expr) -> ExprFuncBuilder:
524524
"""Set partitioning for window functions."""

0 commit comments

Comments
 (0)