-
Notifications
You must be signed in to change notification settings - Fork 826
feat: add coroutine::await_in_coroutine
to await awaitables in coroutine context
#3611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
e0b6cbf
11d1ec2
a5c2e0f
1b3fa98
28e4875
07c5406
f9084e5
c3881d8
f0203e7
5c8346e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
# Awaiting Python awaitables | ||
|
||
Python awaitable can be awaited on Rust side | ||
using [`await_in_coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/fn.await_in_coroutine.html). | ||
|
||
```rust | ||
# # ![allow(dead_code)] | ||
# #[cfg(feature = "experimental-async")] { | ||
use pyo3::{prelude::*, coroutine::await_in_coroutine}; | ||
|
||
#[pyfunction] | ||
async fn wrap_awaitable(awaitable: PyObject) -> PyResult<PyObject> { | ||
Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?.await | ||
} | ||
# } | ||
``` | ||
|
||
Behind the scene, `await_in_coroutine` calls the `__await__` method of the Python awaitable (or `__iter__` for | ||
generator-based coroutine). | ||
|
||
## Restrictions | ||
|
||
As the name suggests, `await_in_coroutine` resulting future can only be awaited in coroutine context. Otherwise, it | ||
panics. | ||
|
||
```rust | ||
# # ![allow(dead_code)] | ||
# #[cfg(feature = "experimental-async")] { | ||
use pyo3::{prelude::*, coroutine::await_in_coroutine}; | ||
|
||
#[pyfunction] | ||
fn block_on(awaitable: PyObject) -> PyResult<PyObject> { | ||
let future = Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?; | ||
futures::executor::block_on(future) // ERROR: Python awaitable must be awaited in coroutine context | ||
} | ||
# } | ||
``` | ||
|
||
The future must also be the only one to be awaited at a time; it means that it's forbidden to await it in a `select!`. | ||
Otherwise, it panics. | ||
|
||
```rust | ||
# # ![allow(dead_code)] | ||
# #[cfg(feature = "experimental-async")] { | ||
use futures::FutureExt; | ||
use pyo3::{prelude::*, coroutine::await_in_coroutine}; | ||
|
||
#[pyfunction] | ||
async fn select(awaitable: PyObject) -> PyResult<PyObject> { | ||
let future = Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?; | ||
futures::select_biased! { | ||
_ = std::future::pending::<()>().fuse() => unreachable!(), | ||
res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future | ||
} | ||
} | ||
# } | ||
``` | ||
|
||
These restrictions exist because awaiting a `await_in_coroutine` future strongly binds it to the | ||
enclosing coroutine. The coroutine will then delegate its `send`/`throw`/`close` methods to the | ||
awaited future. If it was awaited in a `select!`, `Coroutine::send` would no able to know if | ||
the value passed would have to be delegated or not. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Add `coroutine::await_in_coroutine` to await awaitables in coroutine context |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,8 +1,9 @@ | ||
use crate::object::*; | ||
use crate::pyport::Py_ssize_t; | ||
use std::os::raw::{c_char, c_int}; | ||
|
||
#[cfg(any(Py_3_12, all(Py_3_8, not(Py_LIMITED_API))))] | ||
use libc::size_t; | ||
use std::os::raw::{c_char, c_int}; | ||
|
||
use crate::{object::*, pyport::Py_ssize_t}; | ||
|
||
#[inline] | ||
#[cfg(all(not(Py_3_13), not(PyPy)))] // CPython exposed as a function in 3.13, in object.h | ||
|
@@ -143,7 +144,11 @@ extern "C" { | |
pub fn PyIter_Next(arg1: *mut PyObject) -> *mut PyObject; | ||
#[cfg(all(not(PyPy), Py_3_10))] | ||
#[cfg_attr(PyPy, link_name = "PyPyIter_Send")] | ||
pub fn PyIter_Send(iter: *mut PyObject, arg: *mut PyObject, presult: *mut *mut PyObject); | ||
pub fn PyIter_Send( | ||
iter: *mut PyObject, | ||
arg: *mut PyObject, | ||
presult: *mut *mut PyObject, | ||
) -> c_int; | ||
Comment on lines
+147
to
+151
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this was fixed in #4746, will need to rebase. |
||
|
||
#[cfg_attr(PyPy, link_name = "PyPyNumber_Check")] | ||
pub fn PyNumber_Check(o: *mut PyObject) -> c_int; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,28 +11,37 @@ use std::{ | |
use pyo3_macros::{pyclass, pymethods}; | ||
|
||
use crate::{ | ||
coroutine::{cancel::ThrowCallback, waker::AsyncioWaker}, | ||
exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration}, | ||
coroutine::waker::CoroutineWaker, | ||
exceptions::{PyAttributeError, PyGeneratorExit, PyRuntimeError, PyStopIteration}, | ||
panic::PanicException, | ||
types::{string::PyStringMethods, PyIterator, PyString}, | ||
Bound, IntoPyObject, IntoPyObjectExt, Py, PyAny, PyErr, PyObject, PyResult, Python, | ||
types::{string::PyStringMethods, PyString}, | ||
Bound, IntoPyObject, IntoPyObjectExt, Py, PyErr, PyObject, PyResult, Python, | ||
}; | ||
|
||
pub(crate) mod cancel; | ||
mod asyncio; | ||
mod awaitable; | ||
mod cancel; | ||
mod waker; | ||
|
||
pub use cancel::CancelHandle; | ||
pub use awaitable::await_in_coroutine; | ||
pub use cancel::{CancelHandle, ThrowCallback}; | ||
|
||
const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine"; | ||
|
||
pub(crate) enum CoroOp { | ||
Send(PyObject), | ||
Throw(PyObject), | ||
Close, | ||
} | ||
|
||
/// Python coroutine wrapping a [`Future`]. | ||
#[pyclass(crate = "crate")] | ||
pub struct Coroutine { | ||
name: Option<Py<PyString>>, | ||
qualname_prefix: Option<&'static str>, | ||
throw_callback: Option<ThrowCallback>, | ||
future: Option<Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send>>>, | ||
waker: Option<Arc<AsyncioWaker>>, | ||
waker: Option<Arc<CoroutineWaker>>, | ||
} | ||
|
||
// Safety: `Coroutine` is allowed to be `Sync` even though the future is not, | ||
|
@@ -71,55 +80,58 @@ impl Coroutine { | |
} | ||
} | ||
|
||
fn poll(&mut self, py: Python<'_>, throw: Option<PyObject>) -> PyResult<PyObject> { | ||
fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> { | ||
// raise if the coroutine has already been run to completion | ||
let future_rs = match self.future { | ||
Some(ref mut fut) => fut, | ||
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)), | ||
}; | ||
// reraise thrown exception it | ||
match (throw, &self.throw_callback) { | ||
(Some(exc), Some(cb)) => cb.throw(exc), | ||
(Some(exc), None) => { | ||
self.close(); | ||
return Err(PyErr::from_value(exc.into_bound(py))); | ||
} | ||
(None, _) => {} | ||
// if the future is not pending on a Python awaitable, | ||
// execute throw callback or complete on close | ||
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) { | ||
match op { | ||
send @ CoroOp::Send(_) => op = send, | ||
CoroOp::Throw(exc) => match &self.throw_callback { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder, can we simplify the logic here by modelling the throw callback as another form of delegation? What if Or maybe There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure to follow you, I don't understand what |
||
Some(cb) => { | ||
cb.throw(exc.clone_ref(py)); | ||
op = CoroOp::Send(py.None()); | ||
} | ||
None => return Err(PyErr::from_value(exc.into_bound(py))), | ||
}, | ||
CoroOp::Close => return Err(PyGeneratorExit::new_err(py.None())), | ||
}; | ||
} | ||
// create a new waker, or try to reset it in place | ||
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) { | ||
waker.reset(); | ||
*waker = CoroutineWaker::new(op); | ||
} else { | ||
self.waker = Some(Arc::new(AsyncioWaker::new())); | ||
self.waker = Some(Arc::new(CoroutineWaker::new(op))); | ||
} | ||
let waker = Waker::from(self.waker.clone().unwrap()); | ||
// poll the Rust future and forward its results if ready | ||
// poll the future and forward its results if ready; otherwise, yield from waker | ||
// polling is UnwindSafe because the future is dropped in case of panic | ||
let waker = Waker::from(self.waker.clone().unwrap()); | ||
let poll = || future_rs.as_mut().poll(&mut Context::from_waker(&waker)); | ||
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) { | ||
Ok(Poll::Ready(res)) => { | ||
self.close(); | ||
return Err(PyStopIteration::new_err((res?,))); | ||
} | ||
Err(err) => { | ||
self.close(); | ||
return Err(PanicException::from_panic_payload(err)); | ||
} | ||
_ => {} | ||
Err(err) => Err(PanicException::from_panic_payload(err)), | ||
// See #4407, `PyStopIteration::new_err` argument must be wrap in a tuple, | ||
// otherwise, when a tuple is returned, its fields would be expanded as error | ||
// arguments | ||
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err((res?,))), | ||
Ok(Poll::Pending) => match self.waker.as_ref().unwrap().yield_(py) { | ||
Ok(to_yield) => Ok(to_yield), | ||
Err(err) => Err(err), | ||
}, | ||
} | ||
// otherwise, initialize the waker `asyncio.Future` | ||
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? { | ||
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__` | ||
// and will yield itself if its result has not been set in polling above | ||
if let Some(future) = PyIterator::from_object(future).unwrap().next() { | ||
// future has not been leaked into Python for now, and Rust code can only call | ||
// `set_result(None)` in `Wake` implementation, so it's safe to unwrap | ||
return Ok(future.unwrap().into()); | ||
} | ||
} | ||
|
||
fn poll(&mut self, py: Python<'_>, op: CoroOp) -> PyResult<PyObject> { | ||
let result = self.poll_inner(py, op); | ||
if result.is_err() { | ||
// the Rust future is dropped, and the field set to `None` | ||
// to indicate the coroutine has been run to completion | ||
drop(self.future.take()); | ||
} | ||
// if waker has been waken during future polling, this is roughly equivalent to | ||
// `await asyncio.sleep(0)`, so just yield `None`. | ||
Ok(py.None()) | ||
result | ||
} | ||
} | ||
|
||
|
@@ -145,25 +157,27 @@ impl Coroutine { | |
} | ||
} | ||
|
||
fn send(&mut self, py: Python<'_>, _value: &Bound<'_, PyAny>) -> PyResult<PyObject> { | ||
self.poll(py, None) | ||
fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> { | ||
self.poll(py, CoroOp::Send(value)) | ||
} | ||
|
||
fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> { | ||
self.poll(py, Some(exc)) | ||
self.poll(py, CoroOp::Throw(exc)) | ||
} | ||
|
||
fn close(&mut self) { | ||
// the Rust future is dropped, and the field set to `None` | ||
// to indicate the coroutine has been run to completion | ||
drop(self.future.take()); | ||
fn close(&mut self, py: Python<'_>) -> PyResult<()> { | ||
match self.poll(py, CoroOp::Close) { | ||
Ok(_) => Ok(()), | ||
Err(err) if err.is_instance_of::<PyGeneratorExit>(py) => Ok(()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because that's how coroutine are supposed to works: https://docs.python.org/3/reference/datamodel.html#coroutine.close, so I mimic the behavior. async def example():
try:
import asyncio
await asyncio.Future()
except GeneratorExit:
print("close")
raise
coro = example()
coro.send(None)
coro.close()
# print "close" but don't reraise the exception There is no |
||
Err(err) => Err(err), | ||
} | ||
} | ||
|
||
fn __await__(self_: Py<Self>) -> Py<Self> { | ||
self_ | ||
} | ||
|
||
fn __next__(&mut self, py: Python<'_>) -> PyResult<PyObject> { | ||
self.poll(py, None) | ||
self.poll(py, CoroOp::Send(py.None())) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
//! Coroutine implementation compatible with asyncio. | ||
use pyo3_macros::pyfunction; | ||
|
||
use crate::{ | ||
intern, | ||
sync::GILOnceCell, | ||
types::{PyAnyMethods, PyCFunction, PyIterator}, | ||
wrap_pyfunction, Bound, IntoPyObjectExt, Py, PyAny, PyObject, PyResult, Python, | ||
}; | ||
|
||
/// `asyncio.get_running_loop` | ||
fn get_running_loop(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> { | ||
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new(); | ||
let import = || -> PyResult<_> { | ||
let module = py.import("asyncio")?; | ||
Ok(module.getattr("get_running_loop")?.into()) | ||
}; | ||
GET_RUNNING_LOOP | ||
.get_or_try_init(py, import)? | ||
.bind(py) | ||
.call0() | ||
Comment on lines
+12
to
+21
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment that we can use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A very welcomed QOL improvement indeed! |
||
} | ||
|
||
/// Asyncio-compatible coroutine waker. | ||
/// | ||
/// Polling a Rust future yields an `asyncio.Future`, whose `set_result` method is called | ||
/// when `Waker::wake` is called. | ||
pub(super) struct AsyncioWaker { | ||
event_loop: PyObject, | ||
future: PyObject, | ||
} | ||
|
||
impl AsyncioWaker { | ||
pub(super) fn new(py: Python<'_>) -> PyResult<Self> { | ||
let event_loop = get_running_loop(py)?.into_py_any(py)?; | ||
let future = event_loop.call_method0(py, intern!(py, "create_future"))?; | ||
Ok(Self { event_loop, future }) | ||
} | ||
|
||
pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> { | ||
let __await__; | ||
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`, | ||
// but `create_future` may have been overriden | ||
let mut iter = match PyIterator::from_object(self.future.bind(py)) { | ||
Comment on lines
+42
to
+44
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds like this might go wrong if the overridden future defines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, it might go wrong, even if it would be very surprising to do such thing. I only know two alternative event loops:
However, I didn't think about using |
||
Ok(iter) => iter, | ||
Err(_) => { | ||
__await__ = self.future.call_method0(py, intern!(py, "__await__"))?; | ||
PyIterator::from_object(__await__.bind(py))? | ||
} | ||
}; | ||
// future has not been wakened (because `yield_waken` would have been called | ||
// otherwise), so it is expected to yield itself | ||
iter.next().expect("future didn't yield")?.into_py_any(py) | ||
} | ||
|
||
#[allow(clippy::unnecessary_wraps)] | ||
pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> { | ||
Ok(py.None()) | ||
} | ||
Comment on lines
+56
to
+59
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe drop the Result wrapping and move that to the caller? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, if you remember, this work is a part of a bigger one that I have tried to split into smaller parts, which explains it has some trace of the parts coming after (in the arbitrary order chosen). |
||
|
||
pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> { | ||
static RELEASE_WAITER: GILOnceCell<Py<PyCFunction>> = GILOnceCell::new(); | ||
let release_waiter = RELEASE_WAITER | ||
.get_or_try_init(py, || wrap_pyfunction!(release_waiter, py).map(Into::into))?; | ||
// `Future.set_result` must be called in event loop thread, | ||
// so it requires `call_soon_threadsafe` | ||
let call_soon_threadsafe = self.event_loop.call_method1( | ||
py, | ||
intern!(py, "call_soon_threadsafe"), | ||
(release_waiter, &self.future), | ||
); | ||
if let Err(err) = call_soon_threadsafe { | ||
// `call_soon_threadsafe` will raise if the event loop is closed; | ||
// instead of catching an unspecific `RuntimeError`, check directly if it's closed. | ||
let is_closed = self.event_loop.call_method0(py, "is_closed")?; | ||
if !is_closed.extract(py)? { | ||
return Err(err); | ||
} | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
/// Call `future.set_result` if the future is not done. | ||
/// | ||
/// Future can be cancelled by the event loop before being wakened. | ||
/// See <https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L452C5-L452C5> | ||
#[pyfunction(crate = "crate")] | ||
fn release_waiter(future: &Bound<'_, PyAny>) -> PyResult<()> { | ||
let done = future.call_method0(intern!(future.py(), "done"))?; | ||
if !done.extract::<bool>()? { | ||
future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?; | ||
} | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "coroutine context", I understand that means "within an async call stack underneath a
#[pyfunction]
or#[pymethods]
"?Maybe we can adjust wording to something like that? It took me a few reads to process this, and it's maybe not obvious to all users that
async
Rust functions become coroutines.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I will try to reword it. But it will maybe more easily understandable when the
Coroutine
type will be stabilized.