Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to split the tasks into different modules without PrometheusMiddleware crashing the worker #397

Open
waza-ari opened this issue Jan 4, 2025 · 0 comments

Comments

@waza-ari
Copy link

waza-ari commented Jan 4, 2025

Hello team,

in my project there's a relatively large amount of tasks, therefore I decided to split them into separate files. This works reasonably well, as soon as I start using the PrometheusMiddleware as well, it results in an error though, presumably because of the broker being imported in the tasks.

Folder layout

The project (simplified) is structured like this:

.
├── README.md
├── poetry.lock
├── pyproject.toml
└── src
    ├── backend
    ├── cli
    └── worker
       ├── __init__.py
       ├── broker.py
       ├── dependencies
        │   ├── database.py
        │   ├── [...]
        │   └── nats.py
       ├── invoicing
        │   ├── __init__.py
        │   ├── common.py
        │   ├── task_cron_generate_briefing_invoices.py
        │   ├── [... more tasks]
        │   └── task_upload_invoices_to_ev.py
       ├── notification
        │   ├── __init__.py
        │   ├── [... more tasks]
        │   └── task_handle_event.py
       └── users
            ├── __init__.py
            └── [... more tasks]

The broker.py files contains the main taskiq stuff:

import os

from taskiq import AsyncBroker, InMemoryBroker, PrometheusMiddleware, TaskiqEvents, TaskiqScheduler, TaskiqState
from taskiq.schedule_sources import LabelScheduleSource
from taskiq_nats import NatsBroker

from backend.core import sessionmanager, settings

from .middleware import LoggingMiddleware, worker_job_id

# Read environment variables
env = os.environ.get("ENVIRONMENT")

# Create a new NatsBroker instance with the NATS URL and queue name from the settings
broker: AsyncBroker = NatsBroker([settings.WORKER_NATS_URL], queue=settings.WORKER_NATS_QUEUE).with_middlewares(
    LoggingMiddleware(), PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000)
)
if env and env == "pytest":
    broker = InMemoryBroker()

# Create a new TaskiqScheduler instance which is needed to schedule tasks
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)


# Startup and shutdown events are used to initialize and close the database session
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
async def startup(state: TaskiqState) -> None:
    sessionmanager.init(worker_job_id.get)


@broker.on_event(TaskiqEvents.WORKER_SHUTDOWN)
async def shutdown(state: TaskiqState) -> None:
    sessionmanager.close()

To define a task, I then need to import the broker defined in the previous file, super stupid example:

from ..broker import broker

@broker.task(task_name="my_example_task")
async def my_example_task() -> None:
    pass

When running this without the PrometheusMiddleware, it works absolutely fine:

$ poetry run taskiq worker --fs-discover --tasks-pattern **/task_*.py --log-level DEBUG worker.broker:broker

