Skip to content

Commit

Permalink
feat: adds runworker command that will restart the worker on code cha…
Browse files Browse the repository at this point in the history
…nges
  • Loading branch information
joaquimds committed Dec 10, 2024
1 parent ef642d7 commit 7b64355
Show file tree
Hide file tree
Showing 5 changed files with 1,261 additions and 1,031 deletions.
164 changes: 80 additions & 84 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,85 +1,81 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Django",
"type": "debugpy",
"python": "${workspaceFolder}/.venv/bin/python",
"request": "launch",
"justMyCode": false,
"program": "${workspaceFolder}/manage.py",
"args": [
"runserver"
],
"django": true,
"presentation": {
"group": "power users",
"order": 1
}
},
{
"name": "NextJS",
"type": "node-terminal",
"command": "cd nextjs && npm install && npm run dev",
"request": "launch",
"presentation": {
"group": "power users",
"order": 1
}
},
{
"name": "Django background worker",
"type": "debugpy",
"python": "${workspaceFolder}/.venv/bin/python",
"request": "launch",
"program": "${workspaceFolder}/manage.py",
"args": [
"procrastinate", "worker"
],
"justMyCode": true,
"django": true,
"presentation": {
"group": "power users",
"order": 1
}
},
{
// Node
"name": "Generate GraphQL types for client",
"type": "node-terminal",
"command": "./bin/graphql_codegen.sh",
"request": "launch",
"presentation": {
"group": "power users",
"order": 1
}
}
],
"compounds": [
{
"name": "Run Mapped!",
"configurations": [
"NextJS",
"Django",
"Django background worker",
"Generate GraphQL types for client"
],
"presentation": {
"group": "0",
"order": 0
}
},
{
"name": "Backend",
"configurations": [
"Django",
"Django background worker",
"Generate GraphQL types for client"
],
"presentation": {
"group": "power users",
"order": 1
}
}
]
}
"version": "0.2.0",
"configurations": [
{
"name": "Django",
"type": "debugpy",
"python": "${workspaceFolder}/.venv/bin/python",
"request": "launch",
"justMyCode": false,
"program": "${workspaceFolder}/manage.py",
"args": ["runserver"],
"django": true,
"presentation": {
"group": "power users",
"order": 1
}
},
{
"name": "NextJS",
"type": "node-terminal",
"command": "cd nextjs && npm install && npm run dev",
"request": "launch",
"presentation": {
"group": "power users",
"order": 1
}
},
{
"name": "Django background worker",
"type": "debugpy",
"python": "${workspaceFolder}/.venv/bin/python",
"request": "launch",
"program": "${workspaceFolder}/manage.py",
"args": ["runworker"],
"justMyCode": true,
"django": true,
"presentation": {
"group": "power users",
"order": 1
}
},
{
// Node
"name": "Generate GraphQL types for client",
"type": "node-terminal",
"command": "./bin/graphql_codegen.sh",
"request": "launch",
"presentation": {
"group": "power users",
"order": 1
}
}
],
"compounds": [
{
"name": "Run Mapped!",
"configurations": [
"NextJS",
"Django",
"Django background worker",
"Generate GraphQL types for client"
],
"presentation": {
"group": "0",
"order": 0
}
},
{
"name": "Backend",
"configurations": [
"Django",
"Django background worker",
"Generate GraphQL types for client"
],
"presentation": {
"group": "power users",
"order": 1
}
}
]
}
101 changes: 101 additions & 0 deletions hub/management/commands/runworker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
from django.core.management.base import BaseCommand

Check failure on line 1 in hub/management/commands/runworker.py

View workflow job for this annotation

GitHub Actions / lint-python

Imports are incorrectly sorted and/or formatted.
from django.conf import settings
from procrastinate import cli
from procrastinate.contrib.django import app, django_connector, healthchecks

import argparse
import subprocess
import asyncio
import sys

from watchdog.events import FileSystemEvent, PatternMatchingEventHandler
from watchdog.observers import Observer
import logging

logger = logging.getLogger(__name__)


class TerminateWorkerEventHandler(PatternMatchingEventHandler):
"""
Terminate any worker_process saved on the event handler when
file changes are detected. This unblocks the main thread,
which is waiting for worker_process to complete before
restarting it.
"""

def __init__(self):
self.worker_process: subprocess.Popen | None = None
super().__init__(patterns=["*.py"])

def on_modified(self, event: FileSystemEvent) -> None:
logger.info(str(event))
if self.worker_process:
self.worker_process.terminate()


class Command(BaseCommand):
"""
Wraps the `procrastinate worker` command in watchdog,
to restart when files change.
Should only be used in development.
E.G. python manage.py runworker
"""

def add_arguments(self, parser):
"""
Taken from procrastinate/contrib/django/management/commands/procrastinate.py
Copies the worker command arguments from procrastinate and returns them, so that
the user can get useful output from `python manage.py runworker --help`.
"""
self._django_options = {a.dest for a in parser._actions}
temp_parser = argparse.ArgumentParser()
subparsers = temp_parser.add_subparsers(dest="command")
cli.configure_worker_parser(subparsers)
worker_parser = subparsers._name_parser_map["worker"]
for action in worker_parser._actions:
if action.dest not in self._django_options:
parser._add_action(action)

def handle(self, *args, **kwargs):
"""
Starts two threads:
1. The main thread: an infinite loop that starts a worker process, waits for it to
complete, then repeats.
2. The event observer thread: terminates the current worker process when file changes
are detected.
When the event observer terminates the worker process, it unblocks the main thread,
allowing it to loop and restart the worker.
The event observer cannot do both, as it can't wait for the worker and listen to
file changes at the same time.
"""
procrastinate_args = []
for arg in sys.argv:
if arg == "runworker":
procrastinate_args.append("procrastinate")
procrastinate_args.append("worker")
else:
procrastinate_args.append(arg)

print("Starting worker as subprocess and listening for file changes...")

event_handler = TerminateWorkerEventHandler()
observer = Observer()
observer.schedule(event_handler, settings.BASE_DIR, recursive=True)
# Starts event listener thread
observer.start()

while True:
worker_process = subprocess.Popen([sys.executable, *procrastinate_args])
# Save the active worker process on the event handler.
event_handler.worker_process = worker_process
# Waiting is necessary for correct process and signal handling.
# When the worker process is terminated by the event handler, this wait()
# completes and the loop restarts.
event_handler.worker_process.wait()
2 changes: 1 addition & 1 deletion local_intelligence_hub/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@
# Silence endless waiting for job log
"procrastinate.worker": {
"handlers": ["truncated"],
"level": "INFO",
"level": "DEBUG",
"propagate": False,
},
"django": {
Expand Down
Loading

0 comments on commit 7b64355

Please sign in to comment.