Limiting concurrent jobs #98
Replies: 4 comments 2 replies
-
I've been thinking about something like this myself. My concern is that this needs to happen before the job gets dispatched to the consumers, as they will just start clogging up due to exhaustion of file descriptors. |
Beta Was this translation helpful? Give feedback.
-
Cool, looks like i was wrong. Ive run the below script until i got 1.5e6 tasks. So i prev. statement was wrong 😄 import asyncio
import asyncpg
from pgqueuer.db import AsyncpgDriver
from pgqueuer.models import Job
from pgqueuer.qm import QueueManager
from pgqueuer.queries import Queries
async def main() -> None:
qm_conn = await asyncpg.connect()
driver = AsyncpgDriver(qm_conn)
qm = QueueManager(driver)
@qm.entrypoint("fetch")
async def fetch(job: Job) -> None:
await asyncio.sleep(float("inf"))
enq_conn = await asyncpg.connect()
q = Queries(AsyncpgDriver(enq_conn))
N = 100_000
async def enqueue() -> None:
while True:
await q.enqueue(
["fetch"] * N,
[None] * N,
[0] * N,
)
await asyncio.sleep(0.1)
print(len(asyncio.all_tasks()))
await asyncio.gather(enqueue(), qm.run(batch_size=N // 2))
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
-
Yeah, asyncio can handle quite a lot of idle tasks, so just having them pile up in the consumers is not a huge problem in itself. For me the issue is (a) the tasks are actually CPU bound (although via a subprocess) and (b) lack of back-pressure means that job distribution between consumers is inefficient & bringing extra consumers up will not help because tasks are already dequeued. |
Beta Was this translation helpful? Give feedback.
-
Resolved by #103 |
Beta Was this translation helpful? Give feedback.
-
For my use case, jobs spawn CPU intensive subprocesses so I need to keep the maximum number of concurrent jobs on each consumer limited.
Currently, I don't think there's a way to impose a hard limit on the number concurrent tasks that can be started for an entrypoint. The batch size parameter can be used to limit the number of tasks started in a single poll of the database, but if tasks take longer than the
dequeue_timeout
this is not a hard limit. Likewise, therequests_per_second
rate limiting can only impose soft limits and can be difficult to tune if jobs have large variability in how long they take to execute.It is possible in user code to use an
asyncio.Semaphore
within an entrypoint to manage resource utilization, but this does not impose any back-pressure on dequeuing more jobs from the db. This can lead to situations where one consumer has dequeued many jobs that are waiting for the semaphore while another consumer is idle.I would like to propose adding an additional
concurrency_limit
argument toQueueManager.entrypoint
. This would:I have a proof-of-concept implementation of this idea at colingavin/pgqueuer@604f6b9
Beta Was this translation helpful? Give feedback.
All reactions