Skip to content

Commit

Permalink
change executor repr, restrict running in the main thread, remove poe…
Browse files Browse the repository at this point in the history
…try.lock
  • Loading branch information
Denis Kazakov committed Oct 3, 2024
1 parent 6ba5af6 commit 66d0c85
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 181 deletions.
34 changes: 3 additions & 31 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,36 +51,12 @@ coverage.xml
.hypothesis/
.pytest_cache/

# Translations
*.mo
*.pot

# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal

# Flask stuff:
instance/
.webassets-cache

# Scrapy stuff:
.scrapy

# Sphinx documentation
docs/_build/

# PyBuilder
target/

# Jupyter Notebook
.ipynb_checkpoints

# IPython
profile_default/
ipython_config.py

# pyenv
# .python-version

Expand All @@ -94,13 +70,6 @@ ipython_config.py
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/

# Celery stuff
celerybeat-schedule
celerybeat.pid

# SageMath parsed files
*.sage.py

# Environments
.env
.venv
Expand Down Expand Up @@ -130,3 +99,6 @@ dmypy.json

# IDE
.idea

# Poetry
poetry.lock
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ def sync_get_user(user_id):
...

with ThreadPoolExecutor() as ex:
future = ex.submit(sync_get_user)
future = ex.submit(sync_get_user, user_id)
result = future.result()
```

With this:
```python
from aiofutures import AsyncExecutor

def async_get_user(user_id):
async def async_get_user(user_id):
...

with AsyncExecutor() as ex:
future = ex.submit(async_get_user)
future = ex.submit(async_get_user, user_id)
result = future.result()
```

**The former spawns a lot of threads and experiences all cons of GIL, the latter
spawns the only one async thread (check out [notes](#Notes))**
spawns the only one async thread (check out [notes](#Notes))**.

## Installation

Expand Down Expand Up @@ -107,7 +107,7 @@ future = executor.submit(io_bound_task)
print(future.result())
```

NOTE: You can use sync_to_async within tasks running in the executor only.
NOTE: You can use `sync_to_async` within tasks running in the executor only.

### UVLoop

Expand All @@ -127,7 +127,7 @@ executor = AsyncExecutor()
```

### Notes
- Take into account that asyncio still ([CPython3.11](https://github.com/python/cpython/blob/4664a7cf689946f0c9854cadee7c6aa9c276a8cf/Lib/asyncio/base_events.py#L867))
- Take into account that asyncio still ([CPython3.13](https://github.com/python/cpython/blob/v3.13.0rc3/Lib/asyncio/base_events.py#L935))
resolves DNS in threads, not asynchronously
- Any blocking function will block the whole AsyncExecutor

Expand Down
6 changes: 5 additions & 1 deletion aiofutures/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,9 @@
from concurrent.futures import CancelledError, TimeoutError # noqa: F401


class InvalidStateError(Exception):
class AsyncExecutorError(Exception):
pass


class InvalidStateError(AsyncExecutorError):
pass
16 changes: 9 additions & 7 deletions aiofutures/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,28 @@
"""
import asyncio
from concurrent.futures import Executor, Future, ThreadPoolExecutor
from itertools import count
from threading import Thread, get_ident, main_thread
from typing import Awaitable, Callable, Optional, Set

from .exceptions import InvalidStateError
from .exceptions import AsyncExecutorError, InvalidStateError


class AsyncExecutor(Executor):
"""The executor that runs coroutines in a different thread."""
_loop: asyncio.AbstractEventLoop
_counter = count()

def __init__(self, executor: Optional[ThreadPoolExecutor] = None) -> None:
self._loop = asyncio.new_event_loop()
self._name = f'{self.__class__.__name__}-{next(self._counter)}'
self._stopped = False
self._thread_executor = executor
self._thread = Thread(target=self._run, name=self.__class__.__name__, daemon=True)
self._thread = Thread(target=self._run, name=self._name, daemon=True)
self._thread.start()

def __repr__(self) -> str:
cls = self.__class__.__name__
return f'<{cls} stopped={self._stopped} tasks={len(self.tasks)}>'
return f'<{self._name} stopped={self._stopped} tasks={len(self.tasks)}>'

@property
def tasks(self) -> Set[asyncio.Task]:
Expand All @@ -69,7 +71,7 @@ def submit(self, task: Callable[..., Awaitable], *args, **kwargs) -> Future: #
:param kwargs: kwargs to pass to a task
"""
if self._stopped:
raise InvalidStateError(f'The task was submitted after AsyncExecutor shutdown: {task}')
raise InvalidStateError(f'The task has been submitted after AsyncExecutor shutdown: {task}')
return self._submit(task, *args, **kwargs)

def sync_to_async(self, task: Callable, *args) -> asyncio.Future:
Expand All @@ -79,7 +81,7 @@ def sync_to_async(self, task: Callable, *args) -> asyncio.Future:
:param args: args to pass to a task
"""
if self._stopped:
raise InvalidStateError(f'The task was submitted after AsyncExecutor shutdown: {task}')
raise InvalidStateError(f'The task has been submitted after AsyncExecutor shutdown: {task}')
return self._loop.run_in_executor(self._thread_executor, task, *args)

def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None:
Expand Down Expand Up @@ -110,7 +112,7 @@ def cancel_futures(self) -> None:

def _run(self) -> None:
if get_ident() == main_thread().ident:
raise RuntimeError('AsyncExecutor has been tried to start in the main thread')
raise AsyncExecutorError('Async worker has been tried to start in the main thread')

if self._thread_executor:
self._loop.set_default_executor(self._thread_executor)
Expand Down
Loading

0 comments on commit 66d0c85

Please sign in to comment.