-
-
Notifications
You must be signed in to change notification settings - Fork 181
/
Copy pathtest_cli.py
90 lines (66 loc) · 2.82 KB
/
test_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
import pytest
from click.testing import CliRunner
from arq import logs
from arq.cli import cli
from arq.connections import RedisSettings
async def foobar(ctx):
return 42
class WorkerSettings:
burst = True
functions = [foobar]
@pytest.fixture(scope='module', autouse=True)
def setup_worker_connection(test_redis_host: str, test_redis_port: int):
WorkerSettings.redis_settings = RedisSettings(host=test_redis_host, port=test_redis_port)
def test_help():
runner = CliRunner()
result = runner.invoke(cli, ['--help'])
assert result.exit_code == 0
assert result.output.startswith('Usage: arq [OPTIONS] WORKER_SETTINGS\n')
def test_run(cancel_remaining_task, mocker, loop):
mocker.patch('asyncio.get_event_loop', lambda: loop)
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings'])
assert result.exit_code == 0
assert 'Starting worker for 1 functions: foobar' in result.output
def test_check(loop):
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--check'])
assert result.exit_code == 1
assert 'Health check failed: no health check sentinel value found' in result.output
async def mock_awatch():
yield [1]
def test_run_watch(mocker, cancel_remaining_task):
mocker.patch('watchfiles.awatch', return_value=mock_awatch())
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests'])
assert result.exit_code == 0
assert 'files changed, reloading arq worker...' in result.output
def test_multiple_workers(mocker, loop):
mocker.patch('asyncio.get_event_loop', lambda: loop)
runner = CliRunner()
result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4'])
assert 'clients_connected=4' in result.output
custom_log_dict = {
'version': 1,
'handlers': {'custom': {'level': 'ERROR', 'class': 'logging.StreamHandler', 'formatter': 'custom'}},
'formatters': {'custom': {'format': '%(asctime)s: %(message)s', 'datefmt': '%H:%M:%S'}},
'loggers': {'arq': {'handlers': ['custom'], 'level': 'ERROR'}},
}
@pytest.mark.parametrize(
'cli_argument,log_dict_to_use',
[
(None, logs.default_log_config(verbose=False)),
('--custom-log-dict=tests.test_cli.custom_log_dict', custom_log_dict),
],
)
def test_custom_log_dict(mocker, loop, cli_argument, log_dict_to_use):
mocker.patch('asyncio.get_event_loop', lambda: loop)
mock_dictconfig = mocker.MagicMock()
mocker.patch('logging.config.dictConfig', mock_dictconfig)
arq_arguments = ['tests.test_cli.WorkerSettings']
if cli_argument is not None:
arq_arguments.append(cli_argument)
runner = CliRunner()
result = runner.invoke(cli, arq_arguments)
assert result.exit_code == 0
mock_dictconfig.assert_called_with(log_dict_to_use)