From 6cb2b0425b2e14c5f55e2dee9c3aba739daa1842 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 11 Jun 2024 23:54:24 +0200 Subject: [PATCH 1/2] Added graceful reload on SIGHUP. (#330) --- docs/guide/cli.md | 9 +++++++++ taskiq/cli/worker/process_manager.py | 14 ++++++++++---- taskiq/cli/worker/run.py | 2 ++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/docs/guide/cli.md b/docs/guide/cli.md index fcfd94c..af40aa7 100644 --- a/docs/guide/cli.md +++ b/docs/guide/cli.md @@ -84,6 +84,15 @@ To enable this option simply pass the `--reload` or `-r` option to worker taskiq Also this option supports `.gitignore` files. If you have such file in your directory, it won't reload worker when you modify ignored files. To disable this functionality pass `--do-not-use-gitignore` option. +### Graceful reload + +To perform graceful reload, send `SIGHUP` signal to the main worker process. This action will reload all workers with new code. It's useful for deployment that requires zero downtime, but don't use orchestration tools like Kubernetes. + +```bash +taskiq worker my_module:broker +kill -HUP
+``` + ### Other parameters * `--no-configure-logging` - disables default logging configuration for workers. diff --git a/taskiq/cli/worker/process_manager.py b/taskiq/cli/worker/process_manager.py index e723956..24f0155 100644 --- a/taskiq/cli/worker/process_manager.py +++ b/taskiq/cli/worker/process_manager.py @@ -118,6 +118,7 @@ def schedule_workers_reload( def get_signal_handler( action_queue: "Queue[ProcessActionBase]", + action_to_send: ProcessActionBase, ) -> Callable[[int, Any], None]: """ Generate signal handler for main process. @@ -126,6 +127,7 @@ def get_signal_handler( the action queue. :param action_queue: event queue. + :param action_to_send: action that will be sent to the queue on signal. :returns: actual signal handler. """ @@ -134,7 +136,7 @@ def _signal_handler(signum: int, _frame: Any) -> None: raise KeyboardInterrupt logger.debug(f"Got signal {signum}.") - action_queue.put(ShutdownAction()) + action_queue.put(action_to_send) logger.warning("Workers are scheduled for shutdown.") return _signal_handler @@ -169,9 +171,13 @@ def __init__( recursive=True, ) - signal_handler = get_signal_handler(self.action_queue) - signal.signal(signal.SIGINT, signal_handler) - signal.signal(signal.SIGTERM, signal_handler) + shutdown_handler = get_signal_handler(self.action_queue, ShutdownAction()) + signal.signal(signal.SIGINT, shutdown_handler) + signal.signal(signal.SIGTERM, shutdown_handler) + signal.signal( + signal.SIGHUP, + get_signal_handler(self.action_queue, ReloadAllAction()), + ) self.workers: List[Process] = [] diff --git a/taskiq/cli/worker/run.py b/taskiq/cli/worker/run.py index d67b127..727a02a 100644 --- a/taskiq/cli/worker/run.py +++ b/taskiq/cli/worker/run.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import signal from concurrent.futures import ThreadPoolExecutor from multiprocessing import set_start_method @@ -172,6 +173,7 @@ def run_worker(args: WorkerArgs) -> Optional[int]: ) logging.getLogger("taskiq").setLevel(level=logging.getLevelName(args.log_level)) logging.getLogger("watchdog.observers.inotify_buffer").setLevel(level=logging.INFO) + logger.info("Pid of a main process: %s", str(os.getpid())) logger.info("Starting %s worker processes.", args.workers) observer = None From 945748b621d2742bb86be187e2881f4768fefba0 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Tue, 11 Jun 2024 23:55:53 +0200 Subject: [PATCH 2/2] Version bumped to 0.11.4. Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 56fe3f3..73cda53 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.11.3" +version = "0.11.4" description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "]