Skip to content

feat: add coroutine::await_in_coroutine to await awaitables in coroutine context #3611

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ hashbrown = { version = ">= 0.14.5, < 0.16", optional = true }
indexmap = { version = ">= 2.5.0, < 3", optional = true }
num-bigint = { version = "0.4.2", optional = true }
num-complex = { version = ">= 0.4.6, < 0.5", optional = true }
num-rational = {version = "0.4.1", optional = true }
num-rational = { version = "0.4.1", optional = true }
rust_decimal = { version = "1.15", default-features = false, optional = true }
serde = { version = "1.0", optional = true }
smallvec = { version = "1.0", optional = true }
@@ -63,7 +63,7 @@ rayon = "1.6.1"
futures = "0.3.28"
tempfile = "3.12.0"
static_assertions = "1.1.0"
uuid = {version = "1.10.0", features = ["v4"] }
uuid = { version = "1.10.0", features = ["v4"] }

[build-dependencies]
pyo3-build-config = { path = "pyo3-build-config", version = "=0.23.3", features = ["resolve-config"] }
1 change: 1 addition & 0 deletions guide/src/SUMMARY.md
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
- [Mapping of Rust types to Python types](conversions/tables.md)
- [Conversion traits](conversions/traits.md)
- [Using `async` and `await`](async-await.md)
- [Awaiting Python awaitables](async-await/awaiting_python_awaitables)
- [Parallelism](parallelism.md)
- [Supporting Free-Threaded Python](free-threading.md)
- [Debugging](debugging.md)
62 changes: 62 additions & 0 deletions guide/src/async-await/awaiting_python_awaitables.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Awaiting Python awaitables

