Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added graceful reload on SIGHUP. #330

Merged
merged 2 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/guide/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,15 @@ To enable this option simply pass the `--reload` or `-r` option to worker taskiq
Also this option supports `.gitignore` files. If you have such file in your directory, it won't reload worker
when you modify ignored files. To disable this functionality pass `--do-not-use-gitignore` option.

### Graceful reload

To perform graceful reload, send `SIGHUP` signal to the main worker process. This action will reload all workers with new code. It's useful for deployment that requires zero downtime, but don't use orchestration tools like Kubernetes.

```bash
taskiq worker my_module:broker
kill -HUP <main pid>
```

### Other parameters

* `--no-configure-logging` - disables default logging configuration for workers.
Expand Down
14 changes: 10 additions & 4 deletions taskiq/cli/worker/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ def schedule_workers_reload(

def get_signal_handler(
action_queue: "Queue[ProcessActionBase]",
action_to_send: ProcessActionBase,
) -> Callable[[int, Any], None]:
"""
Generate signal handler for main process.
Expand All @@ -126,6 +127,7 @@ def get_signal_handler(
the action queue.

:param action_queue: event queue.
:param action_to_send: action that will be sent to the queue on signal.
:returns: actual signal handler.
"""

Expand All @@ -134,7 +136,7 @@ def _signal_handler(signum: int, _frame: Any) -> None:
raise KeyboardInterrupt

logger.debug(f"Got signal {signum}.")
action_queue.put(ShutdownAction())
action_queue.put(action_to_send)
logger.warning("Workers are scheduled for shutdown.")

return _signal_handler
Expand Down Expand Up @@ -169,9 +171,13 @@ def __init__(
recursive=True,
)

signal_handler = get_signal_handler(self.action_queue)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
shutdown_handler = get_signal_handler(self.action_queue, ShutdownAction())
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
signal.signal(
signal.SIGHUP,
get_signal_handler(self.action_queue, ReloadAllAction()),
)

self.workers: List[Process] = []

Expand Down
2 changes: 2 additions & 0 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import signal
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import set_start_method
Expand Down Expand Up @@ -172,6 +173,7 @@ def run_worker(args: WorkerArgs) -> Optional[int]:
)
logging.getLogger("taskiq").setLevel(level=logging.getLevelName(args.log_level))
logging.getLogger("watchdog.observers.inotify_buffer").setLevel(level=logging.INFO)
logger.info("Pid of a main process: %s", str(os.getpid()))
logger.info("Starting %s worker processes.", args.workers)

observer = None
Expand Down
Loading