Work assignment #379
Replies: 11 comments
-
thanks! You can specify the queue as a string in functions |
Beta Was this translation helpful? Give feedback.
-
Sorry maybe I was too vague, I mean, is there any way to implement consumer strategies for a group of workers? like consistent hashing or round robing based on the I read #290 but the use case is different, I need to distribute a queue's work among workers much like rabbitmq consistent hashing. In the sense that I want to parallelize the work of a queue but certain messages of the queue must not be processed concurrently, therefore only one worker of the group of workers should process a group of messages on the queue at any given time, logically "sharding" the queue, but at reading time. |
Beta Was this translation helpful? Give feedback.
-
A further example: if I have a queue with messages Is not a priority queue and it is bounded to the number of workers, meaning if they grow to 6 then each process one type of message each, and so on. |
Beta Was this translation helpful? Give feedback.
-
Thanks for the examples. Can you simplify this by creating a separate queue per type? "A" would be a queue, for example, then you would run as many workers on "A" as desired. You can use the wildcard patterns if needed, but I would start with simple isolated queues first. |
Beta Was this translation helpful? Give feedback.
-
Thanks for the reply @timgit, I'll consider adding queues as an inexpensive operation, much like Event Sourcing then. Yet I still have the doubt on how to distribute multiple queues among workers? If the queues are 1:1 to users, there will be new queues created constantly, how could I assign them to a user? is there a way to read the queues with messages to then apply a consistent-hashing algorithm to them and distribute them among a number of workers? |
Beta Was this translation helpful? Give feedback.
-
Maybe I'm talking in riddles, my concrete use case is the following: Users need to perform some stats analysis that is split into multiple small analyses, each small analysis can run concurrently between users, but sequentially for any user. It does not matter if the responses are out of order, or even duplicated, the only thing I have to ensure is that not two analyses will run at the same time for the same user. To solve this, a common pattern would be to have a number of workers to run the analyses concurrently, yet to assign the same messages to the same worker all the time, if the assignment is done with consistent-hashing, based on a field of the message (like the user-id) it would ensure not two workers are ever processing the same user message. I'm sure there might be a good way to do it with pg-boss, but I'm not sure how, maybe consistent-hashing is not the answer and there's another way to respect the constraint with another method? |
Beta Was this translation helpful? Give feedback.
-
I'll read a bit more into how other queue products implement this for my own research and see what I learn in turns of overlap with pg-boss. While I'm doing that, as a thought exercise, could you port this implementation over to how it would work in a public cloud service such as AWS SQS? pg-boss has a similar architecture and this may give you much better search engine hits as well. |
Beta Was this translation helpful? Give feedback.
-
Thank you so much for looking into this, I think a way to view it on SQS would be message groups, it is quite a match.
|
Beta Was this translation helpful? Give feedback.
-
Also from https://aws.amazon.com/sqs/faqs/
|
Beta Was this translation helpful? Give feedback.
-
and
I agree, message groups actually sound like exactly what you're looking for. All other messages in the group become unavailable to all other workers if at least 1 message from the group is fetched. I'm thinking in order to pull this type of thing off in pg-boss, we'd need to start tracking in-flight unique keys that should be excluded from job fetching while a job with that key is in active state. Once the job is completed, that key becomes available for fetching from any worker. This doesn't permanently assign a specific worker to a key like |
Beta Was this translation helpful? Give feedback.
-
Sorry for the late response, somehow I wrote it on mobile and was never sent. Yes, it is one way to comply with my constraint 👍 I thought if it was possible to do that in in a queue consumer acting as a router or something, but I fail to see how, because I cannot scan an pick messages at will, but maybe I'm wrong about that? If not, then it would need to be implemented in the library itself. |
Beta Was this translation helpful? Give feedback.
-
Hi, amazing lib!
I have a silly question, but I couldn’t find it in the docs or issues.
Can other assignment strategies be implemented?
I would like to divide a queue among a pool of workers with consistent hashing due to the need to have sequential consistency per user but I see workers just pull jobs, is there a way to specify which jobs to pull? I see I can use some wildcards for queues, does it mean it is intended to create multiple queues instead of distribute the load of a single queue to multiple workers?
Beta Was this translation helpful? Give feedback.
All reactions