Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support coroutine functions #346

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@ repos:
- types-attrs
- pydantic
- typing_extensions
- anyio
- trio
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ test = [
"pyinstaller>=4.0",
"pytest>=6.0",
"pytest-cov",
"pytest-asyncio",
"wrapt",
"msgspec",
"toolz",
Expand Down Expand Up @@ -110,6 +111,7 @@ exclude = [
"src/psygnal/qt.py",
"src/psygnal/_pyinstaller_util",
"src/psygnal/_throttler.py",
"src/psygnal/_async.py",
]

[tool.cibuildwheel]
Expand Down Expand Up @@ -171,6 +173,7 @@ docstring-code-format = true
[tool.pytest.ini_options]
minversion = "6.0"
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"error",
"ignore:The distutils package is deprecated:DeprecationWarning:",
Expand Down
3 changes: 3 additions & 0 deletions src/psygnal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
"debounced",
"emit_queued",
"evented",
"get_async_backend",
"get_evented_namespace",
"is_evented",
"set_async_backend",
"throttled",
]

Expand All @@ -48,6 +50,7 @@
stacklevel=2,
)

from ._async import get_async_backend, set_async_backend
from ._evented_decorator import evented
from ._exceptions import EmitLoopError
from ._group import EmissionInfo, SignalGroup
Expand Down
167 changes: 167 additions & 0 deletions src/psygnal/_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from math import inf
from typing import TYPE_CHECKING, Any, overload

if TYPE_CHECKING:
import anyio.streams.memory
import trio
from typing_extensions import Literal, TypeAlias

from psygnal._weak_callback import WeakCallback

SupportedBackend: TypeAlias = Literal["asyncio", "anyio", "trio"]
QueueItem: TypeAlias = tuple["WeakCallback", tuple[Any, ...]]


_ASYNC_BACKEND: _AsyncBackend | None = None


def get_async_backend() -> _AsyncBackend | None:
"""Get the current async backend. Returns None if no backend is set."""
return _ASYNC_BACKEND


@overload
def set_async_backend(backend: Literal["asyncio"]) -> AsyncioBackend: ...
@overload
def set_async_backend(backend: Literal["anyio"]) -> AnyioBackend: ...
@overload
def set_async_backend(backend: Literal["trio"]) -> TrioBackend: ...
def set_async_backend(backend: SupportedBackend = "asyncio") -> _AsyncBackend:
"""Set the async backend to use. Must be one of: 'asyncio', 'anyio', 'trio'.

This should be done as early as possible, and *must* be called before calling
`SignalInstance.connect` with a coroutine function.
"""
global _ASYNC_BACKEND

if _ASYNC_BACKEND is not None and _ASYNC_BACKEND._backend != backend:
# allow setting the same backend multiple times, for tests
raise RuntimeError(f"Async backend already set to: {_ASYNC_BACKEND._backend}")

Check warning on line 42 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L42

Added line #L42 was not covered by tests

if backend == "asyncio":
_ASYNC_BACKEND = AsyncioBackend()
elif backend == "anyio":
_ASYNC_BACKEND = AnyioBackend()
elif backend == "trio":
_ASYNC_BACKEND = TrioBackend()

Check warning on line 49 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L46-L49

Added lines #L46 - L49 were not covered by tests
else:
raise RuntimeError(

Check warning on line 51 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L51

Added line #L51 was not covered by tests
f"Async backend not supported: {backend}. "
"Must be one of: 'asyncio', 'anyio', 'trio'"
)

return _ASYNC_BACKEND


class _AsyncBackend(ABC):
def __init__(self, backend: str):
self._backend = backend
self._running = False

@property
def running(self) -> bool:
return self._running

@abstractmethod
def _put(self, item: QueueItem) -> None: ...

@abstractmethod
async def _get(self) -> QueueItem: ...

@abstractmethod
async def run(self) -> None: ...

async def call_back(self, item: QueueItem) -> None:
cb, args = item
if func := cb.dereference():
await func(*args)


class AsyncioBackend(_AsyncBackend):
def __init__(self) -> None:
super().__init__("asyncio")
import asyncio

self._asyncio = asyncio
self._queue: asyncio.Queue[tuple] = asyncio.Queue()
self._task = asyncio.create_task(self.run())
self._loop = asyncio.get_running_loop()

def _put(self, item: QueueItem) -> None:
self._queue.put_nowait(item)

async def _get(self) -> QueueItem:
return await self._queue.get()

async def run(self) -> None:
if self.running:
return

Check warning on line 101 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L101

Added line #L101 was not covered by tests

self._running = True
try:
while True:
item = await self._get()
await self.call_back(item)
except self._asyncio.CancelledError:
pass

Check warning on line 109 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L109

Added line #L109 was not covered by tests
except RuntimeError as e:
if not self._loop.is_closed():
raise e

Check warning on line 112 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L112

Added line #L112 was not covered by tests


class AnyioBackend(_AsyncBackend):
_send_stream: anyio.streams.memory.MemoryObjectSendStream[QueueItem]
_receive_stream: anyio.streams.memory.MemoryObjectReceiveStream[QueueItem]

def __init__(self) -> None:
super().__init__("anyio")
import anyio

Check warning on line 121 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L120-L121

Added lines #L120 - L121 were not covered by tests

self._send_stream, self._receive_stream = anyio.create_memory_object_stream(

Check warning on line 123 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L123

Added line #L123 was not covered by tests
max_buffer_size=inf
)

def _put(self, item: QueueItem) -> None:
self._send_stream.send_nowait(item)

Check warning on line 128 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L128

Added line #L128 was not covered by tests

async def _get(self) -> QueueItem:
return await self._receive_stream.receive()

Check warning on line 131 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L131

Added line #L131 was not covered by tests

async def run(self) -> None:
if self.running:
return

Check warning on line 135 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L134-L135

Added lines #L134 - L135 were not covered by tests

self._running = True
async with self._receive_stream:
async for item in self._receive_stream:
await self.call_back(item)

Check warning on line 140 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L137-L140

Added lines #L137 - L140 were not covered by tests


class TrioBackend(_AsyncBackend):
_send_channel: trio._channel.MemorySendChannel[QueueItem]
_receive_channel: trio.abc.ReceiveChannel[QueueItem]

def __init__(self) -> None:
super().__init__("trio")
import trio

Check warning on line 149 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L148-L149

Added lines #L148 - L149 were not covered by tests

self._send_channel, self._receive_channel = trio.open_memory_channel(

Check warning on line 151 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L151

Added line #L151 was not covered by tests
max_buffer_size=inf
)

def _put(self, item: tuple) -> None:
self._send_channel.send_nowait(item)

Check warning on line 156 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L156

Added line #L156 was not covered by tests

async def _get(self) -> tuple:
return await self._receive_channel.receive()

Check warning on line 159 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L159

Added line #L159 was not covered by tests

async def run(self) -> None:
if self.running:
return

Check warning on line 163 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L162-L163

Added lines #L162 - L163 were not covered by tests

self._running = True
async for item in self._receive_channel:
await self.call_back(item)

Check warning on line 167 in src/psygnal/_async.py

View check run for this annotation

Codecov / codecov/patch

src/psygnal/_async.py#L165-L167

Added lines #L165 - L167 were not covered by tests
81 changes: 76 additions & 5 deletions src/psygnal/_weak_callback.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import inspect
import sys
import weakref
from functools import partial
Expand All @@ -16,12 +17,15 @@
)
from warnings import warn

from ._async import get_async_backend
from ._mypyc import mypyc_attr

if TYPE_CHECKING:
import toolz
from typing_extensions import TypeAlias, TypeGuard # py310

from ._async import _AsyncBackend

RefErrorChoice: TypeAlias = Literal["raise", "warn", "ignore"]

__all__ = ["WeakCallback", "weak_callback"]
Expand Down Expand Up @@ -124,14 +128,40 @@ def _on_delete(weak_cb):
kwargs = cb.keywords
cb = cb.func

is_coro = inspect.iscoroutinefunction(cb)
if is_coro:
if (backend := get_async_backend()) is None:
raise RuntimeError("No async backend set: call `set_async_backend()`")
if not backend.running:
raise RuntimeError(
"Async backend not running (launch `get_async_backend().run()` "
"in a background task)"
)

if isinstance(cb, FunctionType):
return (
StrongFunction(cb, max_args, args, kwargs, priority=priority)
if strong_func
else WeakFunction(
# NOTE: I know it looks like this should be easy to express in much shorter
# syntax ... but mypyc will likely fail at runtime.
# Make sure to test compiled version if you change this.
if strong_func:
if is_coro:
return StrongCoroutineFunction(
cb, max_args, args, kwargs, priority=priority
)
return StrongFunction(cb, max_args, args, kwargs, priority=priority)
else:
if is_coro:
return WeakCoroutineFunction(
cb,
max_args,
args,
kwargs,
finalize,
on_ref_error=on_ref_error,
priority=priority,
)
return WeakFunction(
cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority
)
)

if isinstance(cb, MethodType):
if getattr(cb, "__name__", None) == "__setitem__":
Expand All @@ -145,6 +175,11 @@ def _on_delete(weak_cb):
return WeakSetitem(
obj, key, max_args, finalize, on_ref_error, priority=priority
)

if is_coro:
return WeakCoroutineMethod(
cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority
)
return WeakMethod(
cb, max_args, args, kwargs, finalize, on_ref_error, priority=priority
)
Expand Down Expand Up @@ -335,6 +370,8 @@ def _cb(_: weakref.ReferenceType) -> None:
class StrongFunction(WeakCallback):
"""Wrapper around a strong function reference."""

_f: Callable

def __init__(
self,
obj: Callable,
Expand Down Expand Up @@ -580,3 +617,37 @@ def cb(self, args: tuple[Any, ...] = ()) -> None:
def dereference(self) -> partial | None:
obj = self._obj_ref()
return None if obj is None else partial(obj.__setitem__, self._itemkey)


# --------------------------- Coroutines ---------------------------


class WeakCoroutineFunction(WeakFunction):
def cb(self, args: tuple[Any, ...] = ()) -> None:
if self._f() is None:
raise ReferenceError("weakly-referenced object no longer exists")
if self._max_args is not None:
args = args[: self._max_args]

cast("_AsyncBackend", get_async_backend())._put((self, args))


class StrongCoroutineFunction(StrongFunction):
"""Wrapper around a strong coroutine function reference."""

def cb(self, args: tuple[Any, ...] = ()) -> None:
if self._max_args is not None:
args = args[: self._max_args]

cast("_AsyncBackend", get_async_backend())._put((self, args))


class WeakCoroutineMethod(WeakMethod):
def cb(self, args: tuple[Any, ...] = ()) -> None:
if self._obj_ref() is None or self._func_ref() is None:
raise ReferenceError("weakly-referenced object no longer exists")

if self._max_args is not None:
args = args[: self._max_args]

cast("_AsyncBackend", get_async_backend())._put((self, args))
Loading
Loading