Skip to content

Commit

Permalink
adding maintenance batching and warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Sep 22, 2024
1 parent 1e5ae4c commit defa9b4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
45 changes: 32 additions & 13 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ const events = {
warn: 'warn'
}

const WARNINGS = {
SLOW_QUERY: { seconds: 30, message: 'Warning: slow query. Your queues and/or database server should be reviewed' },
LARGE_QUEUE: { size: 10_000, mesasge: 'Warning: large queue. Your queue should be reviewed' }
}

class Boss extends EventEmitter {
#stopped
#maintaining
Expand All @@ -26,6 +31,14 @@ class Boss extends EventEmitter {
this.functions = [
this.maintain
]

if(this.#config.warningSlowQuerySeconds) {
WARNINGS.SLOW_QUERY.seconds = this.#config.warningSlowQuerySeconds
}

if(this.#config.warningLargeQueueSize) {
WARNINGS.LARGE_QUEUE.size = this.#config.warningLargeQueueSize
}
}

async start () {
Expand All @@ -51,9 +64,8 @@ class Boss extends EventEmitter {

const elapsed = (ended - started) / 1000

if (elapsed > 30 || this.#config.__test__warn_slow_query) {
const message = 'Warning: slow query. Your queues and/or database server should be reviewed'
this.emit(events.warn, { message, elapsed, sql, values })
if (elapsed > WARNINGS.SLOW_QUERY.seconds || this.#config.__test__warn_slow_query) {
this.emit(events.warn, { message: WARNINGS.SLOW_QUERY.message, elapsed, sql, values })
}

return result
Expand Down Expand Up @@ -98,19 +110,29 @@ class Boss extends EventEmitter {
}, {})

for (const queueGroup of Object.values(queueGroups)) {
await this.#monitorActive(queueGroup)
await this.#dropCompleted(queueGroup)
const { table, queues } = queueGroup
const names = queues.map(i => i.name)

while (names.length) {
const chunk = names.splice(0, 100)

await this.#monitorActive(table, chunk)
await this.#dropCompleted(table, chunk)
}
}
}

async #monitorActive (queueGroup) {
const { table, queues } = queueGroup
const names = queues.map(i => i.name)

async #monitorActive (table, names) {
const command = plans.trySetQueueMonitorTime(this.#config.schema, names, this.#config.monitorIntervalSeconds)
const { rows } = await this.#executeSql(command)

if (rows.length) {
const warnings = rows.filter(i => i.queuedCount > WARNINGS.LARGE_QUEUE.size)

for (const warning of warnings) {
this.emit(events.warn, { ...warning, message: WARNINGS.LARGE_QUEUE.mesasge })
}

const sql = plans.failJobsByTimeout(this.#config.schema, table, names)
await this.#executeSql(sql)

Expand All @@ -119,10 +141,7 @@ class Boss extends EventEmitter {
}
}

async #dropCompleted (queueGroup) {
const { table, queues } = queueGroup
const names = queues.map(i => i.name)

async #dropCompleted (table, names) {
const command = plans.trySetQueueDeletionTime(this.#config.schema, names, this.#config.maintenanceIntervalSeconds)
const { rows } = await this.#executeSql(command)

Expand Down
9 changes: 7 additions & 2 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,12 @@ function trySetQueueTimestamp (schema, queues, column, seconds) {
SET ${column} = now()
WHERE name IN(${getQueueInClause(queues)})
AND EXTRACT( EPOCH FROM (now() - COALESCE(${column}, now() - interval '1 week') ) ) > ${seconds}
RETURNING true
RETURNING
name,
deferred_count as "deferredCount",
queued_count as "queuedCount",
active_count as "activeCount",
total_count as "totalCount"
`
}

Expand Down Expand Up @@ -896,7 +901,7 @@ function cacheQueueStats (schema, table, queues) {
active_count = stats.active_count,
total_count = stats.total_count
FROM stats
WHERE queue.name = stats.name
WHERE queue.name = stats.name
`

return locked(schema, sql, 'queue-stats')
Expand Down

0 comments on commit defa9b4

Please sign in to comment.