Skip to content

Commit 627841f

Browse files
Joseph Perezwyfo
Joseph Perez
authored andcommitted
feat: support async fn in macros with coroutine implementation
1 parent abe518d commit 627841f

20 files changed

+474
-61
lines changed

Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ unindent = { version = "0.2.1", optional = true }
3131
# support crate for multiple-pymethods feature
3232
inventory = { version = "0.3.0", optional = true }
3333

34+
# coroutine implementation
35+
futures-util = "0.3"
36+
3437
# crate integrations that can be added using the eponymous features
3538
anyhow = { version = "1.0", optional = true }
3639
chrono = { version = "0.4.25", default-features = false, optional = true }
@@ -54,6 +57,7 @@ serde = { version = "1.0", features = ["derive"] }
5457
serde_json = "1.0.61"
5558
rayon = "1.6.1"
5659
widestring = "0.5.1"
60+
futures = "0.3.28"
5761

5862
[build-dependencies]
5963
pyo3-build-config = { path = "pyo3-build-config", version = "0.21.0-dev", features = ["resolve-config"] }

guide/src/SUMMARY.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
- [Conversion traits](conversions/traits.md)]
2020
- [Python exceptions](exception.md)
2121
- [Calling Python from Rust](python_from_rust.md)
22+
- [Using `async` and `await`](async-await.md)
2223
- [GIL, mutability and object types](types.md)
2324
- [Parallelism](parallelism.md)
2425
- [Debugging](debugging.md)

guide/src/async-await.md

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
# Using `async` and `await`
2+
3+
*This feature is still in active development. See [the related issue](https://github.com/PyO3/pyo3/issues/1632).*
4+
5+
`#[pyfunction]` and `#[pymethods]` attributes also support `async fn`.
6+
7+
```rust
8+
# #![allow(dead_code)]
9+
use std::{thread, time::Duration};
10+
use futures::channel::oneshot;
11+
use pyo3::prelude::*;
12+
13+
#[pyfunction]
14+
async fn sleep(seconds: f64, result: Option<PyObject>) -> Option<PyObject> {
15+
let (tx, rx) = oneshot::channel();
16+
thread::spawn(move || {
17+
thread::sleep(Duration::from_secs_f64(seconds));
18+
tx.send(()).unwrap();
19+
});
20+
rx.await.unwrap();
21+
result
22+
}
23+
```
24+
25+
*Python awaitables instantiated with this method can only be awaited in *asyncio* context. Other Python async runtime may be supported in the future.*
26+
27+
## `Send + 'static` constraint
28+
29+
Resulting future of an `async fn` decorated by `#[pyfunction]` must be `Send + 'static` to be embedded in a Python object.
30+
31+
As a consequence, `async fn` parameters and return types must also be `Send + 'static`, so it is not possible to have a signature like `async fn does_not_compile(arg: &PyAny, py: Python<'_>) -> &PyAny`.
32+
33+
It also means that methods cannot use `&self`/`&mut self`, *but this restriction should be dropped in the future.*
34+
35+
36+
## Implicit GIL holding
37+
38+
Even if it is not possible to pass a `py: Python<'_>` parameter to `async fn`, the GIL is still held during the execution of the future – it's also the case for regular `fn` without `Python<'_>`/`&PyAny` parameter, yet the GIL is held.
39+
40+
It is still possible to get a `Python` marker using [`Python::with_gil`]({{#PYO3_DOCS_URL}}/pyo3/struct.Python.html#method.with_gil); because `with_gil` is reentrant and optimized, the cost will be negligible.
41+
42+
## Release the GIL across `.await`
43+
44+
There is currently no simple way to release the GIL when awaiting a future, *but solutions are currently in development*.
45+
46+
Here is the advised workaround for now:
47+
48+
```rust,ignore
49+
use std::{future::Future, pin::{Pin, pin}, task::{Context, Poll}};
50+
use pyo3::prelude::*;
51+
52+
struct AllowThreads<F>(F);
53+
54+
impl<F> Future for AllowThreads<F>
55+
where
56+
F: Future + Unpin + Send,
57+
F::Output: Send,
58+
{
59+
type Output = F::Output;
60+
61+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
62+
let waker = cx.waker();
63+
Python::with_gil(|gil| {
64+
gil.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker)))
65+
})
66+
}
67+
}
68+
```
69+
70+
## Cancellation
71+
72+
*To be implemented*
73+
74+
## The `Coroutine` type
75+
76+
To make a Rust future awaitable in Python, PyO3 defines a [`Coroutine`]({{#PYO3_DOCS_URL}}/pyo3/coroutine/struct.Coroutine.html) type, which implements the Python [coroutine protocol](https://docs.python.org/3/library/collections.abc.html#collections.abc.Coroutine). Each `coroutine.send` call is translated to `Future::poll` call, while `coroutine.throw` call reraise the exception *(this behavior will be configurable with cancellation support)*.
77+
78+
*The type does not yet have a public constructor until the design is finalized.*

guide/src/ecosystem/async-await.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Using `async` and `await`
22

3+
*`async`/`await` support is currently being integrated in PyO3. See the [dedicated documentation](../async-await.md)*
4+
35
If you are working with a Python library that makes use of async functions or wish to provide
46
Python bindings for an async Rust library, [`pyo3-asyncio`](https://github.com/awestlake87/pyo3-asyncio)
57
likely has the tools you need. It provides conversions between async functions in both Python and

newsfragments/3540.added.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Support `async fn` in macros with coroutine implementation

pyo3-macros-backend/src/method.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ pub struct FnSpec<'a> {
228228
pub output: syn::Type,
229229
pub convention: CallingConvention,
230230
pub text_signature: Option<TextSignatureAttribute>,
231+
pub asyncness: Option<syn::Token![async]>,
231232
pub unsafety: Option<syn::Token![unsafe]>,
232233
pub deprecations: Deprecations,
233234
}
@@ -317,6 +318,7 @@ impl<'a> FnSpec<'a> {
317318
signature,
318319
output: ty,
319320
text_signature,
321+
asyncness: sig.asyncness,
320322
unsafety: sig.unsafety,
321323
deprecations,
322324
})
@@ -445,7 +447,11 @@ impl<'a> FnSpec<'a> {
445447
let func_name = &self.name;
446448

447449
let rust_call = |args: Vec<TokenStream>| {
448-
quotes::map_result_into_ptr(quotes::ok_wrap(quote! { function(#self_arg #(#args),*) }))
450+
let mut call = quote! { function(#self_arg #(#args),*) };
451+
if self.asyncness.is_some() {
452+
call = quote! { _pyo3::impl_::coroutine::wrap_future(#call) };
453+
}
454+
quotes::map_result_into_ptr(quotes::ok_wrap(call))
449455
};
450456

451457
let rust_name = if let Some(cls) = cls {

pyo3-macros-backend/src/pyfunction.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::{
66
deprecations::Deprecations,
77
method::{self, CallingConvention, FnArg},
88
pymethod::check_generic,
9-
utils::{ensure_not_async_fn, get_pyo3_crate},
9+
utils::get_pyo3_crate,
1010
};
1111
use proc_macro2::TokenStream;
1212
use quote::{format_ident, quote};
@@ -179,8 +179,6 @@ pub fn impl_wrap_pyfunction(
179179
options: PyFunctionOptions,
180180
) -> syn::Result<TokenStream> {
181181
check_generic(&func.sig)?;
182-
ensure_not_async_fn(&func.sig)?;
183-
184182
let PyFunctionOptions {
185183
pass_module,
186184
name,
@@ -230,6 +228,7 @@ pub fn impl_wrap_pyfunction(
230228
signature,
231229
output: ty,
232230
text_signature,
231+
asyncness: func.sig.asyncness,
233232
unsafety: func.sig.unsafety,
234233
deprecations: Deprecations::new(),
235234
};

pyo3-macros-backend/src/pymethod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::borrow::Cow;
22

33
use crate::attributes::{NameAttribute, RenamingRule};
44
use crate::method::{CallingConvention, ExtractErrorMode};
5-
use crate::utils::{ensure_not_async_fn, PythonDoc};
5+
use crate::utils::PythonDoc;
66
use crate::{
77
method::{FnArg, FnSpec, FnType, SelfType},
88
pyfunction::PyFunctionOptions,
@@ -188,7 +188,6 @@ pub fn gen_py_method(
188188
options: PyFunctionOptions,
189189
) -> Result<GeneratedPyMethod> {
190190
check_generic(sig)?;
191-
ensure_not_async_fn(sig)?;
192191
ensure_function_options_valid(&options)?;
193192
let method = PyMethod::parse(sig, meth_attrs, options)?;
194193
let spec = &method.spec;

pyo3-macros-backend/src/utils.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use proc_macro2::{Span, TokenStream};
22
use quote::ToTokens;
3-
use syn::{punctuated::Punctuated, spanned::Spanned, Token};
3+
use syn::{punctuated::Punctuated, Token};
44

55
use crate::attributes::{CrateAttribute, RenamingRule};
66

@@ -137,17 +137,6 @@ impl quote::ToTokens for PythonDoc {
137137
}
138138
}
139139

140-
pub fn ensure_not_async_fn(sig: &syn::Signature) -> syn::Result<()> {
141-
if let Some(asyncness) = &sig.asyncness {
142-
bail_spanned!(
143-
asyncness.span() => "`async fn` is not yet supported for Python functions.\n\n\
144-
Additional crates such as `pyo3-asyncio` can be used to integrate async Rust and \
145-
Python. For more information, see https://github.com/PyO3/pyo3/issues/1632"
146-
);
147-
};
148-
Ok(())
149-
}
150-
151140
pub fn unwrap_ty_group(mut ty: &syn::Type) -> &syn::Type {
152141
while let syn::Type::Group(g) = ty {
153142
ty = &*g.elem;

src/coroutine.rs

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
//! Python coroutine implementation, used notably when wrapping `async fn`
2+
//! with `#[pyfunction]`/`#[pymethods]`.
3+
use std::{
4+
any::Any,
5+
future::Future,
6+
panic,
7+
pin::Pin,
8+
sync::Arc,
9+
task::{Context, Poll},
10+
};
11+
12+
use futures_util::FutureExt;
13+
use pyo3_macros::{pyclass, pymethods};
14+
15+
use crate::{
16+
coroutine::waker::AsyncioWaker,
17+
exceptions::{PyRuntimeError, PyStopIteration},
18+
panic::PanicException,
19+
pyclass::IterNextOutput,
20+
types::PyIterator,
21+
IntoPy, Py, PyAny, PyErr, PyObject, PyResult, Python,
22+
};
23+
24+
mod waker;
25+
26+
const COROUTINE_REUSED_ERROR: &str = "cannot reuse already awaited coroutine";
27+
28+
type FutureOutput = Result<PyResult<PyObject>, Box<dyn Any + Send>>;
29+
30+
/// Python coroutine wrapping a [`Future`].
31+
#[pyclass(crate = "crate")]
32+
pub struct Coroutine {
33+
future: Option<Pin<Box<dyn Future<Output = FutureOutput> + Send>>>,
34+
waker: Option<Arc<AsyncioWaker>>,
35+
}
36+
37+
impl Coroutine {
38+
/// Wrap a future into a Python coroutine.
39+
///
40+
/// Coroutine `send` polls the wrapped future, ignoring the value passed
41+
/// (should always be `None` anyway).
42+
///
43+
/// `Coroutine `throw` drop the wrapped future and reraise the exception passed
44+
pub(crate) fn from_future<F, T, E>(future: F) -> Self
45+
where
46+
F: Future<Output = Result<T, E>> + Send + 'static,
47+
T: IntoPy<PyObject>,
48+
PyErr: From<E>,
49+
{
50+
let wrap = async move {
51+
let obj = future.await?;
52+
// SAFETY: GIL is acquired when future is polled (see `Coroutine::poll`)
53+
Ok(obj.into_py(unsafe { Python::assume_gil_acquired() }))
54+
};
55+
Self {
56+
future: Some(Box::pin(panic::AssertUnwindSafe(wrap).catch_unwind())),
57+
waker: None,
58+
}
59+
}
60+
61+
fn poll(
62+
&mut self,
63+
py: Python<'_>,
64+
throw: Option<PyObject>,
65+
) -> PyResult<IterNextOutput<PyObject, PyObject>> {
66+
// raise if the coroutine has already been run to completion
67+
let future_rs = match self.future {
68+
Some(ref mut fut) => fut,
69+
None => return Err(PyRuntimeError::new_err(COROUTINE_REUSED_ERROR)),
70+
};
71+
// reraise thrown exception it
72+
if let Some(exc) = throw {
73+
self.close();
74+
return Err(PyErr::from_value(exc.as_ref(py)));
75+
}
76+
// create a new waker, or try to reset it in place
77+
if let Some(waker) = self.waker.as_mut().and_then(Arc::get_mut) {
78+
waker.reset();
79+
} else {
80+
self.waker = Some(Arc::new(AsyncioWaker::new()));
81+
}
82+
let waker = futures_util::task::waker(self.waker.clone().unwrap());
83+
// poll the Rust future and forward its results if ready
84+
if let Poll::Ready(res) = future_rs.as_mut().poll(&mut Context::from_waker(&waker)) {
85+
self.close();
86+
return match res {
87+
Ok(res) => Ok(IterNextOutput::Return(res?)),
88+
Err(err) => Err(PanicException::from_panic_payload(err)),
89+
};
90+
}
91+
// otherwise, initialize the waker `asyncio.Future`
92+
if let Some(future) = self.waker.as_ref().unwrap().initialize_future(py)? {
93+
// `asyncio.Future` must be awaited; fortunately, it implements `__iter__ = __await__`
94+
// and will yield itself if its result has not been set in polling above
95+
if let Some(future) = PyIterator::from_object(future).unwrap().next() {
96+
// future has not been leaked into Python for now, and Rust code can only call
97+
// `set_result(None)` in `ArcWake` implementation, so it's safe to unwrap
98+
return Ok(IterNextOutput::Yield(future.unwrap().into()));
99+
}
100+
}
101+
// if waker has been waken during future polling, this is roughly equivalent to
102+
// `await asyncio.sleep(0)`, so just yield `None`.
103+
Ok(IterNextOutput::Yield(py.None().into()))
104+
}
105+
}
106+
107+
pub(crate) fn iter_result(result: IterNextOutput<PyObject, PyObject>) -> PyResult<PyObject> {
108+
match result {
109+
IterNextOutput::Yield(ob) => Ok(ob),
110+
IterNextOutput::Return(ob) => Err(PyStopIteration::new_err(ob)),
111+
}
112+
}
113+
114+
#[pymethods(crate = "crate")]
115+
impl Coroutine {
116+
fn send(&mut self, py: Python<'_>, _value: &PyAny) -> PyResult<PyObject> {
117+
iter_result(self.poll(py, None)?)
118+
}
119+
120+
fn throw(&mut self, py: Python<'_>, exc: PyObject) -> PyResult<PyObject> {
121+
iter_result(self.poll(py, Some(exc))?)
122+
}
123+
124+
fn close(&mut self) {
125+
// the Rust future is dropped, and the field set to `None`
126+
// to indicate the coroutine has been run to completion
127+
drop(self.future.take());
128+
}
129+
130+
fn __await__(self_: Py<Self>) -> Py<Self> {
131+
self_
132+
}
133+
134+
fn __next__(&mut self, py: Python<'_>) -> PyResult<IterNextOutput<PyObject, PyObject>> {
135+
self.poll(py, None)
136+
}
137+
}

0 commit comments

Comments
 (0)