Skip to content

Commit

Permalink
Allow cancellation of all capability contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
LasseBlaauwbroek authored and haata committed Nov 8, 2023
1 parent 49bda5c commit 0ec4d0b
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
17 changes: 16 additions & 1 deletion capnp/helpers/capabilityHelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,19 @@ class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server
kj::Own<PyRefCounter> py_server;
kj::Own<PyRefCounter> kj_loop;

#if (CAPNP_VERSION_MAJOR < 1)
PythonInterfaceDynamicImpl(capnp::InterfaceSchema & schema,
kj::Own<PyRefCounter> _py_server,
kj::Own<PyRefCounter> kj_loop)
: capnp::DynamicCapability::Server(schema), py_server(kj::mv(_py_server)), kj_loop(kj::mv(kj_loop)) { }
: capnp::DynamicCapability::Server(schema),
py_server(kj::mv(_py_server)), kj_loop(kj::mv(kj_loop)) { }
#else
PythonInterfaceDynamicImpl(capnp::InterfaceSchema & schema,
kj::Own<PyRefCounter> _py_server,
kj::Own<PyRefCounter> kj_loop)
: capnp::DynamicCapability::Server(schema, { true }),
py_server(kj::mv(_py_server)), kj_loop(kj::mv(kj_loop)) { }
#endif

~PythonInterfaceDynamicImpl() {
}
Expand All @@ -87,6 +96,12 @@ class PythonInterfaceDynamicImpl final: public capnp::DynamicCapability::Server
capnp::CallContext< capnp::DynamicStruct, capnp::DynamicStruct> context);
};

inline void allowCancellation(capnp::CallContext<capnp::DynamicStruct, capnp::DynamicStruct> context) {
#if (CAPNP_VERSION_MAJOR < 1)
context.allowCancellation();
#endif
}

class PyAsyncIoStream: public kj::AsyncIoStream {
public:
kj::Own<PyRefCounter> protocol;
Expand Down
5 changes: 3 additions & 2 deletions capnp/helpers/helpers.pxd
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from capnp.includes.capnp_cpp cimport (
Maybe, PyPromise, VoidPromise, RemotePromise,
DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue, Capability,
RpcSystem, MessageBuilder, Own, PyRefCounter, Node, DynamicStruct
DynamicCapability, InterfaceSchema, EnumSchema, StructSchema, DynamicValue, Capability,
RpcSystem, MessageBuilder, Own, PyRefCounter, Node, DynamicStruct, CallContext
)

from capnp.includes.schema_cpp cimport ByteArray
Expand All @@ -22,6 +22,7 @@ cdef extern from "capnp/helpers/capabilityHelper.h":
PyPromise convert_to_pypromise(RemotePromise)
PyPromise convert_to_pypromise(VoidPromise)
VoidPromise taskToPromise(Own[PyRefCounter] coroutine, PyObject* callback)
void allowCancellation(CallContext context) except +reraise_kj_exception nogil
void init_capnp_api()

cdef extern from "capnp/helpers/rpcHelper.h":
Expand Down
3 changes: 2 additions & 1 deletion capnp/lib/capnp.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ async def kj_loop():

async def run(coro):
"""Ensure that the coroutine runs while the KJ event loop is running
This is a shortcut for wrapping the coroutine in a :py:meth:`capnp.kj_loop` context manager.
:param coro: Coroutine to run
Expand All @@ -1949,6 +1949,7 @@ cdef class _CallContext:
cdef CallContext * thisptr

cdef _init(self, CallContext other):
helpers.allowCancellation(other)
self.thisptr = new CallContext(move(other))
return self

Expand Down
20 changes: 20 additions & 0 deletions test/test_capability.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
import asyncio

import capnp
import test_capability_capnp as capability
Expand Down Expand Up @@ -389,3 +390,22 @@ async def test_generic():
obj = capnp._MallocMessageBuilder().get_root_as_any()
obj.set_as_text("anypointer_")
assert (await client.foo(obj)).b == "anypointer_test"


class CancelServer(capability.TestInterface.Server):
def __init__(self, val=1):
self.val = val

async def foo(self, i, j, **kwargs):
with pytest.raises(asyncio.CancelledError):
await asyncio.sleep(10)


async def test_cancel2():
client = capability.TestInterface._new_client(CancelServer())

task = asyncio.ensure_future(client.foo(1, True))
await asyncio.sleep(0) # Make sure that the task runs
task.cancel()
with pytest.raises(asyncio.CancelledError):
await task

0 comments on commit 0ec4d0b

Please sign in to comment.