From 958b16862e9f021bd3a32f12258678775431d5de Mon Sep 17 00:00:00 2001 From: Graeme Holliday Date: Mon, 27 Jan 2025 20:31:53 -0500 Subject: [PATCH 1/3] add workers arg to CLI fix test add workers arg to CLI fix test try again add sleep --- arq/cli.py | 16 ++++++++++++++-- tests/test_cli.py | 13 ++++++++++++- 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/arq/cli.py b/arq/cli.py index 3d3aa300..b07c7657 100644 --- a/arq/cli.py +++ b/arq/cli.py @@ -2,6 +2,7 @@ import logging.config import os import sys +from multiprocessing import Process from signal import Signals from typing import TYPE_CHECKING, cast @@ -20,6 +21,7 @@ watch_help = 'Watch a directory and reload the worker upon changes.' verbose_help = 'Enable verbose output.' logdict_help = "Import path for a dictionary in logdict form, to configure Arq's own logging." +workers_help = 'Number of worker processes to spawn' @click.command('arq') @@ -28,9 +30,12 @@ @click.option('--burst/--no-burst', default=None, help=burst_help) @click.option('--check', is_flag=True, help=health_check_help) @click.option('--watch', type=click.Path(exists=True, dir_okay=True, file_okay=False), help=watch_help) +@click.option('-w', '--workers', type=int, default=1, help=workers_help) @click.option('-v', '--verbose', is_flag=True, help=verbose_help) @click.option('--custom-log-dict', type=str, help=logdict_help) -def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: bool, custom_log_dict: str) -> None: +def cli( + *, worker_settings: str, burst: bool, check: bool, watch: str, workers: int, verbose: bool, custom_log_dict: str +) -> None: """ Job queues in python with asyncio and redis. @@ -49,8 +54,15 @@ def cli(*, worker_settings: str, burst: bool, check: bool, watch: str, verbose: else: kwargs = {} if burst is None else {'burst': burst} if watch: - asyncio.run(watch_reload(watch, worker_settings_)) + coroutine = watch_reload(watch, worker_settings_) + if workers > 1: + for _ in range(workers - 1): + Process(target=asyncio.run, args=(coroutine,)).start() + asyncio.run(coroutine) else: + if workers > 1: + for _ in range(workers - 1): + Process(target=run_worker, args=(worker_settings_,), kwargs=kwargs).start() run_worker(worker_settings_, **kwargs) diff --git a/tests/test_cli.py b/tests/test_cli.py index 8cd98028..bb1abbf4 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,3 +1,5 @@ +import time + import pytest from click.testing import CliRunner @@ -51,7 +53,16 @@ def test_run_watch(mocker, cancel_remaining_task): runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--watch', 'tests']) assert result.exit_code == 0 - assert '1 files changes, reloading arq worker...' + assert 'files changed, reloading arq worker...' in result.output + + +@pytest.mark.timeout(10) # may take a while to get to the point we can test +def test_multiple_workers(): + runner = CliRunner() + result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4']) + while 'clients_connected=4' not in result.output: + time.sleep(1) + assert True custom_log_dict = { From b5a92aba9561e447da825101b9fd1000026dd149 Mon Sep 17 00:00:00 2001 From: Graeme Holliday Date: Mon, 27 Jan 2025 21:43:41 -0500 Subject: [PATCH 2/3] try to fix test --- tests/test_cli.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/tests/test_cli.py b/tests/test_cli.py index bb1abbf4..64a3ebca 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,5 +1,3 @@ -import time - import pytest from click.testing import CliRunner @@ -56,13 +54,11 @@ def test_run_watch(mocker, cancel_remaining_task): assert 'files changed, reloading arq worker...' in result.output -@pytest.mark.timeout(10) # may take a while to get to the point we can test -def test_multiple_workers(): +def test_multiple_workers(mocker, loop): + mocker.patch('asyncio.get_event_loop', lambda: loop) runner = CliRunner() result = runner.invoke(cli, ['tests.test_cli.WorkerSettings', '--workers', '4']) - while 'clients_connected=4' not in result.output: - time.sleep(1) - assert True + assert 'clients_connected=4' in result.output custom_log_dict = { From 309ba0b0b427330e702b03b3fb614cfcddc627f3 Mon Sep 17 00:00:00 2001 From: Graeme Holliday Date: Fri, 25 Jul 2025 14:20:42 -0500 Subject: [PATCH 3/3] allow later redis version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index cc7b10fc..80321718 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,7 +38,7 @@ classifiers = [ ] requires-python = '>=3.8' dependencies = [ - 'redis[hiredis]>=4.2.0,<6', + 'redis[hiredis]>=4.2.0', 'click>=8.0', ] optional-dependencies = {watch = ['watchfiles>=0.16'] }