Skip to content

Commit

Permalink
Pass multiple arrays for udwf evaluate so we can capture the order_by…
Browse files Browse the repository at this point in the history
… and also multiple columns
  • Loading branch information
timsaucer committed Sep 22, 2024
1 parent 9b2992d commit ef4a338
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 29 deletions.
168 changes: 157 additions & 11 deletions python/datafusion/tests/test_udwf.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@

from datafusion import SessionContext, column, udwf, lit, functions as f
from datafusion.udf import WindowEvaluator
from datafusion.expr import WindowFrame


class ExponentialSmooth(WindowEvaluator):
class ExponentialSmoothDefault(WindowEvaluator):
"""Interface of a user-defined accumulation."""

def __init__(self, alpha: float) -> None:
self.alpha = alpha

def evaluate_all(self, values: pa.Array, num_rows: int) -> pa.Array:
def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
results = []
curr_value = 0.0
values = values[0]
for idx in range(num_rows):
if idx == 0:
curr_value = values[idx].as_py()
Expand All @@ -43,6 +45,90 @@ def evaluate_all(self, values: pa.Array, num_rows: int) -> pa.Array:
return pa.array(results)


class ExponentialSmoothBounded(WindowEvaluator):
def __init__(self, alpha: float) -> None:
self.alpha = alpha

def supports_bounded_execution(self) -> bool:
return True

def get_range(self, idx: int, num_rows: int) -> tuple[int, int]:
# Ovrerride the default range of current row since uses_window_frame is False
# So for the purpose of this test we just smooth from the previous row to
# current.
if idx == 0:
return (0, 0)
return (idx - 1, idx)

def evaluate(
self, values: list[pa.Array], eval_range: tuple[int, int]
) -> pa.Scalar:
(start, stop) = eval_range
curr_value = 0.0
values = values[0]
for idx in range(start, stop + 1):
if idx == start:
curr_value = values[idx].as_py()
else:
curr_value = values[idx].as_py() * self.alpha + curr_value * (
1.0 - self.alpha
)
return pa.scalar(curr_value).cast(pa.float64())


class ExponentialSmoothRank(WindowEvaluator):
def __init__(self, alpha: float) -> None:
self.alpha = alpha

def include_rank(self) -> bool:
return True

def evaluate_all_with_rank(
self, num_rows: int, ranks_in_partition: list[tuple[int, int]]
) -> pa.Array:
results = []
for idx in range(num_rows):
if idx == 0:
prior_value = 1.0
matching_row = [
i
for i in range(len(ranks_in_partition))
if ranks_in_partition[i][0] <= idx and ranks_in_partition[i][1] > idx
][0] + 1
curr_value = matching_row * self.alpha + prior_value * (1.0 - self.alpha)
results.append(curr_value)
prior_value = matching_row

return pa.array(results)


class ExponentialSmoothFrame(WindowEvaluator):
def __init__(self, alpha: float) -> None:
self.alpha = alpha

def uses_window_frame(self) -> bool:
return True

def evaluate(
self, values: list[pa.Array], eval_range: tuple[int, int]
) -> pa.Scalar:
(start, stop) = eval_range
curr_value = 0.0
if len(values) > 1:
order_by = values[1] # noqa: F841
values = values[0]
else:
values = values[0]
for idx in range(start, stop):
if idx == start:
curr_value = values[idx].as_py()
else:
curr_value = values[idx].as_py() * self.alpha + curr_value * (
1.0 - self.alpha
)
return pa.scalar(curr_value).cast(pa.float64())


class NotSubclassOfWindowEvaluator:
pass

Expand Down Expand Up @@ -73,32 +159,92 @@ def test_udwf_errors(df):
)


