From dfe35e1d7bbdbffb87171bc18407000cefd7c65f Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 11 Aug 2022 00:01:16 +0400 Subject: [PATCH 1/6] Version bumped to 0.0.3. Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c71cb8e..9bf24b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.0.2" +version = "0.0.3" description = "Asynchronous task queue with async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "] From 86eba83aadf7470ae886376abe0ca3e4525a66a8 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 12 Aug 2022 01:58:56 +0400 Subject: [PATCH 2/6] Updated classifiers. Signed-off-by: Pavel Kirilin --- pyproject.toml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9bf24b7..a598af0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,9 +16,10 @@ classifiers = [ "Programming Language :: Python :: 3.8", "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", - "Topic :: Utilities", - "Topic :: System :: Networking", "Operating System :: OS Independent", + "Intended Audience :: Developers", + "Topic :: System :: Networking", + "Development Status :: 3 - Alpha", ] homepage = "https://github.com/taskiq-python/taskiq" keywords = ["taskiq", "tasks", "distributed", "async"] From 3d455878f6ba36187455bc60e1afc3fe71ad748d Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 12 Aug 2022 02:01:31 +0400 Subject: [PATCH 3/6] Version bumped to 0.0.4. Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index a598af0..a4eb7d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "taskiq" -version = "0.0.3" +version = "0.0.4" description = "Asynchronous task queue with async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "] From ed8826eccbc1774f700b77d974e7af06ef920276 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 12 Aug 2022 12:53:35 +0400 Subject: [PATCH 4/6] Added reload parameter. (#29) Signed-off-by: Pavel Kirilin --- poetry.lock | 51 ++++++++++++++++++++++++++++++++- pyproject.toml | 2 ++ taskiq/cli/args.py | 14 +++++++++ taskiq/cli/watcher.py | 41 +++++++++++++++++++++++++++ taskiq/cli/worker.py | 66 +++++++++++++++++++++++++++++++++++++++---- 5 files changed, 167 insertions(+), 7 deletions(-) create mode 100644 taskiq/cli/watcher.py diff --git a/poetry.lock b/poetry.lock index a9baa38..78df9cd 100644 --- a/poetry.lock +++ b/poetry.lock @@ -364,6 +364,14 @@ python-versions = ">=3.6" [package.dependencies] smmap = ">=3.0.1,<6" +[[package]] +name = "gitignore-parser" +version = "0.0.8" +description = "A spec-compliant gitignore parser for Python 3.5+" +category = "main" +optional = false +python-versions = "*" + [[package]] name = "gitpython" version = "3.1.27" @@ -798,6 +806,17 @@ platformdirs = ">=2,<3" docs = ["proselint (>=0.10.2)", "sphinx (>=3)", "sphinx-argparse (>=0.2.5)", "sphinx-rtd-theme (>=0.4.3)", "towncrier (>=21.3)"] testing = ["coverage (>=4)", "coverage-enable-subprocess (>=1)", "flaky (>=3)", "packaging (>=20.0)", "pytest (>=4)", "pytest-env (>=0.6.2)", "pytest-freezegun (>=0.4.1)", "pytest-mock (>=2)", "pytest-randomly (>=1)", "pytest-timeout (>=1)"] +[[package]] +name = "watchdog" +version = "2.1.9" +description = "Filesystem events monitoring" +category = "main" +optional = false +python-versions = ">=3.6" + +[package.extras] +watchmedo = ["PyYAML (>=3.10)"] + [[package]] name = "wemake-python-styleguide" version = "0.16.1" @@ -859,7 +878,7 @@ zmq = ["pyzmq"] [metadata] lock-version = "1.1" python-versions = "^3.7" -content-hash = "83f7d256e642da9a70ea6c5862da9f5b1a01b94e3197572cfb8ddb7ff4bd79d5" +content-hash = "05d77d5422fbc72146b71980f56348c48ee125ea1acfc31ef2bc44ad1e3d97e0" [metadata.files] astor = [ @@ -1103,6 +1122,9 @@ gitdb = [ {file = "gitdb-4.0.9-py3-none-any.whl", hash = "sha256:8033ad4e853066ba6ca92050b9df2f89301b8fc8bf7e9324d412a63f8bf1a8fd"}, {file = "gitdb-4.0.9.tar.gz", hash = "sha256:bac2fd45c0a1c9cf619e63a90d62bdc63892ef92387424b855792a6cabe789aa"}, ] +gitignore-parser = [ + {file = "gitignore_parser-0.0.8.tar.gz", hash = "sha256:b4e6e63d971810cbd2168bc02deddba719c07da655eaf56336d4bb0233ecef85"}, +] gitpython = [ {file = "GitPython-3.1.27-py3-none-any.whl", hash = "sha256:5b68b000463593e05ff2b261acff0ff0972df8ab1b70d3cdbd41b546c8b8fc3d"}, {file = "GitPython-3.1.27.tar.gz", hash = "sha256:1c885ce809e8ba2d88a29befeb385fcea06338d3640712b59ca623c220bb5704"}, @@ -1434,6 +1456,33 @@ virtualenv = [ {file = "virtualenv-20.16.2-py2.py3-none-any.whl", hash = "sha256:635b272a8e2f77cb051946f46c60a54ace3cb5e25568228bd6b57fc70eca9ff3"}, {file = "virtualenv-20.16.2.tar.gz", hash = "sha256:0ef5be6d07181946891f5abc8047fda8bc2f0b4b9bf222c64e6e8963baee76db"}, ] +watchdog = [ + {file = "watchdog-2.1.9-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:a735a990a1095f75ca4f36ea2ef2752c99e6ee997c46b0de507ba40a09bf7330"}, + {file = "watchdog-2.1.9-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:6b17d302850c8d412784d9246cfe8d7e3af6bcd45f958abb2d08a6f8bedf695d"}, + {file = "watchdog-2.1.9-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ee3e38a6cc050a8830089f79cbec8a3878ec2fe5160cdb2dc8ccb6def8552658"}, + {file = "watchdog-2.1.9-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:64a27aed691408a6abd83394b38503e8176f69031ca25d64131d8d640a307591"}, + {file = "watchdog-2.1.9-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:195fc70c6e41237362ba720e9aaf394f8178bfc7fa68207f112d108edef1af33"}, + {file = "watchdog-2.1.9-cp38-cp38-macosx_10_9_universal2.whl", hash = "sha256:bfc4d351e6348d6ec51df007432e6fe80adb53fd41183716017026af03427846"}, + {file = "watchdog-2.1.9-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:8250546a98388cbc00c3ee3cc5cf96799b5a595270dfcfa855491a64b86ef8c3"}, + {file = "watchdog-2.1.9-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:117ffc6ec261639a0209a3252546b12800670d4bf5f84fbd355957a0595fe654"}, + {file = "watchdog-2.1.9-cp39-cp39-macosx_10_9_universal2.whl", hash = "sha256:97f9752208f5154e9e7b76acc8c4f5a58801b338de2af14e7e181ee3b28a5d39"}, + {file = "watchdog-2.1.9-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:247dcf1df956daa24828bfea5a138d0e7a7c98b1a47cf1fa5b0c3c16241fcbb7"}, + {file = "watchdog-2.1.9-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:226b3c6c468ce72051a4c15a4cc2ef317c32590d82ba0b330403cafd98a62cfd"}, + {file = "watchdog-2.1.9-pp37-pypy37_pp73-macosx_10_9_x86_64.whl", hash = "sha256:d9820fe47c20c13e3c9dd544d3706a2a26c02b2b43c993b62fcd8011bcc0adb3"}, + {file = "watchdog-2.1.9-pp38-pypy38_pp73-macosx_10_9_x86_64.whl", hash = "sha256:70af927aa1613ded6a68089a9262a009fbdf819f46d09c1a908d4b36e1ba2b2d"}, + {file = "watchdog-2.1.9-pp39-pypy39_pp73-macosx_10_9_x86_64.whl", hash = "sha256:ed80a1628cee19f5cfc6bb74e173f1b4189eb532e705e2a13e3250312a62e0c9"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_aarch64.whl", hash = "sha256:9f05a5f7c12452f6a27203f76779ae3f46fa30f1dd833037ea8cbc2887c60213"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_armv7l.whl", hash = "sha256:255bb5758f7e89b1a13c05a5bceccec2219f8995a3a4c4d6968fe1de6a3b2892"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_i686.whl", hash = "sha256:d3dda00aca282b26194bdd0adec21e4c21e916956d972369359ba63ade616153"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_ppc64.whl", hash = "sha256:186f6c55abc5e03872ae14c2f294a153ec7292f807af99f57611acc8caa75306"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_ppc64le.whl", hash = "sha256:083171652584e1b8829581f965b9b7723ca5f9a2cd7e20271edf264cfd7c1412"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_s390x.whl", hash = "sha256:b530ae007a5f5d50b7fbba96634c7ee21abec70dc3e7f0233339c81943848dc1"}, + {file = "watchdog-2.1.9-py3-none-manylinux2014_x86_64.whl", hash = "sha256:4f4e1c4aa54fb86316a62a87b3378c025e228178d55481d30d857c6c438897d6"}, + {file = "watchdog-2.1.9-py3-none-win32.whl", hash = "sha256:5952135968519e2447a01875a6f5fc8c03190b24d14ee52b0f4b1682259520b1"}, + {file = "watchdog-2.1.9-py3-none-win_amd64.whl", hash = "sha256:7a833211f49143c3d336729b0020ffd1274078e94b0ae42e22f596999f50279c"}, + {file = "watchdog-2.1.9-py3-none-win_ia64.whl", hash = "sha256:ad576a565260d8f99d97f2e64b0f97a48228317095908568a9d5c786c829d428"}, + {file = "watchdog-2.1.9.tar.gz", hash = "sha256:43ce20ebb36a51f21fa376f76d1d4692452b2527ccd601950d69ed36b9e21609"}, +] wemake-python-styleguide = [ {file = "wemake-python-styleguide-0.16.1.tar.gz", hash = "sha256:4fcd78dd55732679b5fc8bc37fd7e04bbaa5cdc1b1a829ad265e8f6b0d853cf6"}, {file = "wemake_python_styleguide-0.16.1-py3-none-any.whl", hash = "sha256:202c22ecfee1f5caf0555048602cd52f2435cd57903e6b0cd46b5aaa3f652140"}, diff --git a/pyproject.toml b/pyproject.toml index a4eb7d4..ac9db21 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ typing-extensions = ">=3.10.0.0" pydantic = "^1.6.2" pyzmq = { version = "^23.2.0", optional = true } uvloop = { version = "^0.16.0", optional = true } +watchdog = "^2.1.9" +gitignore-parser = "^0.0.8" [tool.poetry.dev-dependencies] pytest = "^7.1.2" diff --git a/taskiq/cli/args.py b/taskiq/cli/args.py index 6fd9f29..4e11561 100644 --- a/taskiq/cli/args.py +++ b/taskiq/cli/args.py @@ -28,6 +28,8 @@ class TaskiqArgs: max_threadpool_threads: int no_parse: bool shutdown_timeout: float + reload: bool + no_gitignore: bool @classmethod def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WPS213 @@ -113,6 +115,18 @@ def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WP default=5, help="Maximum amount of time for graceful broker's shutdown is seconds.", ) + parser.add_argument( + "--reload", + "-r", + action="store_true", + help="Reload workers if file is changed.", + ) + parser.add_argument( + "--do-not-use-gitignore", + action="store_true", + dest="no_gitignore", + help="Do not use gitignore to check for updated files.", + ) if args is None: namespace = parser.parse_args(args) diff --git a/taskiq/cli/watcher.py b/taskiq/cli/watcher.py new file mode 100644 index 0000000..2fc7d5e --- /dev/null +++ b/taskiq/cli/watcher.py @@ -0,0 +1,41 @@ +from pathlib import Path +from typing import Callable + +from gitignore_parser import parse_gitignore +from watchdog.events import FileSystemEvent + + +class FileWatcher: + """Filewatcher that watchs for filesystem changes.""" + + def __init__( + self, + callback: Callable[[], None], + use_gitignore: bool = True, + ) -> None: + self.callback = callback + self.gitignore = None + gpath = Path("./.gitignore") + if use_gitignore and gpath.exists(): + self.gitignore = parse_gitignore(gpath) + + def dispatch(self, event: FileSystemEvent) -> None: + """ + React to event. + + This function checks wether we need to + react to event and calls callback if we do. + + :param event: incoming fs event. + """ + if event.is_directory: + return + if event.event_type == "closed": + return + if ".pytest_cache" in event.src_path: + return + if "__pycache__" in event.src_path: + return + if self.gitignore and self.gitignore(event.src_path): + return + self.callback() diff --git a/taskiq/cli/worker.py b/taskiq/cli/worker.py index 0d50622..05acdb3 100644 --- a/taskiq/cli/worker.py +++ b/taskiq/cli/worker.py @@ -7,12 +7,16 @@ from logging import basicConfig, getLevelName, getLogger from multiprocessing import Process from pathlib import Path +from queue import Queue from time import sleep from typing import Any, Generator, List +from watchdog.observers import Observer + from taskiq.abc.broker import AsyncBroker from taskiq.cli.args import TaskiqArgs from taskiq.cli.async_task_runner import async_listen_messages +from taskiq.cli.watcher import FileWatcher try: import uvloop # noqa: WPS433 @@ -25,6 +29,8 @@ restart_workers = True worker_processes: List[Process] = [] +observer = Observer() +reload_queue: "Queue[int]" = Queue(-1) def signal_handler(_signal: int, _frame: Any) -> None: @@ -45,9 +51,33 @@ def signal_handler(_signal: int, _frame: Any) -> None: # This is how we kill children, # by sending SIGINT to child processes. if process.pid is None: - process.kill() - else: + continue + try: os.kill(process.pid, signal.SIGINT) + except ProcessLookupError: + continue + process.join() + if observer.is_alive(): + observer.stop() + observer.join() + + +def schedule_workers_reload() -> None: + """ + Function to schedule workers to restart. + + This function adds worker ids to the queue. + + This queue is later read in watcher loop. + """ + global worker_processes # noqa: WPS420 + global reload_queue # noqa: WPS420 + + logger.info("Reloading workers") + for worker_id, _ in enumerate(worker_processes): + reload_queue.put(worker_id) + logger.info("Worker %s scheduled to reload", worker_id) + reload_queue.join() @contextmanager @@ -212,13 +242,16 @@ def interrupt_handler(_signum: int, _frame: Any) -> None: loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout)) -def watch_workers_restarts(args: TaskiqArgs) -> None: +def watcher_loop(args: TaskiqArgs) -> None: # noqa: C901, WPS213 """ Infinate loop for main process. This loop restarts worker processes if they exit with error returncodes. + Also it reads process ids from reload_queue + and reloads workers if they were scheduled to reload. + :param args: cli arguements. """ global worker_processes # noqa: WPS420 @@ -228,6 +261,18 @@ def watch_workers_restarts(args: TaskiqArgs) -> None: # List of processes to remove. sleep(1) process_to_remove = [] + while not reload_queue.empty(): + process_id = reload_queue.get() + worker_processes[process_id].terminate() + worker_processes[process_id].join() + worker_processes[process_id] = Process( + target=start_listen, + kwargs={"args": args}, + name=f"worker-{process_id}", + ) + worker_processes[process_id].start() + reload_queue.task_done() + for worker_id, worker in enumerate(worker_processes): if worker.is_alive(): continue @@ -241,14 +286,13 @@ def watch_workers_restarts(args: TaskiqArgs) -> None: worker_processes[worker_id].start() else: logger.info("Worker-%s terminated.", worker_id) - worker.join() process_to_remove.append(worker) for dead_process in process_to_remove: worker_processes.remove(dead_process) -def run_worker(args: TaskiqArgs) -> None: +def run_worker(args: TaskiqArgs) -> None: # noqa: WPS213 """ This function starts worker processes. @@ -279,7 +323,17 @@ def run_worker(args: TaskiqArgs) -> None: ) worker_processes.append(work_proc) + if args.reload: + observer.schedule( + FileWatcher( + callback=schedule_workers_reload, + use_gitignore=not args.no_gitignore, + ), + path=".", + recursive=True, + ) + observer.start() signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) - watch_workers_restarts(args=args) + watcher_loop(args=args) From 9a18d42d20a852499a2ed96e088632bd2cc8d446 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 12 Aug 2022 15:01:17 +0400 Subject: [PATCH 5/6] Fixed reload atomicity. (#30) At some point the reload process could get invalid index error. Signed-off-by: Pavel Kirilin --- taskiq/cli/worker.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/taskiq/cli/worker.py b/taskiq/cli/worker.py index 05acdb3..31cfe0f 100644 --- a/taskiq/cli/worker.py +++ b/taskiq/cli/worker.py @@ -30,7 +30,7 @@ restart_workers = True worker_processes: List[Process] = [] observer = Observer() -reload_queue: "Queue[int]" = Queue(-1) +reload_queue: "Queue[bool]" = Queue(-1) def signal_handler(_signal: int, _frame: Any) -> None: @@ -73,10 +73,8 @@ def schedule_workers_reload() -> None: global worker_processes # noqa: WPS420 global reload_queue # noqa: WPS420 - logger.info("Reloading workers") - for worker_id, _ in enumerate(worker_processes): - reload_queue.put(worker_id) - logger.info("Worker %s scheduled to reload", worker_id) + reload_queue.put(True) + logger.info("Scheduled workers reload.") reload_queue.join() @@ -261,17 +259,20 @@ def watcher_loop(args: TaskiqArgs) -> None: # noqa: C901, WPS213 # List of processes to remove. sleep(1) process_to_remove = [] - while not reload_queue.empty(): - process_id = reload_queue.get() - worker_processes[process_id].terminate() - worker_processes[process_id].join() - worker_processes[process_id] = Process( - target=start_listen, - kwargs={"args": args}, - name=f"worker-{process_id}", - ) - worker_processes[process_id].start() - reload_queue.task_done() + if not reload_queue.empty(): + while not reload_queue.empty(): + reload_queue.get() + reload_queue.task_done() + + for worker_id, worker in enumerate(worker_processes): + worker.terminate() + worker.join() + worker_processes[worker_id] = Process( + target=start_listen, + kwargs={"args": args}, + name=f"worker-{worker_id}", + ) + worker_processes[worker_id].start() for worker_id, worker in enumerate(worker_processes): if worker.is_alive(): From c81f43de9b9998bcdd0284802eb07ac602e5731e Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Fri, 12 Aug 2022 15:01:35 +0400 Subject: [PATCH 6/6] Description updated. (#31) Signed-off-by: Pavel Kirilin --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index ac9db21..c7e42e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "taskiq" version = "0.0.4" -description = "Asynchronous task queue with async support" +description = "Distributed task queue with full async support" authors = ["Pavel Kirilin "] maintainers = ["Pavel Kirilin "] readme = "README.md"