diff --git a/src/boss.js b/src/boss.js index 55e67e8..36b9e78 100644 --- a/src/boss.js +++ b/src/boss.js @@ -6,6 +6,11 @@ const events = { warn: 'warn' } +const WARNINGS = { + SLOW_QUERY: { seconds: 30, message: 'Warning: slow query. Your queues and/or database server should be reviewed' }, + LARGE_QUEUE: { size: 10_000, mesasge: 'Warning: large queue. Your queue should be reviewed' } +} + class Boss extends EventEmitter { #stopped #maintaining @@ -26,6 +31,14 @@ class Boss extends EventEmitter { this.functions = [ this.maintain ] + + if(this.#config.warningSlowQuerySeconds) { + WARNINGS.SLOW_QUERY.seconds = this.#config.warningSlowQuerySeconds + } + + if(this.#config.warningLargeQueueSize) { + WARNINGS.LARGE_QUEUE.size = this.#config.warningLargeQueueSize + } } async start () { @@ -51,9 +64,8 @@ class Boss extends EventEmitter { const elapsed = (ended - started) / 1000 - if (elapsed > 30 || this.#config.__test__warn_slow_query) { - const message = 'Warning: slow query. Your queues and/or database server should be reviewed' - this.emit(events.warn, { message, elapsed, sql, values }) + if (elapsed > WARNINGS.SLOW_QUERY.seconds || this.#config.__test__warn_slow_query) { + this.emit(events.warn, { message: WARNINGS.SLOW_QUERY.message, elapsed, sql, values }) } return result @@ -98,19 +110,29 @@ class Boss extends EventEmitter { }, {}) for (const queueGroup of Object.values(queueGroups)) { - await this.#monitorActive(queueGroup) - await this.#dropCompleted(queueGroup) + const { table, queues } = queueGroup + const names = queues.map(i => i.name) + + while (names.length) { + const chunk = names.splice(0, 100) + + await this.#monitorActive(table, chunk) + await this.#dropCompleted(table, chunk) + } } } - async #monitorActive (queueGroup) { - const { table, queues } = queueGroup - const names = queues.map(i => i.name) - + async #monitorActive (table, names) { const command = plans.trySetQueueMonitorTime(this.#config.schema, names, this.#config.monitorIntervalSeconds) const { rows } = await this.#executeSql(command) if (rows.length) { + const warnings = rows.filter(i => i.queuedCount > WARNINGS.LARGE_QUEUE.size) + + for (const warning of warnings) { + this.emit(events.warn, { ...warning, message: WARNINGS.LARGE_QUEUE.mesasge }) + } + const sql = plans.failJobsByTimeout(this.#config.schema, table, names) await this.#executeSql(sql) @@ -119,10 +141,7 @@ class Boss extends EventEmitter { } } - async #dropCompleted (queueGroup) { - const { table, queues } = queueGroup - const names = queues.map(i => i.name) - + async #dropCompleted (table, names) { const command = plans.trySetQueueDeletionTime(this.#config.schema, names, this.#config.maintenanceIntervalSeconds) const { rows } = await this.#executeSql(command) diff --git a/src/plans.js b/src/plans.js index 25fffdf..774f8c0 100644 --- a/src/plans.js +++ b/src/plans.js @@ -422,7 +422,12 @@ function trySetQueueTimestamp (schema, queues, column, seconds) { SET ${column} = now() WHERE name IN(${getQueueInClause(queues)}) AND EXTRACT( EPOCH FROM (now() - COALESCE(${column}, now() - interval '1 week') ) ) > ${seconds} - RETURNING true + RETURNING + name, + deferred_count as "deferredCount", + queued_count as "queuedCount", + active_count as "activeCount", + total_count as "totalCount" ` } @@ -896,7 +901,7 @@ function cacheQueueStats (schema, table, queues) { active_count = stats.active_count, total_count = stats.total_count FROM stats - WHERE queue.name = stats.name + WHERE queue.name = stats.name ` return locked(schema, sql, 'queue-stats')