Skip to content

Commit

Permalink
fix: hanging Python async workers (#132)
Browse files Browse the repository at this point in the history
* chore: fix hanging python WorkerProcess shutdown

* fix: progress for empty project

* chore: use gunicorn as HTTP process manager

* chore: bump ES to `7.13.4`
  • Loading branch information
ClemDoum authored Dec 19, 2023
1 parent b22284a commit 1ae94ac
Show file tree
Hide file tree
Showing 17 changed files with 1,277 additions and 1,112 deletions.
72 changes: 44 additions & 28 deletions neo4j-app/neo4j_app/app/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import concurrent.futures
import inspect
import logging
import multiprocessing
Expand Down Expand Up @@ -35,7 +36,7 @@
_TASK_MANAGER: Optional[TaskManager] = None
_TEST_DB_FILE: Optional[Path] = None
_TEST_LOCK: Optional[multiprocessing.Lock] = None
_WORKER_POOL: Optional[multiprocessing.Pool] = None
_PROCESS_EXECUTOR: Optional[concurrent.futures.ProcessPoolExecutor] = None


class DependencyInjectionError(RuntimeError):
Expand Down Expand Up @@ -151,17 +152,24 @@ def _lifespan_test_lock() -> multiprocessing.Lock:
return cast(multiprocessing.Lock, _TEST_LOCK)


def worker_pool_enter(**_):
def process_executor_enter(**_):
# pylint: disable=consider-using-with
config = lifespan_config()
global _WORKER_POOL
_WORKER_POOL = multiprocessing.Pool(processes=config.neo4j_app_n_async_workers)
_WORKER_POOL.__enter__() # pylint: disable=unnecessary-dunder-call
global _PROCESS_EXECUTOR
_PROCESS_EXECUTOR = multiprocessing.Pool(processes=config.neo4j_app_n_async_workers)
_PROCESS_EXECUTOR.__enter__() # pylint: disable=unnecessary-dunder-call
process_id = os.getpid()
config = lifespan_config()
n_workers = min(config.neo4j_app_n_async_workers, config.neo4j_app_task_queue_size)
n_workers = max(1, n_workers)
# TODO: let the process choose they ID and set it with the worker process ID,
# this will help debugging
worker_ids = [f"worker-{process_id}-{i}" for i in range(n_workers)]

_PROCESS_EXECUTOR = concurrent.futures.ProcessPoolExecutor( # pylint: disable=unnecessary-dunder-call
max_workers=n_workers,
mp_context=multiprocessing.get_context("spawn"),
).__enter__()
kwargs = dict()
worker_cls = config.to_worker_cls()
if worker_cls.__name__ == "MockWorker":
Expand All @@ -171,24 +179,25 @@ def worker_pool_enter(**_):
for w_id in worker_ids:
kwargs.update({"worker_id": w_id})
logger.info("starting worker %s", w_id)
_WORKER_POOL.apply_async(worker_cls.work_forever_from_config, kwds=kwargs)
_PROCESS_EXECUTOR.submit(worker_cls.work_forever_from_config, **kwargs)

logger.info("worker pool ready !")


def worker_pool_exit(exc_type, exc_value, trace):
def process_executor_exit(exc_type, exc_value, trace):
# pylint: disable=unused-argument
pool = lifespan_worker_pool()
pool.__exit__(exc_type, exc_value, trace)
pool = lifespan_process_executor()
pool.shutdown(wait=False)
logger.debug("async worker pool has shut down !")


def lifespan_worker_pool() -> multiprocessing.Pool:
if _WORKER_POOL is None:
def lifespan_process_executor() -> concurrent.futures.ProcessPoolExecutor:
if _PROCESS_EXECUTOR is None:
raise DependencyInjectionError("worker pool")
return cast(multiprocessing.Pool, _WORKER_POOL)
return cast(concurrent.futures.ProcessPoolExecutor, _PROCESS_EXECUTOR)


def task_task_manager_enter(**_):
def task_manager_enter(**_):
global _TASK_MANAGER
config = lifespan_config()
if config.test:
Expand Down Expand Up @@ -251,18 +260,18 @@ def lifespan_event_publisher() -> EventPublisher:


FASTAPI_LIFESPAN_DEPS = [
(config_enter, None),
(loggers_enter, None),
(neo4j_driver_enter, neo4j_driver_exit),
(create_project_registry_db_enter, None),
(es_client_enter, es_client_exit),
(test_process_manager_enter, test_process_manager_exit),
(test_db_path_enter, test_db_path_exit),
(_test_lock_enter, None),
(task_task_manager_enter, None),
(event_publisher_enter, None),
(worker_pool_enter, worker_pool_exit),
(migrate_app_db_enter, None),
("configuration reading", config_enter, None),
("loggers setup", loggers_enter, None),
("neo4j driver creation", neo4j_driver_enter, neo4j_driver_exit),
("neo4j project registry creation", create_project_registry_db_enter, None),
("ES client creation", es_client_enter, es_client_exit),
(None, test_process_manager_enter, test_process_manager_exit),
(None, test_db_path_enter, test_db_path_exit),
(None, _test_lock_enter, None),
("task manager creation", task_manager_enter, None),
("event publisher creation", event_publisher_enter, None),
("async worker executor creation", process_executor_enter, process_executor_exit),
("neo4j DB migration", migrate_app_db_enter, None),
]


Expand Down Expand Up @@ -293,13 +302,16 @@ async def run_deps(dependencies: List, **kwargs) -> AsyncGenerator[None, None]:
original_ex = None
try:
with _log_and_reraise():
for enter_fn, exit_fn in dependencies:
logger.info("applying dependencies...")
for name, enter_fn, exit_fn in dependencies:
if enter_fn is not None:
if name is not None:
logger.debug("applying: %s", name)
if inspect.iscoroutinefunction(enter_fn):
await enter_fn(**kwargs)
else:
enter_fn(**kwargs)
to_close.append(exit_fn)
to_close.append((name, exit_fn))
yield
except Exception as e: # pylint: disable=broad-exception-caught
original_ex = e
Expand All @@ -308,16 +320,20 @@ async def run_deps(dependencies: List, **kwargs) -> AsyncGenerator[None, None]:
to_raise = []
if original_ex is not None:
to_raise.append(original_ex)
for exit_fn in to_close[::-1]:
logger.info("rolling back dependencies...")
for name, exit_fn in to_close[::-1]:
if exit_fn is None:
continue
try:
if name is not None:
logger.debug("rolling back %s", name)
exc_info = sys.exc_info()
if inspect.iscoroutinefunction(exit_fn):
await exit_fn(*exc_info)
else:
exit_fn(*exc_info)
except Exception as e: # pylint: disable=broad-exception-caught
to_raise.append(e)
logger.debug("rolled back all dependencies !")
if to_raise:
raise RuntimeError(to_raise)
4 changes: 2 additions & 2 deletions neo4j-app/neo4j_app/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
lifespan_es_client,
lifespan_neo4j_driver,
lifespan_task_manager,
lifespan_worker_pool,
lifespan_process_executor,
)
from neo4j_app.app.doc import OTHER_TAG
from neo4j_app.core import AppConfig
Expand All @@ -25,7 +25,7 @@ async def ping() -> str:
await driver.verify_connectivity()
lifespan_es_client()
lifespan_task_manager()
lifespan_worker_pool()
lifespan_process_executor()
except (DriverError, DependencyInjectionError) as e:
raise HTTPException(503, detail="Service Unavailable") from e
return "pong"
Expand Down
4 changes: 1 addition & 3 deletions neo4j-app/neo4j_app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ class AppConfig(LowerCamelCaseModel, IgnoreExtraModel):
neo4j_app_async_app: str = "neo4j_app.tasks.app"
neo4j_app_async_dependencies: Optional[str] = "neo4j_app.tasks.WORKER_LIFESPAN_DEPS"
neo4j_app_host: str = "127.0.0.1"
neo4j_app_gunicorn_workers: int = 1
neo4j_app_log_level: str = "INFO"
neo4j_app_log_in_json: bool = False
neo4j_app_max_records_in_memory: int = int(1e6)
Expand Down Expand Up @@ -153,9 +154,6 @@ def set_config_globally(cls, value: AppConfig):
raise ValueError("Can't set config globally twice")
cls._global = value

