Skip to content

Commit

Permalink
feat(standalone-sqlite): retry saveBundle when SQLITE_BUSY
Browse files Browse the repository at this point in the history
  • Loading branch information
karlprieb committed Feb 26, 2025
1 parent 3d1de8b commit e79beac
Showing 1 changed file with 62 additions and 27 deletions.
89 changes: 62 additions & 27 deletions src/database/standalone-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2757,13 +2757,7 @@ export class StandaloneSqliteDatabase
})
.on('message', async (result) => {
if (result && result.stack) {
const { message, stack, workerMethod, workerArgs } = result;
const error = new DetailedError(message, {
stack,
workerMethod,
workerArgs,
});
job.reject(error);
job.reject(DetailedError.fromJSON(result));
} else {
job.resolve(result);
}
Expand Down Expand Up @@ -2855,7 +2849,10 @@ export class StandaloneSqliteDatabase
role: WorkerRoleName,
method: WorkerMethodName,
args: any,
retryIfFailed = false,
): Promise<any> {
const MAX_RETRIES = 3;

metrics.sqliteInFlightOps.inc({
worker: workerName,
role,
Expand All @@ -2865,41 +2862,79 @@ export class StandaloneSqliteDatabase
role,
method,
});
const ret = new Promise((resolve, reject) => {
this.workQueues[workerName][role].push({
resolve,
reject,
message: {
method,
args,
},
});
this.drainQueue();
});

const executeWithRetry = async (retryCount = 0): Promise<any> => {
try {
return await new Promise((resolve, reject) => {
this.workQueues[workerName][role].push({
resolve,
reject,
message: {
method,
args,
},
});
this.drainQueue();
});
} catch (error: any) {
if (retryIfFailed && error.code === 'SQLITE_BUSY') {
this.log.warn('SQLITE_BUSY error detected, retrying...', {
worker: workerName,
role,
method,
error: error.message,
stack: error.stack,
retryCount,
maxRetries: MAX_RETRIES,
});

if (retryCount < MAX_RETRIES) {
return executeWithRetry(retryCount + 1);
} else {
this.log.error('Max retries reached for SQLITE_BUSY error', {
worker: workerName,
role,
method,
error: error.message,
stack: error.stack,
maxRetries: MAX_RETRIES,
});
}
}

throw error;
}
};

const ret = executeWithRetry();

ret.finally(() => {
metrics.sqliteInFlightOps.dec({
worker: workerName,
role,
});
end();
});

return ret;
}

queueRead(
pool: WorkerPoolName,
method: WorkerMethodName,
args: any,
retryIfFailed = false,
): Promise<any> {
return this.queueWork(pool, 'read', method, args);
return this.queueWork(pool, 'read', method, args, retryIfFailed);
}

queueWrite(
pool: WorkerPoolName,
method: WorkerMethodName,
args: any,
retryIfFailed = false,
): Promise<any> {
return this.queueWork(pool, 'write', method, args);
return this.queueWork(pool, 'write', method, args, retryIfFailed);
}

getMaxHeight(): Promise<number> {
Expand Down Expand Up @@ -2967,7 +3002,7 @@ export class StandaloneSqliteDatabase
}

saveBundle(bundle: BundleRecord): Promise<BundleSaveResult> {
return this.queueWrite('bundles', 'saveBundle', [bundle]);
return this.queueWrite('bundles', 'saveBundle', [bundle], true);
}

async flushStableDataItems(
Expand Down Expand Up @@ -3526,21 +3561,21 @@ if (!isMainThread) {

const error = new DetailedError('Error in StandaloneSqlite worker', {
stack: e.stack,
code: e.code,
workerMethod: method,
workerArgs: args,
});

log.error(error.message, {
message: error.message,
stack: error.stack,
workerMethod: method,
workerArgs: args,
code: e.code,
});

errorCount++;
parentPort?.postMessage({
message: error.message,
stack: error.stack,
workerMethod: method,
workerArgs: args,
});
parentPort?.postMessage(error.toJSON());
}
});
}

0 comments on commit e79beac

Please sign in to comment.