From f9351858d7abebe8e25a84172a5e2e7ce89e4a50 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 29 Feb 2024 11:44:27 -0500 Subject: [PATCH 01/11] Add support for generalized ufuncs that return a scalar. --- py-polars/polars/series/series.py | 41 ++++++++++++++++--- py-polars/requirements-dev.txt | 1 + py-polars/src/series/numpy_ufunc.rs | 8 ++-- .../unit/interop/numpy/test_ufunc_expr.py | 36 +++++++++++++++- 4 files changed, 75 insertions(+), 11 deletions(-) diff --git a/py-polars/polars/series/series.py b/py-polars/polars/series/series.py index e2500eb91c87..2aca60388e50 100644 --- a/py-polars/polars/series/series.py +++ b/py-polars/polars/series/series.py @@ -1464,6 +1464,25 @@ def __array_ufunc__( else dtype_char_minimum ) + # Figure out the size of the output. We don't support the full + # signature options yet, in particular NEP 20, so this will get + # things wrong in some cases. The default is the length of the + # Series. + # + # Documentation of the signature syntax is found in NEPs 5 and 20 + # in the NumPy docs, and perhaps additional ones + # (https://numpy.org/neps/). + output_length = s.len() + result_is_series = True + if ufunc.signature: + # Non-generalized ufuncs won't have signature set, so it's + # going to be the length of the input. If it is set we need to + # parse it. + result_signature = ufunc.signature.split("->", 1)[-1].strip() + if result_signature == "()": + output_length = 1 + result_is_series = False + f = get_ffi_func("apply_ufunc_<>", numpy_char_code_to_dtype(dtype_char), s) if f is None: @@ -1473,13 +1492,23 @@ def __array_ufunc__( ) raise NotImplementedError(msg) - series = f(lambda out: ufunc(*args, out=out, dtype=dtype_char, **kwargs)) - return ( - self._from_pyseries(series) - .to_frame() - .select(F.when(validity_mask).then(F.col(self.name))) - .to_series(0) + series = f( + lambda out: ufunc(*args, out=out, dtype=dtype_char, **kwargs), + output_length, ) + if result_is_series: + return ( + self._from_pyseries(series) + .to_frame() + .select(F.when(validity_mask).then(F.col(self.name))) + .to_series(0) + ) + else: + getter = get_ffi_func( + "get_<>", numpy_char_code_to_dtype(dtype_char), series + ) + assert getter is not None + return getter(0) else: msg = ( "only `__call__` is implemented for numpy ufuncs on a Series, got " diff --git a/py-polars/requirements-dev.txt b/py-polars/requirements-dev.txt index 4a3785e77f15..4ffbbbe0feee 100644 --- a/py-polars/requirements-dev.txt +++ b/py-polars/requirements-dev.txt @@ -18,6 +18,7 @@ numpy pandas pyarrow pydantic>=2.0.0 +numba; python_version < '3.13' # Numba takes a while to support new Python versions # Datetime / time zones backports.zoneinfo; python_version < '3.9' tzdata; platform_system == 'Windows' diff --git a/py-polars/src/series/numpy_ufunc.rs b/py-polars/src/series/numpy_ufunc.rs index 91265aca0789..04b014d85693 100644 --- a/py-polars/src/series/numpy_ufunc.rs +++ b/py-polars/src/series/numpy_ufunc.rs @@ -66,12 +66,12 @@ macro_rules! impl_ufuncs { // the out array is allocated in this method, send to Python and once the ufunc is applied // ownership is taken by Rust again to prevent memory leak. // if the ufunc fails, we first must take ownership back. - fn $name(&self, lambda: &PyAny) -> PyResult { + fn $name(&self, lambda: &PyAny, output_size: usize) -> PyResult { // numpy array object, and a *mut ptr Python::with_gil(|py| { - let size = self.len(); - let (out_array, av) = - unsafe { aligned_array::<<$type as PolarsNumericType>::Native>(py, size) }; + let (out_array, av) = unsafe { + aligned_array::<<$type as PolarsNumericType>::Native>(py, output_size) + }; debug_assert_eq!(get_refcnt(out_array), 1); // inserting it in a tuple increase the reference count by 1. diff --git a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py index 8695d8d7e4b5..d9238114d6d8 100644 --- a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py +++ b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py @@ -1,8 +1,16 @@ from __future__ import annotations -from typing import Any, cast +from typing import Any, Callable, cast import numpy as np +import pytest + +try: + import numba +except ImportError: + # Numba can take a while to support new Python versions, so we don't want a + # hard dependency on it. + numba = None import polars as pl from polars.testing import assert_frame_equal, assert_series_equal @@ -130,3 +138,29 @@ def test_ufunc_multiple_expressions() -> None: def test_grouped_ufunc() -> None: df = pl.DataFrame({"id": ["a", "a", "b", "b"], "values": [0.1, 0.1, -0.1, -0.1]}) df.group_by("id").agg(pl.col("values").log1p().sum().pipe(np.expm1)) + + +@pytest.mark.skipif(numba is None, reason="Numba is not available") +def test_generalized_ufunc() -> None: + assert numba is not None # to pacify type checkers + + @numba.guvectorize([(numba.int64[:], numba.int64[:])], "(n)->()") + def my_custom_sum(arr, result): + total = 0 + for value in arr: + total += value + result[0] = total + + # Make type checkers happy: + custom_sum = cast(Callable[[object], object], my_custom_sum) + + # Demonstrate NumPy as the canonical expected behavior: + assert custom_sum(np.array([10, 2, 3], dtype=np.int64)) == 15 + + # Direct call of the gufunc: + df = pl.DataFrame({"values": [10, 2, 3]}) + assert custom_sum(df.get_column("values")) == 15 + + # Indirect call of the gufunc: + indirect = df.select(pl.col("values").map_batches(custom_sum)) + assert_frame_equal(indirect, pl.DataFrame({"values": 15})) From db866daf64fad6c2600f4a7481849f6598bef071 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 29 Feb 2024 12:42:11 -0500 Subject: [PATCH 02/11] Pacify mypy. --- py-polars/tests/unit/interop/numpy/test_ufunc_expr.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py index d9238114d6d8..f283009904e9 100644 --- a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py +++ b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py @@ -6,7 +6,7 @@ import pytest try: - import numba + import numba # type: ignore[import-untyped] except ImportError: # Numba can take a while to support new Python versions, so we don't want a # hard dependency on it. @@ -144,8 +144,8 @@ def test_grouped_ufunc() -> None: def test_generalized_ufunc() -> None: assert numba is not None # to pacify type checkers - @numba.guvectorize([(numba.int64[:], numba.int64[:])], "(n)->()") - def my_custom_sum(arr, result): + @numba.guvectorize([(numba.int64[:], numba.int64[:])], "(n)->()") # type: ignore[misc] + def my_custom_sum(arr, result) -> None: # type: ignore[no-untyped-def] total = 0 for value in arr: total += value From aa908e4ac2101f404d2017a0fa0924c4ce5a4b43 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 21 Mar 2024 14:26:49 -0400 Subject: [PATCH 03/11] Sketch of mapping to scalar as a separate Python API function. --- crates/polars-plan/src/dsl/python_udf.rs | 11 +++++++--- py-polars/polars/expr/expr.py | 22 +++++++++++++++++++ py-polars/src/expr/general.rs | 5 +++-- py-polars/src/map/lazy.rs | 3 ++- .../unit/interop/numpy/test_ufunc_expr.py | 15 +++++++++++-- 5 files changed, 48 insertions(+), 8 deletions(-) diff --git a/crates/polars-plan/src/dsl/python_udf.rs b/crates/polars-plan/src/dsl/python_udf.rs index e1fa05d419f0..b96e81c4b8ac 100644 --- a/crates/polars-plan/src/dsl/python_udf.rs +++ b/crates/polars-plan/src/dsl/python_udf.rs @@ -103,14 +103,16 @@ pub struct PythonUdfExpression { python_function: PyObject, output_type: Option, is_elementwise: bool, + returns_scalar: bool } impl PythonUdfExpression { - pub fn new(lambda: PyObject, output_type: Option, is_elementwise: bool) -> Self { + pub fn new(lambda: PyObject, output_type: Option, is_elementwise: bool, returns_scalar: bool) -> Self { Self { python_function: lambda, output_type, is_elementwise, + returns_scalar } } @@ -120,7 +122,7 @@ impl PythonUdfExpression { // skip header let buf = &buf[MAGIC_BYTE_MARK.len()..]; let mut reader = Cursor::new(buf); - let (output_type, is_elementwise): (Option, bool) = + let (output_type, is_elementwise, returns_scalar): (Option, bool, bool) = ciborium::de::from_reader(&mut reader).map_err(map_err)?; let remainder = &buf[reader.position() as usize..]; @@ -137,6 +139,7 @@ impl PythonUdfExpression { python_function.into(), output_type, is_elementwise, + returns_scalar )) as Arc) }) } @@ -172,7 +175,7 @@ impl SeriesUdf for PythonUdfExpression { #[cfg(feature = "serde")] fn try_serialize(&self, buf: &mut Vec) -> PolarsResult<()> { buf.extend_from_slice(MAGIC_BYTE_MARK); - ciborium::ser::into_writer(&(self.output_type.clone(), self.is_elementwise), &mut *buf) + ciborium::ser::into_writer(&(self.output_type.clone(), self.is_elementwise, self.returns_scalar), &mut *buf) .unwrap(); Python::with_gil(|py| { @@ -213,6 +216,7 @@ impl Expr { (ApplyOptions::GroupWise, "python_udf") }; + let returns_scalar = func.returns_scalar; let return_dtype = func.output_type.clone(); let output_type = GetOutput::map_field(move |fld| match return_dtype { Some(ref dt) => Field::new(fld.name(), dt.clone()), @@ -230,6 +234,7 @@ impl Expr { options: FunctionOptions { collect_groups, fmt_str: name, + returns_scalar, ..Default::default() }, } diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index 641ff283325e..cd9c9d69b438 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -4167,6 +4167,28 @@ def map_batches( return_dtype, agg_list, is_elementwise, + False, + ) + ) + + def map_to_scalar( + self, + function: Callable[[Series], Series | Any], + return_dtype: PolarsDataType | None = None, + *, + agg_list: bool = False, + is_elementwise: bool = False, + ): + if return_dtype is not None: + return_dtype = py_type_to_dtype(return_dtype) + + return self._from_pyexpr( + self._pyexpr.map_batches( + self._map_batches_wrapper(function, return_dtype), + return_dtype, + agg_list, + is_elementwise, + True, ) ) diff --git a/py-polars/src/expr/general.rs b/py-polars/src/expr/general.rs index 269236b7b2d4..53736d115751 100644 --- a/py-polars/src/expr/general.rs +++ b/py-polars/src/expr/general.rs @@ -659,15 +659,16 @@ impl PyExpr { self.inner.clone().shrink_dtype().into() } - #[pyo3(signature = (lambda, output_type, agg_list, is_elementwise))] + #[pyo3(signature = (lambda, output_type, agg_list, is_elementwise, returns_scalar))] fn map_batches( &self, lambda: PyObject, output_type: Option>, agg_list: bool, is_elementwise: bool, + returns_scalar: bool, ) -> Self { - map_single(self, lambda, output_type, agg_list, is_elementwise) + map_single(self, lambda, output_type, agg_list, is_elementwise, returns_scalar) } fn dot(&self, other: Self) -> Self { diff --git a/py-polars/src/map/lazy.rs b/py-polars/src/map/lazy.rs index 75084783a295..20b83d915da0 100644 --- a/py-polars/src/map/lazy.rs +++ b/py-polars/src/map/lazy.rs @@ -129,10 +129,11 @@ pub fn map_single( output_type: Option>, agg_list: bool, is_elementwise: bool, + returns_scalar: bool ) -> PyExpr { let output_type = output_type.map(|wrap| wrap.0); - let func = python_udf::PythonUdfExpression::new(lambda, output_type, is_elementwise); + let func = python_udf::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar); pyexpr.inner.clone().map_python(func, agg_list).into() } diff --git a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py index f283009904e9..9791a73e0bbe 100644 --- a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py +++ b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py @@ -141,7 +141,7 @@ def test_grouped_ufunc() -> None: @pytest.mark.skipif(numba is None, reason="Numba is not available") -def test_generalized_ufunc() -> None: +def test_generalized_ufunc_scalar() -> None: assert numba is not None # to pacify type checkers @numba.guvectorize([(numba.int64[:], numba.int64[:])], "(n)->()") # type: ignore[misc] @@ -162,5 +162,16 @@ def my_custom_sum(arr, result) -> None: # type: ignore[no-untyped-def] assert custom_sum(df.get_column("values")) == 15 # Indirect call of the gufunc: - indirect = df.select(pl.col("values").map_batches(custom_sum)) + indirect = df.select(pl.col("values").map_to_scalar(custom_sum)) assert_frame_equal(indirect, pl.DataFrame({"values": 15})) + + # group_by() + df = pl.DataFrame({"labels": ["a", "b", "a", "b"], "values": [10, 2, 3, 30]}) + indirect = ( + df.group_by("labels") + .agg(pl.col("values").map_to_scalar(custom_sum)) + .sort("labels") + ) + assert_frame_equal( + indirect, pl.DataFrame({"labels": ["a", "b"], "values": [13, 32]}) + ) From 692dea282e790b580e52e47811feba95df1fc2d2 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Thu, 21 Mar 2024 14:44:26 -0400 Subject: [PATCH 04/11] Documentation for the new API. --- py-polars/polars/expr/expr.py | 131 +++++++++++++++++++++++++++++----- 1 file changed, 112 insertions(+), 19 deletions(-) diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index cd9c9d69b438..4067627ef08a 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -4067,11 +4067,8 @@ def map_batches( A reasonable use case for `map` functions is transforming the values represented by an expression using a third-party library. - .. warning:: - If you are looking to map a function over a window function or group_by - context, refer to :func:`map_elements` instead. - Read more in `the book - `_. + If your function returns a scalar, for example a float, use + :func:`map_to_scalar` instead. Parameters ---------- @@ -4097,6 +4094,7 @@ def map_batches( See Also -------- + map_to_scalar map_elements replace @@ -4130,7 +4128,7 @@ def map_batches( ... } ... ) >>> df.group_by("a").agg( - ... pl.col("b").map_batches(lambda x: x.max(), agg_list=False) + ... pl.col("b").map_batches(lambda x: x + 2, agg_list=False) ... ) # doctest: +IGNORE_RESULT shape: (2, 2) ┌─────┬───────────┐ @@ -4138,25 +4136,28 @@ def map_batches( │ --- ┆ --- │ │ i64 ┆ list[i64] │ ╞═════╪═══════════╡ - │ 1 ┆ [4] │ - │ 0 ┆ [3] │ + │ 1 ┆ [4, 6] │ + │ 0 ┆ [3, 5] │ └─────┴───────────┘ Using `agg_list=True` would be more efficient. In this example, the input of the function is a Series of type `List(Int64)`. >>> df.group_by("a").agg( - ... pl.col("b").map_batches(lambda x: x.list.max(), agg_list=True) + ... pl.col("b").map_batches( + ... lambda x: x.list.eval(pl.element() + 2), + ... agg_list=True + ... ) ... ) # doctest: +IGNORE_RESULT shape: (2, 2) - ┌─────┬─────┐ - │ a ┆ b │ - │ --- ┆ --- │ - │ i64 ┆ i64 │ - ╞═════╪═════╡ - │ 0 ┆ 3 │ - │ 1 ┆ 4 │ - └─────┴─────┘ + ┌─────┬───────────┐ + │ a ┆ b │ + │ --- ┆ --- │ + │ i64 ┆ list[i64] │ + ╞═════╪═══════════╡ + │ 0 ┆ [3, 5] │ + │ 1 ┆ [4, 6] │ + └─────┴───────────┘ """ if return_dtype is not None: return_dtype = py_type_to_dtype(return_dtype) @@ -4173,12 +4174,104 @@ def map_batches( def map_to_scalar( self, - function: Callable[[Series], Series | Any], + function: Callable[[Series], Any], return_dtype: PolarsDataType | None = None, *, agg_list: bool = False, is_elementwise: bool = False, - ): + ) -> Self: + """ + Apply a custom python function to a whole Series or sequence of Series. + + The output of this custom function must be a scalar. If your function + returns a Series, use :func:`map_batches` instead. + + Parameters + ---------- + function + Lambda/function to apply. + return_dtype + Dtype of the output value. + If not set, the dtype will be inferred based on the value that is + returned by the function. + is_elementwise + If set to true this can run in the streaming engine, but may yield + incorrect results in group-by. Ensure you know what you are doing! + agg_list + Aggregate the values of the expression into a list before applying the + function. This parameter only works in a group-by context. + The function will be invoked only once on a list of groups, rather than + once per group. + + Warnings + -------- + If `return_dtype` is not provided, this may lead to unexpected results. + We allow this, but it is considered a bug in the user's query. + + See Also + -------- + map_batches + map_elements + replace + + Examples + -------- + >>> df = pl.DataFrame( + ... { + ... "sine": [0.0, 1.0, 0.0, -1.0], + ... "cosine": [1.0, 0.0, -1.0, 0.0], + ... } + ... ) + >>> df.select(pl.all().map_batches(lambda x: x.to_numpy().argmax())) + shape: (1, 2) + ┌──────┬────────┐ + │ sine ┆ cosine │ + │ --- ┆ --- │ + │ i64 ┆ i64 │ + ╞══════╪════════╡ + │ 1 ┆ 0 │ + └──────┴────────┘ + + In a group-by context, the `agg_list` parameter can improve performance if used + correctly. The following example has `agg_list` set to `False`, which causes + the function to be applied once per group. The input of the function is a + Series of type `Int64`. This is less efficient. + + >>> df = pl.DataFrame( + ... { + ... "a": [0, 1, 0, 1], + ... "b": [1, 2, 3, 4], + ... } + ... ) + >>> df.group_by("a").agg( + ... pl.col("b").map_to_scalar(lambda x: x.max(), agg_list=False) + ... ) # doctest: +IGNORE_RESULT + shape: (2, 2) + ┌─────┬─────┐ + │ a ┆ b │ + │ --- ┆ --- │ + │ i64 ┆ i64 │ + ╞═════╪═════╡ + │ 1 ┆ 4 │ + │ 0 ┆ 3 │ + └─────┴─────┘ + + Using `agg_list=True` would be more efficient. In this example, the input of + the function is a Series of type `List(Int64)`. + + >>> df.group_by("a").agg( + ... pl.col("b").map_to_scalar(lambda x: x.list.max(), agg_list=True) + ... ) # doctest: +IGNORE_RESULT + shape: (2, 2) + ┌─────┬─────┐ + │ a ┆ b │ + │ --- ┆ --- │ + │ i64 ┆ i64 │ + ╞═════╪═════╡ + │ 0 ┆ 3 │ + │ 1 ┆ 4 │ + └─────┴─────┘ + """ if return_dtype is not None: return_dtype = py_type_to_dtype(return_dtype) From 67368ab987e4f93f5eb617601588d454c59a2412 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 12:31:56 -0400 Subject: [PATCH 05/11] Switch to a more consistent API. --- py-polars/polars/expr/expr.py | 110 ++---------------- .../unit/interop/numpy/test_ufunc_expr.py | 6 +- 2 files changed, 15 insertions(+), 101 deletions(-) diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index 0bd1dd33feb5..27774edd1ab6 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -4524,12 +4524,15 @@ def map_batches( *, agg_list: bool = False, is_elementwise: bool = False, + returns_scalar: bool = False, ) -> Self: - """ - Apply a custom python function to a whole Series or sequence of Series. + """Apply a custom python function to a whole Series or sequence of Series. - The output of this custom function must be a Series (or a NumPy array, in which - case it will be automatically converted into a Series). If you want to apply a + The output of this custom function is presumed to be either a Series, + or a NumPy array (in which case it will be automatically converted into + a Series), or a scalar that will be converted into a Series. If the + result is a scalar and you want it to stay as a scalr, pass in + ``returns_scalar=True``. If you want to apply a custom function elementwise over single values, see :func:`map_elements`. A reasonable use case for `map` functions is transforming the values represented by an expression using a third-party library. @@ -4561,7 +4564,6 @@ def map_batches( See Also -------- - map_to_scalar map_elements replace @@ -4624,84 +4626,9 @@ def map_batches( │ 0 ┆ [3, 5] │ │ 1 ┆ [4, 6] │ └─────┴───────────┘ - """ - if return_dtype is not None: - return_dtype = py_type_to_dtype(return_dtype) - - return self._from_pyexpr( - self._pyexpr.map_batches( - self._map_batches_wrapper(function, return_dtype), - return_dtype, - agg_list, - is_elementwise, - False, - ) - ) - - def map_to_scalar( - self, - function: Callable[[Series], Any], - return_dtype: PolarsDataType | None = None, - *, - agg_list: bool = False, - is_elementwise: bool = False, - ) -> Self: - """ - Apply a custom python function to a whole Series or sequence of Series. - - The output of this custom function must be a scalar. If your function - returns a Series, use :func:`map_batches` instead. - - Parameters - ---------- - function - Lambda/function to apply. - return_dtype - Dtype of the output value. - If not set, the dtype will be inferred based on the value that is - returned by the function. - is_elementwise - If set to true this can run in the streaming engine, but may yield - incorrect results in group-by. Ensure you know what you are doing! - agg_list - Aggregate the values of the expression into a list before applying the - function. This parameter only works in a group-by context. - The function will be invoked only once on a list of groups, rather than - once per group. - - Warnings - -------- - If `return_dtype` is not provided, this may lead to unexpected results. - We allow this, but it is considered a bug in the user's query. - - See Also - -------- - map_batches - map_elements - replace - - Examples - -------- - >>> df = pl.DataFrame( - ... { - ... "sine": [0.0, 1.0, 0.0, -1.0], - ... "cosine": [1.0, 0.0, -1.0, 0.0], - ... } - ... ) - >>> df.select(pl.all().map_batches(lambda x: x.to_numpy().argmax())) - shape: (1, 2) - ┌──────┬────────┐ - │ sine ┆ cosine │ - │ --- ┆ --- │ - │ i64 ┆ i64 │ - ╞══════╪════════╡ - │ 1 ┆ 0 │ - └──────┴────────┘ - In a group-by context, the `agg_list` parameter can improve performance if used - correctly. The following example has `agg_list` set to `False`, which causes - the function to be applied once per group. The input of the function is a - Series of type `Int64`. This is less efficient. + Here's an example of a function that returns a scalar, where we want it + to stay as a scalar: >>> df = pl.DataFrame( ... { @@ -4710,7 +4637,7 @@ def map_to_scalar( ... } ... ) >>> df.group_by("a").agg( - ... pl.col("b").map_to_scalar(lambda x: x.max(), agg_list=False) + ... pl.col("b").map_batches(lambda x: x.max(), returns_scalar=True) ... ) # doctest: +IGNORE_RESULT shape: (2, 2) ┌─────┬─────┐ @@ -4722,21 +4649,6 @@ def map_to_scalar( │ 0 ┆ 3 │ └─────┴─────┘ - Using `agg_list=True` would be more efficient. In this example, the input of - the function is a Series of type `List(Int64)`. - - >>> df.group_by("a").agg( - ... pl.col("b").map_to_scalar(lambda x: x.list.max(), agg_list=True) - ... ) # doctest: +IGNORE_RESULT - shape: (2, 2) - ┌─────┬─────┐ - │ a ┆ b │ - │ --- ┆ --- │ - │ i64 ┆ i64 │ - ╞═════╪═════╡ - │ 0 ┆ 3 │ - │ 1 ┆ 4 │ - └─────┴─────┘ """ if return_dtype is not None: return_dtype = py_type_to_dtype(return_dtype) @@ -4747,7 +4659,7 @@ def map_to_scalar( return_dtype, agg_list, is_elementwise, - True, + returns_scalar, ) ) diff --git a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py index 2aafcf950811..ad288c03593d 100644 --- a/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py +++ b/py-polars/tests/unit/interop/numpy/test_ufunc_expr.py @@ -154,14 +154,16 @@ def my_custom_sum(arr, result) -> None: # type: ignore[no-untyped-def] assert custom_sum(df.get_column("values")) == 15 # Indirect call of the gufunc: - indirect = df.select(pl.col("values").map_to_scalar(custom_sum)) + indirect = df.select(pl.col("values").map_batches(custom_sum, returns_scalar=True)) assert_frame_equal(indirect, pl.DataFrame({"values": 15})) + indirect = df.select(pl.col("values").map_batches(custom_sum, returns_scalar=False)) + assert_frame_equal(indirect, pl.DataFrame({"values": [15]})) # group_by() df = pl.DataFrame({"labels": ["a", "b", "a", "b"], "values": [10, 2, 3, 30]}) indirect = ( df.group_by("labels") - .agg(pl.col("values").map_to_scalar(custom_sum)) + .agg(pl.col("values").map_batches(custom_sum, returns_scalar=True)) .sort("labels") ) assert_frame_equal( From a6b785ff72ebdf2d58bd83e537e144d9330afa89 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 12:35:06 -0400 Subject: [PATCH 06/11] Already there --- py-polars/requirements-dev.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/py-polars/requirements-dev.txt b/py-polars/requirements-dev.txt index 6c95200c031f..4f8f1dbcee99 100644 --- a/py-polars/requirements-dev.txt +++ b/py-polars/requirements-dev.txt @@ -21,7 +21,6 @@ numba; python_version < '3.13' # Numba can lag Python releases pandas pyarrow pydantic>=2.0.0 -numba; python_version < '3.13' # Numba takes a while to support new Python versions # Datetime / time zones backports.zoneinfo; python_version < '3.9' tzdata; platform_system == 'Windows' From 9e0b09120625a85cf47a4b2c2e8376658cae48f0 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 12:41:03 -0400 Subject: [PATCH 07/11] Bit more testing. --- .../tests/unit/operations/map/test_map_batches.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/py-polars/tests/unit/operations/map/test_map_batches.py b/py-polars/tests/unit/operations/map/test_map_batches.py index 457df189fa00..e4929a6cb13e 100644 --- a/py-polars/tests/unit/operations/map/test_map_batches.py +++ b/py-polars/tests/unit/operations/map/test_map_batches.py @@ -68,6 +68,10 @@ def test_map_batches_group() -> None: assert df.group_by("id").agg(pl.col("t").map_batches(lambda s: s.sum())).sort( "id" ).to_dict(as_series=False) == {"id": [0, 1], "t": [[11], [35]]} + # If returns_scalar is True, the result won't be wrapped in a list: + assert df.group_by("id").agg( + pl.col("t").map_batches(lambda s: s.sum(), returns_scalar=True) + ).sort("id").to_dict(as_series=False) == {"id": [0, 1], "t": [11, 35]} def test_map_deprecated() -> None: @@ -82,16 +86,10 @@ def test_map_deprecated() -> None: def test_ufunc_args() -> None: df = pl.DataFrame({"a": [1, 2, 3], "b": [2, 4, 6]}) result = df.select( - z=np.add( # type: ignore[call-overload] - pl.col("a"), pl.col("b") - ) + z=np.add(pl.col("a"), pl.col("b")) # type: ignore[call-overload] ) expected = pl.DataFrame({"z": [3, 6, 9]}) assert_frame_equal(result, expected) - result = df.select( - z=np.add( # type: ignore[call-overload] - 2, pl.col("a") - ) - ) + result = df.select(z=np.add(2, pl.col("a"))) # type: ignore[call-overload] expected = pl.DataFrame({"z": [3, 4, 5]}) assert_frame_equal(result, expected) From f236001cefaf5c68fcdc08469543c42d04ed77c9 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 12:46:31 -0400 Subject: [PATCH 08/11] Document the argument. --- py-polars/polars/expr/expr.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index 27774edd1ab6..a292bcec78c0 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -4526,7 +4526,8 @@ def map_batches( is_elementwise: bool = False, returns_scalar: bool = False, ) -> Self: - """Apply a custom python function to a whole Series or sequence of Series. + """ + Apply a custom python function to a whole Series or sequence of Series. The output of this custom function is presumed to be either a Series, or a NumPy array (in which case it will be automatically converted into @@ -4556,6 +4557,11 @@ def map_batches( function. This parameter only works in a group-by context. The function will be invoked only once on a list of groups, rather than once per group. + returns_scalar + If the function returns a scalar, by default it will be wrapped in + a list in the output, since the assumption is that the function + always returns something Series-like. If you want to keep the + result as a scalar, set this argument to True. Warnings -------- From 35f10be5384956c4d49462c454ccff96c8991b3c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 12:56:35 -0400 Subject: [PATCH 09/11] Reformat. --- py-polars/src/expr/general.rs | 9 ++++++++- py-polars/src/map/lazy.rs | 5 +++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/py-polars/src/expr/general.rs b/py-polars/src/expr/general.rs index d43f32b1faf9..9e2475d99658 100644 --- a/py-polars/src/expr/general.rs +++ b/py-polars/src/expr/general.rs @@ -764,7 +764,14 @@ impl PyExpr { is_elementwise: bool, returns_scalar: bool, ) -> Self { - map_single(self, lambda, output_type, agg_list, is_elementwise, returns_scalar) + map_single( + self, + lambda, + output_type, + agg_list, + is_elementwise, + returns_scalar, + ) } fn dot(&self, other: Self) -> Self { diff --git a/py-polars/src/map/lazy.rs b/py-polars/src/map/lazy.rs index 388d4088cafd..5779ca0c021b 100644 --- a/py-polars/src/map/lazy.rs +++ b/py-polars/src/map/lazy.rs @@ -129,11 +129,12 @@ pub fn map_single( output_type: Option>, agg_list: bool, is_elementwise: bool, - returns_scalar: bool + returns_scalar: bool, ) -> PyExpr { let output_type = output_type.map(|wrap| wrap.0); - let func = python_udf::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar); + let func = + python_udf::PythonUdfExpression::new(lambda, output_type, is_elementwise, returns_scalar); pyexpr.inner.clone().map_python(func, agg_list).into() } From a70d3be2d9424a23f6612602a9fca76a6993c89c Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 12:56:52 -0400 Subject: [PATCH 10/11] Fix typo --- py-polars/polars/expr/expr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py-polars/polars/expr/expr.py b/py-polars/polars/expr/expr.py index a292bcec78c0..e39d2efe82c1 100644 --- a/py-polars/polars/expr/expr.py +++ b/py-polars/polars/expr/expr.py @@ -4532,7 +4532,7 @@ def map_batches( The output of this custom function is presumed to be either a Series, or a NumPy array (in which case it will be automatically converted into a Series), or a scalar that will be converted into a Series. If the - result is a scalar and you want it to stay as a scalr, pass in + result is a scalar and you want it to stay as a scalar, pass in ``returns_scalar=True``. If you want to apply a custom function elementwise over single values, see :func:`map_elements`. A reasonable use case for `map` functions is transforming the values From cc5a1862e067db78761b856660caddb5eb3027c6 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Tue, 28 May 2024 13:05:05 -0400 Subject: [PATCH 11/11] Fix formatting. --- crates/polars-plan/src/dsl/python_udf.rs | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/polars-plan/src/dsl/python_udf.rs b/crates/polars-plan/src/dsl/python_udf.rs index 0a4cdf93260c..fc063e8fbaf9 100644 --- a/crates/polars-plan/src/dsl/python_udf.rs +++ b/crates/polars-plan/src/dsl/python_udf.rs @@ -104,16 +104,21 @@ pub struct PythonUdfExpression { python_function: PyObject, output_type: Option, is_elementwise: bool, - returns_scalar: bool + returns_scalar: bool, } impl PythonUdfExpression { - pub fn new(lambda: PyObject, output_type: Option, is_elementwise: bool, returns_scalar: bool) -> Self { + pub fn new( + lambda: PyObject, + output_type: Option, + is_elementwise: bool, + returns_scalar: bool, + ) -> Self { Self { python_function: lambda, output_type, is_elementwise, - returns_scalar + returns_scalar, } } @@ -140,7 +145,7 @@ impl PythonUdfExpression { python_function.into(), output_type, is_elementwise, - returns_scalar + returns_scalar, )) as Arc) }) } @@ -184,8 +189,15 @@ impl SeriesUdf for PythonUdfExpression { #[cfg(feature = "serde")] fn try_serialize(&self, buf: &mut Vec) -> PolarsResult<()> { buf.extend_from_slice(MAGIC_BYTE_MARK); - ciborium::ser::into_writer(&(self.output_type.clone(), self.is_elementwise, self.returns_scalar), &mut *buf) - .unwrap(); + ciborium::ser::into_writer( + &( + self.output_type.clone(), + self.is_elementwise, + self.returns_scalar, + ), + &mut *buf, + ) + .unwrap(); Python::with_gil(|py| { let pickle = PyModule::import_bound(py, "cloudpickle")