Skip to content

Commit 90f5b5b

Browse files
authored
Add Window Functions for use with function builder (#808)
* Add window function as template for others and function builder * Adding docstrings * Change last_value to use function builder instead of explicitly passing values * Allow any value for lead function default value and add unit test * Add lead window function and unit tests * Temporarily commenting out deprecated functions in documenation so builder will pass * Expose row_number window function * Add rank window function * Add percent rank and dense rank * Add cume_dist * Add ntile window function * Add comment to update when upstream merges * Window frame required calling inner value * Add unit test for avg as window function * Working on documentation for window functions * Add pyo build config file to git ignore since this is user specific * Add examples to docstring * Optionally add window function parameters during function call * Update sort and order_by to apply automatic ordering if any other expression is given * Update unit tests to be cleaner and use default sort on expressions * Ignore vscode folder specific settings * Window frames should only apply to aggregate functions used as window functions. Also pass in scalar pyarrow values so we can set a range other than a uint * Remove deprecated warning until we actually have a way to use all functions without calling window() * Built in window functions do not have any impact by setting null_treatment so remove from user facing * Update user documentation on how to pass parameters for different window functions and what their impacts are * Make first_value and last_value identical in the interface
1 parent 003eea8 commit 90f5b5b

File tree

12 files changed

+1059
-128
lines changed

12 files changed

+1059
-128
lines changed

.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ target
44
/docs/temp
55
/docs/build
66
.DS_Store
7+
.vscode
78

89
# Byte-compiled / optimized / DLL files
910
__pycache__/
@@ -31,3 +32,6 @@ apache-rat-*.jar
3132
CHANGELOG.md.bak
3233

3334
docs/mdbook/book
35+
36+
.pyo3_build_config
37+

docs/source/user-guide/common-operations/aggregations.rst

+2
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
.. specific language governing permissions and limitations
1616
.. under the License.
1717
18+
.. _aggregation:
19+
1820
Aggregation
1921
============
2022

docs/source/user-guide/common-operations/windows.rst

+156-31
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,16 @@
1515
.. specific language governing permissions and limitations
1616
.. under the License.
1717
18+
.. _window_functions:
19+
1820
Window Functions
1921
================
2022

21-
In this section you will learn about window functions. A window function utilizes values from one or multiple rows to
22-
produce a result for each individual row, unlike an aggregate function that provides a single value for multiple rows.
23+
In this section you will learn about window functions. A window function utilizes values from one or
24+
multiple rows to produce a result for each individual row, unlike an aggregate function that
25+
provides a single value for multiple rows.
2326

24-
The functionality of window functions in DataFusion is supported by the dedicated :py:func:`~datafusion.functions.window` function.
27+
The window functions are availble in the :py:mod:`~datafusion.functions` module.
2528

2629
We'll use the pokemon dataset (from Ritchie Vink) in the following examples.
2730

@@ -40,54 +43,176 @@ We'll use the pokemon dataset (from Ritchie Vink) in the following examples.
4043
ctx = SessionContext()
4144
df = ctx.read_csv("pokemon.csv")
4245
43-
Here is an example that shows how to compare each pokemons’s attack power with the average attack power in its ``"Type 1"``
46+
Here is an example that shows how you can compare each pokemon's speed to the speed of the
47+
previous row in the DataFrame.
4448

4549
.. ipython:: python
4650
4751
df.select(
4852
col('"Name"'),
49-
col('"Attack"'),
50-
f.alias(
51-
f.window("avg", [col('"Attack"')], partition_by=[col('"Type 1"')]),
52-
"Average Attack",
53-
)
53+
col('"Speed"'),
54+
f.lag(col('"Speed"')).alias("Previous Speed")
5455
)
5556
56-
You can also control the order in which rows are processed by window functions by providing
57+
Setting Parameters
58+
------------------
59+
60+
61+
Ordering
62+
^^^^^^^^
63+
64+
You can control the order in which rows are processed by window functions by providing
5765
a list of ``order_by`` functions for the ``order_by`` parameter.
5866

5967
.. ipython:: python
6068
6169
df.select(
6270
col('"Name"'),
6371
col('"Attack"'),
64-
f.alias(
65-
f.window(
66-
"rank",
67-
[],
68-
partition_by=[col('"Type 1"')],
69-
order_by=[f.order_by(col('"Attack"'))],
70-
),
71-
"rank",
72-
),
72+
col('"Type 1"'),
73+
f.rank(
74+
partition_by=[col('"Type 1"')],
75+
order_by=[col('"Attack"').sort(ascending=True)],
76+
).alias("rank"),
77+
).sort(col('"Type 1"'), col('"Attack"'))
78+
79+
Partitions
80+
^^^^^^^^^^
81+
82+
A window function can take a list of ``partition_by`` columns similar to an
83+
:ref:`Aggregation Function<aggregation>`. This will cause the window values to be evaluated
84+
independently for each of the partitions. In the example above, we found the rank of each
85+
Pokemon per ``Type 1`` partitions. We can see the first couple of each partition if we do
86+
the following:
87+
88+
.. ipython:: python
89+
90+
df.select(
91+
col('"Name"'),
92+
col('"Attack"'),
93+
col('"Type 1"'),
94+
f.rank(
95+
partition_by=[col('"Type 1"')],
96+
order_by=[col('"Attack"').sort(ascending=True)],
97+
).alias("rank"),
98+
).filter(col("rank") < lit(3)).sort(col('"Type 1"'), col("rank"))
99+
100+
Window Frame
101+
^^^^^^^^^^^^
102+
103+
When using aggregate functions, the Window Frame of defines the rows over which it operates.
104+
If you do not specify a Window Frame, the frame will be set depending on the following
105+
criteria.
106+
107+
* If an ``order_by`` clause is set, the default window frame is defined as the rows between
108+
unbounded preceeding and the current row.
109+
* If an ``order_by`` is not set, the default frame is defined as the rows betwene unbounded
110+
and unbounded following (the entire partition).
111+
112+
Window Frames are defined by three parameters: unit type, starting bound, and ending bound.
113+
114+
The unit types available are:
115+
116+
* Rows: The starting and ending boundaries are defined by the number of rows relative to the
117+
current row.
118+
* Range: When using Range, the ``order_by`` clause must have exactly one term. The boundaries
119+
are defined bow how close the rows are to the value of the expression in the ``order_by``
120+
parameter.
121+
* Groups: A "group" is the set of all rows that have equivalent values for all terms in the
122+
``order_by`` clause.
123+
124+
In this example we perform a "rolling average" of the speed of the current Pokemon and the
125+
two preceeding rows.
126+
127+
.. ipython:: python
128+
129+
from datafusion.expr import WindowFrame
130+
131+
df.select(
132+
col('"Name"'),
133+
col('"Speed"'),
134+
f.window("avg",
135+
[col('"Speed"')],
136+
order_by=[col('"Speed"')],
137+
window_frame=WindowFrame("rows", 2, 0)
138+
).alias("Previous Speed")
139+
)
140+
141+
Null Treatment
142+
^^^^^^^^^^^^^^
143+
144+
When using aggregate functions as window functions, it is often useful to specify how null values
145+
should be treated. In order to do this you need to use the builder function. In future releases
146+
we expect this to be simplified in the interface.
147+
148+
One common usage for handling nulls is the case where you want to find the last value up to the
149+
current row. In the following example we demonstrate how setting the null treatment to ignore
150+
nulls will fill in with the value of the most recent non-null row. To do this, we also will set
151+
the window frame so that we only process up to the current row.
152+
153+
In this example, we filter down to one specific type of Pokemon that does have some entries in
154+
it's ``Type 2`` column that are null.
155+
156+
.. ipython:: python
157+
158+
from datafusion.common import NullTreatment
159+
160+
df.filter(col('"Type 1"') == lit("Bug")).select(
161+
'"Name"',
162+
'"Type 2"',
163+
f.window("last_value", [col('"Type 2"')])
164+
.window_frame(WindowFrame("rows", None, 0))
165+
.order_by(col('"Speed"'))
166+
.null_treatment(NullTreatment.IGNORE_NULLS)
167+
.build()
168+
.alias("last_wo_null"),
169+
f.window("last_value", [col('"Type 2"')])
170+
.window_frame(WindowFrame("rows", None, 0))
171+
.order_by(col('"Speed"'))
172+
.null_treatment(NullTreatment.RESPECT_NULLS)
173+
.build()
174+
.alias("last_with_null")
175+
)
176+
177+
Aggregate Functions
178+
-------------------
179+
180+
You can use any :ref:`Aggregation Function<aggregation>` as a window function. Currently
181+
aggregate functions must use the deprecated
182+
:py:func:`datafusion.functions.window` API but this should be resolved in
183+
DataFusion 42.0 (`Issue Link <https://github.com/apache/datafusion-python/issues/833>`_). Here
184+
is an example that shows how to compare each pokemons’s attack power with the average attack
185+
power in its ``"Type 1"`` using the :py:func:`datafusion.functions.avg` function.
186+
187+
.. ipython:: python
188+
:okwarning:
189+
190+
df.select(
191+
col('"Name"'),
192+
col('"Attack"'),
193+
col('"Type 1"'),
194+
f.window("avg", [col('"Attack"')])
195+
.partition_by(col('"Type 1"'))
196+
.build()
197+
.alias("Average Attack"),
73198
)
74199
200+
Available Functions
201+
-------------------
202+
75203
The possible window functions are:
76204

77205
1. Rank Functions
78-
- rank
79-
- dense_rank
80-
- row_number
81-
- ntile
206+
- :py:func:`datafusion.functions.rank`
207+
- :py:func:`datafusion.functions.dense_rank`
208+
- :py:func:`datafusion.functions.ntile`
209+
- :py:func:`datafusion.functions.row_number`
82210

83211
2. Analytical Functions
84-
- cume_dist
85-
- percent_rank
86-
- lag
87-
- lead
88-
- first_value
89-
- last_value
90-
- nth_value
212+
- :py:func:`datafusion.functions.cume_dist`
213+
- :py:func:`datafusion.functions.percent_rank`
214+
- :py:func:`datafusion.functions.lag`
215+
- :py:func:`datafusion.functions.lead`
91216

92217
3. Aggregate Functions
93-
- All aggregate functions can be used as window functions.
218+
- All :ref:`Aggregation Functions<aggregation>` can be used as window functions.

python/datafusion/dataframe.py

+3-4
Original file line numberDiff line numberDiff line change
@@ -123,11 +123,10 @@ def select(self, *exprs: Expr | str) -> DataFrame:
123123
df = df.select("a", col("b"), col("a").alias("alternate_a"))
124124
125125
"""
126-
exprs = [
127-
arg.expr if isinstance(arg, Expr) else Expr.column(arg).expr
128-
for arg in exprs
126+
exprs_internal = [
127+
Expr.column(arg).expr if isinstance(arg, str) else arg.expr for arg in exprs
129128
]
130-
return DataFrame(self.df.select(*exprs))
129+
return DataFrame(self.df.select(*exprs_internal))
131130

132131
def filter(self, *predicates: Expr) -> DataFrame:
133132
"""Return a DataFrame for which ``predicate`` evaluates to ``True``.

0 commit comments

Comments
 (0)