Skip to content

Commit

Permalink
tests and types
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Sep 23, 2024
1 parent fa8bafd commit a72e5bf
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 86 deletions.
31 changes: 9 additions & 22 deletions src/attorney.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,10 @@ module.exports = {
validateQueueArgs,
checkWorkArgs,
checkFetchArgs,
warnClockSkew,
assertPostgresObjectName,
assertQueueName
}

const WARNINGS = {
CLOCK_SKEW: {
message: 'Timekeeper detected clock skew between this instance and the database server. This will not affect scheduling operations, but this warning is shown any time the skew exceeds 60 seconds.',
code: 'pg-boss-w01'
}
}

function validateQueueArgs (config = {}) {
assert(!('deadLetter' in config) || config.deadLetter === null || (typeof config.deadLetter === 'string'), 'deadLetter must be a string')
assert(!('deadLetter' in config) || config.deadLetter === null || /[\w-]/.test(config.deadLetter), 'deadLetter can only contain alphanumeric characters, underscores, or hyphens')
Expand Down Expand Up @@ -133,6 +125,7 @@ function getConfig (value) {
applySchemaConfig(config)
applyMaintenanceConfig(config)
applyScheduleConfig(config)
validateWarningConfig(config)

return config
}
Expand All @@ -145,6 +138,14 @@ function applySchemaConfig (config) {
config.schema = config.schema || DEFAULT_SCHEMA
}

function validateWarningConfig (config) {
assert(!('warningLargeQueueSize' in config) || config.warningLargeQueueSize >= 1,
'configuration assert: warningLargeQueueSize must be at least 1')

assert(!('warningSlowQuerySeconds' in config) || config.warningSlowQuerySeconds >= 1,
'configuration assert: warningSlowQuerySeconds must be at least 1')
}

function assertPostgresObjectName (name) {
assert(typeof name === 'string', 'Name must be a string')
assert(name.length <= 50, 'Name cannot exceed 50 characters')
Expand Down Expand Up @@ -232,17 +233,3 @@ function applyScheduleConfig (config) {

config.cronWorkerIntervalSeconds = config.cronWorkerIntervalSeconds || 5
}

function warnClockSkew (message) {
emitWarning(WARNINGS.CLOCK_SKEW, message, { force: true })
}

function emitWarning (warning, message, options = {}) {
const { force } = options

if (force || !warning.warned) {
warning.warned = true
message = `${warning.message} ${message || ''}`
process.emitWarning(message, warning.type, warning.code)
}
}
15 changes: 8 additions & 7 deletions src/boss.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const plans = require('./plans')

const events = {
error: 'error',
warn: 'warn'
warning: 'warning'
}

const WARNINGS = {
Expand Down Expand Up @@ -32,12 +32,12 @@ class Boss extends EventEmitter {
this.maintain
]

if (this.#config.warningSlowQuerySeconds) {
WARNINGS.SLOW_QUERY.seconds = this.#config.warningSlowQuerySeconds
if (config.warningSlowQuerySeconds) {
WARNINGS.SLOW_QUERY.seconds = config.warningSlowQuerySeconds
}

if (this.#config.warningLargeQueueSize) {
WARNINGS.LARGE_QUEUE.size = this.#config.warningLargeQueueSize
if (config.warningLargeQueueSize) {
WARNINGS.LARGE_QUEUE.size = config.warningLargeQueueSize
}
}

Expand Down Expand Up @@ -65,7 +65,7 @@ class Boss extends EventEmitter {
const elapsed = (ended - started) / 1000

if (elapsed > WARNINGS.SLOW_QUERY.seconds || this.#config.__test__warn_slow_query) {
this.emit(events.warn, { message: WARNINGS.SLOW_QUERY.message, elapsed, sql, values })
this.emit(events.warning, { message: WARNINGS.SLOW_QUERY.message, data: { elapsed, sql, values } })
}

return result
Expand Down Expand Up @@ -114,6 +114,7 @@ class Boss extends EventEmitter {
const names = queues.map(i => i.name)

while (names.length) {
// todo: test
const chunk = names.splice(0, 100)

await this.#monitorActive(table, chunk)
Expand All @@ -130,7 +131,7 @@ class Boss extends EventEmitter {
const warnings = rows.filter(i => i.queuedCount > (i.queueSizeWarning || WARNINGS.LARGE_QUEUE.size))

for (const warning of warnings) {
this.emit(events.warn, { ...warning, message: WARNINGS.LARGE_QUEUE.mesasge })
this.emit(events.warning, { message: WARNINGS.LARGE_QUEUE.mesasge, data: warning })
}

const sql = plans.failJobsByTimeout(this.#config.schema, table, names)
Expand Down
11 changes: 9 additions & 2 deletions src/timekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,14 @@ const QUEUES = {

const EVENTS = {
error: 'error',
schedule: 'schedule'
schedule: 'schedule',
warning: 'warning'
}

const WARNINGS = {
CLOCK_SKEW: {
message: 'Warning: Clock skew between this instance and the database server. This will not break scheduling, but is emitted any time the skew exceeds 60 seconds.'
}
}

class Timekeeper extends EventEmitter {
Expand Down Expand Up @@ -89,7 +96,7 @@ class Timekeeper extends EventEmitter {
const skewSeconds = Math.abs(skew) / 1000

if (skewSeconds >= 60 || this.config.__test__force_clock_skew_warning) {
Attorney.warnClockSkew(`Instance clock is ${skewSeconds}s ${skew > 0 ? 'slower' : 'faster'} than database.`)
this.emit(this.events.warning, { message: WARNINGS.CLOCK_SKEW.message, data: { seconds: skewSeconds, direction: skew > 0 ? 'slower' : 'faster' } })
}
} catch (err) {
this.emit(this.events.error, err)
Expand Down
2 changes: 1 addition & 1 deletion test/expireTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ describe('expire', function () {
// fetch the job but don't complete it
await boss.fetch(queue)

await delay(2000)
await delay(4000)

const job = await boss.getJobById(queue, jobId)

Expand Down
39 changes: 36 additions & 3 deletions test/monitoringTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,53 @@ describe('monitoring', function () {
assert(errorCount > 0)
})

it('slow maintenance should emit warn', async function () {
it('slow maintenance should emit warning', async function () {
const config = {
...this.test.bossConfig,
__test__warn_slow_query: true
__test__warn_slow_query: true,
warningSlowQuerySeconds: 1
}

const boss = this.test.boss = await helper.start(config)
const queue = this.test.bossConfig.schema

let eventCount = 0
boss.on('warn', () => eventCount++)
boss.on('warning', (event) => {
assert(event.message.includes('slow'))
eventCount++
})

await boss.maintain(queue)

assert(eventCount > 0)
})

it('large queue should emit warning', async function () {
const config = {
...this.test.bossConfig,
monitorIntervalSeconds: 1,
warningLargeQueueSize: 1,
superviseIntervalSeconds: 1,
supervise: true
}

const boss = this.test.boss = await helper.start(config)
const queue = this.test.bossConfig.schema

await boss.send(queue)
await boss.send(queue)

let eventCount = 0

boss.on('warning', (event) => {
assert(event.message.includes('queue'))
eventCount++
})

await boss.maintain(queue)

await delay(4000)

assert(eventCount > 0)
})
})
12 changes: 3 additions & 9 deletions test/scheduleTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -164,19 +164,13 @@ describe('schedule', function () {

let warningCount = 0

const warningEvent = 'warning'

const onWarning = (warning) => {
assert(warning.message.includes('clock skew'))
boss.once('warning', (warning) => {
assert(warning.message.includes('Clock skew'))
warningCount++
}

process.on(warningEvent, onWarning)
})

await boss.start()

process.removeListener(warningEvent, onWarning)

assert.strictEqual(warningCount, 1)
})

Expand Down
82 changes: 40 additions & 42 deletions types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ declare namespace PgBoss {
singleton: 'singleton',
stately: 'stately'
}

interface Db {
executeSql(text: string, values: any[]): Promise<{ rows: any[] }>;
}
Expand Down Expand Up @@ -45,6 +46,8 @@ declare namespace PgBoss {
interface MaintenanceOptions {
supervise?: boolean;
migrate?: boolean;
warningSlowQuerySeconds?: number;
warningLargeQueueSize?: number;
}

type ConstructorOptions = DatabaseOptions & SchedulingOptions & MaintenanceOptions
Expand All @@ -59,7 +62,7 @@ declare namespace PgBoss {
}

interface JobOptions {
id?: string,
id?: string;
priority?: number;
startAfter?: number | string | Date;
singletonKey?: string;
Expand All @@ -78,20 +81,21 @@ declare namespace PgBoss {
type QueuePolicy = 'standard' | 'short' | 'singleton' | 'stately'

type Queue = {
name: string,
policy?: QueuePolicy,
partition?: boolean,
deadLetter?: string,
name: string;
policy?: QueuePolicy;
partition?: boolean;
deadLetter?: string;
queueSizeWarning?: number;
} & QueueOptions

type QueueResult = Queue & {
deferredCount: number,
queuedCount: number,
activeCount: number,
completedCount: number,
table: number,
createdOn: Date,
updatedOn: Date
deferredCount: number;
queuedCount: number;
activeCount: number;
completedCount: number;
table: number;
createdOn: Date;
updatedOn: Date;
}

type ScheduleOptions = SendOptions & { tz?: string }
Expand Down Expand Up @@ -153,13 +157,13 @@ declare namespace PgBoss {
createdOn: Date;
completedOn: Date | null;
keepUntil: Date;
policy: QueuePolicy,
deadLetter: string,
output: object
policy: QueuePolicy;
deadLetter: string;
output: object;
}

interface JobInsert<T = object> {
id?: string,
id?: string;
name: string;
data?: T;
priority?: number;
Expand All @@ -170,39 +174,30 @@ declare namespace PgBoss {
singletonKey?: string;
singletonSeconds?: number;
expireInSeconds?: number;
deleteAfterSeconds: number;
keepUntil?: Date | string;
}

interface MonitorState {
all: number;
created: number;
retry: number;
active: number;
completed: number;
cancelled: number;
failed: number;
}

interface Worker {
id: string,
name: string,
options: WorkOptions,
state: 'created' | 'active' | 'stopping' | 'stopped'
count: number,
createdOn: Date,
lastFetchedOn: Date,
lastJobStartedOn: Date,
lastJobEndedOn: Date,
lastJobDuration: number,
lastError: object,
lastErrorOn: Date
id: string;
name: string;
options: WorkOptions;
state: 'created' | 'active' | 'stopping' | 'stopped';
count: number;
createdOn: Date;
lastFetchedOn: Date;
lastJobStartedOn: Date;
lastJobEndedOn: Date;
lastJobDuration: number;
lastError: object;
lastErrorOn: Date;
}

interface StopOptions {
close?: boolean,
graceful?: boolean,
timeout?: number,
wait?: boolean
close?: boolean;
graceful?: boolean;
timeout?: number;
wait?: boolean;
}

interface OffWorkOptions {
Expand Down Expand Up @@ -232,6 +227,9 @@ declare class PgBoss extends EventEmitter {
on(event: "error", handler: (error: Error) => void): this;
off(event: "error", handler: (error: Error) => void): this;

on(event: "warning", handler: (warning: { message: string, data: object }) => void): this;
off(event: "warning", handler: (warning: { message: string, data: object }) => void): this;

on(event: "wip", handler: (data: PgBoss.Worker[]) => void): this;
off(event: "wip", handler: (data: PgBoss.Worker[]) => void): this;

Expand Down

0 comments on commit a72e5bf

Please sign in to comment.