def to_uvicorn(self) -> UviCornModel:
return UviCornModel(port=self.neo4j_app_port)

@property
def neo4j_uri(self) -> str:
return f"{self.neo4j_uri_scheme}://{self.neo4j_host}:{self.neo4j_port}"
Expand Down
2 changes: 2 additions & 0 deletions neo4j-app/neo4j_app/core/elasticsearch/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,8 @@ async def _fill_import_queue(
res_it = run_with_concurrency(count_tasks, max_concurrency=max_concurrency)
ne_counts = [c[COUNT] async for c in res_it]
ne_counts = sum(ne_counts)
if not ne_counts:
return 0
raw_progress = to_raw_progress(progress, max_progress=ne_counts)
gens = [
self._fill_import_buffer(
Expand Down
4 changes: 0 additions & 4 deletions neo4j-app/neo4j_app/icij_worker/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,6 @@ class RecoverableError(ICIJWorkerError, Exception):
...


class WorkerCancelled(BaseException, ICIJWorkerError):
...


class UnknownTask(ICIJWorkerError, ValueError):
def __init__(self, task_id: str, worker_id: Optional[str] = None):
msg = f'Unknown task "{task_id}"'
Expand Down
37 changes: 16 additions & 21 deletions neo4j-app/neo4j_app/icij_worker/worker/process.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,33 @@
import functools
import signal
import sys
from abc import ABC
from typing import Callable, cast

from neo4j_app.icij_worker.exceptions import WorkerCancelled
from neo4j_app.icij_worker.worker.worker import Worker

_HANDLE_SIGNALS = [
signal.SIGINT,
signal.SIGTERM,
]
if sys.platform == "win32":
_HANDLE_SIGNALS += [signal.CTRL_C_EVENT, signal.CTRL_BREAK_EVENT]


class ProcessWorkerMixin(Worker, ABC):
async def _aenter__(self):
await super()._aenter__()
self._setup_signal_handlers()

def _signal_handler(
self,
signal_name: int,
_,
__, # pylint: disable=invalid-name
*,
graceful: bool,
):
if not self._already_shutdown:
self.error("received %s", signal_name)
self._graceful_shutdown = graceful
raise WorkerCancelled()
def _signal_handler(self, signal_name: signal.Signals, *, graceful: bool):
self.error("received %s", signal_name)
self._graceful_shutdown = graceful
if self._work_forever_task is not None:
self.info("cancelling worker loop...")
self._work_forever_task.cancel()

def _setup_signal_handlers(self):
# Let's always shutdown gracefully for now since when the server shutdown
# it will try to SIGTERM, we want to avoid loosing track of running tasks
for s in ["SIGINT", "SIGTERM", "CTRL_C_EVENT", "CTRL_BREAK_EVENT"]:
try:
signalnum = getattr(signal, s)
except AttributeError:
continue
for s in _HANDLE_SIGNALS:
handler = functools.partial(self._signal_handler, s, graceful=True)
handler = cast(Callable[[int], None], handler)
signal.signal(signalnum, handler)
self._loop.add_signal_handler(s, handler)
Loading

0 comments on commit 1ae94ac

Please sign in to comment.