[2025-01-04 16:41:08,068][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 50346
[2025-01-04 16:41:08,068][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-01-04 16:41:08,114][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 50356 
[2025-01-04 16:41:08,117][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 50357

As soon as I add the PrometheusMiddleware back as shown above, the workers crash due to the registry. I also saw #173, which may or may not be related, I'm not entirely sure about that.

poetry run taskiq worker --fs-discover --tasks-pattern **/task_*.py --log-level DEBUG worker.broker:broker
[2025-01-04 16:42:23,400][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 50465
[2025-01-04 16:42:23,400][taskiq.worker][INFO   ][MainProcess] Starting 2 worker processes.
[2025-01-04 16:42:23,410][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 50469 
[2025-01-04 16:42:23,414][taskiq.process-manager][INFO   ][MainProcess] Started process worker-1 with pid 50470 
Process worker-0:
Process worker-1:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/homebrew/Cellar/[email protected]/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/opt/homebrew/Cellar/[email protected]/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/cli/worker/run.py", line 133, in start_listen
    import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/cli/utils.py", line 99, in import_tasks
    import_from_modules(modules)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/cli/utils.py", line 66, in import_from_modules
    import_module(module)
    ~~~~~~~~~~~~~^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/importlib/__init__.py", line 88, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "/opt/homebrew/Cellar/[email protected]/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 1026, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/Users/my_username/Coding/Makerspace/PythonProjects/makerspace-fastapi/src/worker/__init__.py", line 3, in <module>
    from .briefings.task_offer_cancelled import briefing_offer_cancelled
  File "/Users/my_username/Coding/Makerspace/PythonProjects/makerspace-fastapi/src/worker/briefings/task_offer_cancelled.py", line 7, in <module>
    from ..broker import broker
  File "/opt/homebrew/Cellar/[email protected]/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/cli/worker/run.py", line 133, in start_listen
    import_tasks(args.modules, args.tasks_pattern, args.fs_discover)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Coding/Makerspace/PythonProjects/makerspace-fastapi/src/worker/broker.py", line 17, in <module>
    LoggingMiddleware(), PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000)
                         ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/cli/utils.py", line 99, in import_tasks
    import_from_modules(modules)
    ~~~~~~~~~~~~~~~~~~~^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/cli/utils.py", line 66, in import_from_modules
    import_module(module)
    ~~~~~~~~~~~~~^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.13.1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/importlib/__init__.py", line 88, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/middlewares/prometheus_middleware.py", line 53, in __init__
    self.found_errors = Counter(
                        ~~~~~~~^
        "found_errors",
        ^^^^^^^^^^^^^^^
        "Number of found errors",
        ^^^^^^^^^^^^^^^^^^^^^^^^^
        ["task_name"],
        ^^^^^^^^^^^^^^
    )
    ^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/prometheus_client/metrics.py", line 156, in __init__
    registry.register(self)
    ~~~~~~~~~~~~~~~~~^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/prometheus_client/registry.py", line 43, in register
    raise ValueError(
        'Duplicated timeseries in CollectorRegistry: {}'.format(
            duplicates))
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
ValueError: Duplicated timeseries in CollectorRegistry: {'found_errors', 'found_errors_total', 'found_errors_created'}
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1310, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 1026, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/Users/my_username/Coding/Makerspace/PythonProjects/makerspace-fastapi/src/worker/__init__.py", line 3, in <module>
    from .briefings.task_offer_cancelled import briefing_offer_cancelled
  File "/Users/my_username/Coding/Makerspace/PythonProjects/makerspace-fastapi/src/worker/briefings/task_offer_cancelled.py", line 7, in <module>
    from ..broker import broker
  File "/Users/my_username/Coding/Makerspace/PythonProjects/makerspace-fastapi/src/worker/broker.py", line 17, in <module>
    LoggingMiddleware(), PrometheusMiddleware(server_addr="0.0.0.0", server_port=9000)
                         ~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/taskiq/middlewares/prometheus_middleware.py", line 53, in __init__
    self.found_errors = Counter(
                        ~~~~~~~^
        "found_errors",
        ^^^^^^^^^^^^^^^
        "Number of found errors",
        ^^^^^^^^^^^^^^^^^^^^^^^^^
        ["task_name"],
        ^^^^^^^^^^^^^^
    )
    ^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/prometheus_client/metrics.py", line 156, in __init__
    registry.register(self)
    ~~~~~~~~~~~~~~~~~^^^^^^
  File "/Users/my_username/Library/Caches/pypoetry/virtualenvs/makerspace-fastapi-7V0aiXpE-py3.13/lib/python3.13/site-packages/prometheus_client/registry.py", line 43, in register
    raise ValueError(
        'Duplicated timeseries in CollectorRegistry: {}'.format(
            duplicates))
ValueError: Duplicated timeseries in CollectorRegistry: {'found_errors', 'found_errors_created', 'found_errors_total'}
[2025-01-04 16:42:26,632][taskiq.process-manager][INFO   ][MainProcess] worker-0 is dead. Scheduling reload.
[2025-01-04 16:42:26,634][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.
^C[2025-01-04 16:42:27,039][taskiq.process-manager][DEBUG  ][MainProcess] Got signal 2.
[2025-01-04 16:42:27,040][taskiq.process-manager][WARNING][MainProcess] Workers are scheduled for shutdown.
task: Signal received: "interrupt"
[2025-01-04 16:42:27,639][root][DEBUG  ][MainProcess] Got event: ReloadOneAction(worker_num=0, is_reload_all=False)
[2025-01-04 16:42:27,641][taskiq.process-manager][INFO   ][MainProcess] Process worker-0 restarted with pid 50478
[2025-01-04 16:42:27,742][root][DEBUG  ][MainProcess] Got event: ReloadOneAction(worker_num=1, is_reload_all=False)
[2025-01-04 16:42:27,744][taskiq.process-manager][INFO   ][MainProcess] Process worker-1 restarted with pid 50479
[2025-01-04 16:42:27,845][root][DEBUG  ][MainProcess] Got event: ShutdownAction()
[2025-01-04 16:42:27,845][taskiq.process-manager][DEBUG  ][MainProcess] Process manager closed, killing workers.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant