-
Notifications
You must be signed in to change notification settings - Fork 833
feat: support async fn
in macros with coroutine implementation
#3540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
# Using `async` and `await` | ||
|
||
*This feature is still in active development. See [the related issue](https://github.com/PyO3/pyo3/issues/1632).* | ||
|
||
`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`. | ||
|
||
```rust | ||
# #![allow(dead_code)] | ||
use std::{thread, time::Duration}; | ||
use futures::channel::oneshot; | ||
use pyo3::prelude::*; | ||
|
||
#[pyfunction] | ||
async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> { | ||
let (tx, rx) = oneshot::channel(); | ||
thread::spawn(move || { | ||
thread::sleep(Duration::from_secs_f64(seconds)); | ||
tx.send(()).unwrap(); | ||
}); | ||
rx.await.unwrap(); | ||
result | ||
} | ||
``` | ||
|
||
*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.* | ||
|
||
## `Send + 'static` constraint | ||
|
||
Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object. | ||
|
||
As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile(arg: &PyAny, py: Python<'_>) -> &PyAny`. | ||
|
||
It also means that methods cannot use `&self`/`&mut self`, *but this restriction should be dropped in the future.* | ||
|
||
|
||
## Implicit GIL holding | ||
|
||
Even if it is not possible to pass a `py: Python<'_>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'_>`/`&PyAny` parameter, yet the GIL is held. | ||
|
||
It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible. | ||
|
||
## Release the GIL across `.await` | ||
|
||
There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*. | ||
|
||
Here is the advised workaround for now: | ||
|
||
```rust,ignore | ||
use std::{future::Future, pin::{Pin, pin}, task::{Context, Poll}}; | ||
use pyo3::prelude::*; | ||
|
||
struct AllowThreads<F>(F); | ||
|
||
impl<F> Future for AllowThreads<F> | ||
where | ||
F: Future + Unpin + Send, | ||
F::Output: Send, | ||
{ | ||
type Output = F::Output; | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
let waker = cx.waker(); | ||
Python::with_gil(|gil| { | ||
gil.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker))) | ||
}) | ||
} | ||
} | ||
``` | ||
|
||
## Cancellation | ||
|
||
*To be implemented* | ||
|
||
## The `Coroutine` type | ||
|
||
To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). Each `coroutine.send` call is translated to `Future::poll` call, while `coroutine.throw` call reraise the exception *(this behavior will be configurable with cancellation support)*. | ||
|
||
*The type does not yet have a public constructor until the design is finalized.* |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Support `async fn` in macros with coroutine implementation |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
//! Python coroutine implementation, used notably when wrapping `async fn` | ||
//! with `#[pyfunction]`/`#[pymethods]`. | ||
use std::{ | ||
any::Any, | ||
future::Future, | ||
panic, | ||
pin::Pin, | ||
sync::Arc, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
use futures_util::FutureExt; | ||
use pyo3_macros::{pyclass, pymethods}; | ||
|
||
use crate::{ | ||
coroutine::waker::AsyncioWaker, | ||
exceptions::{PyRuntimeError, PyStopIteration}, | ||
panic::PanicException, | ||
pyclass::IterNextOutput, | ||
types::PyIterator, | ||
IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python, | ||
}; | ||
|
||
mod waker; | ||
|
||
const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine"; | ||
|
||
type FutureOutput = Result<PyResult<PyObject>, Box<dyn Any + Send>>; | ||
|
||
/// Python coroutine wrapping a [`Future`]. | ||
#[pyclass(crate = "crate")] | ||
pub struct Coroutine { | ||
future: Option<Pin<Box<dyn Future<Output = FutureOutput> + Send>>>, | ||
waker: Option<Arc<AsyncioWaker>>, | ||
} | ||
|
||
impl Coroutine { | ||
/// Wrap a future into a Python coroutine. | ||
/// | ||
/// Coroutine `send` polls the wrapped future, ignoring the value passed | ||
/// (should always be `None` anyway). | ||
davidhewitt marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// | ||
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed | ||
pub(crate) fn from_future<F, T, E>(future: F) -> Self | ||
where | ||
F: Future<Output = Result<T, E>> + Send + 'static, | ||
T: IntoPy<PyObject>, | ||
PyErr: From<E>, | ||
{ | ||
let wrap = async move { | ||
let obj = future.await?; | ||
// SAFETY: GIL is acquired when future is polled (see `Coroutine::poll`) | ||
Ok(obj.into_py(unsafe { Python::assume_gil_acquired() })) | ||
}; | ||
Self { | ||
future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())), | ||
waker: None, | ||
} | ||
} | ||
|
||
fn poll( | ||
&mut self, | ||
py: Python<'_>, | ||
throw: Option<PyObject>, | ||
) -> PyResult<IterNextOutput<PyObject, PyObject>> { | ||
// raise if the coroutine has already been run to completion | ||
let future_rs = match self.future { | ||
Some(ref mut fut) => fut, | ||
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)), | ||
}; | ||
// reraise thrown exception it | ||
if let Some(exc) = throw { | ||
self.close(); | ||
return Err(PyErr::from_value(exc.as_ref(py))); | ||
} | ||
// create a new waker, or try to reset it in place | ||
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) { | ||
waker.reset(); | ||
} else { | ||
self.waker = Some(Arc::new(AsyncioWaker::new())); | ||
} | ||
let waker = futures_util::task::waker(self.waker.clone().unwrap()); | ||
// poll the Rust future and forward its results if ready | ||
if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) { | ||
self.close(); | ||
return match res { | ||
Ok(res) => Ok(IterNextOutput::Return(res?)), | ||
Err(err) => Err(PanicException::from_panic_payload(err)), | ||
}; | ||
} | ||
// otherwise, initialize the waker `asyncio.Future` | ||
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? { | ||
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__` | ||
// and will yield itself if its result has not been set in polling above | ||
Comment on lines
+93
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can't help but wonder if somehow deep down in the Rust future, if it awaits a Python awaitable, whether we want to yield whatever the Python awaitable that blocked it here, rather than make a new However, it needs some way to lift whatever that object was up to here. Maybe that's done through the waker, or maybe that's done through our own mini thread-local "runtime"? Of course, there's also the possibility that this Rust future never awaits on anything from Python, and if that's the case, doing this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I've an idea to make this possible, and in a quite elegant way, but it would require a nightly feature.
This would indeed be a way to support arbitrary awaitables. You would still need to target a particular runtime for arbitrary Rust futures (but you could also have a runtime-agnostic coroutine which would only support Python awaitables). However, it would not be possible to do a An additional side effect would be cancellation, because if you delegate to the Python awaitable, you should also delegate cancellation, and I really like this idea though! I may implement a POC in the next days, maybe in pyo3-async to begin with. EDIT: Actually, you can't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be interesting to explore, and if we find that this design makes the most sense but requires nightly, we can put it behind our |
||
if let Some(future) = PyIterator::from_object(future).unwrap().next() { | ||
// future has not been leaked into Python for now, and Rust code can only call | ||
// `set_result(None)` in `ArcWake` implementation, so it's safe to unwrap | ||
return Ok(IterNextOutput::Yield(future.unwrap().into())); | ||
} | ||
} | ||
// if waker has been waken during future polling, this is roughly equivalent to | ||
// `await asyncio.sleep(0)`, so just yield `None`. | ||
Ok(IterNextOutput::Yield(py.None().into())) | ||
} | ||
} | ||
|
||
pub(crate) fn iter_result(result: IterNextOutput<PyObject, PyObject>) -> PyResult<PyObject> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will make our handling of iterators even more of a mess, won't it? So this is basically what we want to do with all iterators but it would apply only to coroutines? Maybe we should align all of this if we go ahead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Sorry if this not clear, but this is mainly directed at @davidhewitt, not so much at @wyfo.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I honestly think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
match result { | ||
IterNextOutput::Yield(ob) => Ok(ob), | ||
IterNextOutput::Return(ob) => Err(PyStopIteration::new_err(ob)), | ||
} | ||
} | ||
|
||
#[pymethods(crate = "crate")] | ||
impl Coroutine { | ||
fn send(&mut self, py: Python<'_>, _value: &PyAny) -> PyResult<PyObject> { | ||
iter_result(self.poll(py, None)?) | ||
wyfo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> { | ||
iter_result(self.poll(py, Some(exc))?) | ||
} | ||
|
||
fn close(&mut self) { | ||
// the Rust future is dropped, and the field set to `None` | ||
// to indicate the coroutine has been run to completion | ||
drop(self.future.take()); | ||
} | ||
|
||
fn __await__(self_: Py<Self>) -> Py<Self> { | ||
self_ | ||
} | ||
|
||
fn __next__(&mut self, py: Python<'_>) -> PyResult<IterNextOutput<PyObject, PyObject>> { | ||
self.poll(py, None) | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.