smooth = udwf(
ExponentialSmooth(0.9),
smooth_default = udwf(
ExponentialSmoothDefault(0.9),
pa.float64(),
pa.float64(),
volatility="immutable",
)

smooth_bounded = udwf(
ExponentialSmoothBounded(0.9),
pa.float64(),
pa.float64(),
volatility="immutable",
)

smooth_rank = udwf(
ExponentialSmoothRank(0.9),
pa.utf8(),
pa.float64(),
volatility="immutable",
)

smooth_frame = udwf(
ExponentialSmoothFrame(0.9),
pa.float64(),
pa.float64(),
volatility="immutable",
)

data_test_udwf_functions = [
("smooth_udwf", smooth(column("a")), [0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889]),
(
"partitioned_udwf",
smooth(column("a")).partition_by(column("c")).build(),
"default_udwf",
smooth_default(column("a")),
[0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889],
),
(
"default_udwf_partitioned",
smooth_default(column("a")).partition_by(column("c")).build(),
[0, 0.9, 1.89, 2.889, 4.0, 4.9, 5.89],
),
(
"ordered_udwf",
smooth(column("a")).order_by(column("b")).build(),
"default_udwf_ordered",
smooth_default(column("a")).order_by(column("b")).build(),
[0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513],
),
(
"bounded_udwf",
smooth_bounded(column("a")),
[0, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9],
),
(
"bounded_udwf_ignores_frame",
smooth_bounded(column("a"))
.window_frame(WindowFrame("rows", None, None))
.build(),
[0, 0.9, 1.9, 2.9, 3.9, 4.9, 5.9],
),
(
"rank_udwf",
smooth_rank(column("c")).order_by(column("c")).build(),
[1, 1, 1, 1, 1.9, 2, 2],
),
(
"frame_unbounded_udwf",
smooth_frame(column("a")).window_frame(WindowFrame("rows", None, None)).build(),
[5.889, 5.889, 5.889, 5.889, 5.889, 5.889, 5.889],
),
(
"frame_bounded_udwf",
smooth_frame(column("a")).window_frame(WindowFrame("rows", None, 0)).build(),
[0.0, 0.9, 1.89, 2.889, 3.889, 4.889, 5.889],
),
(
"frame_bounded_udwf",
smooth_frame(column("a"))
.window_frame(WindowFrame("rows", None, 0))
.order_by(column("b"))
.build(),
[0.551, 1.13, 2.3, 2.755, 3.876, 5.0, 5.513],
),
]


@pytest.mark.parametrize("name,expr,expected", data_test_udwf_functions)
def test_udwf_functions(df, name, expr, expected):
df = df.select("a", f.round(expr, lit(3)).alias(name))

df = df.select("a", "b", f.round(expr, lit(3)).alias(name))
df.sort(column("a")).show()
# execute and collect the first (and only) batch
result = df.sort(column("a")).select(column(name)).collect()[0]

Expand Down
10 changes: 6 additions & 4 deletions python/datafusion/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ def memoize(self) -> None:
"""
pass

def get_range(self, idx: int, n_rows: int) -> tuple[int, int]:
def get_range(self, idx: int, num_rows: int) -> tuple[int, int]:
"""Return the range for the window fuction.
If `uses_window_frame` flag is `false`. This method is used to
Expand All @@ -288,15 +288,15 @@ def get_range(self, idx: int, n_rows: int) -> tuple[int, int]:
Args:
idx:: Current index
n_rows: Number of rows.
num_rows: Number of rows.
"""
return (idx, idx + 1)

def is_causal(self) -> bool:
"""Get whether evaluator needs future data for its result."""
return False

def evaluate_all(self, values: pyarrow.Array, num_rows: int) -> pyarrow.Array:
def evaluate_all(self, values: list[pyarrow.Array], num_rows: int) -> pyarrow.Array:
"""Evaluate a window function on an entire input partition.
This function is called once per input *partition* for window
Expand Down Expand Up @@ -336,7 +336,9 @@ def evaluate_all(self, values: pyarrow.Array, num_rows: int) -> pyarrow.Array:
"""
pass

def evaluate(self, values: pyarrow.Array, range: tuple[int, int]) -> pyarrow.Scalar:
def evaluate(
self, values: list[pyarrow.Array], eval_range: tuple[int, int]
) -> pyarrow.Scalar:
"""Evaluate window function on a range of rows in an input partition.
This is the simplest and most general function to implement
Expand Down
36 changes: 22 additions & 14 deletions src/udwf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::num;

Check warning on line 18 in src/udwf.rs

View workflow job for this annotation

GitHub Actions / test-matrix (3.10, stable)

unused import: `std::num`
use std::ops::Range;
use std::sync::Arc;

Expand Down Expand Up @@ -89,12 +90,17 @@ impl PartitionEvaluator for RustPartitionEvaluator {

fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
Python::with_gil(|py| {
let mut py_args = values
.iter()
.map(|arg| arg.into_data().to_pyarrow(py).unwrap())
.collect::<Vec<_>>();
py_args.push(num_rows.to_object(py));
let py_args = PyTuple::new_bound(py, py_args);
let py_values = PyList::new_bound(
py,
values
.iter()
.map(|arg| arg.into_data().to_pyarrow(py).unwrap()),
);
let py_num_rows = num_rows.to_object(py).into_bound(py);
let py_args = PyTuple::new_bound(
py,
PyTuple::new_bound(py, vec![py_values.as_any(), &py_num_rows]),
);

self.evaluator
.bind(py)
Expand All @@ -109,17 +115,19 @@ impl PartitionEvaluator for RustPartitionEvaluator {

fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
Python::with_gil(|py| {
// 1. cast args to Pyarrow array
let mut py_args = values
.iter()
.map(|arg| arg.into_data().to_pyarrow(py).unwrap())
.collect::<Vec<_>>();
let py_values = PyList::new_bound(
py,
values
.iter()
.map(|arg| arg.into_data().to_pyarrow(py).unwrap()),
);
let range_tuple =
PyTuple::new_bound(py, vec![range.start.to_object(py), range.end.to_object(py)]);
py_args.push(range_tuple.into());
let py_args = PyTuple::new_bound(py, py_args);
let py_args = PyTuple::new_bound(
py,
PyTuple::new_bound(py, vec![py_values.as_any(), range_tuple.as_any()]),
);

// 2. call function
self.evaluator
.bind(py)
.call_method1("evaluate", py_args)
Expand Down

0 comments on commit ef4a338

Please sign in to comment.