Skip to content

Commit

Permalink
add workers arg to CLI
Browse files Browse the repository at this point in the history
fix test

add workers arg to CLI

fix test

try again

add sleep
  • Loading branch information
Graeme22 committed Jan 28, 2025
1 parent 7a911f3 commit 958b168
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
16 changes: 14 additions & 2 deletions arq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging.config
import os
import sys
from multiprocessing import Process
from signal import Signals
from typing import TYPE_CHECKING, cast

Expand All @@ -20,6 +21,7 @@
watch_help = 'Watch a directory and reload the worker upon changes.'
verbose_help = 'Enable verbose output.'
logdict_help = "Import path for a dictionary in logdict form, to configure Arq's own logging."
workers_help = 'Number of worker processes to spawn'


@click.command('arq')
Expand All @@ -28,9 +30,12 @@
@click.option('--burst/--no-burst', default=None, help=burst_help)
@click.option('--check', is_flag=True, help=health_check_help)
@click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help)
@click.option('-w', '--workers', type=int, default=1, help=workers_help)
@click.option('-v', '--verbose', is_flag=True, help=verbose_help)
@click.option('--custom-log-dict', type=str, help=logdict_help)
def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None:
def cli(
*, worker_settings: str, burst: bool, check: bool, watch: str, workers: int, verbose: bool, custom_log_dict: str
) -> None:
"""
Job queues in python with asyncio and redis.
Expand All @@ -49,8 +54,15 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose:
else:
kwargs = {} if burst is None else {'burst': burst}
if watch:
asyncio.run(watch_reload(watch, worker_settings_))
coroutine = watch_reload(watch, worker_settings_)
if workers > 1:
for _ in range(workers - 1):
Process(target=asyncio.run, args=(coroutine,)).start()
asyncio.run(coroutine)
else:
if workers > 1:
for _ in range(workers - 1):
Process(target=run_worker, args=(worker_settings_,), kwargs=kwargs).start()
run_worker(worker_settings_, **kwargs)


Expand Down
13 changes: 12 additions & 1 deletion tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import pytest
from click.testing import CliRunner

Expand Down Expand Up @@ -51,7 +53,16 @@ def test_run_watch(mocker, cancel_remaining_task):
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests'])
assert result.exit_code == 0
assert '1 files changes, reloading arq worker...'
assert 'files changed, reloading arq worker...' in result.output


@pytest.mark.timeout(10) # may take a while to get to the point we can test
def test_multiple_workers():
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4'])
while 'clients_connected=4' not in result.output:
time.sleep(1)
assert True


custom_log_dict = {
Expand Down

0 comments on commit 958b168

Please sign in to comment.