Skip to content

Commit

Permalink
fix(python): Release GIL in Python APIs, part 1 (#19705)
Browse files Browse the repository at this point in the history
Co-authored-by: Itamar Turner-Trauring <[email protected]>
  • Loading branch information
itamarst and pythonspeed authored Nov 13, 2024
1 parent 87367e9 commit 18786ac
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 214 deletions.
118 changes: 51 additions & 67 deletions crates/polars-python/src/series/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,39 @@ use crate::error::PyPolarsErr;

#[pymethods]
impl PySeries {
fn any(&self, ignore_nulls: bool) -> PyResult<Option<bool>> {
let s = self.series.bool().map_err(PyPolarsErr::from)?;
Ok(if ignore_nulls {
Some(s.any())
} else {
s.any_kleene()
fn any(&self, py: Python, ignore_nulls: bool) -> PyResult<Option<bool>> {
py.allow_threads(|| {
let s = self.series.bool().map_err(PyPolarsErr::from)?;
Ok(if ignore_nulls {
Some(s.any())
} else {
s.any_kleene()
})
})
}

fn all(&self, ignore_nulls: bool) -> PyResult<Option<bool>> {
let s = self.series.bool().map_err(PyPolarsErr::from)?;
Ok(if ignore_nulls {
Some(s.all())
} else {
s.all_kleene()
fn all(&self, py: Python, ignore_nulls: bool) -> PyResult<Option<bool>> {
py.allow_threads(|| {
let s = self.series.bool().map_err(PyPolarsErr::from)?;
Ok(if ignore_nulls {
Some(s.all())
} else {
s.all_kleene()
})
})
}

fn arg_max(&self) -> Option<usize> {
self.series.arg_max()
fn arg_max(&self, py: Python) -> Option<usize> {
py.allow_threads(|| self.series.arg_max())
}

fn arg_min(&self) -> Option<usize> {
self.series.arg_min()
fn arg_min(&self, py: Python) -> Option<usize> {
py.allow_threads(|| self.series.arg_min())
}

fn max(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.max_reduce()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.max_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
Expand All @@ -47,124 +49,110 @@ impl PySeries {
fn mean(&self, py: Python) -> PyResult<PyObject> {
match self.series.dtype() {
Boolean => Ok(Wrap(
self.series
.cast(&DataType::UInt8)
.unwrap()
.mean_reduce()
py.allow_threads(|| self.series.cast(&DataType::UInt8).unwrap().mean_reduce())
.as_any_value(),
)
.into_py(py)),
// For non-numeric output types we require mean_reduce.
dt if dt.is_temporal() => {
Ok(Wrap(self.series.mean_reduce().as_any_value()).into_py(py))
},
_ => Ok(self.series.mean().into_py(py)),
dt if dt.is_temporal() => Ok(Wrap(
py.allow_threads(|| self.series.mean_reduce())
.as_any_value(),
)
.into_py(py)),
_ => Ok(py.allow_threads(|| self.series.mean()).into_py(py)),
}
}

fn median(&self, py: Python) -> PyResult<PyObject> {
match self.series.dtype() {
Boolean => Ok(Wrap(
self.series
.cast(&DataType::UInt8)
.unwrap()
.median_reduce()
py.allow_threads(|| self.series.cast(&DataType::UInt8).unwrap().median_reduce())
.map_err(PyPolarsErr::from)?
.as_any_value(),
)
.into_py(py)),
// For non-numeric output types we require median_reduce.
dt if dt.is_temporal() => Ok(Wrap(
self.series
.median_reduce()
py.allow_threads(|| self.series.median_reduce())
.map_err(PyPolarsErr::from)?
.as_any_value(),
)
.into_py(py)),
_ => Ok(self.series.median().into_py(py)),
_ => Ok(py.allow_threads(|| self.series.median()).into_py(py)),
}
}

fn min(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.min_reduce()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.min_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
}

fn product(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.product()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.product().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
}

fn quantile(&self, quantile: f64, interpolation: Wrap<QuantileMethod>) -> PyResult<PyObject> {
let bind = self.series.quantile_reduce(quantile, interpolation.0);
fn quantile(
&self,
py: Python,
quantile: f64,
interpolation: Wrap<QuantileMethod>,
) -> PyResult<PyObject> {
let bind = py.allow_threads(|| self.series.quantile_reduce(quantile, interpolation.0));
let sc = bind.map_err(PyPolarsErr::from)?;

Ok(Python::with_gil(|py| Wrap(sc.as_any_value()).into_py(py)))
Ok(Wrap(sc.as_any_value()).into_py(py))
}

fn std(&self, py: Python, ddof: u8) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.std_reduce(ddof)
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.std_reduce(ddof).map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
}

fn var(&self, py: Python, ddof: u8) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.var_reduce(ddof)
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.var_reduce(ddof).map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
}

fn sum(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.sum_reduce()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.sum_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
}

fn first(&self, py: Python) -> PyObject {
Wrap(self.series.first().as_any_value()).into_py(py)
Wrap(py.allow_threads(|| self.series.first()).as_any_value()).into_py(py)
}

fn last(&self, py: Python) -> PyObject {
Wrap(self.series.last().as_any_value()).into_py(py)
Wrap(py.allow_threads(|| self.series.last()).as_any_value()).into_py(py)
}

#[cfg(feature = "approx_unique")]
fn approx_n_unique(&self, py: Python) -> PyResult<PyObject> {
Ok(self
.series
.approx_n_unique()
.map_err(PyPolarsErr::from)?
Ok(py
.allow_threads(|| self.series.approx_n_unique().map_err(PyPolarsErr::from))?
.into_py(py))
}

#[cfg(feature = "bitwise")]
fn bitwise_and(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.and_reduce()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.and_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
Expand All @@ -173,9 +161,7 @@ impl PySeries {
#[cfg(feature = "bitwise")]
fn bitwise_or(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.or_reduce()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.or_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
Expand All @@ -184,9 +170,7 @@ impl PySeries {
#[cfg(feature = "bitwise")]
fn bitwise_xor(&self, py: Python) -> PyResult<PyObject> {
Ok(Wrap(
self.series
.xor_reduce()
.map_err(PyPolarsErr::from)?
py.allow_threads(|| self.series.xor_reduce().map_err(PyPolarsErr::from))?
.as_any_value(),
)
.into_py(py))
Expand Down
33 changes: 19 additions & 14 deletions crates/polars-python/src/series/arithmetic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,33 @@ use crate::error::PyPolarsErr;

#[pymethods]
impl PySeries {
fn add(&self, other: &PySeries) -> PyResult<Self> {
Ok((&self.series + &other.series)
fn add(&self, py: Python, other: &PySeries) -> PyResult<Self> {
Ok(py
.allow_threads(|| &self.series + &other.series)
.map(Into::into)
.map_err(PyPolarsErr::from)?)
}
fn sub(&self, other: &PySeries) -> PyResult<Self> {
Ok((&self.series - &other.series)
fn sub(&self, py: Python, other: &PySeries) -> PyResult<Self> {
Ok(py
.allow_threads(|| &self.series - &other.series)
.map(Into::into)
.map_err(PyPolarsErr::from)?)
}
fn div(&self, other: &PySeries) -> PyResult<Self> {
Ok((&self.series / &other.series)
fn div(&self, py: Python, other: &PySeries) -> PyResult<Self> {
Ok(py
.allow_threads(|| &self.series / &other.series)
.map(Into::into)
.map_err(PyPolarsErr::from)?)
}
fn mul(&self, other: &PySeries) -> PyResult<Self> {
Ok((&self.series * &other.series)
fn mul(&self, py: Python, other: &PySeries) -> PyResult<Self> {
Ok(py
.allow_threads(|| &self.series * &other.series)
.map(Into::into)
.map_err(PyPolarsErr::from)?)
}
fn rem(&self, other: &PySeries) -> PyResult<Self> {
Ok((&self.series % &other.series)
fn rem(&self, py: Python, other: &PySeries) -> PyResult<Self> {
Ok(py
.allow_threads(|| &self.series % &other.series)
.map(Into::into)
.map_err(PyPolarsErr::from)?)
}
Expand All @@ -37,8 +42,8 @@ macro_rules! impl_arithmetic {
($name:ident, $type:ty, $operand:tt) => {
#[pymethods]
impl PySeries {
fn $name(&self, other: $type) -> PyResult<Self> {
Ok((&self.series $operand other).into())
fn $name(&self, py: Python, other: $type) -> PyResult<Self> {
Ok(py.allow_threads(|| {&self.series $operand other}).into())
}
}
};
Expand Down Expand Up @@ -103,8 +108,8 @@ macro_rules! impl_rhs_arithmetic {
($name:ident, $type:ty, $operand:ident) => {
#[pymethods]
impl PySeries {
fn $name(&self, other: $type) -> PyResult<Self> {
Ok(other.$operand(&self.series).into())
fn $name(&self, py: Python, other: $type) -> PyResult<Self> {
Ok(py.allow_threads(|| other.$operand(&self.series)).into())
}
}
};
Expand Down
9 changes: 5 additions & 4 deletions crates/polars-python/src/series/buffers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,17 +82,17 @@ impl PySeries {
}

/// Return the underlying values, validity, and offsets buffers as Series.
fn _get_buffers(&self) -> PyResult<(Self, Option<Self>, Option<Self>)> {
fn _get_buffers(&self, py: Python) -> PyResult<(Self, Option<Self>, Option<Self>)> {
let s = &self.series;
match s.dtype().to_physical() {
py.allow_threads(|| match s.dtype().to_physical() {
dt if dt.is_numeric() => get_buffers_from_primitive(s),
DataType::Boolean => get_buffers_from_primitive(s),
DataType::String => get_buffers_from_string(s),
dt => {
let msg = format!("`_get_buffers` not implemented for `dtype` {dt}");
Err(PyTypeError::new_err(msg))
},
}
})
}
}

Expand Down Expand Up @@ -253,6 +253,7 @@ impl PySeries {
#[staticmethod]
#[pyo3(signature = (dtype, data, validity=None))]
unsafe fn _from_buffers(
py: Python,
dtype: Wrap<DataType>,
data: Vec<PySeries>,
validity: Option<PySeries>,
Expand Down Expand Up @@ -320,7 +321,7 @@ impl PySeries {
)),
};
let values = series_to_buffer::<UInt8Type>(values);
from_buffers_string_impl(values, validity, offsets)?
py.allow_threads(|| from_buffers_string_impl(values, validity, offsets))?
},
dt => {
let msg = format!("`_from_buffers` not implemented for `dtype` {dt}");
Expand Down
Loading

0 comments on commit 18786ac

Please sign in to comment.