Python awaitable can be awaited on Rust side
using [`await_in_coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/fn.await_in_coroutine.html).

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use pyo3::{prelude::*, coroutine::await_in_coroutine};

#[pyfunction]
async fn wrap_awaitable(awaitable: PyObject) -> PyResult<PyObject> {
Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?.await
}
# }
```

Behind the scene, `await_in_coroutine` calls the `__await__` method of the Python awaitable (or `__iter__` for
generator-based coroutine).

## Restrictions

As the name suggests, `await_in_coroutine` resulting future can only be awaited in coroutine context. Otherwise, it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By "coroutine context", I understand that means "within an async call stack underneath a #[pyfunction] or #[pymethods]"?

Maybe we can adjust wording to something like that? It took me a few reads to process this, and it's maybe not obvious to all users that async Rust functions become coroutines.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, I will try to reword it. But it will maybe more easily understandable when the Coroutine type will be stabilized.

panics.

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use pyo3::{prelude::*, coroutine::await_in_coroutine};

#[pyfunction]
fn block_on(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?;
futures::executor::block_on(future) // ERROR: Python awaitable must be awaited in coroutine context
}
# }
```

The future must also be the only one to be awaited at a time; it means that it's forbidden to await it in a `select!`.
Otherwise, it panics.

```rust
# # ![allow(dead_code)]
# #[cfg(feature = "experimental-async")] {
use futures::FutureExt;
use pyo3::{prelude::*, coroutine::await_in_coroutine};

#[pyfunction]
async fn select(awaitable: PyObject) -> PyResult<PyObject> {
let future = Python::with_gil(|gil| await_in_coroutine(awaitable.bind(gil)))?;
futures::select_biased! {
_ = std::future::pending::<()>().fuse() => unreachable!(),
res = future.fuse() => res, // ERROR: Python awaitable mixed with Rust future
}
}
# }
```

These restrictions exist because awaiting a `await_in_coroutine` future strongly binds it to the
enclosing coroutine. The coroutine will then delegate its `send`/`throw`/`close` methods to the
awaited future. If it was awaited in a `select!`, `Coroutine::send` would no able to know if
the value passed would have to be delegated or not.
1 change: 1 addition & 0 deletions newsfragments/3611.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `coroutine::await_in_coroutine` to await awaitables in coroutine context
13 changes: 9 additions & 4 deletions pyo3-ffi/src/abstract_.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::object::*;
use crate::pyport::Py_ssize_t;
use std::os::raw::{c_char, c_int};

#[cfg(any(Py_3_12, all(Py_3_8, not(Py_LIMITED_API))))]
use libc::size_t;
use std::os::raw::{c_char, c_int};

use crate::{object::*, pyport::Py_ssize_t};

#[inline]
#[cfg(all(not(Py_3_13), not(PyPy)))] // CPython exposed as a function in 3.13, in object.h
@@ -143,7 +144,11 @@ extern "C" {
pub fn PyIter_Next(arg1: *mut PyObject) -> *mut PyObject;
#[cfg(all(not(PyPy), Py_3_10))]
#[cfg_attr(PyPy, link_name = "PyPyIter_Send")]
pub fn PyIter_Send(iter: *mut PyObject, arg: *mut PyObject, presult: *mut *mut PyObject);
pub fn PyIter_Send(
iter: *mut PyObject,
arg: *mut PyObject,
presult: *mut *mut PyObject,
) -> c_int;
Comment on lines +147 to +151
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this was fixed in #4746, will need to rebase.


#[cfg_attr(PyPy, link_name = "PyPyNumber_Check")]
pub fn PyNumber_Check(o: *mut PyObject) -> c_int;
112 changes: 63 additions & 49 deletions src/coroutine.rs
Original file line number Diff line number Diff line change
@@ -11,28 +11,37 @@ use std::{
use pyo3_macros::{pyclass, pymethods};

use crate::{
coroutine::{cancel::ThrowCallback, waker::AsyncioWaker},
exceptions::{PyAttributeError, PyRuntimeError, PyStopIteration},
coroutine::waker::CoroutineWaker,
exceptions::{PyAttributeError, PyGeneratorExit, PyRuntimeError, PyStopIteration},
panic::PanicException,
types::{string::PyStringMethods, PyIterator, PyString},
Bound, IntoPyObject, IntoPyObjectExt, Py, PyAny, PyErr, PyObject, PyResult, Python,
types::{string::PyStringMethods, PyString},
Bound, IntoPyObject, IntoPyObjectExt, Py, PyErr, PyObject, PyResult, Python,
};

pub(crate) mod cancel;
mod asyncio;
mod awaitable;
mod cancel;
mod waker;

pub use cancel::CancelHandle;
pub use awaitable::await_in_coroutine;
pub use cancel::{CancelHandle, ThrowCallback};

const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";

pub(crate) enum CoroOp {
Send(PyObject),
Throw(PyObject),
Close,
}

/// Python coroutine wrapping a [`Future`].
#[pyclass(crate = "crate")]
pub struct Coroutine {
name: Option<Py<PyString>>,
qualname_prefix: Option<&'static str>,
throw_callback: Option<ThrowCallback>,
future: Option<Pin<Box<dyn Future<Output = PyResult<PyObject>> + Send>>>,
waker: Option<Arc<AsyncioWaker>>,
waker: Option<Arc<CoroutineWaker>>,
}

// Safety: `Coroutine` is allowed to be `Sync` even though the future is not,
@@ -71,55 +80,58 @@ impl Coroutine {
}
}

fn poll(&mut self, py: Python<'_>, throw: Option<PyObject>) -> PyResult<PyObject> {
fn poll_inner(&mut self, py: Python<'_>, mut op: CoroOp) -> PyResult<PyObject> {
// raise if the coroutine has already been run to completion
let future_rs = match self.future {
Some(ref mut fut) => fut,
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
};
// reraise thrown exception it
match (throw, &self.throw_callback) {
(Some(exc), Some(cb)) => cb.throw(exc),
(Some(exc), None) => {
self.close();
return Err(PyErr::from_value(exc.into_bound(py)));
}
(None, _) => {}
// if the future is not pending on a Python awaitable,
// execute throw callback or complete on close
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) {
match op {
send @ CoroOp::Send(_) => op = send,
CoroOp::Throw(exc) => match &self.throw_callback {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, can we simplify the logic here by modelling the throw callback as another form of delegation? What if CancelHandle also had a .await_in_coroutine() method?

Or maybe CancelHandle can be generalized and becomes CoroutineContext which has a .py_await() method, or something like that? This would also help with communicating the idea of "coroutine context" if it was a physical type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure to follow you, I don't understand what CancelHandle::await_in_coroutine will do.
I don't think the logic here is complex: if there is no delegation, we call the throw_callback when throw is called, otherwise, the delegated throw is called, as you would expect.
However, leveraging on 1.83, I would in fact like to simply remove CancelHandle (more details in the review comment).

Some(cb) => {
cb.throw(exc.clone_ref(py));
op = CoroOp::Send(py.None());
}
None => return Err(PyErr::from_value(exc.into_bound(py))),
},
CoroOp::Close => return Err(PyGeneratorExit::new_err(py.None())),
};
}
// create a new waker, or try to reset it in place
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
waker.reset();
*waker = CoroutineWaker::new(op);
} else {
self.waker = Some(Arc::new(AsyncioWaker::new()));
self.waker = Some(Arc::new(CoroutineWaker::new(op)));
}
let waker = Waker::from(self.waker.clone().unwrap());
// poll the Rust future and forward its results if ready
// poll the future and forward its results if ready; otherwise, yield from waker
// polling is UnwindSafe because the future is dropped in case of panic
let waker = Waker::from(self.waker.clone().unwrap());
let poll = || future_rs.as_mut().poll(&mut Context::from_waker(&waker));
match panic::catch_unwind(panic::AssertUnwindSafe(poll)) {
Ok(Poll::Ready(res)) => {
self.close();
return Err(PyStopIteration::new_err((res?,)));
}
Err(err) => {
self.close();
return Err(PanicException::from_panic_payload(err));
}
_ => {}
Err(err) => Err(PanicException::from_panic_payload(err)),
// See #4407, `PyStopIteration::new_err` argument must be wrap in a tuple,
// otherwise, when a tuple is returned, its fields would be expanded as error
// arguments
Ok(Poll::Ready(res)) => Err(PyStopIteration::new_err((res?,))),
Ok(Poll::Pending) => match self.waker.as_ref().unwrap().yield_(py) {
Ok(to_yield) => Ok(to_yield),
Err(err) => Err(err),
},
}
// otherwise, initialize the waker `asyncio.Future`
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
// and will yield itself if its result has not been set in polling above
if let Some(future) = PyIterator::from_object(future).unwrap().next() {
// future has not been leaked into Python for now, and Rust code can only call
// `set_result(None)` in `Wake` implementation, so it's safe to unwrap
return Ok(future.unwrap().into());
}
}

fn poll(&mut self, py: Python<'_>, op: CoroOp) -> PyResult<PyObject> {
let result = self.poll_inner(py, op);
if result.is_err() {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
}
// if waker has been waken during future polling, this is roughly equivalent to
// `await asyncio.sleep(0)`, so just yield `None`.
Ok(py.None())
result
}
}

@@ -145,25 +157,27 @@ impl Coroutine {
}
}

fn send(&mut self, py: Python<'_>, _value: &Bound<'_, PyAny>) -> PyResult<PyObject> {
self.poll(py, None)
fn send(&mut self, py: Python<'_>, value: PyObject) -> PyResult<PyObject> {
self.poll(py, CoroOp::Send(value))
}

fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
self.poll(py, Some(exc))
self.poll(py, CoroOp::Throw(exc))
}

fn close(&mut self) {
// the Rust future is dropped, and the field set to `None`
// to indicate the coroutine has been run to completion
drop(self.future.take());
fn close(&mut self, py: Python<'_>) -> PyResult<()> {
match self.poll(py, CoroOp::Close) {
Ok(_) => Ok(()),
Err(err) if err.is_instance_of::<PyGeneratorExit>(py) => Ok(()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is PyGeneratorExit a special case here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because that's how coroutine are supposed to works: https://docs.python.org/3/reference/datamodel.html#coroutine.close, so I mimic the behavior.
For example, you can execute this code:

async def example():
    try:
        import asyncio
        await asyncio.Future()
    except GeneratorExit:
        print("close")
        raise

coro = example()
coro.send(None)
coro.close()
# print "close" but don't reraise the exception

There is no close_callback now, but if there was, the error could be caught by the code and be reraised. And with the changes I want to do, it would indeed maybe be possible to catch it in the code, so why not reraising it.

Err(err) => Err(err),
}
}

fn __await__(self_: Py<Self>) -> Py<Self> {
self_
}

fn __next__(&mut self, py: Python<'_>) -> PyResult<PyObject> {
self.poll(py, None)
self.poll(py, CoroOp::Send(py.None()))
}
}
95 changes: 95 additions & 0 deletions src/coroutine/asyncio.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//! Coroutine implementation compatible with asyncio.
use pyo3_macros::pyfunction;

use crate::{
intern,
sync::GILOnceCell,
types::{PyAnyMethods, PyCFunction, PyIterator},
wrap_pyfunction, Bound, IntoPyObjectExt, Py, PyAny, PyObject, PyResult, Python,
};

/// `asyncio.get_running_loop`
fn get_running_loop(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> {
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new();
let import = || -> PyResult<_> {
let module = py.import("asyncio")?;
Ok(module.getattr("get_running_loop")?.into())
};
GET_RUNNING_LOOP
.get_or_try_init(py, import)?
.bind(py)
.call0()
Comment on lines +12 to +21
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment that we can use GILOnceCell::import to simplify here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very welcomed QOL improvement indeed!

}

/// Asyncio-compatible coroutine waker.
///
/// Polling a Rust future yields an `asyncio.Future`, whose `set_result` method is called
/// when `Waker::wake` is called.
pub(super) struct AsyncioWaker {
event_loop: PyObject,
future: PyObject,
}

impl AsyncioWaker {
pub(super) fn new(py: Python<'_>) -> PyResult<Self> {
let event_loop = get_running_loop(py)?.into_py_any(py)?;
let future = event_loop.call_method0(py, intern!(py, "create_future"))?;
Ok(Self { event_loop, future })
}

pub(super) fn yield_(&self, py: Python<'_>) -> PyResult<PyObject> {
let __await__;
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`,
// but `create_future` may have been overriden
let mut iter = match PyIterator::from_object(self.future.bind(py)) {
Comment on lines +42 to +44
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds like this might go wrong if the overridden future defines __next__ and also has important logic in its __await__ method. Should we just always do the call to __await__ (maybe via a slot call again to avoid Python dispatch)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it might go wrong, even if it would be very surprising to do such thing. I only know two alternative event loops:

  • uvloop, by far the most used after asyncio I belive, reuse asyncio.Future;
  • leviathan defines __await__ as __iter__.
    Also, alternative implementations would try to be compatible with asyncio.Future, and this one defines __iter__ = __await__. That's why I think it can be a reasonable assumption to rely on the iterator protocol for an object returned by create_future. And I obviously chose to use the iterator protocol for performance reason over Python dispatch.

However, I didn't think about using am_await slot (don't know how to do for now, will dig), so I should maybe benchmark both approaches to decide which one to keep.

Ok(iter) => iter,
Err(_) => {
__await__ = self.future.call_method0(py, intern!(py, "__await__"))?;
PyIterator::from_object(__await__.bind(py))?
}
};
// future has not been wakened (because `yield_waken` would have been called
// otherwise), so it is expected to yield itself
iter.next().expect("future didn't yield")?.into_py_any(py)
}

#[allow(clippy::unnecessary_wraps)]
pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> {
Ok(py.None())
}
Comment on lines +56 to +59
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe drop the Result wrapping and move that to the caller?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, if you remember, this work is a part of a bigger one that I have tried to split into smaller parts, which explains it has some trace of the parts coming after (in the arbitrary order chosen).
If you look at the next PR, you understand the reason of this signature, which is the common API between asyncio and trio wakers.
That's why I would like to keep it as is if you agree.


pub(super) fn wake(&self, py: Python<'_>) -> PyResult<()> {
static RELEASE_WAITER: GILOnceCell<Py<PyCFunction>> = GILOnceCell::new();
let release_waiter = RELEASE_WAITER
.get_or_try_init(py, || wrap_pyfunction!(release_waiter, py).map(Into::into))?;
// `Future.set_result` must be called in event loop thread,
// so it requires `call_soon_threadsafe`
let call_soon_threadsafe = self.event_loop.call_method1(
py,
intern!(py, "call_soon_threadsafe"),
(release_waiter, &self.future),
);
if let Err(err) = call_soon_threadsafe {
// `call_soon_threadsafe` will raise if the event loop is closed;
// instead of catching an unspecific `RuntimeError`, check directly if it's closed.
let is_closed = self.event_loop.call_method0(py, "is_closed")?;
if !is_closed.extract(py)? {
return Err(err);
}
}
Ok(())
}
}

/// Call `future.set_result` if the future is not done.
///
/// Future can be cancelled by the event loop before being wakened.
/// See <https://github.com/python/cpython/blob/main/Lib/asyncio/tasks.py#L452C5-L452C5>
#[pyfunction(crate = "crate")]
fn release_waiter(future: &Bound<'_, PyAny>) -> PyResult<()> {
let done = future.call_method0(intern!(future.py(), "done"))?;
if !done.extract::<bool>()? {
future.call_method1(intern!(future.py(), "set_result"), (future.py().None(),))?;
}
Ok(())
}
Loading