From 4fb4ef848e26d1d47f04867eb888904fa451a282 Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Thu, 19 Dec 2024 21:55:42 -0500 Subject: [PATCH 01/13] feat: support coroutine functions --- src/psygnal/_weak_callback.py | 79 ++++++++++++++++++++++++++++++----- tests/test_weak_callable.py | 62 +++++++++++++++++++++++---- 2 files changed, 122 insertions(+), 19 deletions(-) diff --git a/src/psygnal/_weak_callback.py b/src/psygnal/_weak_callback.py index 332ccfd3..172a49ed 100644 --- a/src/psygnal/_weak_callback.py +++ b/src/psygnal/_weak_callback.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import sys import weakref from functools import partial @@ -19,6 +20,8 @@ from ._mypyc import mypyc_attr if TYPE_CHECKING: + from collections.abc import Coroutine + import toolz from typing_extensions import TypeAlias, TypeGuard # py310 @@ -28,6 +31,9 @@ _T = TypeVar("_T") _R = TypeVar("_R") # return type of cb +# reference to all background tasks created by Coroutine WeakCallbacks +_BACKGROUND_TASKS: set[asyncio.Task] = set() + def _is_toolz_curry(obj: Any) -> TypeGuard[toolz.curry]: """Return True if obj is a toolz.curry object.""" @@ -124,14 +130,17 @@ def _on_delete(weak_cb): kwargs = cb.keywords cb = cb.func + is_coro = asyncio.iscoroutinefunction(cb) + if isinstance(cb, FunctionType): - return ( - StrongFunction(cb, max_args, args, kwargs, priority=priority) - if strong_func - else WeakFunction( + if strong_func: + cls = StrongCoroutineFunction if is_coro else StrongFunction + return cls(cb, max_args, args, kwargs, priority=priority) + else: + wcls = WeakCoroutineFunction if is_coro else WeakFunction + return wcls( cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority ) - ) if isinstance(cb, MethodType): if getattr(cb, "__name__", None) == "__setitem__": @@ -145,7 +154,8 @@ def _on_delete(weak_cb): return WeakSetitem( obj, key, max_args, finalize, on_ref_error, priority=priority ) - return WeakMethod( + mcls = WeakCoroutineMethod if is_coro else WeakMethod + return mcls( cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority ) @@ -225,7 +235,7 @@ def __init__( self.priority: int = priority - def cb(self, args: tuple[Any, ...] = ()) -> None: + def cb(self, args: tuple[Any, ...] = ()) -> Any: """Call the callback with `args`. Args will be spread when calling the func.""" raise NotImplementedError() @@ -334,6 +344,8 @@ def _cb(_: weakref.ReferenceType) -> None: class StrongFunction(WeakCallback): """Wrapper around a strong function reference.""" + _f: Callable + def __init__( self, obj: Callable, @@ -351,7 +363,7 @@ def __init__( if args: self._object_repr = f"{self._object_repr}{(*args,)!r}".replace(")", " ...)") - def cb(self, args: tuple[Any, ...] = ()) -> None: + def cb(self, args: tuple[Any, ...] = ()) -> Any: if self._max_args is not None: args = args[: self._max_args] self._f(*self._args, *args, **self._kwargs) @@ -370,6 +382,21 @@ def __setstate__(self, state: dict) -> None: setattr(self, k, v) +class StrongCoroutineFunction(StrongFunction): + """Wrapper around a strong coroutine function reference.""" + + _f: Callable[..., Coroutine] + + def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: + if self._max_args is not None: + args = args[: self._max_args] + coroutine = self._f(*self._args, *args, **self._kwargs) + task = asyncio.create_task(coroutine) + _BACKGROUND_TASKS.add(task) + task.add_done_callback(_BACKGROUND_TASKS.discard) + return coroutine + + class WeakFunction(WeakCallback): """Wrapper around a weak function reference.""" @@ -391,7 +418,7 @@ def __init__( if args: self._object_repr = f"{self._object_repr}{(*args,)!r}".replace(")", " ...)") - def cb(self, args: tuple[Any, ...] = ()) -> None: + def cb(self, args: tuple[Any, ...] = ()) -> Any: f = self._f() if f is None: raise ReferenceError("weakly-referenced object no longer exists") @@ -408,6 +435,21 @@ def dereference(self) -> Callable | None: return f +class WeakCoroutineFunction(WeakFunction): + def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: + f = self._f() + if f is None: + raise ReferenceError("weakly-referenced object no longer exists") + if self._max_args is not None: + args = args[: self._max_args] + coroutine = f(*self._args, *args, **self._kwargs) + + task = asyncio.create_task(coroutine) + _BACKGROUND_TASKS.add(task) + task.add_done_callback(_BACKGROUND_TASKS.discard) + return coroutine + + class WeakMethod(WeakCallback): """Wrapper around a method bound to a weakly-referenced object. @@ -442,7 +484,7 @@ def slot_repr(self) -> str: func_name = getattr(self._func_ref(), "__name__", "") return f"{self._obj_module}.{obj.__class__.__qualname__}.{func_name}" - def cb(self, args: tuple[Any, ...] = ()) -> None: + def cb(self, args: tuple[Any, ...] = ()) -> Any: obj = self._obj_ref() func = self._func_ref() if obj is None or func is None: @@ -463,6 +505,23 @@ def dereference(self) -> MethodType | partial | None: return method +class WeakCoroutineMethod(WeakMethod): + def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: + obj = self._obj_ref() + func = self._func_ref() + if obj is None or func is None: + raise ReferenceError("weakly-referenced object no longer exists") + + if self._max_args is not None: + args = args[: self._max_args] + coroutine = func(obj, *self._args, *args, **self._kwargs) + + task = asyncio.create_task(coroutine) + _BACKGROUND_TASKS.add(task) + task.add_done_callback(_BACKGROUND_TASKS.discard) + return coroutine + + class WeakBuiltin(WeakCallback): """Wrapper around a c-based method on a weakly-referenced object. diff --git a/tests/test_weak_callable.py b/tests/test_weak_callable.py index e52e7e85..c24a2751 100644 --- a/tests/test_weak_callable.py +++ b/tests/test_weak_callable.py @@ -1,3 +1,4 @@ +import asyncio import gc import re from functools import partial @@ -15,10 +16,13 @@ "type_", [ "function", + "coroutinefunc", "toolz_function", "weak_func", + "weak_coroutinefunc", "lambda", "method", + "coroutinemethod", "partial_method", "toolz_method", "setattr", @@ -33,7 +37,11 @@ def test_slot_types(type_: str, capsys: Any) -> None: final_mock = Mock() class MyObj: - def method(self, x: int) -> None: + def method(self, x: int) -> int: + mock(x) + return x + + async def coroutine_method(self, x: int) -> int: mock(x) return x @@ -41,12 +49,12 @@ def __setitem__(self, key, value): mock(value) return value - def __setattr__(self, __name: str, __value) -> None: + def __setattr__(self, __name: str, __value: Any) -> Any: if __name == "x": mock(__value) return __value - obj = MyObj() + obj: Any = MyObj() if type_ == "setattr": cb = weak_callback(setattr, obj, "x", finalize=final_mock) @@ -54,16 +62,25 @@ def __setattr__(self, __name: str, __value) -> None: cb = weak_callback(obj.__setitem__, "x", finalize=final_mock) elif type_ in {"function", "weak_func"}: - def obj(x: int) -> None: + def obj(x: int) -> int: mock(x) return x cb = weak_callback(obj, strong_func=(type_ == "function"), finalize=final_mock) + elif type_ in {"coroutinefunc", "weak_coroutinefunc"}: + + async def obj(x: int) -> int: + mock(x) + return x + + cb = weak_callback( + obj, strong_func=(type_ == "coroutinefunc"), finalize=final_mock + ) elif type_ == "toolz_function": toolz = pytest.importorskip("toolz") @toolz.curry - def obj(z: int, x: int) -> None: + def obj(z: int, x: int) -> int: mock(x) return x @@ -72,6 +89,8 @@ def obj(z: int, x: int) -> None: cb = weak_callback(lambda x: mock(x) and x, finalize=final_mock) elif type_ == "method": cb = weak_callback(obj.method, finalize=final_mock) + elif type_ == "coroutinemethod": + cb = weak_callback(obj.coroutine_method, finalize=final_mock) elif type_ == "partial_method": cb = weak_callback(partial(obj.method, 2), max_args=0, finalize=final_mock) elif type_ == "toolz_method": @@ -87,7 +106,15 @@ def obj(z: int, x: int) -> None: assert isinstance(cb, WeakCallback) assert isinstance(cb.slot_repr(), str) - cb.cb((2,)) + + if "coroutine" in type_: + + async def main() -> None: + await cb.cb((2,)) + + asyncio.run(main()) + else: + cb.cb((2,)) assert cb.dereference() is not None if type_ == "print": assert capsys.readouterr().out == "2\n" @@ -95,14 +122,24 @@ def obj(z: int, x: int) -> None: mock.assert_called_once_with(2) mock.reset_mock() - result = cb(2) + + if "coroutine" in type_: + result: Any = None + + async def main() -> None: + nonlocal result + result = await cb(2) + + asyncio.run(main()) + else: + result = cb(2) if type_ not in ("setattr", "mock"): assert result == 2 mock.assert_called_once_with(2) del obj - if type_ not in ("function", "toolz_function", "lambda", "mock"): + if type_ not in ("function", "coroutinefunc", "toolz_function", "lambda", "mock"): final_mock.assert_called_once_with(cb) assert cb.dereference() is None with pytest.raises(ReferenceError): @@ -110,7 +147,14 @@ def obj(z: int, x: int) -> None: with pytest.raises(ReferenceError): cb(2) else: - cb.cb((4,)) + if "coroutine" in type_: + + async def main() -> None: + await cb.cb((4,)) + + asyncio.run(main()) + else: + cb.cb((4,)) mock.assert_called_with(4) From 533b838c124da0e19d93476930b5dcb872cd00a9 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Sun, 16 Feb 2025 22:45:56 +0100 Subject: [PATCH 02/13] Add async support for other event loops --- src/psygnal/__init__.py | 3 ++ src/psygnal/_async.py | 79 +++++++++++++++++++++++++++++++++++ src/psygnal/_weak_callback.py | 43 +++++++------------ 3 files changed, 98 insertions(+), 27 deletions(-) create mode 100644 src/psygnal/_async.py diff --git a/src/psygnal/__init__.py b/src/psygnal/__init__.py index 8ffd9483..68c5a180 100644 --- a/src/psygnal/__init__.py +++ b/src/psygnal/__init__.py @@ -31,8 +31,10 @@ "debounced", "emit_queued", "evented", + "get_async_backend", "get_evented_namespace", "is_evented", + "set_async_backend", "throttled", ] @@ -48,6 +50,7 @@ stacklevel=2, ) +from ._async import get_async_backend, set_async_backend from ._evented_decorator import evented from ._exceptions import EmitLoopError from ._group import EmissionInfo, SignalGroup diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py new file mode 100644 index 00000000..42df67d5 --- /dev/null +++ b/src/psygnal/_async.py @@ -0,0 +1,79 @@ +_async_backend = None + + +def get_async_backend(): + return _async_backend + + +def set_async_backend(backend: str = "asyncio"): + global _async_backend + + if _async_backend is not None: + raise RuntimeError(f"Async backend already set to: {_async_backend._backend}") + + if backend == "asyncio": + + import asyncio + + class AsyncBackend: + def __init__(self): + self._backend = backend + self._queue = asyncio.Queue() + self.__running = False + + @property + def _running(self) -> bool: + return self.__running + + def _put(self, item) -> None: + self._queue.put_nowait(item) + + async def _get(self): + return await self._queue.get() + + async def run(self) -> None: + self.__running = True + while True: + item = await self._get() + cb = item[0] + args = item[1:-1] + kwargs = item[-1] + await cb(*args, **kwargs) + + _async_backend = AsyncBackend() + + elif backend == "anyio": + + import anyio + + class AsyncBackend: + def __init__(self): + self._backend = backend + self._send_stream, self._receive_stream = anyio.create_memory_object_stream(max_buffer_size=float("inf")) + self.__running = False + + @property + def _running(self) -> bool: + return self.__running + + def _put(self, item) -> None: + self._send_stream.send_nowait(item) + + async def _get(self): + return await self._receive_stream.receive() + + async def run(self) -> None: + self.__running = True + async with self._receive_stream: + async for item in self._receive_stream: + cb = item[0] + args = item[1:-1] + kwargs = item[-1] + await cb(*args, **kwargs) + + _async_backend = AsyncBackend() + + else: + raise RuntimeError(f"Async backend not supported: {backend}") + + return _async_backend diff --git a/src/psygnal/_weak_callback.py b/src/psygnal/_weak_callback.py index 480f4192..341ad30c 100644 --- a/src/psygnal/_weak_callback.py +++ b/src/psygnal/_weak_callback.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import inspect import sys import weakref from functools import partial @@ -8,6 +9,7 @@ from typing import ( TYPE_CHECKING, Any, + Awaitable, Callable, Generic, Literal, @@ -17,11 +19,10 @@ ) from warnings import warn +from ._async import get_async_backend from ._mypyc import mypyc_attr if TYPE_CHECKING: - from collections.abc import Coroutine - import toolz from typing_extensions import TypeAlias, TypeGuard # py310 @@ -31,9 +32,6 @@ _T = TypeVar("_T") _R = TypeVar("_R") # return type of cb -# reference to all background tasks created by Coroutine WeakCallbacks -_BACKGROUND_TASKS: set[asyncio.Task] = set() - def _is_toolz_curry(obj: Any) -> TypeGuard[toolz.curry]: """Return True if obj is a toolz.curry object.""" @@ -130,7 +128,12 @@ def _on_delete(weak_cb): kwargs = cb.keywords cb = cb.func - is_coro = asyncio.iscoroutinefunction(cb) + is_coro = inspect.iscoroutinefunction(cb) + if is_coro: + if get_async_backend() is None: + raise RuntimeError("No async backend set: call `set_async_backend()`") + if not get_async_backend()._running: + raise RuntimeError("Async backend not running (launch `get_async_backend().run()` in a background task)") if isinstance(cb, FunctionType): if strong_func: @@ -386,16 +389,12 @@ def __setstate__(self, state: dict) -> None: class StrongCoroutineFunction(StrongFunction): """Wrapper around a strong coroutine function reference.""" - _f: Callable[..., Coroutine] + _f: Awaitable[Any] - def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: + def cb(self, args: tuple[Any, ...] = ()) -> Any: if self._max_args is not None: args = args[: self._max_args] - coroutine = self._f(*self._args, *args, **self._kwargs) - task = asyncio.create_task(coroutine) - _BACKGROUND_TASKS.add(task) - task.add_done_callback(_BACKGROUND_TASKS.discard) - return coroutine + get_async_backend()._put((self._f, *self._args, *args, self._kwargs)) class WeakFunction(WeakCallback): @@ -437,18 +436,13 @@ def dereference(self) -> Callable | None: class WeakCoroutineFunction(WeakFunction): - def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: + def cb(self, args: tuple[Any, ...] = ()) -> Any: f = self._f() if f is None: raise ReferenceError("weakly-referenced object no longer exists") if self._max_args is not None: args = args[: self._max_args] - coroutine = f(*self._args, *args, **self._kwargs) - - task = asyncio.create_task(coroutine) - _BACKGROUND_TASKS.add(task) - task.add_done_callback(_BACKGROUND_TASKS.discard) - return coroutine + get_async_backend()._put((f, *self._args, *args, self._kwargs)) class WeakMethod(WeakCallback): @@ -507,7 +501,7 @@ def dereference(self) -> MethodType | partial | None: class WeakCoroutineMethod(WeakMethod): - def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: + def cb(self, args: tuple[Any, ...] = ()) -> Any: obj = self._obj_ref() func = self._func_ref() if obj is None or func is None: @@ -515,12 +509,7 @@ def cb(self, args: tuple[Any, ...] = ()) -> Coroutine: if self._max_args is not None: args = args[: self._max_args] - coroutine = func(obj, *self._args, *args, **self._kwargs) - - task = asyncio.create_task(coroutine) - _BACKGROUND_TASKS.add(task) - task.add_done_callback(_BACKGROUND_TASKS.discard) - return coroutine + get_async_backend()._put((func, obj, *self._args, *args, self._kwargs)) class WeakBuiltin(WeakCallback): From 6958f62b8a0c485f703be08689bce1e28e182150 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 17 Feb 2025 10:00:12 +0100 Subject: [PATCH 03/13] Automatically run async backend for asyncio --- src/psygnal/_async.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 42df67d5..838b01f1 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -20,6 +20,7 @@ def __init__(self): self._backend = backend self._queue = asyncio.Queue() self.__running = False + self._task = asyncio.create_task(self.run()) @property def _running(self) -> bool: @@ -32,6 +33,9 @@ async def _get(self): return await self._queue.get() async def run(self) -> None: + if self.__running: + return + self.__running = True while True: item = await self._get() @@ -63,6 +67,9 @@ async def _get(self): return await self._receive_stream.receive() async def run(self) -> None: + if self.__running: + return + self.__running = True async with self._receive_stream: async for item in self._receive_stream: From 2d40a6cf3ab3b146f21d1b220de0442900ac0217 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Mon, 17 Feb 2025 10:29:35 +0100 Subject: [PATCH 04/13] Add trio backend --- src/psygnal/_async.py | 88 +++++++++++++++++++++++++++++++------------ 1 file changed, 63 insertions(+), 25 deletions(-) diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 838b01f1..00db979f 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -1,3 +1,7 @@ +from math import inf +from abc import ABC, abstractmethod + + _async_backend = None @@ -15,17 +19,12 @@ def set_async_backend(backend: str = "asyncio"): import asyncio - class AsyncBackend: + class AsyncBackend(_AsyncBackend): def __init__(self): - self._backend = backend + super().__init__(backend) self._queue = asyncio.Queue() - self.__running = False self._task = asyncio.create_task(self.run()) - @property - def _running(self) -> bool: - return self.__running - def _put(self, item) -> None: self._queue.put_nowait(item) @@ -39,26 +38,16 @@ async def run(self) -> None: self.__running = True while True: item = await self._get() - cb = item[0] - args = item[1:-1] - kwargs = item[-1] - await cb(*args, **kwargs) - - _async_backend = AsyncBackend() + await self.call_back(item) elif backend == "anyio": import anyio - class AsyncBackend: + class AsyncBackend(_AsyncBackend): def __init__(self): - self._backend = backend - self._send_stream, self._receive_stream = anyio.create_memory_object_stream(max_buffer_size=float("inf")) - self.__running = False - - @property - def _running(self) -> bool: - return self.__running + super().__init__(backend) + self._send_stream, self._receive_stream = anyio.create_memory_object_stream(max_buffer_size=inf) def _put(self, item) -> None: self._send_stream.send_nowait(item) @@ -73,14 +62,63 @@ async def run(self) -> None: self.__running = True async with self._receive_stream: async for item in self._receive_stream: - cb = item[0] - args = item[1:-1] - kwargs = item[-1] - await cb(*args, **kwargs) + await self.call_back(item) + + elif backend == "trio": + + import trio + + class AsyncBackend(_AsyncBackend): + def __init__(self): + super().__init__(backend) + self._send_channel, self._receive_channel = trio.open_memory_channel(max_buffer_size=inf) + + def _put(self, item) -> None: + self._send_channel.send_nowait(item) + + async def _get(self): + return await self._receive_channel.receive() + + async def run(self) -> None: + if self.__running: + return + + self.__running = True + async for item in self._receive_channel: + await self.call_back(item) _async_backend = AsyncBackend() else: raise RuntimeError(f"Async backend not supported: {backend}") + _async_backend = AsyncBackend() return _async_backend + + +class _AsyncBackend(ABC): + def __init__(self, backend: str): + self._backend = backend + self.__running = False + + @property + def _running(self) -> bool: + return self.__running + + @abstractmethod + def _put(self, item) -> None: + ... + + @abstractmethod + async def _get(self): + ... + + @abstractmethod + async def run(self) -> None: + ... + + async def call_back(self, item) -> None: + cb = item[0] + args = item[1:-1] + kwargs = item[-1] + await cb(*args, **kwargs) From 5d870c81987ed03560579015453758b0e94891ec Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 17 Feb 2025 13:25:23 +0000 Subject: [PATCH 05/13] style(pre-commit.ci): auto fixes [...] --- src/psygnal/_async.py | 23 ++++++++++------------- src/psygnal/_weak_callback.py | 8 +++++--- 2 files changed, 15 insertions(+), 16 deletions(-) diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 00db979f..4a45c63b 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -1,6 +1,5 @@ -from math import inf from abc import ABC, abstractmethod - +from math import inf _async_backend = None @@ -16,7 +15,6 @@ def set_async_backend(backend: str = "asyncio"): raise RuntimeError(f"Async backend already set to: {_async_backend._backend}") if backend == "asyncio": - import asyncio class AsyncBackend(_AsyncBackend): @@ -41,13 +39,14 @@ async def run(self) -> None: await self.call_back(item) elif backend == "anyio": - import anyio class AsyncBackend(_AsyncBackend): def __init__(self): super().__init__(backend) - self._send_stream, self._receive_stream = anyio.create_memory_object_stream(max_buffer_size=inf) + self._send_stream, self._receive_stream = ( + anyio.create_memory_object_stream(max_buffer_size=inf) + ) def _put(self, item) -> None: self._send_stream.send_nowait(item) @@ -65,13 +64,14 @@ async def run(self) -> None: await self.call_back(item) elif backend == "trio": - import trio class AsyncBackend(_AsyncBackend): def __init__(self): super().__init__(backend) - self._send_channel, self._receive_channel = trio.open_memory_channel(max_buffer_size=inf) + self._send_channel, self._receive_channel = trio.open_memory_channel( + max_buffer_size=inf + ) def _put(self, item) -> None: self._send_channel.send_nowait(item) @@ -106,16 +106,13 @@ def _running(self) -> bool: return self.__running @abstractmethod - def _put(self, item) -> None: - ... + def _put(self, item) -> None: ... @abstractmethod - async def _get(self): - ... + async def _get(self): ... @abstractmethod - async def run(self) -> None: - ... + async def run(self) -> None: ... async def call_back(self, item) -> None: cb = item[0] diff --git a/src/psygnal/_weak_callback.py b/src/psygnal/_weak_callback.py index 341ad30c..b3fbfec6 100644 --- a/src/psygnal/_weak_callback.py +++ b/src/psygnal/_weak_callback.py @@ -1,6 +1,5 @@ from __future__ import annotations -import asyncio import inspect import sys import weakref @@ -9,7 +8,6 @@ from typing import ( TYPE_CHECKING, Any, - Awaitable, Callable, Generic, Literal, @@ -23,6 +21,8 @@ from ._mypyc import mypyc_attr if TYPE_CHECKING: + from collections.abc import Awaitable + import toolz from typing_extensions import TypeAlias, TypeGuard # py310 @@ -133,7 +133,9 @@ def _on_delete(weak_cb): if get_async_backend() is None: raise RuntimeError("No async backend set: call `set_async_backend()`") if not get_async_backend()._running: - raise RuntimeError("Async backend not running (launch `get_async_backend().run()` in a background task)") + raise RuntimeError( + "Async backend not running (launch `get_async_backend().run()` in a background task)" + ) if isinstance(cb, FunctionType): if strong_func: From 0426bc5ea3aad2d4c0ee31ae049426492874e2e2 Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 10:06:54 -0500 Subject: [PATCH 06/13] tests working, but ugly --- pyproject.toml | 1 + src/psygnal/_async.py | 192 ++++++++++++++++++++-------------- src/psygnal/_weak_callback.py | 20 ++-- tests/test_coroutines.py | 111 ++++++++++++++++++++ tests/test_weak_callable.py | 62 ++--------- 5 files changed, 241 insertions(+), 145 deletions(-) create mode 100644 tests/test_coroutines.py diff --git a/pyproject.toml b/pyproject.toml index 4111073e..61578b4b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ test = [ "pyinstaller>=4.0", "pytest>=6.0", "pytest-cov", + "pytest-asyncio", "wrapt", "msgspec", "toolz", diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 4a45c63b..c4571678 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -1,121 +1,151 @@ +from __future__ import annotations + from abc import ABC, abstractmethod from math import inf +from typing import TYPE_CHECKING, Any, Literal, TypeAlias, overload + +if TYPE_CHECKING: + from psygnal._weak_callback import WeakCallback -_async_backend = None +SupportedBackend = Literal["asyncio", "anyio", "trio"] +_async_backend: _AsyncBackend | None = None -def get_async_backend(): +def get_async_backend() -> _AsyncBackend | None: return _async_backend -def set_async_backend(backend: str = "asyncio"): +@overload +def set_async_backend(backend: Literal["asyncio"]) -> AsyncioBackend: ... +@overload +def set_async_backend(backend: Literal["anyio"]) -> AnyioBackend: ... +@overload +def set_async_backend(backend: Literal["trio"]) -> TrioBackend: ... +def set_async_backend(backend: SupportedBackend = "asyncio") -> _AsyncBackend: global _async_backend - if _async_backend is not None: + if _async_backend is not None and _async_backend._backend != backend: raise RuntimeError(f"Async backend already set to: {_async_backend._backend}") if backend == "asyncio": - import asyncio + _async_backend = AsyncioBackend() + elif backend == "anyio": + _async_backend = AnyioBackend() + elif backend == "trio": + _async_backend = TrioBackend() + else: + raise RuntimeError( + f"Async backend not supported: {backend}. " + "Must be one of: 'asyncio', 'anyio', 'trio'" + ) - class AsyncBackend(_AsyncBackend): - def __init__(self): - super().__init__(backend) - self._queue = asyncio.Queue() - self._task = asyncio.create_task(self.run()) + return _async_backend - def _put(self, item) -> None: - self._queue.put_nowait(item) - async def _get(self): - return await self._queue.get() +QueueItem: TypeAlias = tuple["WeakCallback", tuple[Any, ...]] - async def run(self) -> None: - if self.__running: - return - self.__running = True - while True: - item = await self._get() - await self.call_back(item) +class _AsyncBackend(ABC): + def __init__(self, backend: str): + self._backend = backend + self._running = False - elif backend == "anyio": - import anyio + @property + def running(self) -> bool: + return self._running - class AsyncBackend(_AsyncBackend): - def __init__(self): - super().__init__(backend) - self._send_stream, self._receive_stream = ( - anyio.create_memory_object_stream(max_buffer_size=inf) - ) + @abstractmethod + def _put(self, item: QueueItem) -> None: ... - def _put(self, item) -> None: - self._send_stream.send_nowait(item) + @abstractmethod + async def _get(self) -> QueueItem: ... - async def _get(self): - return await self._receive_stream.receive() + @abstractmethod + async def run(self) -> None: ... - async def run(self) -> None: - if self.__running: - return + async def call_back(self, item: QueueItem) -> None: + cb, args = item + if func := cb.dereference(): + await func(*args) - self.__running = True - async with self._receive_stream: - async for item in self._receive_stream: - await self.call_back(item) - elif backend == "trio": - import trio +class AsyncioBackend(_AsyncBackend): + def __init__(self) -> None: + super().__init__("asyncio") + import asyncio - class AsyncBackend(_AsyncBackend): - def __init__(self): - super().__init__(backend) - self._send_channel, self._receive_channel = trio.open_memory_channel( - max_buffer_size=inf - ) + self._queue: asyncio.Queue[tuple] = asyncio.Queue() + self._task = asyncio.create_task(self.run()) + self._loop = asyncio.get_running_loop() - def _put(self, item) -> None: - self._send_channel.send_nowait(item) + def _put(self, item: QueueItem) -> None: + self._queue.put_nowait(item) - async def _get(self): - return await self._receive_channel.receive() + async def _get(self) -> QueueItem: + return await self._queue.get() - async def run(self) -> None: - if self.__running: - return + async def run(self) -> None: + import asyncio - self.__running = True - async for item in self._receive_channel: - await self.call_back(item) + if self.running: + return + + self._running = True + try: + while True: + item = await self._get() + await self.call_back(item) + except asyncio.CancelledError: + pass + except RuntimeError as e: + if self._loop.is_closed(): + pass + else: + raise e + + +class AnyioBackend(_AsyncBackend): + def __init__(self) -> None: + super().__init__("anyio") + import anyio - _async_backend = AsyncBackend() + streams = anyio.create_memory_object_stream(max_buffer_size=inf) + self._send_stream, self._receive_stream = streams - else: - raise RuntimeError(f"Async backend not supported: {backend}") + def _put(self, item: QueueItem) -> None: + self._send_stream.send_nowait(item) - _async_backend = AsyncBackend() - return _async_backend + async def _get(self) -> QueueItem: + return await self._receive_stream.receive() + async def run(self) -> None: + if self.running: + return -class _AsyncBackend(ABC): - def __init__(self, backend: str): - self._backend = backend - self.__running = False + self._running = True + async with self._receive_stream: + async for item in self._receive_stream: + await self.call_back(item) - @property - def _running(self) -> bool: - return self.__running - @abstractmethod - def _put(self, item) -> None: ... +class TrioBackend(_AsyncBackend): + def __init__(self) -> None: + super().__init__("trio") + import trio - @abstractmethod - async def _get(self): ... + streams = trio.open_memory_channel(max_buffer_size=inf) + self._send_channel, self._receive_channel = streams - @abstractmethod - async def run(self) -> None: ... + def _put(self, item: tuple) -> None: + self._send_channel.send_nowait(item) + + async def _get(self) -> tuple: + return await self._receive_channel.receive() + + async def run(self) -> None: + if self.running: + return - async def call_back(self, item) -> None: - cb = item[0] - args = item[1:-1] - kwargs = item[-1] - await cb(*args, **kwargs) + self._running = True + async for item in self._receive_channel: + await self.call_back(item) diff --git a/src/psygnal/_weak_callback.py b/src/psygnal/_weak_callback.py index b3fbfec6..301f4a17 100644 --- a/src/psygnal/_weak_callback.py +++ b/src/psygnal/_weak_callback.py @@ -130,11 +130,12 @@ def _on_delete(weak_cb): is_coro = inspect.iscoroutinefunction(cb) if is_coro: - if get_async_backend() is None: + if (backend := get_async_backend()) is None: raise RuntimeError("No async backend set: call `set_async_backend()`") - if not get_async_backend()._running: + if not backend.running: raise RuntimeError( - "Async backend not running (launch `get_async_backend().run()` in a background task)" + "Async backend not running (launch `get_async_backend().run()` " + "in a background task)" ) if isinstance(cb, FunctionType): @@ -396,7 +397,7 @@ class StrongCoroutineFunction(StrongFunction): def cb(self, args: tuple[Any, ...] = ()) -> Any: if self._max_args is not None: args = args[: self._max_args] - get_async_backend()._put((self._f, *self._args, *args, self._kwargs)) + get_async_backend()._put((self, args)) class WeakFunction(WeakCallback): @@ -439,12 +440,11 @@ def dereference(self) -> Callable | None: class WeakCoroutineFunction(WeakFunction): def cb(self, args: tuple[Any, ...] = ()) -> Any: - f = self._f() - if f is None: + if self._f() is None: raise ReferenceError("weakly-referenced object no longer exists") if self._max_args is not None: args = args[: self._max_args] - get_async_backend()._put((f, *self._args, *args, self._kwargs)) + get_async_backend()._put((self, args)) class WeakMethod(WeakCallback): @@ -504,14 +504,12 @@ def dereference(self) -> MethodType | partial | None: class WeakCoroutineMethod(WeakMethod): def cb(self, args: tuple[Any, ...] = ()) -> Any: - obj = self._obj_ref() - func = self._func_ref() - if obj is None or func is None: + if self._obj_ref() is None or self._func_ref() is None: raise ReferenceError("weakly-referenced object no longer exists") if self._max_args is not None: args = args[: self._max_args] - get_async_backend()._put((func, obj, *self._args, *args, self._kwargs)) + get_async_backend()._put((self, args)) class WeakBuiltin(WeakCallback): diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py new file mode 100644 index 00000000..d88e4a2a --- /dev/null +++ b/tests/test_coroutines.py @@ -0,0 +1,111 @@ +import asyncio +import gc +from typing import Any +from unittest.mock import Mock + +import pytest + +from psygnal import _async +from psygnal._weak_callback import WeakCallback, weak_callback + + +@pytest.mark.parametrize( + "type_", + [ + "coroutinefunc", + "weak_coroutinefunc", + "coroutinemethod", + ], +) +@pytest.mark.asyncio +async def test_slot_types(type_: str, capsys: Any) -> None: + backend = _async.set_async_backend("asyncio") + assert backend is _async.get_async_backend() is not None + + await asyncio.sleep(0) + + mock = Mock() + final_mock = Mock() + + if type_ in {"coroutinefunc", "weak_coroutinefunc"}: + + async def obj(x: int) -> int: + mock(x) + return x + + cb = weak_callback( + obj, strong_func=(type_ == "coroutinefunc"), finalize=final_mock + ) + elif type_ == "coroutinemethod": + + class MyObj: + async def coroutine_method(self, x: int) -> int: + mock(x) + return x + + obj = MyObj() + cb = weak_callback(obj.coroutine_method, finalize=final_mock) + + assert isinstance(cb, WeakCallback) + assert isinstance(cb.slot_repr(), str) + assert cb.dereference() is not None + + cb.cb((2,)) + await asyncio.sleep(0.01) + mock.assert_called_once_with(2) + + mock.reset_mock() + assert await cb(4) == 4 + mock.assert_called_once_with(4) + + del obj + gc.collect() + await asyncio.sleep(0.01) + + if type_ == "coroutinefunc": # strong_func + cb.cb((4,)) + await asyncio.sleep(0.01) + mock.assert_called_with(4) + + else: + final_mock.assert_called_once_with(cb) + assert cb.dereference() is None + with pytest.raises(ReferenceError): + cb.cb((2,)) + await asyncio.sleep(0.01) + with pytest.raises(ReferenceError): + cb(2) + await asyncio.sleep(0.01) + + backend._task.cancel() + + +@pytest.mark.asyncio +async def testsimple() -> None: + backend = _async.set_async_backend("asyncio") + assert backend is _async.get_async_backend() is not None + await asyncio.sleep(0) + + final_mock = Mock() + mock = Mock() + + class MyObj: + async def coroutine_method(self, x: int) -> int: + mock(x) + return x + + obj = MyObj() + cb = weak_callback(obj.coroutine_method, finalize=final_mock) + + assert isinstance(cb, WeakCallback) + assert isinstance(cb.slot_repr(), str) + assert cb.dereference() is not None + + cb.cb((2,)) + await asyncio.sleep(0.01) + mock.assert_called_once_with(2) + + del obj + gc.collect() + await asyncio.sleep(0.01) + final_mock.assert_called_once() diff --git a/tests/test_weak_callable.py b/tests/test_weak_callable.py index c24a2751..e52e7e85 100644 --- a/tests/test_weak_callable.py +++ b/tests/test_weak_callable.py @@ -1,4 +1,3 @@ -import asyncio import gc import re from functools import partial @@ -16,13 +15,10 @@ "type_", [ "function", - "coroutinefunc", "toolz_function", "weak_func", - "weak_coroutinefunc", "lambda", "method", - "coroutinemethod", "partial_method", "toolz_method", "setattr", @@ -37,11 +33,7 @@ def test_slot_types(type_: str, capsys: Any) -> None: final_mock = Mock() class MyObj: - def method(self, x: int) -> int: - mock(x) - return x - - async def coroutine_method(self, x: int) -> int: + def method(self, x: int) -> None: mock(x) return x @@ -49,12 +41,12 @@ def __setitem__(self, key, value): mock(value) return value - def __setattr__(self, __name: str, __value: Any) -> Any: + def __setattr__(self, __name: str, __value) -> None: if __name == "x": mock(__value) return __value - obj: Any = MyObj() + obj = MyObj() if type_ == "setattr": cb = weak_callback(setattr, obj, "x", finalize=final_mock) @@ -62,25 +54,16 @@ def __setattr__(self, __name: str, __value: Any) -> Any: cb = weak_callback(obj.__setitem__, "x", finalize=final_mock) elif type_ in {"function", "weak_func"}: - def obj(x: int) -> int: + def obj(x: int) -> None: mock(x) return x cb = weak_callback(obj, strong_func=(type_ == "function"), finalize=final_mock) - elif type_ in {"coroutinefunc", "weak_coroutinefunc"}: - - async def obj(x: int) -> int: - mock(x) - return x - - cb = weak_callback( - obj, strong_func=(type_ == "coroutinefunc"), finalize=final_mock - ) elif type_ == "toolz_function": toolz = pytest.importorskip("toolz") @toolz.curry - def obj(z: int, x: int) -> int: + def obj(z: int, x: int) -> None: mock(x) return x @@ -89,8 +72,6 @@ def obj(z: int, x: int) -> int: cb = weak_callback(lambda x: mock(x) and x, finalize=final_mock) elif type_ == "method": cb = weak_callback(obj.method, finalize=final_mock) - elif type_ == "coroutinemethod": - cb = weak_callback(obj.coroutine_method, finalize=final_mock) elif type_ == "partial_method": cb = weak_callback(partial(obj.method, 2), max_args=0, finalize=final_mock) elif type_ == "toolz_method": @@ -106,15 +87,7 @@ def obj(z: int, x: int) -> int: assert isinstance(cb, WeakCallback) assert isinstance(cb.slot_repr(), str) - - if "coroutine" in type_: - - async def main() -> None: - await cb.cb((2,)) - - asyncio.run(main()) - else: - cb.cb((2,)) + cb.cb((2,)) assert cb.dereference() is not None if type_ == "print": assert capsys.readouterr().out == "2\n" @@ -122,24 +95,14 @@ async def main() -> None: mock.assert_called_once_with(2) mock.reset_mock() - - if "coroutine" in type_: - result: Any = None - - async def main() -> None: - nonlocal result - result = await cb(2) - - asyncio.run(main()) - else: - result = cb(2) + result = cb(2) if type_ not in ("setattr", "mock"): assert result == 2 mock.assert_called_once_with(2) del obj - if type_ not in ("function", "coroutinefunc", "toolz_function", "lambda", "mock"): + if type_ not in ("function", "toolz_function", "lambda", "mock"): final_mock.assert_called_once_with(cb) assert cb.dereference() is None with pytest.raises(ReferenceError): @@ -147,14 +110,7 @@ async def main() -> None: with pytest.raises(ReferenceError): cb(2) else: - if "coroutine" in type_: - - async def main() -> None: - await cb.cb((4,)) - - asyncio.run(main()) - else: - cb.cb((4,)) + cb.cb((4,)) mock.assert_called_with(4) From d4053470b10b0e7945691d419dc2bf4fbf3836dd Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 10:17:37 -0500 Subject: [PATCH 07/13] fix typing --- .pre-commit-config.yaml | 2 ++ src/psygnal/_async.py | 26 ++++++++++---- src/psygnal/_weak_callback.py | 68 ++++++++++++++++++----------------- tests/test_coroutines.py | 35 ++---------------- 4 files changed, 58 insertions(+), 73 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 86f323c8..071e84f8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,3 +32,5 @@ repos: - types-attrs - pydantic - typing_extensions + - anyio + - trio diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index c4571678..668965d4 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -98,19 +98,24 @@ async def run(self) -> None: except asyncio.CancelledError: pass except RuntimeError as e: - if self._loop.is_closed(): - pass - else: + if not self._loop.is_closed(): raise e class AnyioBackend(_AsyncBackend): + if TYPE_CHECKING: + import anyio.streams.memory + + _send_stream: anyio.streams.memory.MemoryObjectSendStream[QueueItem] + _receive_stream: anyio.streams.memory.MemoryObjectReceiveStream[QueueItem] + def __init__(self) -> None: super().__init__("anyio") import anyio - streams = anyio.create_memory_object_stream(max_buffer_size=inf) - self._send_stream, self._receive_stream = streams + self._send_stream, self._receive_stream = anyio.create_memory_object_stream( + max_buffer_size=inf + ) def _put(self, item: QueueItem) -> None: self._send_stream.send_nowait(item) @@ -129,12 +134,19 @@ async def run(self) -> None: class TrioBackend(_AsyncBackend): + if TYPE_CHECKING: + import trio + + _send_channel: trio._channel.MemorySendChannel[QueueItem] + _receive_channel: trio.abc.ReceiveChannel[QueueItem] + def __init__(self) -> None: super().__init__("trio") import trio - streams = trio.open_memory_channel(max_buffer_size=inf) - self._send_channel, self._receive_channel = streams + self._send_channel, self._receive_channel = trio.open_memory_channel( + max_buffer_size=inf + ) def _put(self, item: tuple) -> None: self._send_channel.send_nowait(item) diff --git a/src/psygnal/_weak_callback.py b/src/psygnal/_weak_callback.py index 301f4a17..005363a1 100644 --- a/src/psygnal/_weak_callback.py +++ b/src/psygnal/_weak_callback.py @@ -17,12 +17,10 @@ ) from warnings import warn -from ._async import get_async_backend +from ._async import _AsyncBackend, get_async_backend from ._mypyc import mypyc_attr if TYPE_CHECKING: - from collections.abc import Awaitable - import toolz from typing_extensions import TypeAlias, TypeGuard # py310 @@ -389,17 +387,6 @@ def __setstate__(self, state: dict) -> None: setattr(self, k, v) -class StrongCoroutineFunction(StrongFunction): - """Wrapper around a strong coroutine function reference.""" - - _f: Awaitable[Any] - - def cb(self, args: tuple[Any, ...] = ()) -> Any: - if self._max_args is not None: - args = args[: self._max_args] - get_async_backend()._put((self, args)) - - class WeakFunction(WeakCallback): """Wrapper around a weak function reference.""" @@ -438,15 +425,6 @@ def dereference(self) -> Callable | None: return f -class WeakCoroutineFunction(WeakFunction): - def cb(self, args: tuple[Any, ...] = ()) -> Any: - if self._f() is None: - raise ReferenceError("weakly-referenced object no longer exists") - if self._max_args is not None: - args = args[: self._max_args] - get_async_backend()._put((self, args)) - - class WeakMethod(WeakCallback): """Wrapper around a method bound to a weakly-referenced object. @@ -502,16 +480,6 @@ def dereference(self) -> MethodType | partial | None: return method -class WeakCoroutineMethod(WeakMethod): - def cb(self, args: tuple[Any, ...] = ()) -> Any: - if self._obj_ref() is None or self._func_ref() is None: - raise ReferenceError("weakly-referenced object no longer exists") - - if self._max_args is not None: - args = args[: self._max_args] - get_async_backend()._put((self, args)) - - class WeakBuiltin(WeakCallback): """Wrapper around a c-based method on a weakly-referenced object. @@ -628,3 +596,37 @@ def cb(self, args: tuple[Any, ...] = ()) -> None: def dereference(self) -> partial | None: obj = self._obj_ref() return None if obj is None else partial(obj.__setitem__, self._itemkey) + + +# --------------------------- Coroutines -------------------- + + +class WeakCoroutineFunction(WeakFunction): + def cb(self, args: tuple[Any, ...] = ()) -> Any: + if self._f() is None: + raise ReferenceError("weakly-referenced object no longer exists") + if self._max_args is not None: + args = args[: self._max_args] + + cast("_AsyncBackend", get_async_backend())._put((self, args)) + + +class StrongCoroutineFunction(StrongFunction): + """Wrapper around a strong coroutine function reference.""" + + def cb(self, args: tuple[Any, ...] = ()) -> Any: + if self._max_args is not None: + args = args[: self._max_args] + + cast("_AsyncBackend", get_async_backend())._put((self, args)) + + +class WeakCoroutineMethod(WeakMethod): + def cb(self, args: tuple[Any, ...] = ()) -> Any: + if self._obj_ref() is None or self._func_ref() is None: + raise ReferenceError("weakly-referenced object no longer exists") + + if self._max_args is not None: + args = args[: self._max_args] + + cast("_AsyncBackend", get_async_backend())._put((self, args)) diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py index d88e4a2a..46b640e6 100644 --- a/tests/test_coroutines.py +++ b/tests/test_coroutines.py @@ -27,6 +27,7 @@ async def test_slot_types(type_: str, capsys: Any) -> None: mock = Mock() final_mock = Mock() + obj: Any if type_ in {"coroutinefunc", "weak_coroutinefunc"}: async def obj(x: int) -> int: @@ -74,38 +75,6 @@ async def coroutine_method(self, x: int) -> int: cb.cb((2,)) await asyncio.sleep(0.01) with pytest.raises(ReferenceError): - cb(2) - await asyncio.sleep(0.01) + await cb(2) backend._task.cancel() - - -@pytest.mark.asyncio -async def testsimple() -> None: - backend = _async.set_async_backend("asyncio") - assert backend is _async.get_async_backend() is not None - await asyncio.sleep(0) - - final_mock = Mock() - mock = Mock() - - class MyObj: - async def coroutine_method(self, x: int) -> int: - mock(x) - return x - - obj = MyObj() - cb = weak_callback(obj.coroutine_method, finalize=final_mock) - - assert isinstance(cb, WeakCallback) - assert isinstance(cb.slot_repr(), str) - assert cb.dereference() is not None - - cb.cb((2,)) - await asyncio.sleep(0.01) - mock.assert_called_once_with(2) - - del obj - gc.collect() - await asyncio.sleep(0.01) - final_mock.assert_called_once() From 19337c821f1256585a8ea8756e15ce5043b35eca Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 10:32:48 -0500 Subject: [PATCH 08/13] docs --- src/psygnal/_async.py | 42 +++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 668965d4..16ded2c4 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -2,17 +2,23 @@ from abc import ABC, abstractmethod from math import inf -from typing import TYPE_CHECKING, Any, Literal, TypeAlias, overload +from typing import TYPE_CHECKING, Any, overload if TYPE_CHECKING: + from typing import Literal, TypeAlias + from psygnal._weak_callback import WeakCallback -SupportedBackend = Literal["asyncio", "anyio", "trio"] -_async_backend: _AsyncBackend | None = None + SupportedBackend: TypeAlias = Literal["asyncio", "anyio", "trio"] + QueueItem: TypeAlias = tuple["WeakCallback", tuple[Any, ...]] + + +_ASYNC_BACKEND: _AsyncBackend | None = None def get_async_backend() -> _AsyncBackend | None: - return _async_backend + """Get the current async backend. Returns None if no backend is set.""" + return _ASYNC_BACKEND @overload @@ -22,27 +28,30 @@ def set_async_backend(backend: Literal["anyio"]) -> AnyioBackend: ... @overload def set_async_backend(backend: Literal["trio"]) -> TrioBackend: ... def set_async_backend(backend: SupportedBackend = "asyncio") -> _AsyncBackend: - global _async_backend + """Set the async backend to use. Must be one of: 'asyncio', 'anyio', 'trio'. + + This should be done as early as possible, and *must* be called before calling + `SignalInstance.connect` with a coroutine function. + """ + global _ASYNC_BACKEND - if _async_backend is not None and _async_backend._backend != backend: - raise RuntimeError(f"Async backend already set to: {_async_backend._backend}") + if _ASYNC_BACKEND is not None and _ASYNC_BACKEND._backend != backend: + # allow setting the same backend multiple times, for tests + raise RuntimeError(f"Async backend already set to: {_ASYNC_BACKEND._backend}") if backend == "asyncio": - _async_backend = AsyncioBackend() + _ASYNC_BACKEND = AsyncioBackend() elif backend == "anyio": - _async_backend = AnyioBackend() + _ASYNC_BACKEND = AnyioBackend() elif backend == "trio": - _async_backend = TrioBackend() + _ASYNC_BACKEND = TrioBackend() else: raise RuntimeError( f"Async backend not supported: {backend}. " "Must be one of: 'asyncio', 'anyio', 'trio'" ) - return _async_backend - - -QueueItem: TypeAlias = tuple["WeakCallback", tuple[Any, ...]] + return _ASYNC_BACKEND class _AsyncBackend(ABC): @@ -74,6 +83,7 @@ def __init__(self) -> None: super().__init__("asyncio") import asyncio + self._asyncio = asyncio self._queue: asyncio.Queue[tuple] = asyncio.Queue() self._task = asyncio.create_task(self.run()) self._loop = asyncio.get_running_loop() @@ -85,8 +95,6 @@ async def _get(self) -> QueueItem: return await self._queue.get() async def run(self) -> None: - import asyncio - if self.running: return @@ -95,7 +103,7 @@ async def run(self) -> None: while True: item = await self._get() await self.call_back(item) - except asyncio.CancelledError: + except self._asyncio.CancelledError: pass except RuntimeError as e: if not self._loop.is_closed(): From fa18cdd817dc27829656c9ab503f5c73c3a3cd53 Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 10:33:35 -0500 Subject: [PATCH 09/13] fix typing --- src/psygnal/_async.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 16ded2c4..29f6c1cc 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Any, overload if TYPE_CHECKING: - from typing import Literal, TypeAlias + from typing_extensions import Literal, TypeAlias from psygnal._weak_callback import WeakCallback From ab105cc571f56161cecab35e533b5a3b2fa214ee Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 10:35:33 -0500 Subject: [PATCH 10/13] while not running --- tests/test_coroutines.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py index 46b640e6..1d639a6b 100644 --- a/tests/test_coroutines.py +++ b/tests/test_coroutines.py @@ -21,8 +21,8 @@ async def test_slot_types(type_: str, capsys: Any) -> None: backend = _async.set_async_backend("asyncio") assert backend is _async.get_async_backend() is not None - - await asyncio.sleep(0) + while not backend.running: + await asyncio.sleep(0) mock = Mock() final_mock = Mock() From b8af6b673b1656f0b7dc24f00ddebd231217fa88 Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 10:49:54 -0500 Subject: [PATCH 11/13] don't compile async --- pyproject.toml | 1 + src/psygnal/_async.py | 16 ++++++---------- tests/test_coroutines.py | 2 -- 3 files changed, 7 insertions(+), 12 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 61578b4b..ee4228fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,6 +111,7 @@ exclude = [ "src/psygnal/qt.py", "src/psygnal/_pyinstaller_util", "src/psygnal/_throttler.py", + "src/psygnal/_async.py", ] [tool.cibuildwheel] diff --git a/src/psygnal/_async.py b/src/psygnal/_async.py index 29f6c1cc..42c7c53e 100644 --- a/src/psygnal/_async.py +++ b/src/psygnal/_async.py @@ -5,6 +5,8 @@ from typing import TYPE_CHECKING, Any, overload if TYPE_CHECKING: + import anyio.streams.memory + import trio from typing_extensions import Literal, TypeAlias from psygnal._weak_callback import WeakCallback @@ -111,11 +113,8 @@ async def run(self) -> None: class AnyioBackend(_AsyncBackend): - if TYPE_CHECKING: - import anyio.streams.memory - - _send_stream: anyio.streams.memory.MemoryObjectSendStream[QueueItem] - _receive_stream: anyio.streams.memory.MemoryObjectReceiveStream[QueueItem] + _send_stream: anyio.streams.memory.MemoryObjectSendStream[QueueItem] + _receive_stream: anyio.streams.memory.MemoryObjectReceiveStream[QueueItem] def __init__(self) -> None: super().__init__("anyio") @@ -142,11 +141,8 @@ async def run(self) -> None: class TrioBackend(_AsyncBackend): - if TYPE_CHECKING: - import trio - - _send_channel: trio._channel.MemorySendChannel[QueueItem] - _receive_channel: trio.abc.ReceiveChannel[QueueItem] + _send_channel: trio._channel.MemorySendChannel[QueueItem] + _receive_channel: trio.abc.ReceiveChannel[QueueItem] def __init__(self) -> None: super().__init__("trio") diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py index 1d639a6b..506cff1e 100644 --- a/tests/test_coroutines.py +++ b/tests/test_coroutines.py @@ -76,5 +76,3 @@ async def coroutine_method(self, x: int) -> int: await asyncio.sleep(0.01) with pytest.raises(ReferenceError): await cb(2) - - backend._task.cancel() From 4f02ab70699a12442c73413c0c03855198c8be9a Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 12:26:38 -0500 Subject: [PATCH 12/13] working --- pyproject.toml | 1 + src/psygnal/_weak_callback.py | 51 ++++++++++++++++++++++++----------- tests/test_coroutines.py | 4 +-- 3 files changed, 39 insertions(+), 17 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index ee4228fd..def54ead 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -173,6 +173,7 @@ docstring-code-format = true [tool.pytest.ini_options] minversion = "6.0" testpaths = ["tests"] +asyncio_default_fixture_loop_scope = "function" filterwarnings = [ "error", "ignore:The distutils package is deprecated:DeprecationWarning:", diff --git a/src/psygnal/_weak_callback.py b/src/psygnal/_weak_callback.py index 005363a1..f5a65e53 100644 --- a/src/psygnal/_weak_callback.py +++ b/src/psygnal/_weak_callback.py @@ -17,13 +17,15 @@ ) from warnings import warn -from ._async import _AsyncBackend, get_async_backend +from ._async import get_async_backend from ._mypyc import mypyc_attr if TYPE_CHECKING: import toolz from typing_extensions import TypeAlias, TypeGuard # py310 + from ._async import _AsyncBackend + RefErrorChoice: TypeAlias = Literal["raise", "warn", "ignore"] __all__ = ["WeakCallback", "weak_callback"] @@ -137,12 +139,27 @@ def _on_delete(weak_cb): ) if isinstance(cb, FunctionType): + # NOTE: I know it looks like this should be easy to express in much shorter + # syntax ... but mypyc will likely fail at runtime. + # Make sure to test compiled version if you change this. if strong_func: - cls = StrongCoroutineFunction if is_coro else StrongFunction - return cls(cb, max_args, args, kwargs, priority=priority) + if is_coro: + return StrongCoroutineFunction( + cb, max_args, args, kwargs, priority=priority + ) + return StrongFunction(cb, max_args, args, kwargs, priority=priority) else: - wcls = WeakCoroutineFunction if is_coro else WeakFunction - return wcls( + if is_coro: + return WeakCoroutineFunction( + cb, + max_args, + args, + kwargs, + finalize, + on_ref_error=on_ref_error, + priority=priority, + ) + return WeakFunction( cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority ) @@ -158,8 +175,12 @@ def _on_delete(weak_cb): return WeakSetitem( obj, key, max_args, finalize, on_ref_error, priority=priority ) - mcls = WeakCoroutineMethod if is_coro else WeakMethod - return mcls( + + if is_coro: + return WeakCoroutineMethod( + cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority + ) + return WeakMethod( cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority ) @@ -239,7 +260,7 @@ def __init__( self.priority: int = priority - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: """Call the callback with `args`. Args will be spread when calling the func.""" raise NotImplementedError() @@ -368,7 +389,7 @@ def __init__( if args: self._object_repr = f"{self._object_repr}{(*args,)!r}".replace(")", " ...)") - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: if self._max_args is not None: args = args[: self._max_args] self._f(*self._args, *args, **self._kwargs) @@ -408,7 +429,7 @@ def __init__( if args: self._object_repr = f"{self._object_repr}{(*args,)!r}".replace(")", " ...)") - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: f = self._f() if f is None: raise ReferenceError("weakly-referenced object no longer exists") @@ -459,7 +480,7 @@ def slot_repr(self) -> str: func_name = getattr(self._func_ref(), "__name__", "") return f"{self._obj_module}.{obj.__class__.__qualname__}.{func_name}" - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: obj = self._obj_ref() func = self._func_ref() if obj is None or func is None: @@ -598,11 +619,11 @@ def dereference(self) -> partial | None: return None if obj is None else partial(obj.__setitem__, self._itemkey) -# --------------------------- Coroutines -------------------- +# --------------------------- Coroutines --------------------------- class WeakCoroutineFunction(WeakFunction): - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: if self._f() is None: raise ReferenceError("weakly-referenced object no longer exists") if self._max_args is not None: @@ -614,7 +635,7 @@ def cb(self, args: tuple[Any, ...] = ()) -> Any: class StrongCoroutineFunction(StrongFunction): """Wrapper around a strong coroutine function reference.""" - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: if self._max_args is not None: args = args[: self._max_args] @@ -622,7 +643,7 @@ def cb(self, args: tuple[Any, ...] = ()) -> Any: class WeakCoroutineMethod(WeakMethod): - def cb(self, args: tuple[Any, ...] = ()) -> Any: + def cb(self, args: tuple[Any, ...] = ()) -> None: if self._obj_ref() is None or self._func_ref() is None: raise ReferenceError("weakly-referenced object no longer exists") diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py index 506cff1e..12e34cdd 100644 --- a/tests/test_coroutines.py +++ b/tests/test_coroutines.py @@ -12,9 +12,9 @@ @pytest.mark.parametrize( "type_", [ - "coroutinefunc", + # "coroutinefunc", "weak_coroutinefunc", - "coroutinemethod", + # "coroutinemethod", ], ) @pytest.mark.asyncio From 037cc17b1ad6a8014357574b721fa97105d6d20d Mon Sep 17 00:00:00 2001 From: Talley Lambert Date: Mon, 17 Feb 2025 12:29:26 -0500 Subject: [PATCH 13/13] uncomment test --- tests/test_coroutines.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_coroutines.py b/tests/test_coroutines.py index 12e34cdd..506cff1e 100644 --- a/tests/test_coroutines.py +++ b/tests/test_coroutines.py @@ -12,9 +12,9 @@ @pytest.mark.parametrize( "type_", [ - # "coroutinefunc", + "coroutinefunc", "weak_coroutinefunc", - # "coroutinemethod", + "coroutinemethod", ], ) @pytest.mark.asyncio