Skip to content

Commit

Permalink
update default db close config on stop
Browse files Browse the repository at this point in the history
  • Loading branch information
timgit committed Aug 11, 2024
1 parent 46c2601 commit 2ae28f1
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 25 deletions.
18 changes: 7 additions & 11 deletions docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ CREATE TABLE pgboss.job (

# Events

Each instance of pg-boss is an EventEmitter. You can run multiple instances of pg-boss for a variety of use cases including distribution and load balancing. Each instance has the freedom to process to whichever jobs you need. Because of this diversity, the job activity of one instance could be drastically different from another.

> For example, if you were to process to `error` in instance A, it will not receive an `error` event from instance B.
The pg-boss class inherits from EventEmitter.

## `error`
The error event is raised from any errors that may occur during internal job fetching, monitoring and archiving activities. While not required, adding a listener to the error event is strongly encouraged:
The error event is raised from any errors that may occur during internal processing, such as scheduling and maintenance. While not required, adding a listener to the error event is strongly encouraged:

> If an EventEmitter does not have at least one listener registered for the 'error' event, and an 'error' event is emitted, the error is thrown, a stack trace is printed, and the Node.js process exits.
>
Expand All @@ -172,8 +170,6 @@ Ideally, code similar to the following example would be used after creating your
boss.on('error', error => logger.error(error));
```

> **Note: Since error events are only raised during internal housekeeping activities, they are not raised for direct API calls.**
## `monitor-states`

The `monitor-states` event is conditionally raised based on the `monitorStateInterval` configuration setting and only emitted from `start()`. If passed during instance creation, it will provide a count of jobs in each state per interval. This could be useful for logging or even determining if the job system is handling its load.
Expand Down Expand Up @@ -425,13 +421,13 @@ If the required database objects do not exist in the specified database, **`star

> While this is most likely a welcome feature, be aware of this during upgrades since this could delay the promise resolution by however long the migration script takes to run against your data. For example, if you happened to have millions of jobs in the job table just hanging around for archiving and the next version of the schema had a couple of new indexes, it may take a few seconds before `start()` resolves. Most migrations are very quick, however, and are designed with performance in mind.
Additionally, all schema operations, both first-time provisioning and migrations, are nested within advisory locks to prevent race conditions during `start()`. Internally, these locks are created using `pg_advisory_xact_lock()` which auto-unlock at the end of the transaction and don't require a persistent session or the need to issue an unlock. This should make it compatible with most connection poolers, such as pgBouncer in transactional pooling mode.
Additionally, all schema operations, both first-time provisioning and migrations, are nested within advisory locks to prevent race conditions during `start()`. Internally, these locks are created using `pg_advisory_xact_lock()` which auto-unlock at the end of the transaction and don't require a persistent session or the need to issue an unlock.

One example of how this is useful would be including `start()` inside the bootstrapping of a pod in a ReplicaSet in Kubernetes. Being able to scale up your job processing using a container orchestration tool like k8s is becoming more and more popular, and pg-boss can be dropped into this system with no additional logic, fear, or special configuration.
One example of how this is useful would be including `start()` inside the bootstrapping of a pod in a ReplicaSet in Kubernetes. Being able to scale up your job processing using a container orchestration tool like k8s is becoming more and more popular, and pg-boss can be dropped into this system without any special startup handling.

## `stop(options)`

All job monitoring will be stopped and all workers on this instance will be removed. Basically, it's the opposite of `start()`. Even though `start()` may create new database objects during initialization, `stop()` will never remove anything from the database.
Stops all background processing, such as maintenance and scheduling, as well as all polling workers started with `work()`.

By default, calling `stop()` without any arguments will gracefully wait for all workers to finish processing active jobs before resolving. This behaviour can be configured using the options argument. If monitoring for the end of the stop is needed, add a listener to the `stopped` event.

Expand All @@ -446,8 +442,8 @@ By default, calling `stop()` without any arguments will gracefully wait for all

Default: `true`. If `true`, the PgBoss instance will wait for any workers that are currently processing jobs to finish, up to the specified timeout. During this period, new jobs will not be processed, but active jobs will be allowed to finish.

* `destroy`, bool
Default: `false`. If `true` and the database connection is managed by pg-boss, it will destroy the connection pool.
* `close`, bool
Default: `true`. If `true` and the database connection is managed by pg-boss, it will close the connection pool. Use `false` if needed to continue allowing operations such as `send()` and `fetch()`.

* `timeout`, int

Expand Down
4 changes: 2 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ class PgBoss extends EventEmitter {
return
}

let { destroy = false, graceful = true, timeout = 30000, wait = true } = options
let { close = true, graceful = true, timeout = 30000, wait = true } = options

timeout = Math.max(timeout, 1000)

Expand All @@ -168,7 +168,7 @@ class PgBoss extends EventEmitter {

await this.#manager.failWip()

if (this.#db.isOurs && this.#db.opened && destroy) {
if (this.#db.isOurs && this.#db.opened && close) {
await this.#db.close()
}

Expand Down
2 changes: 1 addition & 1 deletion test/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async function afterEach () {
const { boss, state } = this.currentTest

if (boss) {
await boss.stop({ destroy: true, timeout: 2000 })
await boss.stop({ timeout: 2000 })
}

if (state === 'passed') {
Expand Down
24 changes: 16 additions & 8 deletions test/opsTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,31 @@ describe('ops', function () {

it('should force stop', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig })
await boss.stop({ graceful: false, wait: false })
await boss.stop({ graceful: false, wait: true })
})

it('should destroy the connection pool', async function () {
it('should close the connection pool', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig })
await boss.stop({ destroy: true, graceful: false, wait: false })
await boss.stop({ graceful: false, wait: true })

assert(boss.getDb().pool.totalCount === 0)
})

it('should destroy the connection pool gracefully', async function () {
it('should close the connection pool gracefully', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig })
await boss.stop({ destroy: true, wait: false })
await new Promise((resolve) => {
boss.on('stopped', () => resolve())
})
await boss.stop({ wait: true })

assert(boss.getDb().pool.totalCount === 0)
})

it('should not close the connection pool after stop with close option', async function () {
const boss = this.test.boss = await helper.start({ ...this.test.bossConfig })
const queue = this.test.bossConfig.schema
await boss.stop({ close: false, wait: true })

const jobId = await boss.send(queue)
const [job] = await boss.fetch(queue)

assert.strictEqual(jobId, job.id)
})
})
2 changes: 1 addition & 1 deletion test/readme.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async function readme () {
})

await delay(2000)
await boss.stop({ destroy: true })
await boss.stop()
}

readme()
2 changes: 1 addition & 1 deletion test/scheduleTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ describe('schedule', function () {

await boss.unschedule(queue)

await boss.stop({ destroy: true, graceful: false, wait: false })
await boss.stop({ graceful: false })

const db = await helper.getDb()
await db.executeSql(plans.clearStorage(this.test.bossConfig.schema))
Expand Down
2 changes: 1 addition & 1 deletion types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ declare namespace PgBoss {
}

interface StopOptions {
destroy?: boolean,
close?: boolean,
graceful?: boolean,
timeout?: number,
wait?: boolean
Expand Down

0 comments on commit 2ae28f1

Please sign in to comment.