-
-
Notifications
You must be signed in to change notification settings - Fork 177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Future plan for Arq #437
Comments
Great to hear all of that. I would very much like One additional thing might be to allow some kind of locking for the jobs. I could image this to work like the |
That's very easily done in redis, I guess no reason for us not to add a utility. |
I love this part! That's the main reason I had gone for Celery over RQ and others (many years ago, long before FastAPI). That required weird tricks in how to define the Celery code. Having something that takes this into account first-class sounds amazing. It would be great if it could also support regular |
I support this statement heavily. We have a synchronous codebase at work for a current project requirement using Fastapi. We had to go with rq for the queuing solution (Celery was a bit too heavy for our use case), which works great but I would have loved to go for |
I would like to see automatic documentation with AsyncAPI. |
Thank you very much for working on It would be great to have an out of the box priorities for the jobs. Currently, we're using this solution. We can't adapt the approach with different queues because we need to have the same limited number of concurrent jobs and sometimes execute some urgent jobs ahead of the others |
I think you can run stream consumers on different streams with different priorities, so that should be possible. |
One thing I’d love to see is first party support for a UI to see job status, worker conditions, ability to start or stop enqueued jobs, search jobs, filter jobs by workers, delete jobs, etc. Maybe FastUI can be used to build a robust solution for that. A lot of the time, using Celery-like tools is a pain because there’s no well supported first-party UI for monitoring and taking interactive actions. If |
Thank you @samuelcolvin for the renewed efforts on this project 🙏 As @birkjernstrom mentioned on Twitter, we use Arq as our worker backend for Polar. One of the aspect I would love to see is a middleware-like feature. Most of the time, we have logic/behaviors to execute before/after a job is run (logging, context management...). In the current version, we have access to Currently, we workaround this by implementing decorators we apply on our worker tasks: https://github.com/polarsource/polar/blob/98f4696e95755e93019b0c657c6b08dff64ea02a/server/polar/worker.py#L150-L189 A middleware approach would be very neat and allow us to wrap any kind of logic without having to touch the tasks themselves, similar to what we can do with web frameworks like Starlette. |
Yup, totally agree - middleware makes lots of sense and should be fairly easy to implement compared to some of the other stuff here. |
@rednafi I agree on dashboards, they're something we're thinking about lots at Pydantic. |
Really thanks for the renewed interest and roadmap! :) My Wishlist:
|
Agreed, I think that's covered above - I think we can even do that for jobs where the worker code is not in the search path of the queuing logic.
Makes sense, I think that's usable.
But how would you define the arguments? I guess we could either define them via a python file, or as JSON in the command line? Maybe you could create a separate issue to discus this?
Yup, seems pretty easy - I guess we allow you to define separate |
I'm really, really excited about this! Thanks for all the effort throughout the years @samuelcolvin, and to everyone helping out in issues or with PRs. I think you're pretty spot on with these ideas, in addition to what @frankie567 writes about the middleware. I'd also like to add that while we have queues today, the queue name is not used for e.g. storage of results, causing confusing behavior. I'm also very much in favor of OpenTelemetry + a dashboard/API (#297), though I don't know if a dashboard nessescarily needs to be shipped with arq. As for the API - it looks really clean. 👏 |
Thank you Colvin for trying to boost strap this project, it is really a great project. I would love to see it have a documentation. Async worker ought not to be feature rich like celery, we need library that is simple and does the work effectively and efficiently. |
Few more:
|
Would be awesome if you could first skim through the backlog of approved / tested PRs waiting for merge @samuelcolvin. There are some really simple but high-value ones like #378 that would probably help a lot of people. |
Done. I'm working through PRs now. |
|
Job uniqueness in arq is opt-in, i.e. you opt-in by crafting deterministic job ids. In my team we heavily rely on job uniqueness (in a huge majority of cases we do not want concurrent runs of the same worker function with the same arguments) - to the extent that we wanted it to be opt-out. We accomplish this by generating default job_ids, something along these lines: @worker_app.register(
job_id="foo:{a}" # template string, may refer to arguments passed to the worker function
)
async def foo(ctx: FunctionContext, a: str, b: int) -> int:
... I think this is simple and generally useful. I'm not sure if this is the right thread for this sort of feature requests, so sorry if that is not appropriate here.
I wholeheartedly agree. I've once made an attempt to make a generic dashboard for arq and I found it problematic that pickle is used as the default serializer, because unpickling is reliable only in an environment where the same version of |
@samuelcolvin Many thanks for creating arq and for leading the project over the years. I am glad to see the wide-ranging uses arq finds and your plan for new features and refinements. However, I do hope that arq remains relatively light-weight and easy to get started with. I used it for the first time during an internship, when I was short on time. The documentation for arq is concise and thorough, and it has few moving parts. I hope that we can, broadly, maintain these qualities. |
I have an offhand suggestion. In cPython 3.13, we will get a Python API for "Multiple Interpreters" as per PEP 734. This will allow us to launch interpreters in separate OS threads, in a single OS process. The interpreters don't share a GIL, so that gives us multi-core parallelism. It should be possible for arq to use this facility to launch multiple workers, each in a separate interpreter OS thread, in a single process. Worker would then have the same multi-core parallelism that they otherwise have in separate processes. The advantage would be memory saving. For example, in an ML application, if each worker needs to pre-precess the same dataset and keep it resident in memory, that could be a common global object between workers on the same machine.
Another offhand suggestion. For the testing backend, we could use PEP 734 to launch a worker and job-submitter in the same process in separate interpreters, then use the inter-interpreter sharing facilities of PEP 734 for a rudimentary in-memory backend. |
First of all, thanks for the project and dedication! It's taken for granted quite often, but it takes a lot of time to maintain OS projects. One suggestion would be custom exception handlers, similar to what FastAPI offers. We're exclusively using structlog for JSON based log output, and custom error handlers would help in:
|
We should also try to make health checks to be k8s-friendly. The health check for the worker should probably only check whether the worker can connect to the backend and query the task queue, and not be bound to tasks being executed. Today, a health check is recorded only if:
This setup fails if:
We found this behavior this week, where our setup is:
|
@samuelcolvin
What are the plans for the release? |
Oh 🤦 , I'll do the release today. So sorry, all I can say is, it's been a busy few weeks. |
Having just discovered this very good project! Thank you! In my scenario, I would prioritize the following two needs: Are they in the refactoring plan? Maybe I can try to contribute some pr without interfering refactoring |
How does this compare to something like FastStream (formerly Propan) ? |
They are different tools for different purposes and applications.
|
Very excited that you want to improve this library. For asynchronous python, such simple task execution libraries are not enough (and you can't deploy giants like Temporal in any project) |
Inspired by this project, I developed brq for my own needs based on redis stream(a simple task queue library that can be used with a redis cluster without the |
But now we are talking about
These things are makes the ARQ the same thing with FastStream, so why we should make it? If we are talking about the tool for asynchronous services - we already have the one. If we are talking about tasks framework - arq is a good choice. Should we mixed these cases? |
Quick update on this: As some of you might have seen, I've moved arq to a new organization In particular @aaazzam and the Prefect team are big users of arq and will be contributing a lot to the improvements I outlined above. More to come... |
That is a great decision, Colvin.
We pray that arq shall succeed.
…On Fri, 19 Jul 2024, 04:54 Samuel Colvin, ***@***.***> wrote:
*Quick update on this:*
As some of you might have seen, I've moved arq to a new organization
python-arq to make it easier and more attractive for those using arq to
collaborate on making it better.
In particular @aaazzam <https://github.com/aaazzam> and the Prefect
<https://www.prefect.io/> team are big users of arq and will be
contributing a lot to the improvements I outlined above.
More to come...
—
Reply to this email directly, view it on GitHub
<#437 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AN5K7K2DUFZBBJIFR2UIPODZNCEYTAVCNFSM6AAAAABE2OS3ZWVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDEMZYGA3DKOJRHA>
.
You are receiving this because you commented.Message ID:
***@***.***>
|
HI @samuelcolvin , I'm late to the party here, but very glad to see reneved effort on ARQ. We've decided to use ARQ over a year ago when I realised that running even fairly modest LLM pipelines in sync workers is a lost cause. Here are some of our learnings and hacks that could be incorporated into ARQ. I'd be happy to contribute some time on the side this year. Priority of CRON jobsWe shot our foot once as follows:
The system works nicely until... max_jobs is hit. At that point the cron job would no longer be admitted and so the quota would not be restored, resulting in a deadlock that was quite nasty to debug. This kind of scenario could be avoided with either of:
We got the named queues variety to work, but later decided on a different approach. I still have the code, tho. Dependency injectionUsing FastAPI we ended up with a bunch of
We're currently using this solution and I have the code. It's ugly in places (I hacked |
Hi there 👋 At my company (Back Market), we're looking to get out of rabbitmq, and therefore more or less celery altogether. OpenTelemetryWe're currently running with datadog for our observability needs, which maintains an integration with celery and rabbitmq. DataDog has support for OpenTelemetry though, and we absolutely need observability on these, most importantly to properly autoscale our workers and monitor our eventual consistency. We're also relying more and more on distirbuted tracing for troubleshooting. Moving repo to PydanticYours and the Pydantic name are strong markers of quality! it certainly would help to more explicitly showcase they belong in the same universe as Arq. Avoid PollingWhile there's always some polling happening, the default 500ms of the current implementation can become an issue, especially when they compound accross services. I personally don't think that's much of a deal, but I can see the polling become an issue as the number of workers grow. Separate the backendespecially regarding an in-memory backend, this would allow us to shift left on some tests (or... just have them, because celery doesn't make it easy). However, using testcontainers to spin up a redis instance isn't that much of a hassle either. Conversly, these are the items I feel wouldn't really make a difference in our adoption Type SafetyWhile we run a completely typed stack, we also wrap a bunch our adopted tools in our base library for our services. so in the end we can implement that typing there. DAGwe already use Airflow for more advanced workflows worker improvementsAs stated above, we'd probably wrap it one way or another, so this doesn't really make a difference for us. What's the best approach to start contributing towards the goals outlined here? 🚀 |
A note for people interested in Arq mainly because of its "pessimistic execution" property:
It looks like Celery will get something similar in the next release: celery/celery#9213 |
@samuelcolvin I thought |
Redis Cluster support would also be a priority for our use cases. We are running ARQ at a pretty big scale at this point, millions of execution per day, and currently the way to scale ARQ for us will be to create multiple redis instances. |
I hate to be "that guy", but what's the status of this project? I'm in a position where I can choose Celery or some alternative, where I would really prefer not to go with Celery. My main issue with the current task-queueing ecosystem is a lack of safety. I would love arq to be based on pydantic using its validation as a primitive form of versioning tasks, which ensures that producers/workers have a contract. I'm envisioning a sort of FastAPI-based design. |
Do you know about new Pydantic support in Celery? https://github.com/celery/celery/blob/main/examples/pydantic/tasks.py |
I wasn't aware of this, thanks for pointing it out! |
Celery is a monstrous library. Have you seen taskiq? taskiq supports pydantic out of the box because everything is serialized to json via pydantic by default. |
Arq was the first real open source project I ever created, back in 2016. That was long before Pydantic, FastAPI, ParamSpec, or even Redis Streams.
I remember a sense of incredulity that I couldn't find an async variant of rq (which I was helping to maintain at the time), surely I wasn't the only person wanting to queue jobs in async code? Apparently at the time I was.
Fast forward eight years, and I'm definitely not the only person trying to queue jobs in an async world.
Hence my incredulity has only grown - there's still no ubiquitous queuing library for async Python, and despite neglect, Arq still seems to work well for lots of people, I've used it in every role I've had since, and for the most part it just works.
That said, Arq needs some love, and since we're now using it at Pydantic, I think we should have the resources to provide that love later this year. This is a rough plan of what I propose to do.
Feedback very welcome from all, but especially @JonasKs and @pydantic-maintainers (who apparently I can't tag here :-()
In summary I want to significantly refactor the internals, and update the API in a backwards compatible way.
1.
ParamSpec
and type safety 🚧The most important change we should make is to make Arq typesafe using
ParamSpec
andConcatenate
, I have a partially working demonstration of how this will work below.We'll provide the same or similar method for enqueuing a job via it's name for cases where the worker code is not accessible where jobs are enqueued.
We should be able to do this while still supporting the current API to make migration as easy as possible.
2. Redis Streams 🚀
The second most important change is to adopt Redis streams which read like they were designed for exactly this application, they mean we can effectively guarantee only one execution while still being resilient to unexpected shutdown (Jobs shutdown during execution will be rerun later).
This should be possible without breaking the current API at all.
3. Avoid sorted set for immediate jobs 🚀
Current Arq is slower than it should be because it uses a sorted set to queue all jobs, the sorted set provides two things:
The idea would be to only use the sorted set for jobs scheduled to run in the future, then use the logic demonstrated by SAQ to take jobs off the sorted set when they're ready to be run and add them to the stream.
Jobs which are enqueued without a delay can be added to the stream immediately, which should significantly improve performance for this very common case.
This should be possible without breaking the current API at all.
4. Avoid polling 🇵🇱
Mostly for latency reasons it would be nice to avoid polling, the idea would be:
XREADGROUP
withBLOCK
on streams mean we'll no longer need to poll for the next jobs in the worker5. OpenTelemetry 🔭
Observability is close to our hearts at Pydantic, so it would be nice to have optional support for OpenTelemetry, or perhaps just hook points to implement your own observability.
This should be possible without breaking the current API at all.
6. DAG - Task Dependency Graph 📈
The idea is to allow one or more jobs to be triggered by one or more previous jobs.
See the
then()
andstart_with()
methods in the partial implementation below.This should be possible without breaking the current API at all.
7. CLI, settings and
worker.run
improvements 🏃We can mostly just copy
uvicorn
, we should also remove the very uglyWorkerSettings
and configure the worker via simple function.We should also fix reload logic to use
watchfiles
.This can be done such that existing code still works, with or without deprecation warnings.
8. Separate the backend↔️
We should separate the Redis logic to make it easier to provide alternative backends, an in memory backend for testing would be especially useful for unit testing applications.
This can be done such that existing code still works, with or without deprecation warnings.
9. Better documentation 📖
Documentation should move to mkdocs material and mkdocstrings, and be improved significantly.
10. Moving repo to Pydantic 🏢
To provide the resources for this work, we should move Arq to the Pydantic organization, and the docs to
arq.pydantic.dev
or similar.Have I missed anything?
API Sketch
Here's a sketch of how I see the new type-safe API working, together with a partial implementation:
Example Usage:
Partial Implementation
The text was updated successfully, but these errors were encountered: