Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: maxRetryDelay option to limit retryBackoff #531

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions docs/api/jobs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Creates a new job and returns the job id.
* **priority**, int

optional priority. Higher numbers have, um, higher priority

* **id**, uuid

optional id. If not set, a uuid will automatically created
Expand All @@ -41,6 +41,10 @@ Available in constructor as a default, or overridden in send.

Default: false. Enables exponential backoff retries based on retryDelay instead of a fixed delay. Sets initial retryDelay to 1 if not set.

* **maxRetryDelay**, int

Default: no limit. Maximum delay between retries of failed jobs, in seconds. Only used when retryBackoff is true.

**Expiration options**

* **expireInSeconds**, number
Expand Down Expand Up @@ -84,7 +88,7 @@ Available in constructor as a default, or overridden in send.
**Connection options**

* **db**, object

Instead of using pg-boss's default adapter, you can use your own, as long as it implements the following interface (the same as the pg module).

```ts
Expand Down Expand Up @@ -284,7 +288,7 @@ await Promise.allSettled(jobs.map(async job => {

Deletes a job by id.

> Job deletion is offered if desired for a "fetch then delete" workflow similar to SQS. This is not the default behavior for workers so "everything just works" by default, including job throttling and debouncing, which requires jobs to exist to enforce a unique constraint. For example, if you are debouncing a queue to "only allow 1 job per hour", deleting jobs after processing would re-open that time slot, breaking your throttling policy.
> Job deletion is offered if desired for a "fetch then delete" workflow similar to SQS. This is not the default behavior for workers so "everything just works" by default, including job throttling and debouncing, which requires jobs to exist to enforce a unique constraint. For example, if you are debouncing a queue to "only allow 1 job per hour", deleting jobs after processing would re-open that time slot, breaking your throttling policy.

### `deleteJob(name, [ids], options)`

Expand All @@ -298,7 +302,7 @@ Cancels a pending or active job.

Cancels a set of pending or active jobs.

When passing an array of ids, it's possible that the operation may partially succeed based on the state of individual jobs requested. Consider this a best-effort attempt.
When passing an array of ids, it's possible that the operation may partially succeed based on the state of individual jobs requested. Consider this a best-effort attempt.

### `resume(name, id, options)`

Expand Down
3 changes: 3 additions & 0 deletions src/attorney.js
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,13 @@ function applyRetryConfig (config, defaults) {
assert(!('retryDelay' in config) || (Number.isInteger(config.retryDelay) && config.retryDelay >= 0), 'retryDelay must be an integer >= 0')
assert(!('retryLimit' in config) || (Number.isInteger(config.retryLimit) && config.retryLimit >= 0), 'retryLimit must be an integer >= 0')
assert(!('retryBackoff' in config) || (config.retryBackoff === true || config.retryBackoff === false), 'retryBackoff must be either true or false')
assert(!('maxRetryDelay' in config) || config.maxRetryDelay === undefined || config.retryBackoff === true, 'maxRetryDelay can only be set if retryBackoff is true')
assert(!('maxRetryDelay' in config) || config.maxRetryDelay === undefined || (Number.isInteger(config.maxRetryDelay) && config.maxRetryDelay >= 0), 'maxRetryDelay must be an integer >= 0')

config.retryDelayDefault = defaults?.retryDelay
config.retryLimitDefault = defaults?.retryLimit
config.retryBackoffDefault = defaults?.retryBackoff
config.maxRetryDelayDefault = defaults?.maxRetryDelay
}

function applyPollingInterval (config, defaults) {
Expand Down
31 changes: 20 additions & 11 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ class Manager extends EventEmitter {
retryDelay,
retryDelayDefault,
retryBackoff,
retryBackoffDefault
retryBackoffDefault,
maxRetryDelay,
maxRetryDelayDefault
} = options

const values = [
Expand All @@ -369,7 +371,9 @@ class Manager extends EventEmitter {
retryDelay, // 16
retryDelayDefault, // 17
retryBackoff, // 18
retryBackoffDefault // 19
retryBackoffDefault, // 19
maxRetryDelay, // 20
maxRetryDelayDefault // 21
]

const db = wrapper || this.db
Expand Down Expand Up @@ -405,7 +409,8 @@ class Manager extends EventEmitter {
this.config.keepUntil, // 3
this.config.retryLimit, // 4
this.config.retryDelay, // 5
this.config.retryBackoff // 6
this.config.retryBackoff, // 6
this.config.maxRetryDelay // 7
]

const { rows } = await db.executeSql(this.insertJobsCommand, params)
Expand Down Expand Up @@ -529,6 +534,7 @@ class Manager extends EventEmitter {
retryLimit,
retryDelay,
retryBackoff,
maxRetryDelay,
expireInSeconds,
retentionMinutes,
deadLetter
Expand All @@ -544,6 +550,7 @@ class Manager extends EventEmitter {
retryLimit,
retryDelay,
retryBackoff,
maxRetryDelay,
expireInSeconds,
retentionMinutes,
deadLetter
Expand All @@ -568,20 +575,22 @@ class Manager extends EventEmitter {
retryLimit,
retryDelay,
retryBackoff,
maxRetryDelay,
expireInSeconds,
retentionMinutes,
deadLetter
} = Attorney.checkQueueArgs(name, options)

const params = [
name,
policy,
retryLimit,
retryDelay,
retryBackoff,
expireInSeconds,
retentionMinutes,
deadLetter
name, // 1
policy, // 2
retryLimit, // 3
retryDelay, // 4
retryBackoff, // 5
maxRetryDelay, // 6
expireInSeconds, // 7
retentionMinutes, // 8
deadLetter // 9
]

await this.db.executeSql(this.updateQueueCommand, params)
Expand Down
155 changes: 154 additions & 1 deletion src/migrationStore.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,159 @@ function migrate (value, version, migrations) {

function getAll (schema) {
return [
{
release: '10.2.0',
version: 25,
previous: 24,
install: [
`ALTER TABLE ${schema}.queue
ADD COLUMN IF NOT EXISTS max_retry_delay INTEGER`
],
uninstall: [
`ALTER TABLE ${schema}.queue
DROP COLUMN IF EXISTS max_retry_delay`
]
},
{
release: '10.2.0',
version: 25,
previous: 24,
install: [
`ALTER TABLE ${schema}.job
ADD COLUMN IF NOT EXISTS max_retry_delay INTEGER`
],
uninstall: [
`ALTER TABLE ${schema}.job
DROP COLUMN IF EXISTS max_retry_delay`
]
},
{
release: '10.2.0',
version: 25,
previous: 24,
install: [
`
CREATE OR REPLACE FUNCTION ${schema}.create_queue(queue_name text, options json)
RETURNS VOID AS
$$
DECLARE
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex');
queue_created_on timestamptz;
BEGIN

WITH q as (
INSERT INTO ${schema}.queue (
name,
policy,
retry_limit,
retry_delay,
retry_backoff,
max_retry_delay,
expire_seconds,
retention_minutes,
dead_letter,
partition_name
)
VALUES (
queue_name,
options->>'policy',
(options->>'retryLimit')::int,
(options->>'retryDelay')::int,
(options->>'retryBackoff')::bool,
(options->>'maxRetryDelay')::int,
(options->>'expireInSeconds')::int,
(options->>'retentionMinutes')::int,
options->>'deadLetter',
table_name
)
ON CONFLICT DO NOTHING
RETURNING created_on
)
SELECT created_on into queue_created_on from q;

IF queue_created_on IS NULL THEN
RETURN;
END IF;

EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name);

EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name);
EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON ${schema}.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON ${schema}.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name);
EXECUTE format('CREATE INDEX %1$s_i5 ON ${schema}.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name);

EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name);
EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name);
END;
$$
LANGUAGE plpgsql
`
],
uninstall: [
`
CREATE OR REPLACE FUNCTION ${schema}.create_queue(queue_name text, options json)
RETURNS VOID AS
$$
DECLARE
table_name varchar := 'j' || encode(sha224(queue_name::bytea), 'hex');
queue_created_on timestamptz;
BEGIN

WITH q as (
INSERT INTO ${schema}.queue (
name,
policy,
retry_limit,
retry_delay,
retry_backoff,
expire_seconds,
retention_minutes,
dead_letter,
partition_name
)
VALUES (
queue_name,
options->>'policy',
(options->>'retryLimit')::int,
(options->>'retryDelay')::int,
(options->>'retryBackoff')::bool,
(options->>'expireInSeconds')::int,
(options->>'retentionMinutes')::int,
options->>'deadLetter',
table_name
)
ON CONFLICT DO NOTHING
RETURNING created_on
)
SELECT created_on into queue_created_on from q;

IF queue_created_on IS NULL THEN
RETURN;
END IF;

EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name);

EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name);
EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i1 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''created'' AND policy = ''short''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i2 ON ${schema}.%1$I (name, COALESCE(singleton_key, '''')) WHERE state = ''active'' AND policy = ''singleton''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i3 ON ${schema}.%1$I (name, state, COALESCE(singleton_key, '''')) WHERE state <= ''active'' AND policy = ''stately''', table_name);
EXECUTE format('CREATE UNIQUE INDEX %1$s_i4 ON ${schema}.%1$I (name, singleton_on, COALESCE(singleton_key, '''')) WHERE state <> ''cancelled'' AND singleton_on IS NOT NULL', table_name);
EXECUTE format('CREATE INDEX %1$s_i5 ON ${schema}.%1$I (name, start_after) INCLUDE (priority, created_on, id) WHERE state < ''active''', table_name);

EXECUTE format('ALTER TABLE ${schema}.%I ADD CONSTRAINT cjc CHECK (name=%L)', table_name, queue_name);
EXECUTE format('ALTER TABLE ${schema}.job ATTACH PARTITION ${schema}.%I FOR VALUES IN (%L)', table_name, queue_name);
END;
$$
LANGUAGE plpgsql
`
]
},
{
release: '10.1.5',
version: 24,
Expand Down Expand Up @@ -111,7 +264,7 @@ function getAll (schema) {
END IF;

EXECUTE format('CREATE TABLE ${schema}.%I (LIKE ${schema}.job INCLUDING DEFAULTS)', table_name);

EXECUTE format('ALTER TABLE ${schema}.%1$I ADD PRIMARY KEY (name, id)', table_name);
EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT q_fkey FOREIGN KEY (name) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
EXECUTE format('ALTER TABLE ${schema}.%1$I ADD CONSTRAINT dlq_fkey FOREIGN KEY (dead_letter) REFERENCES ${schema}.queue (name) ON DELETE RESTRICT DEFERRABLE INITIALLY DEFERRED', table_name);
Expand Down
Loading
Loading