Skip to content

Commit

Permalink
Merge branch 'release/0.0.4'
Browse files Browse the repository at this point in the history
Signed-off-by: Pavel Kirilin <[email protected]>
  • Loading branch information
s3rius committed Aug 12, 2022
2 parents 739f227 + c81f43d commit 4e956c1
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 11 deletions.
51 changes: 50 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "taskiq"
version = "0.0.3"
description = "Asynchronous task queue with async support"
version = "0.0.4"
description = "Distributed task queue with full async support"
authors = ["Pavel Kirilin <[email protected]>"]
maintainers = ["Pavel Kirilin <[email protected]>"]
readme = "README.md"
Expand All @@ -16,9 +16,10 @@ classifiers = [
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Utilities",
"Topic :: System :: Networking",
"Operating System :: OS Independent",
"Intended Audience :: Developers",
"Topic :: System :: Networking",
"Development Status :: 3 - Alpha",
]
homepage = "https://github.com/taskiq-python/taskiq"
keywords = ["taskiq", "tasks", "distributed", "async"]
Expand All @@ -29,6 +30,8 @@ typing-extensions = ">=3.10.0.0"
pydantic = "^1.6.2"
pyzmq = { version = "^23.2.0", optional = true }
uvloop = { version = "^0.16.0", optional = true }
watchdog = "^2.1.9"
gitignore-parser = "^0.0.8"

[tool.poetry.dev-dependencies]
pytest = "^7.1.2"
Expand Down
14 changes: 14 additions & 0 deletions taskiq/cli/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ class TaskiqArgs:
max_threadpool_threads: int
no_parse: bool
shutdown_timeout: float
reload: bool
no_gitignore: bool

@classmethod
def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WPS213
Expand Down Expand Up @@ -113,6 +115,18 @@ def from_cli(cls, args: Optional[List[str]] = None) -> "TaskiqArgs": # noqa: WP
default=5,
help="Maximum amount of time for graceful broker's shutdown is seconds.",
)
parser.add_argument(
"--reload",
"-r",
action="store_true",
help="Reload workers if file is changed.",
)
parser.add_argument(
"--do-not-use-gitignore",
action="store_true",
dest="no_gitignore",
help="Do not use gitignore to check for updated files.",
)

if args is None:
namespace = parser.parse_args(args)
Expand Down
41 changes: 41 additions & 0 deletions taskiq/cli/watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from pathlib import Path
from typing import Callable

from gitignore_parser import parse_gitignore
from watchdog.events import FileSystemEvent


class FileWatcher:
"""Filewatcher that watchs for filesystem changes."""

def __init__(
self,
callback: Callable[[], None],
use_gitignore: bool = True,
) -> None:
self.callback = callback
self.gitignore = None
gpath = Path("./.gitignore")
if use_gitignore and gpath.exists():
self.gitignore = parse_gitignore(gpath)

def dispatch(self, event: FileSystemEvent) -> None:
"""
React to event.
This function checks wether we need to
react to event and calls callback if we do.
:param event: incoming fs event.
"""
if event.is_directory:
return
if event.event_type == "closed":
return
if ".pytest_cache" in event.src_path:
return
if "__pycache__" in event.src_path:
return
if self.gitignore and self.gitignore(event.src_path):
return
self.callback()
67 changes: 61 additions & 6 deletions taskiq/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
from logging import basicConfig, getLevelName, getLogger
from multiprocessing import Process
from pathlib import Path
from queue import Queue
from time import sleep
from typing import Any, Generator, List

from watchdog.observers import Observer

from taskiq.abc.broker import AsyncBroker
from taskiq.cli.args import TaskiqArgs
from taskiq.cli.async_task_runner import async_listen_messages
from taskiq.cli.watcher import FileWatcher

try:
import uvloop # noqa: WPS433
Expand All @@ -25,6 +29,8 @@

restart_workers = True
worker_processes: List[Process] = []
observer = Observer()
reload_queue: "Queue[bool]" = Queue(-1)


def signal_handler(_signal: int, _frame: Any) -> None:
Expand All @@ -45,9 +51,31 @@ def signal_handler(_signal: int, _frame: Any) -> None:
# This is how we kill children,
# by sending SIGINT to child processes.
if process.pid is None:
process.kill()
else:
continue
try:
os.kill(process.pid, signal.SIGINT)
except ProcessLookupError:
continue
process.join()
if observer.is_alive():
observer.stop()
observer.join()


def schedule_workers_reload() -> None:
"""
Function to schedule workers to restart.
This function adds worker ids to the queue.
This queue is later read in watcher loop.
"""
global worker_processes # noqa: WPS420
global reload_queue # noqa: WPS420

reload_queue.put(True)
logger.info("Scheduled workers reload.")
reload_queue.join()


@contextmanager
Expand Down Expand Up @@ -212,13 +240,16 @@ def interrupt_handler(_signum: int, _frame: Any) -> None:
loop.run_until_complete(shutdown_broker(broker, args.shutdown_timeout))


def watch_workers_restarts(args: TaskiqArgs) -> None:
def watcher_loop(args: TaskiqArgs) -> None: # noqa: C901, WPS213
"""
Infinate loop for main process.
This loop restarts worker processes
if they exit with error returncodes.
Also it reads process ids from reload_queue
and reloads workers if they were scheduled to reload.
:param args: cli arguements.
"""
global worker_processes # noqa: WPS420
Expand All @@ -228,6 +259,21 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
# List of processes to remove.
sleep(1)
process_to_remove = []
if not reload_queue.empty():
while not reload_queue.empty():
reload_queue.get()
reload_queue.task_done()

for worker_id, worker in enumerate(worker_processes):
worker.terminate()
worker.join()
worker_processes[worker_id] = Process(
target=start_listen,
kwargs={"args": args},
name=f"worker-{worker_id}",
)
worker_processes[worker_id].start()

for worker_id, worker in enumerate(worker_processes):
if worker.is_alive():
continue
Expand All @@ -241,14 +287,13 @@ def watch_workers_restarts(args: TaskiqArgs) -> None:
worker_processes[worker_id].start()
else:
logger.info("Worker-%s terminated.", worker_id)
worker.join()
process_to_remove.append(worker)

for dead_process in process_to_remove:
worker_processes.remove(dead_process)


def run_worker(args: TaskiqArgs) -> None:
def run_worker(args: TaskiqArgs) -> None: # noqa: WPS213
"""
This function starts worker processes.
Expand Down Expand Up @@ -279,7 +324,17 @@ def run_worker(args: TaskiqArgs) -> None:
)
worker_processes.append(work_proc)

if args.reload:
observer.schedule(
FileWatcher(
callback=schedule_workers_reload,
use_gitignore=not args.no_gitignore,
),
path=".",
recursive=True,
)
observer.start()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

watch_workers_restarts(args=args)
watcher_loop(args=args)

0 comments on commit 4e956c1

Please sign in to comment.