Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full in-memory database support #52

Merged
merged 11 commits into from
Nov 23, 2024
80 changes: 47 additions & 33 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import type {
Transaction,
DeleteMessage,
DatabasePath,
ExportMessage,
} from './types.js';
import { SQLocalProcessor } from './processor.js';
import { sqlTag } from './lib/sql-tag.js';
Expand All @@ -32,11 +33,12 @@ import { getQueryKey } from './lib/get-query-key.js';
import { normalizeSql } from './lib/normalize-sql.js';
import { parseDatabasePath } from './lib/parse-database-path.js';
import { mutationLock } from './lib/mutation-lock.js';
import { normalizeDatabaseFile } from './lib/normalize-database-file.js';

export class SQLocal {
protected config: ClientConfig;
protected clientKey: QueryKey;
protected processor?: SQLocalProcessor | Worker;
protected processor: SQLocalProcessor | Worker;
protected isDestroyed: boolean = false;
protected bypassMutationLock: boolean = false;
protected userCallbacks = new Map<string, CallbackUserFunction['func']>();
Expand All @@ -48,7 +50,7 @@ export class SQLocal {
]
>();

protected proxy?: WorkerProxy;
protected proxy: WorkerProxy;
protected reinitChannel: BroadcastChannel;

constructor(databasePath: DatabasePath);
Expand All @@ -66,15 +68,22 @@ export class SQLocal {
`_sqlocal_reinit_(${clientConfig.databasePath})`
);

if (typeof globalThis.Worker !== 'undefined') {
if (
typeof globalThis.Worker !== 'undefined' &&
processorConfig.databasePath !== ':memory:'
) {
this.processor = new Worker(new URL('./worker', import.meta.url), {
type: 'module',
});
this.processor.addEventListener('message', this.processMessageEvent);
this.proxy = coincident(this.processor) as WorkerProxy;
} else {
this.processor = new SQLocalProcessor(true);
this.processor.onmessage = (message) => this.processMessageEvent(message);
this.proxy = globalThis as WorkerProxy;
}

this.processor?.postMessage({
this.processor.postMessage({
type: 'config',
config: processorConfig,
} satisfies ConfigMessage);
Expand All @@ -89,8 +98,9 @@ export class SQLocal {
switch (message.type) {
case 'success':
case 'data':
case 'error':
case 'buffer':
case 'info':
case 'error':
if (message.queryKey && queries.has(message.queryKey)) {
const [resolve, reject] = queries.get(message.queryKey)!;
if (message.type === 'error') {
Expand Down Expand Up @@ -124,8 +134,9 @@ export class SQLocal {
| BatchMessage
| TransactionMessage
| FunctionMessage
| ImportMessage
| GetInfoMessage
| ImportMessage
| ExportMessage
| DeleteMessage
| DestroyMessage
>
Expand All @@ -137,12 +148,6 @@ export class SQLocal {
message.type === 'delete',
this.config,
async () => {
if (!this.processor) {
throw new Error(
'This SQLocal client is not connected to a database. This is likely due to the client being initialized in a server-side environment.'
);
}

if (this.isDestroyed === true) {
throw new Error(
'This SQLocal client has been destroyed. You will need to initialize a new client in order to make further queries.'
Expand Down Expand Up @@ -171,6 +176,7 @@ export class SQLocal {
| TransactionMessage
| FunctionMessage
| GetInfoMessage
| ExportMessage
| DeleteMessage
| DestroyMessage);
break;
Expand Down Expand Up @@ -369,7 +375,7 @@ export class SQLocal {
functionType: 'scalar',
});

if (this.proxy && this.proxy !== globalThis) {
if (this.proxy !== globalThis) {
this.proxy[key] = func;
}
};
Expand All @@ -385,19 +391,33 @@ export class SQLocal {
};

getDatabaseFile = async (): Promise<File> => {
const { directories, fileName, getDirectoryHandle } = parseDatabasePath(
this.config.databasePath
);
const tempFileName = `backup-${Date.now()}--${fileName}`;
const tempFilePath = `${directories.join('/')}/${tempFileName}`;

await this.exec('VACUUM INTO ?', [tempFilePath]);
let fileName, fileBuffer;
const { storageType } = await this.getDatabaseInfo();

if (storageType === 'opfs') {
const path = parseDatabasePath(this.config.databasePath);
const { directories, getDirectoryHandle } = path;
fileName = path.fileName;
const tempFileName = `backup-${Date.now()}--${fileName}`;
const tempFilePath = `${directories.join('/')}/${tempFileName}`;

await this.exec('VACUUM INTO ?', [tempFilePath]);

const dirHandle = await getDirectoryHandle();
const fileHandle = await dirHandle.getFileHandle(tempFileName);
const file = await fileHandle.getFile();
fileBuffer = await file.arrayBuffer();
await dirHandle.removeEntry(tempFileName);
} else {
const message = await this.createQuery({ type: 'export' });

const dirHandle = await getDirectoryHandle();
const fileHandle = await dirHandle.getFileHandle(tempFileName);
const file = await fileHandle.getFile();
const fileBuffer = await file.arrayBuffer();
await dirHandle.removeEntry(tempFileName);
if (message.type === 'buffer') {
fileName = 'database.sqlite3';
fileBuffer = message.buffer;
} else {
throw new Error('The database failed to export.');
}
}

return new File([fileBuffer], fileName, {
type: 'application/x-sqlite3',
Expand All @@ -413,16 +433,10 @@ export class SQLocal {
| ReadableStream<Uint8Array>,
beforeUnlock?: () => void | Promise<void>
): Promise<void> => {
let database: ArrayBuffer | Uint8Array | ReadableStream<Uint8Array>;

if (databaseFile instanceof Blob) {
database = databaseFile.stream();
} else {
database = databaseFile;
}

await mutationLock('exclusive', false, this.config, async () => {
try {
const database = await normalizeDatabaseFile(databaseFile);

await this.createQuery({
type: 'import',
database,
Expand Down
74 changes: 74 additions & 0 deletions src/lib/normalize-database-file.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
type DatabaseFileInput =
| File
| Blob
| ArrayBuffer
| Uint8Array
| ReadableStream<Uint8Array>;

export function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo: 'callback'
): Promise<ArrayBuffer | Uint8Array | (() => Promise<Uint8Array | undefined>)>;
export function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo: 'buffer'
): Promise<ArrayBuffer | Uint8Array>;
export function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo?: undefined
): Promise<ArrayBuffer | Uint8Array | ReadableStream<Uint8Array>>;
export async function normalizeDatabaseFile(
dbFile: DatabaseFileInput,
convertStreamTo?: 'callback' | 'buffer'
): Promise<
| ArrayBuffer
| Uint8Array
| ReadableStream<Uint8Array>
| (() => Promise<Uint8Array | undefined>)
> {
let bufferOrStream: ArrayBuffer | Uint8Array | ReadableStream<Uint8Array>;

if (dbFile instanceof Blob) {
bufferOrStream = dbFile.stream();
} else {
bufferOrStream = dbFile;
}

if (bufferOrStream instanceof ReadableStream && convertStreamTo) {
const stream = bufferOrStream;
const reader = stream.getReader();

switch (convertStreamTo) {
case 'callback':
return async () => {
const chunk = await reader.read();
return chunk.value;
};

case 'buffer':
const chunks: Uint8Array[] = [];
let streamDone = false;

while (!streamDone) {
const chunk = await reader.read();
if (chunk.value) chunks.push(chunk.value);
streamDone = chunk.done;
}

const arrayLength = chunks.reduce((length, chunk) => {
return length + chunk.length;
}, 0);
const buffer = new Uint8Array(arrayLength);
let offset = 0;

chunks.forEach((chunk) => {
buffer.set(chunk, offset);
offset += chunk.length;
});

return buffer;
}
} else {
return bufferOrStream;
}
}
Loading
Loading