diff --git a/.vscode/settings.json b/.vscode/settings.json index cfe90978..a7a288d8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -11,7 +11,7 @@ ], "[typescript]": { "editor.codeActionsOnSave": { - "source.organizeImports": true + "source.organizeImports": "explicit" } }, "editor.rulers": [ diff --git a/src/lib/db.test.ts b/src/lib/db.test.ts index adcb2f5b..88c603bb 100644 --- a/src/lib/db.test.ts +++ b/src/lib/db.test.ts @@ -1132,7 +1132,7 @@ describe("lib/db", () => { }); it("..., but only above the minimum size", async () => { - await retry(5, async () => { + await retry(1, async () => { db = new JsonlDB(testFilenameFull, { autoCompress: { sizeFactor: 4, @@ -1752,7 +1752,6 @@ describe("lib/db", () => { await testFS.create({ [testFilename]: ``, }); - console.log(testFilenameFull); db = new JsonlDB(testFilenameFull, { enableTimestamps: true, @@ -1776,7 +1775,7 @@ describe("lib/db", () => { `, ); }); - }, 120000); + }); it("should be parsed from the db file", async () => { await retry(3, async () => { diff --git a/src/lib/db.ts b/src/lib/db.ts index be8c50c6..366584f2 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -8,6 +8,7 @@ import { composeObject } from "alcalzone-shared/objects"; import * as fs from "fs-extra"; import * as path from "path"; import * as readline from "readline"; +import { Signal } from "./signal"; export interface JsonlDBOptions { /** @@ -137,13 +138,17 @@ type LazyEntry = ( }; type PersistenceTask = + // The DB is stopped, perform cleanup | { type: "stop" } - | { type: "none" } + // Used only in the persistence thread and indicates that a write should be performed + | { type: "write" } + // The DB should be dumped to a file | { type: "dump"; filename: string; done: DeferredPromise; } + // The DB should be compressed | { type: "compress"; done: DeferredPromise; @@ -299,14 +304,57 @@ export class JsonlDB { } private _changesSinceLastCompress: number = 0; + private _compressBySizeThreshold: number = Number.POSITIVE_INFINITY; + // Signals that the conditions to compress the DB by size are fulfilled + private _compressBySizeNeeded = new Signal(); + // Signals that the minimum number of changes to automatically compress were exceeded + private _compressIntervalMinChangesExceeded = false; + // Signals that the next change may immediately trigger a write to disk + private _writeIntervalElapsed = false; + // Signals that the journal has exceeded the maximum buffered commands + // or that the journal contains entries that may be written to disk immediately + private _journalFlushable = new Signal(); + + private updateCompressBySizeThreshold(): void { + if (!this.options.autoCompress) return; + if (!this.options.autoCompress.sizeFactor) return; + const { + sizeFactor = Number.POSITIVE_INFINITY, + sizeFactorMinimumSize = 0, + } = this.options.autoCompress; + + this._compressBySizeThreshold = Math.max( + sizeFactorMinimumSize, + sizeFactor * this.size, + ); + } + + private triggerJournalFlush(): void { + // Trigger a flush... + if ( + // ... immediately if writing isn't throttled + !this.options.throttleFS?.intervalMs || + // ... immediately if the timer elapsed + this._writeIntervalElapsed || + // ... or the maximum buffered commands were exceeded + this.exceededMaxBufferedCommands() + ) { + this._journalFlushable.set(); + } + } private _isOpen: boolean = false; public get isOpen(): boolean { return this._isOpen; } + // Resolves when the persistence thread ends private _persistencePromise: Promise | undefined; + // An array of tasks to be handled by the persistence thread private _persistenceTasks: PersistenceTask[] = []; + // Indicates to the persistence thread that there is a pending task + private _persistenceTaskSignal = new Signal(); + private _journal: LazyEntry[] = []; private _fd: number | undefined; @@ -314,6 +362,11 @@ export class JsonlDB { return this._journal.splice(0, this._journal.length); } + private pushPersistenceTask(task: PersistenceTask): void { + this._persistenceTasks.push(task); + this._persistenceTaskSignal.set(); + } + private _openPromise: DeferredPromise | undefined; // /** Opens the database file or creates it if it doesn't exist */ public async open(): Promise { @@ -422,6 +475,8 @@ export class JsonlDB { this._fd = undefined; } + this.updateCompressBySizeThreshold(); + // Start background persistence thread this._persistencePromise = this.persistenceThread(); await this._openPromise; @@ -575,6 +630,9 @@ export class JsonlDB { // Something was deleted this._journal.push(this.makeLazyDelete(key)); this._timestamps.delete(key); + + this.updateCompressBySizeThreshold(); + this.triggerJournalFlush(); } return ret; } @@ -585,7 +643,7 @@ export class JsonlDB { } this._db.set(key, value); if (this.options.enableTimestamps) { - // If the timestamp should updated, use the current time, otherwise try to preserve the old one + // If the timestamp should be updated, use the current time, otherwise try to preserve the old one let ts: number | undefined; if (updateTimestamp) { ts = Date.now(); @@ -597,6 +655,10 @@ export class JsonlDB { } else { this._journal.push(this.makeLazyWrite(key, value)); } + + this.updateCompressBySizeThreshold(); + this.triggerJournalFlush(); + return this; } @@ -626,6 +688,9 @@ export class JsonlDB { this._db.set(key, value); this._journal.push(this.makeLazyWrite(key, value)); } + + this.updateCompressBySizeThreshold(); + this.triggerJournalFlush(); } public async exportJson( @@ -752,7 +817,7 @@ export class JsonlDB { if (!this._isOpen) return; const done = createDeferredPromise(); - this._persistenceTasks.push({ + this.pushPersistenceTask({ type: "dump", filename: targetFilename, done, @@ -766,18 +831,22 @@ export class JsonlDB { } } - private needToCompressBySize(): boolean { - const { - sizeFactor = Number.POSITIVE_INFINITY, - sizeFactorMinimumSize = 0, - } = this.options.autoCompress ?? {}; - if ( - this._uncompressedSize >= sizeFactorMinimumSize && - this._uncompressedSize >= sizeFactor * this.size - ) { - return true; + private exceededMaxBufferedCommands(): boolean { + const maxBufferedCommands = + this.options.throttleFS?.maxBufferedCommands; + if (maxBufferedCommands == undefined) { + return false; + } else { + return ( + this._journal.length > 0 && + this._journal.length > maxBufferedCommands + ); } - return false; + } + + private needToCompressBySize(): boolean { + if (!this._isOpen) return false; + return this._uncompressedSize >= this._compressBySizeThreshold; } private needToCompressByTime(lastCompress: number): boolean { @@ -793,50 +862,113 @@ export class JsonlDB { } private async persistenceThread(): Promise { + const compressInterval = this.options.autoCompress?.intervalMs; + const throttleInterval = this.options.throttleFS?.intervalMs ?? 0; + // Keep track of the write accesses and compression attempts let lastWrite = Date.now(); let lastCompress = Date.now(); - const throttleInterval = this.options.throttleFS?.intervalMs ?? 0; - const maxBufferedCommands = - this.options.throttleFS?.maxBufferedCommands ?? - Number.POSITIVE_INFINITY; // Open the file for appending and reading this._fd = await fs.open(this.filename, "a+"); this._openPromise?.resolve(); - const sleepDuration = 20; // ms - while (true) { + const now = Date.now(); + + // Figure out how long the timeouts should be. + // > 0 means wait, 0 means do it now, undefined means don't do it + let compressByTimeSleepDuration: number | undefined; + if (compressInterval) { + const nextCompress = lastCompress + compressInterval; + if (nextCompress > now) { + compressByTimeSleepDuration = nextCompress - now; + } else if (this._compressIntervalMinChangesExceeded) { + // Compress now + compressByTimeSleepDuration = 0; + } + } + + let throttledWriteSleepDuration: number | undefined; + if (throttleInterval) { + const nextWrite = lastWrite + throttleInterval; + if (nextWrite > now) { + throttledWriteSleepDuration = nextWrite - now; + } else if (this._journal.length > 0) { + // Write now + throttledWriteSleepDuration = 0; + } else { + // Indicate to the outside that the next journal entry + // should cause a write/trigger + this._writeIntervalElapsed = true; + } + } else if (this._journal.length > 0) { + // Not throttled, write immediately + throttledWriteSleepDuration = 0; + } + // Figure out what to do + type Input = "flush journal" | "write" | "compress" | "task"; + // We do this in two steps, as we only want to react to a single action + const input = (await Promise.race( + [ + // The journal has exceeded the maximum buffered commands + // and needs to be written to disk + this._journalFlushable.then(() => "flush journal" as const), + + // The timer to flush the journal to disk has elapsed + throttledWriteSleepDuration != undefined && + wait(throttledWriteSleepDuration, true).then( + () => "write" as const, + ), + + // The timer to compress by time has elapsed + compressByTimeSleepDuration != undefined && + wait(compressByTimeSleepDuration, true).then(() => { + if (this._compressIntervalMinChangesExceeded) { + return "compress" as const; + } + }), + + this._compressBySizeNeeded.then(() => "compress" as const), + + // A task was received from the outside + this._persistenceTaskSignal.then(() => "task" as const), + ].filter((p) => !!p), + )) as Input; + let task: PersistenceTask | undefined; - if ( - this.needToCompressBySize() || - this.needToCompressByTime(lastCompress) - ) { + if (input === "flush journal") { + task = { type: "write" }; + } else if (input === "write") { + task = { type: "write" }; + } else if (input === "compress") { // Need to compress - task = { type: "compress", done: createDeferredPromise() }; + task = { + type: "compress", + done: createDeferredPromise(), + }; // but catch errors! // eslint-disable-next-line @typescript-eslint/no-empty-function task.done.catch(() => {}); - } else { - // Take the first tasks of from the task queue - task = this._persistenceTasks.shift() ?? { type: "none" }; + } else if (input === "task") { + task = this._persistenceTasks.shift(); + // Reset the signal when there are no more tasks + if (!task) this._persistenceTaskSignal.reset(); } + if (!task) continue; + let isStopCmd = false; switch (task.type) { case "stop": isStopCmd = true; // fall through - case "none": { + case "write": { // Write to disk if necessary - const shouldWrite = - this._journal.length > 0 && - (isStopCmd || - Date.now() - lastWrite > throttleInterval || - this._journal.length > maxBufferedCommands); + // Only write if there are actually entries to write + const shouldWrite = this._journal.length > 0; if (shouldWrite) { // Drain the journal @@ -852,6 +984,16 @@ export class JsonlDB { await fs.close(this._fd); this._fd = undefined; return; + } else { + // Since we wrote something, the uncompressed size may have changed + if (this.needToCompressBySize()) { + this._compressBySizeNeeded.set(); + } + // Also we might have surpassed the change threshold to trigger a compression + this._compressIntervalMinChangesExceeded = + this._changesSinceLastCompress >= + (this.options.autoCompress?.intervalMinChanges ?? + 1); } break; } @@ -877,8 +1019,6 @@ export class JsonlDB { break; } } - - await wait(sleepDuration); } } @@ -930,6 +1070,10 @@ export class JsonlDB { await fs.appendFile(fd, serialized); await fs.fsync(fd); + // Reset the signals related to writing the journal + this._journalFlushable.reset(); + this._writeIntervalElapsed = false; + return fd; } @@ -948,6 +1092,12 @@ export class JsonlDB { // 2. Create a dump, draining the journal to avoid duplicate writes await this.dumpInternal(this.dumpFilename, true); + // We're done writing, so update the staticstics now + this._uncompressedSize = this._db.size; + this.updateCompressBySizeThreshold(); + this._changesSinceLastCompress = 0; + this._compressIntervalMinChangesExceeded = false; + // 3. Ensure there are no pending rename operations or file creations await fsyncDir(path.dirname(this.filename)); @@ -964,15 +1114,13 @@ export class JsonlDB { // 6. open the main DB file again in append mode this._fd = await fs.open(this.filename, "a+"); - // Remember the new statistics - this._uncompressedSize = this._db.size; - this._changesSinceLastCompress = 0; + // Reset the signals related to compressing the DB + this._compressBySizeNeeded.reset(); } /** Compresses the db by dumping it and overwriting the aof file. */ public async compress(): Promise { if (!this._isOpen) return; - await this.compressInternal(); } @@ -986,7 +1134,7 @@ export class JsonlDB { if (task) return task.done; const done = createDeferredPromise(); - this._persistenceTasks.push({ + this.pushPersistenceTask({ type: "compress", done, }); @@ -1010,13 +1158,14 @@ export class JsonlDB { } // Stop persistence thread and wait for it to finish - this._persistenceTasks.push({ type: "stop" }); + this.pushPersistenceTask({ type: "stop" }); await this._persistencePromise; // Reset all variables this._db.clear(); this._changesSinceLastCompress = 0; this._uncompressedSize = Number.NaN; + this.updateCompressBySizeThreshold(); // Free the lock try { diff --git a/src/lib/signal.test.ts b/src/lib/signal.test.ts new file mode 100644 index 00000000..cc3d5f6c --- /dev/null +++ b/src/lib/signal.test.ts @@ -0,0 +1,47 @@ +import { describe, expect, it } from "vitest"; +import { Signal } from "./signal"; + +describe("signal", () => { + it("can be awaited and resolves immediately when set", async () => { + const signal = new Signal(); + signal.set(); + await signal; + }); + + it("can be awaited and resolves when set", async () => { + const signal = new Signal(); + setTimeout(() => signal.set(), 100); + await signal; + }); + + it("does not resolve when awaited after being reset", async () => { + const signal = new Signal(); + signal.set(); + signal.reset(); + + const result = await Promise.race([ + signal.then(() => "resolved"), + new Promise((resolve) => setTimeout(resolve, 100)).then( + () => "timeout", + ), + ]); + + expect(result).toBe("timeout"); + }); + + it("can be re-awaited multiple times in sequence", async () => { + const signal = new Signal(); + + setTimeout(() => signal.set(), 100); + await signal; + signal.reset(); + + setTimeout(() => signal.set(), 100); + await signal; + signal.reset(); + + setTimeout(() => signal.set(), 100); + await signal; + signal.reset(); + }); +}); diff --git a/src/lib/signal.ts b/src/lib/signal.ts new file mode 100644 index 00000000..dff6ae36 --- /dev/null +++ b/src/lib/signal.ts @@ -0,0 +1,37 @@ +import { + createDeferredPromise, + type DeferredPromise, +} from "alcalzone-shared/deferred-promise"; + +/** Can be used to asynchronously signal a single listener */ +export class Signal { + private _listener: DeferredPromise | undefined; + + private _status: boolean = false; + public get isSet(): boolean { + return this._status; + } + + public set(): void { + if (this._status) return; + this._status = true; + if (this._listener) { + this._listener.resolve(undefined); + } + } + + public reset(): void { + this._status = false; + this._listener = undefined; + } + + public then(onfulfilled: () => T | PromiseLike): Promise { + if (this._status) { + return Promise.resolve(onfulfilled()); + } else { + const p = createDeferredPromise(); + this._listener = p; + return p.then(onfulfilled); + } + } +} diff --git a/test/cpu.ts b/test/cpu.ts new file mode 100644 index 00000000..fbf8593d --- /dev/null +++ b/test/cpu.ts @@ -0,0 +1,25 @@ +import { JsonlDB } from "../src"; + +async function main() { + const testDB: JsonlDB = new JsonlDB("test.jsonl", { + autoCompress: { + sizeFactor: 2, + sizeFactorMinimumSize: 5000, + }, + ignoreReadErrors: true, + throttleFS: { + intervalMs: 60000, + maxBufferedCommands: 100, + }, + }); + + await testDB.open(); + for (let i = 0; i < 100; i++) { + testDB.set(`benchmark.0.test${i}`, i); + } + + await new Promise((resolve) => setTimeout(resolve, 20000)); + + await testDB.close(); +} +void main();