Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Sep 23, 2024
1 parent a72e5bf commit 246cdc1
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 88 deletions.
50 changes: 18 additions & 32 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
- [`getSchedules()`](#getschedules)
- [`deleteJob(name, id, options)`](#deletejobname-id-options)
- [`deleteJob(name, [ids], options)`](#deletejobname-ids-options)
- [`deleteQueuedJobs(name)`](#deletequeuedjobsname)
- [`deleteStoredJobs(name)`](#deletestoredjobsname)
- [`deleteAllJobs(name)`](#deletealljobsname)
- [`cancel(name, id, options)`](#cancelname-id-options)
- [`cancel(name, [ids], options)`](#cancelname-ids-options)
- [`resume(name, id, options)`](#resumename-id-options)
Expand All @@ -54,13 +57,10 @@
- [`getJobById(name, id, options)`](#getjobbyidname-id-options)
- [`createQueue(name, Queue)`](#createqueuename-queue)
- [`updateQueue(name, options)`](#updatequeuename-options)
- [`dropQueuedJobs(name)`](#dropqueuedjobsname)
- [`dropStoredJobs(name)`](#dropstoredjobsname)
- [`dropAllJobs(name)`](#dropalljobsname)
- [`deleteQueue(name)`](#deletequeuename)
- [`getQueues()`](#getqueues)
- [`getQueue(name)`](#getqueuename)
- [`getQueueSize(name, options)`](#getqueuesizename-options)
- [`getQueueStats(name)`](#getqueuestatsname)
- [`isInstalled()`](#isinstalled)
- [`schemaVersion()`](#schemaversion)

Expand Down Expand Up @@ -777,6 +777,18 @@ Deletes a job by id.

Deletes a set of jobs by id.

## `deleteQueuedJobs(name)`

Deletes all queued jobs in a queue.

## `deleteStoredJobs(name)`

Deletes all jobs in completed, failed, and cancelled state in a queue.

## `deleteAllJobs(name)`

Deletes all jobs in a queue, including active jobs.

## `cancel(name, id, options)`

Cancels a pending or active job.
Expand Down Expand Up @@ -875,18 +887,6 @@ When a job fails after all retries, if the queue has a `deadLetter` property, th
Updates options on an existing queue. The policy can be changed, but understand this won't impact existing jobs in flight and will only apply the new policy on new incoming jobs.
## `dropQueuedJobs(name)`
Deletes all queued jobs in a queue.
## `dropStoredJobs(name)`
Deletes all jobs in completed, failed, and cancelled state in a queue.
## `dropAllJobs(name)`
Deletes all jobs in a queue, including active jobs.
## `deleteQueue(name)`
Deletes a queue and all jobs.
Expand All @@ -899,23 +899,9 @@ Returns all queues
Returns a queue by name
## `getQueueSize(name, options)`
Returns the number of pending jobs in a queue by name.
`options`: Optional, object.
## `getQueueStats(name)`
| Prop | Type | Description | Default |
| - | - | - | - |
|`before`| string | count jobs in states before this state | states.active |
As an example, the following options object include active jobs along with created and retry.
```js
{
before: states.completed
}
```
Returns the number of jobs in various states in a queue. The result matches the results from getQueue(), but ignores the cached data and forces the stats to be retrieved immediately.
## `isInstalled()`
Expand Down
5 changes: 5 additions & 0 deletions src/attorney.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ function checkSendArgs (args) {
? options.startAfter
: null

validateRetryConfig(options)
validateExpirationConfig(options)
validateRetentionConfig(options)
validateDeletionConfig(options)

return { name, data, options }
}

Expand Down
30 changes: 16 additions & 14 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ class Manager extends EventEmitter {
this.createQueue,
this.updateQueue,
this.deleteQueue,
this.getQueueSize,
this.getQueueStats,
this.getQueue,
this.getQueues,
this.dropQueuedJobs,
this.dropStoredJobs,
this.dropAllJobs,
this.deleteQueuedJobs,
this.deleteStoredJobs,
this.deleteAllJobs,
this.deleteJob,
this.getJobById
]
Expand Down Expand Up @@ -338,6 +338,7 @@ class Manager extends EventEmitter {
singletonSeconds,
singletonNextSlot,
expireInSeconds,
deleteAfterSeconds,
keepUntil,
retryLimit,
retryDelay,
Expand All @@ -354,6 +355,7 @@ class Manager extends EventEmitter {
singletonSeconds,
singletonOffset,
expireInSeconds,
deleteAfterSeconds,
keepUntil,
retryLimit,
retryDelay,
Expand Down Expand Up @@ -593,43 +595,43 @@ class Manager extends EventEmitter {
} catch {}
}

async dropQueuedJobs (name) {
async deleteQueuedJobs (name) {
Attorney.assertQueueName(name)
const { table } = await this.getQueueCache(name)
const sql = plans.dropQueuedJobs(this.config.schema, table)
const sql = plans.deleteQueuedJobs(this.config.schema, table)
await this.db.executeSql(sql, [name])
}

async dropStoredJobs (name) {
async deleteStoredJobs (name) {
Attorney.assertQueueName(name)
const { table } = await this.getQueueCache(name)
const sql = plans.dropStoredJobs(this.config.schema, table)
const sql = plans.deleteStoredJobs(this.config.schema, table)
await this.db.executeSql(sql, [name])
}

async dropAllJobs (name) {
async deleteAllJobs (name) {
Attorney.assertQueueName(name)
const { table, partition } = await this.getQueueCache(name)

if (partition) {
const sql = plans.dropAllJobs(this.config.schema, table)
const sql = plans.deleteAllJobs(this.config.schema, table)
await this.db.executeSql(sql, [name])
} else {
const sql = plans.truncateTable(this.config.schema, table)
await this.db.executeSql(sql)
}
}

async getQueueSize (name, options) {
async getQueueStats (name) {
Attorney.assertQueueName(name)

const { table } = await this.getQueueCache(name)

const sql = plans.getQueueSize(this.config.schema, table, options?.before)
const sql = plans.getQueueStats(this.config.schema, table, [name])

const result = await this.db.executeSql(sql, [name])
const { rows } = await this.db.executeSql(sql)

return result ? parseFloat(result.rows[0].count) : null
return rows.at(0) || null
}

async getJobById (name, id, options = {}) {
Expand Down
53 changes: 25 additions & 28 deletions src/plans.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ module.exports = {
cancelJobs,
resumeJobs,
deleteJobsById,
dropAllJobs,
dropQueuedJobs,
dropStoredJobs,
deleteAllJobs,
deleteQueuedJobs,
deleteStoredJobs,
truncateTable,
failJobsById,
failJobsByTimeout,
Expand All @@ -49,7 +49,7 @@ module.exports = {
createQueue,
deleteQueue,
getQueues,
getQueueSize,
getQueueStats,
trySetQueueMonitorTime,
trySetQueueDeletionTime,
trySetCronTime,
Expand All @@ -63,8 +63,6 @@ module.exports = {
DEFAULT_SCHEMA
}

const assert = require('node:assert')

const COMMON_JOB_TABLE = 'job_common'

const FIFTEEN_MINUTES = 60 * 15
Expand Down Expand Up @@ -495,27 +493,22 @@ function deleteJobsById (schema, table) {
`
}

function dropQueuedJobs (schema, table) {
function deleteQueuedJobs (schema, table) {
return `DELETE from ${schema}.${table} WHERE name = $1 and state < '${JOB_STATES.active}'`
}

function dropStoredJobs (schema, table) {
function deleteStoredJobs (schema, table) {
return `DELETE from ${schema}.${table} WHERE name = $1 and state > '${JOB_STATES.active}'`
}

function truncateTable (schema, table) {
return `TRUNCATE ${schema}.${table}`
}

function dropAllJobs (schema, table) {
function deleteAllJobs (schema, table) {
return `DELETE from ${schema}.${table} WHERE name = $1`
}

function getQueueSize (schema, table, before = JOB_STATES.active) {
assert(before in JOB_STATES, `${before} is not a valid state`)
return `SELECT count(*) as count FROM ${schema}.${table} WHERE name = $1 AND state < '${before}'`
}

function getSchedules (schema) {
return `SELECT * FROM ${schema}.schedule`
}
Expand Down Expand Up @@ -889,24 +882,28 @@ function deletion (schema, table) {
return locked(schema, sql, table + 'deletion')
}

function cacheQueueStats (schema, table, queues) {
const sql = `
WITH stats AS (
SELECT
function getQueueStats (schema, table, queues) {
return `
SELECT
name,
count(*) FILTER (WHERE start_after > now()) as deferred_count,
count(*) FILTER (WHERE state < '${JOB_STATES.active}') as queued_count,
count(*) FILTER (WHERE state = '${JOB_STATES.active}') as active_count,
count(*) as total_count
(count(*) FILTER (WHERE start_after > now()))::int as "deferredCount",
(count(*) FILTER (WHERE state < '${JOB_STATES.active}'))::int as "queuedCount",
(count(*) FILTER (WHERE state = '${JOB_STATES.active}'))::int as "activeCount",
count(*)::int as "totalCount"
FROM ${schema}.${table}
WHERE name IN (${getQueueInClause(queues)})
GROUP BY 1
)
GROUP BY 1
`
}

function cacheQueueStats (schema, table, queues) {
const sql = `
WITH stats AS (${getQueueStats(schema, table, queues)})
UPDATE ${schema}.queue SET
deferred_count = stats.deferred_count,
queued_count = stats.queued_count,
active_count = stats.active_count,
total_count = stats.total_count
deferred_count = "deferredCount",
queued_count = "queuedCount",
active_count = "activeCount",
total_count = "totalCount"
FROM stats
WHERE queue.name = stats.name
`
Expand Down
22 changes: 22 additions & 0 deletions test/deleteTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,28 @@ const { delay } = require('../src/tools')

describe('delete', async function () {
it('should delete a completed job via maintenance', async function () {
const config = {
...this.test.bossConfig,
maintenanceIntervalSeconds: 1
}

const boss = this.test.boss = await helper.start(config)
const queue = this.test.bossConfig.schema

const jobId = await boss.send(queue, null, { deleteAfterSeconds: 1 })
await boss.fetch(queue)
await boss.complete(queue, jobId)

await delay(1000)

await boss.maintain(queue)

const job = await boss.getJobById(queue, jobId)

assert(!job)
})

it('should delete a completed job via maintenance - cascade config from queue', async function () {
const config = {
...this.test.bossConfig,
maintenanceIntervalSeconds: 1,
Expand Down
4 changes: 2 additions & 2 deletions test/insertTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ describe('insert', function () {

await boss.insert(queue, input)

const count = await boss.getQueueSize(queue)
const { queuedCount } = await boss.getQueueStats(queue)

assert.strictEqual(count, 3)
assert.strictEqual(queuedCount, 3)
})

it('should create jobs from an array with all properties', async function () {
Expand Down
14 changes: 7 additions & 7 deletions test/queueTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ describe('queues', function () {

await boss.createQueue(queue)
await boss.send(queue)
await boss.dropAllJobs(queue)
await boss.deleteAllJobs(queue)
await boss.deleteQueue(queue)
})

Expand All @@ -113,11 +113,11 @@ describe('queues', function () {
await boss.createQueue(queue, { partition: true })
await boss.send(queue)

await boss.dropAllJobs(queue)
await boss.deleteAllJobs(queue)
await boss.deleteQueue(queue)

const count = await boss.getQueueSize(queue2)
assert(count)
const { queuedCount } = await boss.getQueueStats(queue2)
assert(queuedCount)
})

it('should truncate a partitioned queue', async function () {
Expand All @@ -126,7 +126,7 @@ describe('queues', function () {

await boss.createQueue(queue, { partition: true })
await boss.send(queue)
await boss.dropAllJobs(queue)
await boss.deleteAllJobs(queue)
await boss.deleteQueue(queue)
})

Expand All @@ -153,7 +153,7 @@ describe('queues', function () {

assert.strictEqual(await getCount(), 1)

await boss.dropQueuedJobs(queue)
await boss.deleteQueuedJobs(queue)

assert.strictEqual(await getCount(), 0)
})
Expand All @@ -180,7 +180,7 @@ describe('queues', function () {

assert.strictEqual(await getCount(), 2)

await boss.dropStoredJobs(queue)
await boss.deleteStoredJobs(queue)

assert.strictEqual(await getCount(), 0)
})
Expand Down
Loading

0 comments on commit 246cdc1

Please sign in to comment.