Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature request: worker's job filter #409

Open
Eomm opened this issue Jul 10, 2023 · 8 comments
Open

feature request: worker's job filter #409

Eomm opened this issue Jul 10, 2023 · 8 comments

Comments

@Eomm
Copy link

Eomm commented Jul 10, 2023

In order to create a QOS system it would be great to support a filter option by the work() method.

This new parameter lets the user to write to a single queue/table and 2 different workers will be able to process the same queue at different pace:

  • eg: queue icecream
  • worker1 consumes icecream with job taste: cioco has a worker that process it with newJobCheckInterval: 200
  • worker1 consumes icecream with job taste: bubble has a worker that process it with newJobCheckInterval: 2000

❗️ It is up to the user writing a filter that process all the jobs, otherwise the job with taste: lemon will be archived automatically.

Example:

const PgBoss = require('pg-boss');

(async function () {
  try {
    await buildConsumer();
  } catch (error) {
    console.log({ globalErr: error.message });
  }
})();

async function buildConsumer () {
  const boss = new PgBoss({
    user: 'postgres',
    password: 'postgres',
    noScheduling: true,
  });

  await boss.work(
    queueName,
    {
      teamSize: 2,
      newJobCheckInterval: 1000,
      filter: { // 🚀
        jobFilter: `data ->> 'body' = $1`,
        jobParams: ['body'],
      }
    },
    executeJob
  );

  console.log('Waiting for jobs');
}

This should generate a query like this in the fetch function:

function fetchNextJob (schema) {

    WITH nextJob as (
      SELECT id
      FROM pgboss.job j
      WHERE state < 'active'
        AND name LIKE $1
        AND startAfter < now()
+        AND data ->> 'body' = $3
      ORDER BY priority desc, createdOn, id
      LIMIT $2
      FOR UPDATE SKIP LOCKED
    )
    UPDATE pgboss.job j SET
      state = 'active',
      startedOn = now(),
      retryCount = CASE WHEN state = 'retry' THEN retryCount + 1 ELSE retryCount END
    FROM nextJob
    WHERE j.id = nextJob.id
    RETURNING j.id, name, data, EXTRACT(epoch FROM expireIn) as expire_in_seconds

Note:

  • this query should land to the executeSql function when the user customizes the db too

What do you think?

@timgit
Copy link
Owner

timgit commented Aug 5, 2023

You can already do this using wildcards. Set all workers to a wildcard by default, such as icecream.*, then, you can replace the wildcard with a specific flavor

@Eomm
Copy link
Author

Eomm commented Aug 7, 2023

Sorry, I don't get it.
Does the producer that send the message must know the consumer's queue in di case?

My target would be having a silly producer that does not know how many consumers the BE has

@timgit
Copy link
Owner

timgit commented Aug 7, 2023

Queue patterns use the * character to match 0 or more characters. For example, a job from queue status-report-12345 would be fetched with pattern status-report-* or even stat*5.

For example, a producer would use the flavor as part of the queue name, such as icecream.vanilla and icecream.chocolate. A consumer using work('icecream.*') would get both flavors, but another consumer using work('icecream.vanilla') would not get chocolate.

@Eomm
Copy link
Author

Eomm commented Aug 8, 2023

The proposed solution assumes that I have control over the producer - it is not the case 😞

@Eomm
Copy link
Author

Eomm commented Aug 8, 2023

Would you mind to accept a PR with such a feature in case?

@timgit
Copy link
Owner

timgit commented Aug 9, 2023

Yes, sounds good

@nickreese
Copy link

nickreese commented Sep 19, 2023

This would be useful. I didn't see a test case for work('icecream.*.toppings.none') sort of queue layouts.

@timgit
Copy link
Owner

timgit commented Aug 21, 2024

Since wildcards have been dropped in v10, my original response is no longer valid. The most feasible option seems to be some sort of additional opt-in filter like originally proposed. The primary issue in using the data payload is fetch performance, since the data column is not indexed. I don't think indexing data is a good idea globally, since the payload could be large, and this would impact write performance as well.

However, this doesn't mean it's a deal-breaker. Now that queues are isolated via partitioning, this opens up the possibility of a diversity of indexing strategies since indexes in queue A wouldn't have to match indexes in queue B. One way to pull this off would be add a new option in createQueue().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants