-
Notifications
You must be signed in to change notification settings - Fork 825
feat: add coroutine::await_in_coroutine
to await awaitables in coroutine context
#3611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
CodSpeed Performance ReportMerging #3611 will not alter performanceComparing Summary
|
d1e9e17
to
53cccb4
Compare
@wyfo I'm sorry for the painfully slow review here. Given that Does this patch rely on #3610 getting merged, or is it possible to rebase this on main? That would allow us to move forward with some of these remaining async PRs. |
2eb4c74
to
732bb84
Compare
There is one thing bothering me with this implementation: the name I've chosen |
e15675f
to
053bfc2
Compare
The error in CI (other than |
2ebb268
to
18f59ea
Compare
coroutine::await_in_coroutine
to await awaitables in coroutine context
As written above, the name |
@davidhewitt As promised, the PR has been rebased on main and is now ready to review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this epic piece of engineering, and I'm very sorry for my long delay in review; it's a complex piece of code and I've only now reviewed!
Overall this looks great and makes sense to proceed with. I asked a ton of questions and had suggestions for cleanup, I think once these are addressed there will likely be another round of review. Now that my head is much more engaged with this code I promise the next review will come much sooner! 🙏
|
||
## Restrictions | ||
|
||
As the name suggests, `await_in_coroutine` resulting future can only be awaited in coroutine context. Otherwise, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By "coroutine context", I understand that means "within an async call stack underneath a #[pyfunction]
or #[pymethods]
"?
Maybe we can adjust wording to something like that? It took me a few reads to process this, and it's maybe not obvious to all users that async
Rust functions become coroutines.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I will try to reword it. But it will maybe more easily understandable when the Coroutine
type will be stabilized.
pub fn PyIter_Send( | ||
iter: *mut PyObject, | ||
arg: *mut PyObject, | ||
presult: *mut *mut PyObject, | ||
) -> c_int; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was fixed in #4746, will need to rebase.
/// }) | ||
/// # } | ||
/// ``` | ||
pub fn await_in_coroutine( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
API question: do you think it's better to have it this way or as a method on PyAnyMethods
?
e.g. obj.bind(py).await_in_coroutine()
?
Is there ever a reasonable way to await one a Python awaitable without being in coroutine context? I guess that there would be no way to send
/ throw
/ close
because the enclosing context would not have such a mechanism. Even more, I suppose that driving a Python awaitable really requires you to be in a Python event loop (i.e. in coroutine context).
That makes me think, is the key difference that "coroutine context" means something to be run on the Python event loop, as opposed to in a Rust runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coroutine context technically means that the Waker
inside the Context
used to poll
the Future
is the one instantiated in Coroutine::poll_inner
. That's not really related to being run on the Python event loop, even if you expect your Coroutine
object to be run on it.
That concept should be properly documented, as you mentioned in another comment.
Is there ever a reasonable way to await one a Python awaitable without being in coroutine context?
Awaiting a Python awaitable means delegating send/throw/close
when it's possible. That's the goal of this PR, to make things as much as it would be done in pure Python.
Actually, it would be technically feasible to "await" a python awaitable in a Rust runtime, but the main issue is that Python async stack is not supposed to be thread safe, and I assume that most awaitables assume to be run by a Python event loop by using asyncio API, so you should expect to have issue if you don't use the Python runtime.
Another point is that Python awaitables have a different semantic than Rust future regarding cancellation: Python awaitable should always be run until completion, there is no such thing as "dropped in the middle of execution" like Rust future (otherwise, finally
block are not executed, and that's so much counterintuitive to debug; I speak from experience, as I've already seen encountered nasty things like this). So wrapping an awaitable in a Rust future that is not guaranteed to be polled into completion is not the best thing to do — Coroutine
are not technically guaranteed to be run until completion, but again, they are supposed to be run on the Python event loop.
That's why I didn't think at that time it's worth to allow something almost guaranteed to behave or fail badly, and I put this conservative protection, i.e. only allows await_in_coroutine
inside "coroutine context"; I do think that panicking is better that returning a PyErr
wrapping things like RuntimeError: no running event loop
.
API question: do you think it's better to have it this way or as a method on PyAnyMethods?
I didn't think about it, but yes, it could be better than a badly named free function like this one.
src/lib.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There seem to be a lot of unrelated formatting changes in this file. I'm not strongly against them, but they seem likely to cause rebase conflicts, can they be reverted please?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I forgot to disable my import formatting when editing this file ...
By the way, I think I will open an issue on this subject, because not having import formatting rules in big projects like PyO3 can be a pain for contributors because of this kind of conflicts.
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) { | ||
match op { | ||
send @ CoroOp::Send(_) => op = send, | ||
CoroOp::Throw(exc) => match &self.throw_callback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder, can we simplify the logic here by modelling the throw callback as another form of delegation? What if CancelHandle
also had a .await_in_coroutine()
method?
Or maybe CancelHandle
can be generalized and becomes CoroutineContext
which has a .py_await()
method, or something like that? This would also help with communicating the idea of "coroutine context" if it was a physical type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to follow you, I don't understand what CancelHandle::await_in_coroutine
will do.
I don't think the logic here is complex: if there is no delegation, we call the throw_callback
when throw
is called, otherwise, the delegated throw
is called, as you would expect.
However, leveraging on 1.83, I would in fact like to simply remove CancelHandle
(more details in the review comment).
fn get_running_loop(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> { | ||
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new(); | ||
let import = || -> PyResult<_> { | ||
let module = py.import("asyncio")?; | ||
Ok(module.getattr("get_running_loop")?.into()) | ||
}; | ||
GET_RUNNING_LOOP | ||
.get_or_try_init(py, import)? | ||
.bind(py) | ||
.call0() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment that we can use GILOnceCell::import
to simplify here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A very welcomed QOL improvement indeed!
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`, | ||
// but `create_future` may have been overriden | ||
let mut iter = match PyIterator::from_object(self.future.bind(py)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds like this might go wrong if the overridden future defines __next__
and also has important logic in its __await__
method. Should we just always do the call to __await__
(maybe via a slot call again to avoid Python dispatch)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, it might go wrong, even if it would be very surprising to do such thing. I only know two alternative event loops:
uvloop
, by far the most used after asyncio I belive, reuseasyncio.Future
;leviathan
defines__await__
as__iter__
.
Also, alternative implementations would try to be compatible withasyncio.Future
, and this one defines__iter__ = __await__
. That's why I think it can be a reasonable assumption to rely on the iterator protocol for an object returned bycreate_future
. And I obviously chose to use the iterator protocol for performance reason over Python dispatch.
However, I didn't think about using am_await
slot (don't know how to do for now, will dig), so I should maybe benchmark both approaches to decide which one to keep.
#[allow(clippy::unnecessary_wraps)] | ||
pub(super) fn yield_waken(py: Python<'_>) -> PyResult<PyObject> { | ||
Ok(py.None()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe drop the Result wrapping and move that to the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if you remember, this work is a part of a bigger one that I have tried to split into smaller parts, which explains it has some trace of the parts coming after (in the arbitrary order chosen).
If you look at the next PR, you understand the reason of this signature, which is the common API between asyncio and trio wakers.
That's why I would like to keep it as is if you agree.
Co-authored-by: David Hewitt <[email protected]>
Co-authored-by: David Hewitt <[email protected]>
Co-authored-by: David Hewitt <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to read you again! And thank you a lot for this detailed review!
As I've written in some comments, Rust 1.83 changes things radically. My hack is no more needed, but there is more. In fact, Waker::vtable
could be used to detect this much written-about "coroutine context", and it could be use to handle cancellation in a much better way.
CancelHandle
is indeed no longer needed. We could just provide an asynchronous function waiting for thrown exception, and awaiting it would register the exception catch in Coroutine
state machine. We could also do the same for close
and GeneratorExit
exception.
In fact, I would like to use Coroutine
static method for that, because it would make it clearer that this features are related to being polled in a coroutine context. For example:
impl Coroutine {
/// Returns the argument of `Coroutine::throw` whenever it's called.
async fn catch_throw() -> PyResult<()> {
todo!()
}
/// Returns whenever `Coroutine::close` is called.
async fn catch_close() {
todo!()
}
/// Wrap a Python awaitable into a Rust `Future` that can be awaited in the context of a `Coroutine`.
async fn delegate(
awaitable: obj: &Bound<'_, PyAny>
) -> PyResult<impl Future<Output = PyResult<PyObject>> + Send + Sync + 'static> {
todo!()
}
What do you think about that idea?
|
||
## Restrictions | ||
|
||
As the name suggests, `await_in_coroutine` resulting future can only be awaited in coroutine context. Otherwise, it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, I will try to reword it. But it will maybe more easily understandable when the Coroutine
type will be stabilized.
if !matches!(self.waker, Some(ref w) if w.is_delegated(py)) { | ||
match op { | ||
send @ CoroOp::Send(_) => op = send, | ||
CoroOp::Throw(exc) => match &self.throw_callback { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure to follow you, I don't understand what CancelHandle::await_in_coroutine
will do.
I don't think the logic here is complex: if there is no delegation, we call the throw_callback
when throw
is called, otherwise, the delegated throw
is called, as you would expect.
However, leveraging on 1.83, I would in fact like to simply remove CancelHandle
(more details in the review comment).
fn close(&mut self, py: Python<'_>) -> PyResult<()> { | ||
match self.poll(py, CoroOp::Close) { | ||
Ok(_) => Ok(()), | ||
Err(err) if err.is_instance_of::<PyGeneratorExit>(py) => Ok(()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because that's how coroutine are supposed to works: https://docs.python.org/3/reference/datamodel.html#coroutine.close, so I mimic the behavior.
For example, you can execute this code:
async def example():
try:
import asyncio
await asyncio.Future()
except GeneratorExit:
print("close")
raise
coro = example()
coro.send(None)
coro.close()
# print "close" but don't reraise the exception
There is no close_callback
now, but if there was, the error could be caught by the code and be reraised. And with the changes I want to do, it would indeed maybe be possible to catch it in the code, so why not reraising it.
fn get_running_loop(py: Python<'_>) -> PyResult<Bound<'_, PyAny>> { | ||
static GET_RUNNING_LOOP: GILOnceCell<PyObject> = GILOnceCell::new(); | ||
let import = || -> PyResult<_> { | ||
let module = py.import("asyncio")?; | ||
Ok(module.getattr("get_running_loop")?.into()) | ||
}; | ||
GET_RUNNING_LOOP | ||
.get_or_try_init(py, import)? | ||
.bind(py) | ||
.call0() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A very welcomed QOL improvement indeed!
// `asyncio.Future` must be awaited; in normal case, it implements `__iter__ = __await__`, | ||
// but `create_future` may have been overriden | ||
let mut iter = match PyIterator::from_object(self.future.bind(py)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, it might go wrong, even if it would be very surprising to do such thing. I only know two alternative event loops:
uvloop
, by far the most used after asyncio I belive, reuseasyncio.Future
;leviathan
defines__await__
as__iter__
.
Also, alternative implementations would try to be compatible withasyncio.Future
, and this one defines__iter__ = __await__
. That's why I think it can be a reasonable assumption to rely on the iterator protocol for an object returned bycreate_future
. And I obviously chose to use the iterator protocol for performance reason over Python dispatch.
However, I didn't think about using am_await
slot (don't know how to do for now, will dig), so I should maybe benchmark both approaches to decide which one to keep.
src/lib.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I forgot to disable my import formatting when editing this file ...
By the way, I think I will open an issue on this subject, because not having import formatting rules in big projects like PyO3 can be a pain for contributors because of this kind of conflicts.
"CancelledError" | ||
) | ||
}); | ||
assert!(!cancel.is_cancelled()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, the equivalent Python code would be
async def wrap_cancellable(awaitable):
try:
await awaitable
except Exception as err:
assert type(err).__name__ == "CancelledError"
Because cancellation is delegated to the awaitable, this one will reraise it (and not the CancelHandle
). Then if we catch the error in wrap_cancellable
and don't reraise it, then the outer task is not cancelled.
But your question makes me understand I should add some comments to better explain the tests.
res = future.fuse() => res, | ||
_ = checkpoint().fuse() => unreachable!(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The same panic is indeed raised at two different places in the code depending on the polling order. I will add some comments to clarify what happens.
enum WakerHack { | ||
Argument(PyObject), | ||
Result(Poll<PyResult<PyObject>>), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm completely for raising the MSRV of the async feature to 1.83!
When I wrote this code, back in 1.74, the feature was far from stabilized, which is why I came up with this hack. But using 1.83 would in fact bring other benefits (more details in the review comment).
/// }) | ||
/// # } | ||
/// ``` | ||
pub fn await_in_coroutine( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Coroutine context technically means that the Waker
inside the Context
used to poll
the Future
is the one instantiated in Coroutine::poll_inner
. That's not really related to being run on the Python event loop, even if you expect your Coroutine
object to be run on it.
That concept should be properly documented, as you mentioned in another comment.
Is there ever a reasonable way to await one a Python awaitable without being in coroutine context?
Awaiting a Python awaitable means delegating send/throw/close
when it's possible. That's the goal of this PR, to make things as much as it would be done in pure Python.
Actually, it would be technically feasible to "await" a python awaitable in a Rust runtime, but the main issue is that Python async stack is not supposed to be thread safe, and I assume that most awaitables assume to be run by a Python event loop by using asyncio API, so you should expect to have issue if you don't use the Python runtime.
Another point is that Python awaitables have a different semantic than Rust future regarding cancellation: Python awaitable should always be run until completion, there is no such thing as "dropped in the middle of execution" like Rust future (otherwise, finally
block are not executed, and that's so much counterintuitive to debug; I speak from experience, as I've already seen encountered nasty things like this). So wrapping an awaitable in a Rust future that is not guaranteed to be polled into completion is not the best thing to do — Coroutine
are not technically guaranteed to be run until completion, but again, they are supposed to be run on the Python event loop.
That's why I didn't think at that time it's worth to allow something almost guaranteed to behave or fail badly, and I put this conservative protection, i.e. only allows await_in_coroutine
inside "coroutine context"; I do think that panicking is better that returning a PyErr
wrapping things like RuntimeError: no running event loop
.
API question: do you think it's better to have it this way or as a method on PyAnyMethods?
I didn't think about it, but yes, it could be better than a badly named free function like this one.
Relates to #1632
Draft based on #3610