Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Sep 22, 2024
1 parent e79d5e8 commit 1e5ae4c
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 125 deletions.
7 changes: 5 additions & 2 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,13 @@ CREATE TABLE pgboss.job (
started_on timestamp with time zone,
singleton_key text,
singleton_on timestamp without time zone,
expire_in interval not null default interval '15 minutes',
expire_seconds int not null default interval '15 minutes',
created_on timestamp with time zone not null default now(),
completed_on timestamp with time zone,
keep_until timestamp with time zone NOT NULL default now() + interval '14 days',
output jsonb,
policy text,
dead_letter text,
CONSTRAINT job_pkey PRIMARY KEY (name, id)
) PARTITION BY LIST (name)
```
Expand Down Expand Up @@ -573,11 +574,13 @@ Returns an array of jobs from a queue
startedOn: Date;
singletonKey: string | null;
singletonOn: Date | null;
expireIn: PostgresInterval;
expireInSeconds: number;
deleteAfterSeconds: number;
createdOn: Date;
completedOn: Date | null;
keepUntil: Date;
policy: string,
deadLetter: string,
output: object
}
```
Expand Down
64 changes: 47 additions & 17 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ const EventEmitter = require('node:events')
const plans = require('./plans')

const events = {
error: 'error'
error: 'error',
warn: 'warn'
}

class Boss extends EventEmitter {
Expand Down Expand Up @@ -41,6 +42,23 @@ class Boss extends EventEmitter {
}
}

async #executeSql (sql, values) {
const started = Date.now()

const result = await this.#db.executeSql(sql, values)

const ended = Date.now()

const elapsed = (ended - started) / 1000

if (elapsed > 30 || this.#config.__test__warn_slow_query) {
const message = 'Warning: slow query. Your queues and/or database server should be reviewed'
this.emit(events.warn, { message, elapsed, sql, values })
}

return result
}

async #onSupervise () {
try {
if (this.#stopped) return
Expand Down Expand Up @@ -72,33 +90,45 @@ class Boss extends EventEmitter {
queues = [value]
}

for (const queue of queues) {
// todo: group queries by table
await this.#monitorActive(queue)
await this.#dropCompleted(queue)
const queueGroups = queues.reduce((acc, q) => {
const { table } = q
acc[table] = acc[table] || { table, queues: [] }
acc[table].queues.push(q)
return acc
}, {})

for (const queueGroup of Object.values(queueGroups)) {
await this.#monitorActive(queueGroup)
await this.#dropCompleted(queueGroup)
}
}

async #monitorActive (queue) {
const command = plans.trySetQueueMonitorTime(this.#config.schema, queue.name, this.#config.monitorIntervalSeconds)
const { rows } = await this.#db.executeSql(command)
async #monitorActive (queueGroup) {
const { table, queues } = queueGroup
const names = queues.map(i => i.name)

const command = plans.trySetQueueMonitorTime(this.#config.schema, names, this.#config.monitorIntervalSeconds)
const { rows } = await this.#executeSql(command)

if (rows.length) {
const sql = plans.failJobsByTimeout(this.#config.schema, queue)
await this.#db.executeSql(sql)
const sql = plans.failJobsByTimeout(this.#config.schema, table, names)
await this.#executeSql(sql)

const cacheStatsSql = plans.cacheQueueStats(this.#config.schema, queue)
await this.#db.executeSql(cacheStatsSql)
const cacheStatsSql = plans.cacheQueueStats(this.#config.schema, table, names)
await this.#executeSql(cacheStatsSql)
}
}

async #dropCompleted (queue) {
const command = plans.trySetQueueDeletionTime(this.#config.schema, queue.name, this.#config.maintenanceIntervalSeconds)
const { rows } = await this.#db.executeSql(command)
async #dropCompleted (queueGroup) {
const { table, queues } = queueGroup
const names = queues.map(i => i.name)

const command = plans.trySetQueueDeletionTime(this.#config.schema, names, this.#config.maintenanceIntervalSeconds)
const { rows } = await this.#executeSql(command)

if (rows.length) {
const sql = plans.deletion(this.#config.schema, queue.table, queue.deleteAfterSeconds)
await this.#db.executeSql(sql)
const sql = plans.deletion(this.#config.schema, table)
await this.#executeSql(sql)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ class Manager extends EventEmitter {
Attorney.assertQueueName(name)
const db = this.assertDb(options)
const ids = this.mapCompletionIdArg(id, 'fail')
const queue = await this.getQueueCache(name)
const sql = plans.failJobsById(this.config.schema, queue)
const { table } = await this.getQueueCache(name)
const sql = plans.failJobsById(this.config.schema, table)
const result = await db.executeSql(sql, [name, ids, this.mapCompletionDataArg(data)])
return this.mapCommandResponse(ids, result)
}
Expand Down
Loading

0 comments on commit 1e5ae4c

Please sign in to comment.