diff --git a/arq/cli.py b/arq/cli.py index 3d3aa300..b07c7657 100644 --- a/arq/cli.py +++ b/arq/cli.py @@ -2,6 +2,7 @@ import logging.config import os import sys +from multiprocessing import Process from signal import Signals from typing import TYPE_CHECKING, cast @@ -20,6 +21,7 @@ watch_help = 'Watch a directory and reload the worker upon changes.' verbose_help = 'Enable verbose output.' logdict_help = "Import path for a dictionary in logdict form, to configure Arq's own logging." +workers_help = 'Number of worker processes to spawn' @click.command('arq') @@ -28,9 +30,12 @@ @click.option('--burst/--no-burst', default=None, help=burst_help) @click.option('--check', is_flag=True, help=health_check_help) @click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help) +@click.option('-w', '--workers', type=int, default=1, help=workers_help) @click.option('-v', '--verbose', is_flag=True, help=verbose_help) @click.option('--custom-log-dict', type=str, help=logdict_help) -def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None: +def cli( + *, worker_settings: str, burst: bool, check: bool, watch: str, workers: int, verbose: bool, custom_log_dict: str +) -> None: """ Job queues in python with asyncio and redis. @@ -49,8 +54,15 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: else: kwargs = {} if burst is None else {'burst': burst} if watch: - asyncio.run(watch_reload(watch, worker_settings_)) + coroutine = watch_reload(watch, worker_settings_) + if workers > 1: + for _ in range(workers - 1): + Process(target=asyncio.run, args=(coroutine,)).start() + asyncio.run(coroutine) else: + if workers > 1: + for _ in range(workers - 1): + Process(target=run_worker, args=(worker_settings_,), kwargs=kwargs).start() run_worker(worker_settings_, **kwargs) diff --git a/tests/test_cli.py b/tests/test_cli.py index 8cd98028..bb1abbf4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,5 @@ +import time + import pytest from click.testing import CliRunner @@ -51,7 +53,16 @@ def test_run_watch(mocker, cancel_remaining_task): runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests']) assert result.exit_code == 0 - assert '1 files changes, reloading arq worker...' + assert 'files changed, reloading arq worker...' in result.output + + +@pytest.mark.timeout(10) # may take a while to get to the point we can test +def test_multiple_workers(): + runner = CliRunner() + result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4']) + while 'clients_connected=4' not in result.output: + time.sleep(1) + assert True custom_log_dict = {