From 18786acd8d1eb68fc87982b07ce29ecbae0923f0 Mon Sep 17 00:00:00 2001 From: Itamar Turner-Trauring Date: Wed, 13 Nov 2024 03:01:31 -0500 Subject: [PATCH] fix(python): Release GIL in Python APIs, part 1 (#19705) Co-authored-by: Itamar Turner-Trauring --- .../polars-python/src/series/aggregation.rs | 118 +++++----- crates/polars-python/src/series/arithmetic.rs | 33 +-- crates/polars-python/src/series/buffers.rs | 9 +- crates/polars-python/src/series/comparison.rs | 79 ++++--- .../polars-python/src/series/construction.rs | 18 +- crates/polars-python/src/series/export.rs | 14 +- crates/polars-python/src/series/general.rs | 205 +++++++++++------- crates/polars-python/src/series/scatter.rs | 5 +- 8 files changed, 267 insertions(+), 214 deletions(-) diff --git a/crates/polars-python/src/series/aggregation.rs b/crates/polars-python/src/series/aggregation.rs index 5aa8ee16639e..c4fe8d3447ec 100644 --- a/crates/polars-python/src/series/aggregation.rs +++ b/crates/polars-python/src/series/aggregation.rs @@ -8,37 +8,39 @@ use crate::error::PyPolarsErr; #[pymethods] impl PySeries { - fn any(&self, ignore_nulls: bool) -> PyResult> { - let s = self.series.bool().map_err(PyPolarsErr::from)?; - Ok(if ignore_nulls { - Some(s.any()) - } else { - s.any_kleene() + fn any(&self, py: Python, ignore_nulls: bool) -> PyResult> { + py.allow_threads(|| { + let s = self.series.bool().map_err(PyPolarsErr::from)?; + Ok(if ignore_nulls { + Some(s.any()) + } else { + s.any_kleene() + }) }) } - fn all(&self, ignore_nulls: bool) -> PyResult> { - let s = self.series.bool().map_err(PyPolarsErr::from)?; - Ok(if ignore_nulls { - Some(s.all()) - } else { - s.all_kleene() + fn all(&self, py: Python, ignore_nulls: bool) -> PyResult> { + py.allow_threads(|| { + let s = self.series.bool().map_err(PyPolarsErr::from)?; + Ok(if ignore_nulls { + Some(s.all()) + } else { + s.all_kleene() + }) }) } - fn arg_max(&self) -> Option { - self.series.arg_max() + fn arg_max(&self, py: Python) -> Option { + py.allow_threads(|| self.series.arg_max()) } - fn arg_min(&self) -> Option { - self.series.arg_min() + fn arg_min(&self, py: Python) -> Option { + py.allow_threads(|| self.series.arg_min()) } fn max(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .max_reduce() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.max_reduce().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) @@ -47,49 +49,42 @@ impl PySeries { fn mean(&self, py: Python) -> PyResult { match self.series.dtype() { Boolean => Ok(Wrap( - self.series - .cast(&DataType::UInt8) - .unwrap() - .mean_reduce() + py.allow_threads(|| self.series.cast(&DataType::UInt8).unwrap().mean_reduce()) .as_any_value(), ) .into_py(py)), // For non-numeric output types we require mean_reduce. - dt if dt.is_temporal() => { - Ok(Wrap(self.series.mean_reduce().as_any_value()).into_py(py)) - }, - _ => Ok(self.series.mean().into_py(py)), + dt if dt.is_temporal() => Ok(Wrap( + py.allow_threads(|| self.series.mean_reduce()) + .as_any_value(), + ) + .into_py(py)), + _ => Ok(py.allow_threads(|| self.series.mean()).into_py(py)), } } fn median(&self, py: Python) -> PyResult { match self.series.dtype() { Boolean => Ok(Wrap( - self.series - .cast(&DataType::UInt8) - .unwrap() - .median_reduce() + py.allow_threads(|| self.series.cast(&DataType::UInt8).unwrap().median_reduce()) .map_err(PyPolarsErr::from)? .as_any_value(), ) .into_py(py)), // For non-numeric output types we require median_reduce. dt if dt.is_temporal() => Ok(Wrap( - self.series - .median_reduce() + py.allow_threads(|| self.series.median_reduce()) .map_err(PyPolarsErr::from)? .as_any_value(), ) .into_py(py)), - _ => Ok(self.series.median().into_py(py)), + _ => Ok(py.allow_threads(|| self.series.median()).into_py(py)), } } fn min(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .min_reduce() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.min_reduce().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) @@ -97,26 +92,27 @@ impl PySeries { fn product(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .product() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.product().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) } - fn quantile(&self, quantile: f64, interpolation: Wrap) -> PyResult { - let bind = self.series.quantile_reduce(quantile, interpolation.0); + fn quantile( + &self, + py: Python, + quantile: f64, + interpolation: Wrap, + ) -> PyResult { + let bind = py.allow_threads(|| self.series.quantile_reduce(quantile, interpolation.0)); let sc = bind.map_err(PyPolarsErr::from)?; - Ok(Python::with_gil(|py| Wrap(sc.as_any_value()).into_py(py))) + Ok(Wrap(sc.as_any_value()).into_py(py)) } fn std(&self, py: Python, ddof: u8) -> PyResult { Ok(Wrap( - self.series - .std_reduce(ddof) - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.std_reduce(ddof).map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) @@ -124,9 +120,7 @@ impl PySeries { fn var(&self, py: Python, ddof: u8) -> PyResult { Ok(Wrap( - self.series - .var_reduce(ddof) - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.var_reduce(ddof).map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) @@ -134,37 +128,31 @@ impl PySeries { fn sum(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .sum_reduce() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.sum_reduce().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) } fn first(&self, py: Python) -> PyObject { - Wrap(self.series.first().as_any_value()).into_py(py) + Wrap(py.allow_threads(|| self.series.first()).as_any_value()).into_py(py) } fn last(&self, py: Python) -> PyObject { - Wrap(self.series.last().as_any_value()).into_py(py) + Wrap(py.allow_threads(|| self.series.last()).as_any_value()).into_py(py) } #[cfg(feature = "approx_unique")] fn approx_n_unique(&self, py: Python) -> PyResult { - Ok(self - .series - .approx_n_unique() - .map_err(PyPolarsErr::from)? + Ok(py + .allow_threads(|| self.series.approx_n_unique().map_err(PyPolarsErr::from))? .into_py(py)) } #[cfg(feature = "bitwise")] fn bitwise_and(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .and_reduce() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.and_reduce().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) @@ -173,9 +161,7 @@ impl PySeries { #[cfg(feature = "bitwise")] fn bitwise_or(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .or_reduce() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.or_reduce().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) @@ -184,9 +170,7 @@ impl PySeries { #[cfg(feature = "bitwise")] fn bitwise_xor(&self, py: Python) -> PyResult { Ok(Wrap( - self.series - .xor_reduce() - .map_err(PyPolarsErr::from)? + py.allow_threads(|| self.series.xor_reduce().map_err(PyPolarsErr::from))? .as_any_value(), ) .into_py(py)) diff --git a/crates/polars-python/src/series/arithmetic.rs b/crates/polars-python/src/series/arithmetic.rs index c5483aced1e7..62edd00a7656 100644 --- a/crates/polars-python/src/series/arithmetic.rs +++ b/crates/polars-python/src/series/arithmetic.rs @@ -6,28 +6,33 @@ use crate::error::PyPolarsErr; #[pymethods] impl PySeries { - fn add(&self, other: &PySeries) -> PyResult { - Ok((&self.series + &other.series) + fn add(&self, py: Python, other: &PySeries) -> PyResult { + Ok(py + .allow_threads(|| &self.series + &other.series) .map(Into::into) .map_err(PyPolarsErr::from)?) } - fn sub(&self, other: &PySeries) -> PyResult { - Ok((&self.series - &other.series) + fn sub(&self, py: Python, other: &PySeries) -> PyResult { + Ok(py + .allow_threads(|| &self.series - &other.series) .map(Into::into) .map_err(PyPolarsErr::from)?) } - fn div(&self, other: &PySeries) -> PyResult { - Ok((&self.series / &other.series) + fn div(&self, py: Python, other: &PySeries) -> PyResult { + Ok(py + .allow_threads(|| &self.series / &other.series) .map(Into::into) .map_err(PyPolarsErr::from)?) } - fn mul(&self, other: &PySeries) -> PyResult { - Ok((&self.series * &other.series) + fn mul(&self, py: Python, other: &PySeries) -> PyResult { + Ok(py + .allow_threads(|| &self.series * &other.series) .map(Into::into) .map_err(PyPolarsErr::from)?) } - fn rem(&self, other: &PySeries) -> PyResult { - Ok((&self.series % &other.series) + fn rem(&self, py: Python, other: &PySeries) -> PyResult { + Ok(py + .allow_threads(|| &self.series % &other.series) .map(Into::into) .map_err(PyPolarsErr::from)?) } @@ -37,8 +42,8 @@ macro_rules! impl_arithmetic { ($name:ident, $type:ty, $operand:tt) => { #[pymethods] impl PySeries { - fn $name(&self, other: $type) -> PyResult { - Ok((&self.series $operand other).into()) + fn $name(&self, py: Python, other: $type) -> PyResult { + Ok(py.allow_threads(|| {&self.series $operand other}).into()) } } }; @@ -103,8 +108,8 @@ macro_rules! impl_rhs_arithmetic { ($name:ident, $type:ty, $operand:ident) => { #[pymethods] impl PySeries { - fn $name(&self, other: $type) -> PyResult { - Ok(other.$operand(&self.series).into()) + fn $name(&self, py: Python, other: $type) -> PyResult { + Ok(py.allow_threads(|| other.$operand(&self.series)).into()) } } }; diff --git a/crates/polars-python/src/series/buffers.rs b/crates/polars-python/src/series/buffers.rs index 939159220277..e3b9402d4d47 100644 --- a/crates/polars-python/src/series/buffers.rs +++ b/crates/polars-python/src/series/buffers.rs @@ -82,9 +82,9 @@ impl PySeries { } /// Return the underlying values, validity, and offsets buffers as Series. - fn _get_buffers(&self) -> PyResult<(Self, Option, Option)> { + fn _get_buffers(&self, py: Python) -> PyResult<(Self, Option, Option)> { let s = &self.series; - match s.dtype().to_physical() { + py.allow_threads(|| match s.dtype().to_physical() { dt if dt.is_numeric() => get_buffers_from_primitive(s), DataType::Boolean => get_buffers_from_primitive(s), DataType::String => get_buffers_from_string(s), @@ -92,7 +92,7 @@ impl PySeries { let msg = format!("`_get_buffers` not implemented for `dtype` {dt}"); Err(PyTypeError::new_err(msg)) }, - } + }) } } @@ -253,6 +253,7 @@ impl PySeries { #[staticmethod] #[pyo3(signature = (dtype, data, validity=None))] unsafe fn _from_buffers( + py: Python, dtype: Wrap, data: Vec, validity: Option, @@ -320,7 +321,7 @@ impl PySeries { )), }; let values = series_to_buffer::(values); - from_buffers_string_impl(values, validity, offsets)? + py.allow_threads(|| from_buffers_string_impl(values, validity, offsets))? }, dt => { let msg = format!("`_from_buffers` not implemented for `dtype` {dt}"); diff --git a/crates/polars-python/src/series/comparison.rs b/crates/polars-python/src/series/comparison.rs index 7064edb7698a..2b7de37931f9 100644 --- a/crates/polars-python/src/series/comparison.rs +++ b/crates/polars-python/src/series/comparison.rs @@ -6,36 +6,45 @@ use crate::PySeries; #[pymethods] impl PySeries { - fn eq(&self, rhs: &PySeries) -> PyResult { - let s = self.series.equal(&rhs.series).map_err(PyPolarsErr::from)?; + fn eq(&self, py: Python, rhs: &PySeries) -> PyResult { + let s = py + .allow_threads(|| self.series.equal(&rhs.series)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } - fn neq(&self, rhs: &PySeries) -> PyResult { - let s = self - .series - .not_equal(&rhs.series) + fn neq(&self, py: Python, rhs: &PySeries) -> PyResult { + let s = py + .allow_threads(|| self.series.not_equal(&rhs.series)) .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } - fn gt(&self, rhs: &PySeries) -> PyResult { - let s = self.series.gt(&rhs.series).map_err(PyPolarsErr::from)?; + fn gt(&self, py: Python, rhs: &PySeries) -> PyResult { + let s = py + .allow_threads(|| self.series.gt(&rhs.series)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } - fn gt_eq(&self, rhs: &PySeries) -> PyResult { - let s = self.series.gt_eq(&rhs.series).map_err(PyPolarsErr::from)?; + fn gt_eq(&self, py: Python, rhs: &PySeries) -> PyResult { + let s = py + .allow_threads(|| self.series.gt_eq(&rhs.series)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } - fn lt(&self, rhs: &PySeries) -> PyResult { - let s = self.series.lt(&rhs.series).map_err(PyPolarsErr::from)?; + fn lt(&self, py: Python, rhs: &PySeries) -> PyResult { + let s = py + .allow_threads(|| self.series.lt(&rhs.series)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } - fn lt_eq(&self, rhs: &PySeries) -> PyResult { - let s = self.series.lt_eq(&rhs.series).map_err(PyPolarsErr::from)?; + fn lt_eq(&self, py: Python, rhs: &PySeries) -> PyResult { + let s = py + .allow_threads(|| self.series.lt_eq(&rhs.series)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -44,8 +53,10 @@ macro_rules! impl_eq_num { ($name:ident, $type:ty) => { #[pymethods] impl PySeries { - fn $name(&self, rhs: $type) -> PyResult { - let s = self.series.equal(rhs).map_err(PyPolarsErr::from)?; + fn $name(&self, py: Python, rhs: $type) -> PyResult { + let s = py + .allow_threads(|| self.series.equal(rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -69,8 +80,10 @@ macro_rules! impl_neq_num { #[allow(clippy::nonstandard_macro_braces)] #[pymethods] impl PySeries { - fn $name(&self, rhs: $type) -> PyResult { - let s = self.series.not_equal(rhs).map_err(PyPolarsErr::from)?; + fn $name(&self, py: Python, rhs: $type) -> PyResult { + let s = py + .allow_threads(|| self.series.not_equal(rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -93,8 +106,10 @@ macro_rules! impl_gt_num { ($name:ident, $type:ty) => { #[pymethods] impl PySeries { - fn $name(&self, rhs: $type) -> PyResult { - let s = self.series.gt(rhs).map_err(PyPolarsErr::from)?; + fn $name(&self, py: Python, rhs: $type) -> PyResult { + let s = py + .allow_threads(|| self.series.gt(rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -117,8 +132,10 @@ macro_rules! impl_gt_eq_num { ($name:ident, $type:ty) => { #[pymethods] impl PySeries { - fn $name(&self, rhs: $type) -> PyResult { - let s = self.series.gt_eq(rhs).map_err(PyPolarsErr::from)?; + fn $name(&self, py: Python, rhs: $type) -> PyResult { + let s = py + .allow_threads(|| self.series.gt_eq(rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -142,8 +159,10 @@ macro_rules! impl_lt_num { #[allow(clippy::nonstandard_macro_braces)] #[pymethods] impl PySeries { - fn $name(&self, rhs: $type) -> PyResult { - let s = self.series.lt(rhs).map_err(PyPolarsErr::from)?; + fn $name(&self, py: Python, rhs: $type) -> PyResult { + let s = py + .allow_threads(|| self.series.lt(rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -166,8 +185,10 @@ macro_rules! impl_lt_eq_num { ($name:ident, $type:ty) => { #[pymethods] impl PySeries { - fn $name(&self, rhs: $type) -> PyResult { - let s = self.series.lt_eq(rhs).map_err(PyPolarsErr::from)?; + fn $name(&self, py: Python, rhs: $type) -> PyResult { + let s = py + .allow_threads(|| self.series.lt_eq(rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } @@ -226,12 +247,14 @@ macro_rules! impl_decimal { ($name:ident, $method:ident) => { #[pymethods] impl PySeries { - fn $name(&self, rhs: PyDecimal) -> PyResult { + fn $name(&self, py: Python, rhs: PyDecimal) -> PyResult { let rhs = Series::new( PlSmallStr::from_static("decimal"), &[AnyValue::Decimal(rhs.0, rhs.1)], ); - let s = self.series.$method(&rhs).map_err(PyPolarsErr::from)?; + let s = py + .allow_threads(|| self.series.$method(&rhs)) + .map_err(PyPolarsErr::from)?; Ok(s.into_series().into()) } } diff --git a/crates/polars-python/src/series/construction.rs b/crates/polars-python/src/series/construction.rs index 5935f1e7b0ce..e9dbdf264d8c 100644 --- a/crates/polars-python/src/series/construction.rs +++ b/crates/polars-python/src/series/construction.rs @@ -71,10 +71,11 @@ impl PySeries { if nan_is_null { let array = array.readonly(); let vals = array.as_slice().unwrap(); - let ca: Float32Chunked = vals - .iter() - .map(|&val| if f32::is_nan(val) { None } else { Some(val) }) - .collect_trusted(); + let ca: Float32Chunked = py.allow_threads(|| { + vals.iter() + .map(|&val| if f32::is_nan(val) { None } else { Some(val) }) + .collect_trusted() + }); ca.with_name(name.into()).into_series().into() } else { mmap_numpy_array(py, name, array) @@ -86,10 +87,11 @@ impl PySeries { if nan_is_null { let array = array.readonly(); let vals = array.as_slice().unwrap(); - let ca: Float64Chunked = vals - .iter() - .map(|&val| if f64::is_nan(val) { None } else { Some(val) }) - .collect_trusted(); + let ca: Float64Chunked = py.allow_threads(|| { + vals.iter() + .map(|&val| if f64::is_nan(val) { None } else { Some(val) }) + .collect_trusted() + }); ca.with_name(name.into()).into_series().into() } else { mmap_numpy_array(py, name, array) diff --git a/crates/polars-python/src/series/export.rs b/crates/polars-python/src/series/export.rs index 886b6114427a..959b2dd47293 100644 --- a/crates/polars-python/src/series/export.rs +++ b/crates/polars-python/src/series/export.rs @@ -147,17 +147,11 @@ impl PySeries { /// Return the underlying Arrow array. #[allow(clippy::wrong_self_convention)] - fn to_arrow(&mut self, compat_level: PyCompatLevel) -> PyResult { - self.rechunk(true); - Python::with_gil(|py| { - let pyarrow = py.import_bound("pyarrow")?; + fn to_arrow(&mut self, py: Python, compat_level: PyCompatLevel) -> PyResult { + self.rechunk(py, true); + let pyarrow = py.import_bound("pyarrow")?; - interop::arrow::to_py::to_py_array( - self.series.to_arrow(0, compat_level.0), - py, - &pyarrow, - ) - }) + interop::arrow::to_py::to_py_array(self.series.to_arrow(0, compat_level.0), py, &pyarrow) } #[allow(unused_variables)] diff --git a/crates/polars-python/src/series/general.rs b/crates/polars-python/src/series/general.rs index 7312995d7606..3134f5354f09 100644 --- a/crates/polars-python/src/series/general.rs +++ b/crates/polars-python/src/series/general.rs @@ -16,9 +16,9 @@ use crate::py_modules::POLARS; #[pymethods] impl PySeries { - fn struct_unnest(&self) -> PyResult { + fn struct_unnest(&self, py: Python) -> PyResult { let ca = self.series.struct_().map_err(PyPolarsErr::from)?; - let df: DataFrame = ca.clone().unnest(); + let df: DataFrame = py.allow_threads(|| ca.clone().unnest()); Ok(df.into()) } @@ -56,9 +56,9 @@ impl PySeries { Ok(ca.get_rev_map().is_local()) } - pub fn cat_to_local(&self) -> PyResult { + pub fn cat_to_local(&self, py: Python) -> PyResult { let ca = self.series.categorical().map_err(PyPolarsErr::from)?; - Ok(ca.to_local().into_series().into()) + Ok(py.allow_threads(|| ca.to_local().into_series().into())) } fn estimated_size(&self) -> usize { @@ -78,15 +78,14 @@ impl PySeries { } #[cfg(feature = "dtype-array")] - fn reshape(&self, dims: Vec) -> PyResult { + fn reshape(&self, py: Python, dims: Vec) -> PyResult { let dims = dims .into_iter() .map(ReshapeDimension::new) .collect::>(); - let out = self - .series - .reshape_array(&dims) + let out = py + .allow_threads(|| self.series.reshape_array(&dims)) .map_err(PyPolarsErr::from)?; Ok(out.into()) } @@ -114,8 +113,8 @@ impl PySeries { } } - pub fn rechunk(&mut self, in_place: bool) -> Option { - let series = self.series.rechunk(); + pub fn rechunk(&mut self, py: Python, in_place: bool) -> Option { + let series = py.allow_threads(|| self.series.rechunk()); if in_place { self.series = series; None @@ -167,16 +166,23 @@ impl PySeries { self.get_index(py, index) } - fn bitand(&self, other: &PySeries) -> PyResult { - let out = (&self.series & &other.series).map_err(PyPolarsErr::from)?; + fn bitand(&self, py: Python, other: &PySeries) -> PyResult { + let out = py + .allow_threads(|| &self.series & &other.series) + .map_err(PyPolarsErr::from)?; Ok(out.into()) } - fn bitor(&self, other: &PySeries) -> PyResult { - let out = (&self.series | &other.series).map_err(PyPolarsErr::from)?; + + fn bitor(&self, py: Python, other: &PySeries) -> PyResult { + let out = py + .allow_threads(|| &self.series | &other.series) + .map_err(PyPolarsErr::from)?; Ok(out.into()) } - fn bitxor(&self, other: &PySeries) -> PyResult { - let out = (&self.series ^ &other.series).map_err(PyPolarsErr::from)?; + fn bitxor(&self, py: Python, other: &PySeries) -> PyResult { + let out = py + .allow_threads(|| &self.series ^ &other.series) + .map_err(PyPolarsErr::from)?; Ok(out.into()) } @@ -217,48 +223,58 @@ impl PySeries { Ok(()) } - fn extend(&mut self, other: &PySeries) -> PyResult<()> { - self.series - .extend(&other.series) + fn extend(&mut self, py: Python, other: &PySeries) -> PyResult<()> { + py.allow_threads(|| self.series.extend(&other.series)) .map_err(PyPolarsErr::from)?; Ok(()) } - fn new_from_index(&self, index: usize, length: usize) -> PyResult { + fn new_from_index(&self, py: Python, index: usize, length: usize) -> PyResult { if index >= self.series.len() { Err(PyValueError::new_err("index is out of bounds")) } else { - Ok(self.series.new_from_index(index, length).into()) + Ok(py.allow_threads(|| self.series.new_from_index(index, length).into())) } } - fn filter(&self, filter: &PySeries) -> PyResult { + fn filter(&self, py: Python, filter: &PySeries) -> PyResult { let filter_series = &filter.series; if let Ok(ca) = filter_series.bool() { - let series = self.series.filter(ca).map_err(PyPolarsErr::from)?; + let series = py + .allow_threads(|| self.series.filter(ca)) + .map_err(PyPolarsErr::from)?; Ok(PySeries { series }) } else { Err(PyRuntimeError::new_err("Expected a boolean mask")) } } - fn sort(&mut self, descending: bool, nulls_last: bool, multithreaded: bool) -> PyResult { - Ok(self - .series - .sort( - SortOptions::default() - .with_order_descending(descending) - .with_nulls_last(nulls_last) - .with_multithreaded(multithreaded), - ) + fn sort( + &mut self, + py: Python, + descending: bool, + nulls_last: bool, + multithreaded: bool, + ) -> PyResult { + Ok(py + .allow_threads(|| { + self.series.sort( + SortOptions::default() + .with_order_descending(descending) + .with_nulls_last(nulls_last) + .with_multithreaded(multithreaded), + ) + }) .map_err(PyPolarsErr::from)? .into()) } - fn gather_with_series(&self, indices: &PySeries) -> PyResult { - let indices = indices.series.idx().map_err(PyPolarsErr::from)?; - let s = self.series.take(indices).map_err(PyPolarsErr::from)?; - Ok(s.into()) + fn gather_with_series(&self, py: Python, indices: &PySeries) -> PyResult { + py.allow_threads(|| { + let indices = indices.series.idx().map_err(PyPolarsErr::from)?; + let s = self.series.take(indices).map_err(PyPolarsErr::from)?; + Ok(s.into()) + }) } fn null_count(&self) -> PyResult { @@ -271,6 +287,7 @@ impl PySeries { fn equals( &self, + py: Python, other: &PySeries, check_dtypes: bool, check_names: bool, @@ -283,9 +300,9 @@ impl PySeries { return false; } if null_equal { - self.series.equals_missing(&other.series) + py.allow_threads(|| self.series.equals_missing(&other.series)) } else { - self.series.equals(&other.series) + py.allow_threads(|| self.series.equals(&other.series)) } } @@ -300,8 +317,10 @@ impl PySeries { /// Rechunk and return a pointer to the start of the Series. /// Only implemented for numeric types - fn as_single_ptr(&mut self) -> PyResult { - let ptr = self.series.as_single_ptr().map_err(PyPolarsErr::from)?; + fn as_single_ptr(&mut self, py: Python) -> PyResult { + let ptr = py + .allow_threads(|| self.series.as_single_ptr()) + .map_err(PyPolarsErr::from)?; Ok(ptr) } @@ -309,20 +328,23 @@ impl PySeries { self.series.clone().into() } - fn zip_with(&self, mask: &PySeries, other: &PySeries) -> PyResult { + fn zip_with(&self, py: Python, mask: &PySeries, other: &PySeries) -> PyResult { let mask = mask.series.bool().map_err(PyPolarsErr::from)?; - let s = self - .series - .zip_with(mask, &other.series) + let s = py + .allow_threads(|| self.series.zip_with(mask, &other.series)) .map_err(PyPolarsErr::from)?; Ok(s.into()) } #[pyo3(signature = (separator, drop_first=false))] - fn to_dummies(&self, separator: Option<&str>, drop_first: bool) -> PyResult { - let df = self - .series - .to_dummies(separator, drop_first) + fn to_dummies( + &self, + py: Python, + separator: Option<&str>, + drop_first: bool, + ) -> PyResult { + let df = py + .allow_threads(|| self.series.to_dummies(separator, drop_first)) .map_err(PyPolarsErr::from)?; Ok(df.into()) } @@ -332,18 +354,22 @@ impl PySeries { Some(ca.get_as_series(index)?.into()) } - fn n_unique(&self) -> PyResult { - let n = self.series.n_unique().map_err(PyPolarsErr::from)?; + fn n_unique(&self, py: Python) -> PyResult { + let n = py + .allow_threads(|| self.series.n_unique()) + .map_err(PyPolarsErr::from)?; Ok(n) } - fn floor(&self) -> PyResult { - let s = self.series.floor().map_err(PyPolarsErr::from)?; + fn floor(&self, py: Python) -> PyResult { + let s = py + .allow_threads(|| self.series.floor()) + .map_err(PyPolarsErr::from)?; Ok(s.into()) } - fn shrink_to_fit(&mut self) { - self.series.shrink_to_fit(); + fn shrink_to_fit(&mut self, py: Python) { + py.allow_threads(|| self.series.shrink_to_fit()); } fn dot(&self, other: &PySeries, py: Python) -> PyResult { @@ -358,15 +384,11 @@ impl PySeries { } let result: AnyValue = if lhs_dtype.is_float() || rhs_dtype.is_float() { - (&self.series * &other.series) - .map_err(PyPolarsErr::from)? - .sum::() + py.allow_threads(|| (&self.series * &other.series)?.sum::()) .map_err(PyPolarsErr::from)? .into() } else { - (&self.series * &other.series) - .map_err(PyPolarsErr::from)? - .sum::() + py.allow_threads(|| (&self.series * &other.series)?.sum::()) .map_err(PyPolarsErr::from)? .into() }; @@ -413,20 +435,27 @@ impl PySeries { } } - fn skew(&self, bias: bool) -> PyResult> { - let out = self.series.skew(bias).map_err(PyPolarsErr::from)?; + fn skew(&self, py: Python, bias: bool) -> PyResult> { + let out = py + .allow_threads(|| self.series.skew(bias)) + .map_err(PyPolarsErr::from)?; Ok(out) } - fn kurtosis(&self, fisher: bool, bias: bool) -> PyResult> { - let out = self - .series - .kurtosis(fisher, bias) + fn kurtosis(&self, py: Python, fisher: bool, bias: bool) -> PyResult> { + let out = py + .allow_threads(|| self.series.kurtosis(fisher, bias)) .map_err(PyPolarsErr::from)?; Ok(out) } - fn cast(&self, dtype: Wrap, strict: bool, wrap_numerical: bool) -> PyResult { + fn cast( + &self, + py: Python, + dtype: Wrap, + strict: bool, + wrap_numerical: bool, + ) -> PyResult { let options = if wrap_numerical { CastOptions::Overflowing } else if strict { @@ -436,7 +465,7 @@ impl PySeries { }; let dtype = dtype.0; - let out = self.series.cast_with_options(&dtype, options); + let out = py.allow_threads(|| self.series.cast_with_options(&dtype, options)); let out = out.map_err(PyPolarsErr::from)?; Ok(out.into()) } @@ -451,7 +480,7 @@ impl PySeries { }) } - fn is_sorted(&self, descending: bool, nulls_last: bool) -> PyResult { + fn is_sorted(&self, py: Python, descending: bool, nulls_last: bool) -> PyResult { let options = SortOptions { descending, nulls_last, @@ -459,31 +488,36 @@ impl PySeries { maintain_order: false, limit: None, }; - Ok(self.series.is_sorted(options).map_err(PyPolarsErr::from)?) + Ok(py + .allow_threads(|| self.series.is_sorted(options)) + .map_err(PyPolarsErr::from)?) } fn clear(&self) -> Self { self.series.clear().into() } - fn head(&self, n: usize) -> Self { - self.series.head(Some(n)).into() + fn head(&self, py: Python, n: usize) -> Self { + py.allow_threads(|| self.series.head(Some(n))).into() } - fn tail(&self, n: usize) -> Self { - self.series.tail(Some(n)).into() + fn tail(&self, py: Python, n: usize) -> Self { + py.allow_threads(|| self.series.tail(Some(n))).into() } fn value_counts( &self, + py: Python, sort: bool, parallel: bool, name: String, normalize: bool, ) -> PyResult { - let out = self - .series - .value_counts(sort, parallel, name.into(), normalize) + let out = py + .allow_threads(|| { + self.series + .value_counts(sort, parallel, name.into(), normalize) + }) .map_err(PyPolarsErr::from)?; Ok(out.into()) } @@ -494,8 +528,10 @@ impl PySeries { self.series.slice(offset, length).into() } - pub fn not_(&self) -> PyResult { - let out = polars_ops::series::negate_bitwise(&self.series).map_err(PyPolarsErr::from)?; + pub fn not_(&self, py: Python) -> PyResult { + let out = py + .allow_threads(|| polars_ops::series::negate_bitwise(&self.series)) + .map_err(PyPolarsErr::from)?; Ok(out.into()) } } @@ -516,8 +552,15 @@ macro_rules! impl_set_with_mask { #[pymethods] impl PySeries { #[pyo3(signature = (filter, value))] - fn $name(&self, filter: &PySeries, value: Option<$native>) -> PyResult { - let series = $name(&self.series, filter, value).map_err(PyPolarsErr::from)?; + fn $name( + &self, + py: Python, + filter: &PySeries, + value: Option<$native>, + ) -> PyResult { + let series = py + .allow_threads(|| $name(&self.series, filter, value)) + .map_err(PyPolarsErr::from)?; Ok(Self::new(series)) } } diff --git a/crates/polars-python/src/series/scatter.rs b/crates/polars-python/src/series/scatter.rs index 97df60ef205b..798cd189a9b6 100644 --- a/crates/polars-python/src/series/scatter.rs +++ b/crates/polars-python/src/series/scatter.rs @@ -7,11 +7,12 @@ use crate::error::PyPolarsErr; #[pymethods] impl PySeries { - fn scatter(&mut self, idx: PySeries, values: PySeries) -> PyResult<()> { + fn scatter(&mut self, py: Python, idx: PySeries, values: PySeries) -> PyResult<()> { // we take the value because we want a ref count of 1 so that we can // have mutable access cheaply via _get_inner_mut(). let s = std::mem::take(&mut self.series); - match scatter(s, &idx.series, &values.series) { + let result = py.allow_threads(|| scatter(s, &idx.series, &values.series)); + match result { Ok(out) => { self.series = out; Ok(())