Skip to content

Commit ac3a73c

Browse files
committed
perf: parallelize update task
1 parent 2ce6ab2 commit ac3a73c

File tree

2 files changed

+101
-32
lines changed

2 files changed

+101
-32
lines changed

packages/service/common/vectorStore/pg/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,13 @@ class PgClass {
169169
const pg = await connectPg();
170170
return pg.query<{ id: string }>(sql);
171171
}
172-
async query<T extends QueryResultRow = any>(sql: string) {
172+
async query<T extends QueryResultRow = any>(sql: string, warning = true) {
173173
const pg = await connectPg();
174174
const start = Date.now();
175175
return pg.query<T>(sql).then((res) => {
176176
const time = Date.now() - start;
177177

178-
if (time > 300) {
178+
if (warning && time > 300) {
179179
addLog.warn(`pg query time: ${time}ms, sql: ${sql}`);
180180
}
181181

projects/app/src/pages/api/admin/inithalfvec.ts

+99-30
Original file line numberDiff line numberDiff line change
@@ -26,37 +26,99 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
2626
console.log('halfvector column added');
2727
}
2828

29-
let rowsUpdated;
30-
let retryCount = 0;
31-
do {
32-
try {
33-
rowsUpdated = await PgClient.query(`
34-
WITH updated AS (
35-
UPDATE ${DatasetVectorTableName}
36-
SET halfvector = vector::halfvec(1536)
37-
WHERE id IN (
38-
SELECT id
39-
FROM ${DatasetVectorTableName}
40-
WHERE halfvector IS NULL
41-
LIMIT 1000
42-
)
43-
RETURNING 1
44-
)
45-
SELECT count(*) FROM updated;
46-
`);
47-
console.log('rowsUpdated:', rowsUpdated.rows[0].count);
48-
} catch (error) {
49-
console.error('Error updating halfvector:', error);
50-
retryCount++;
29+
const maxIdResult = await PgClient.query(
30+
`SELECT MAX(id) as max_id FROM ${DatasetVectorTableName}`
31+
);
32+
const maxId: number = maxIdResult.rows[0].max_id;
33+
34+
if (!maxId) {
35+
console.warn('No data in the table: empty max_id');
36+
jsonRes(res, { code: 500, error: 'No data in the table: empty max_id' });
37+
return;
38+
}
39+
40+
const batchSize = 25;
41+
const numBatches = Math.ceil(maxId / batchSize);
42+
43+
const tasks: (() => Promise<void>)[] = [];
44+
let totalRowsUpdated = 0;
45+
let lastLoggedTime = Date.now();
46+
let lastLoggedRows = 0;
47+
48+
const logUpdateSpeed = () => {
49+
const currentTime = Date.now();
50+
const timeElapsed = (currentTime - lastLoggedTime) / 1000; // seconds
51+
const rowsUpdated = totalRowsUpdated - lastLoggedRows;
52+
const speed = rowsUpdated / timeElapsed; // rows per second
53+
console.log(`Update speed: ${speed.toFixed(2)} rows/s`);
54+
lastLoggedTime = currentTime;
55+
lastLoggedRows = totalRowsUpdated;
56+
};
57+
58+
for (let i = 0; i < numBatches; i++) {
59+
const startId = i * batchSize;
60+
const endId = startId + batchSize;
61+
62+
const asyncUpdate = async () => {
63+
let retryCount = 0;
64+
do {
65+
try {
66+
const rowsUpdated = await PgClient.query(
67+
`
68+
UPDATE ${DatasetVectorTableName}
69+
SET halfvector = vector::halfvec(1536)
70+
WHERE id >= ${startId} AND id < ${endId} AND halfvector IS NULL;
71+
`,
72+
false
73+
);
74+
if (rowsUpdated?.rowCount) {
75+
totalRowsUpdated += rowsUpdated.rowCount;
76+
console.log(`Batch ${i + 1} - rowsUpdated: ${rowsUpdated.rowCount}`);
77+
}
78+
break;
79+
} catch (error) {
80+
console.error(`Error updating halfvector in batch ${i + 1}:`, error);
81+
retryCount++;
82+
}
83+
} while (retryCount < 3);
84+
85+
if (retryCount >= 3) {
86+
console.error(`Failed to update halfvector in batch ${i + 1} after 3 retries`);
87+
Promise.reject(new Error('Failed to update halfvector in batch'));
88+
}
89+
};
90+
91+
tasks.push(asyncUpdate);
92+
}
93+
94+
// randomize task list
95+
tasks.sort(() => Math.random() - 0.5);
96+
97+
let currentIdx = 0;
98+
const executor = async () => {
99+
console.log(`Executing tasks from: ${currentIdx}`);
100+
let idx: number;
101+
while ((idx = currentIdx++) < tasks.length) {
102+
try {
103+
await tasks[idx]();
104+
} catch (error) {
105+
console.error(`Error updating halfvector in task ${idx}`, error);
106+
}
51107
}
52-
} while (retryCount < 3 && rowsUpdated!.rows[0].count > 0);
53-
54-
if (retryCount >= 3) {
55-
console.error('Failed to update halfvector after 3 retries');
56-
return jsonRes(res, {
57-
code: 500,
58-
error: 'Failed to update halfvector after 3 retries'
59-
});
108+
};
109+
110+
const maxConcurrency = 20;
111+
const promises = [];
112+
for (let i = 0; i < maxConcurrency; ++i) {
113+
promises.push(executor());
114+
}
115+
116+
const telemetryInterval = setInterval(logUpdateSpeed, 5000);
117+
118+
try {
119+
await Promise.all(promises);
120+
} finally {
121+
clearInterval(telemetryInterval);
60122
}
61123

62124
console.log('halfvector column updated');
@@ -70,6 +132,13 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
70132
COMMIT;
71133
`
72134
);
135+
console.log('halfvector column set not null');
136+
137+
// 创建索引以提升查询性能
138+
await PgClient.query(`
139+
CREATE INDEX CONCURRENTLY IF NOT EXISTS halfvector_index ON ${DatasetVectorTableName} USING hnsw (halfvector halfvec_ip_ops) WITH (m = 32, ef_construction = 128);
140+
`);
141+
console.log('halfvector index created');
73142

74143
// 后台释放空间,避免使用 VACUUM FULL 导致锁表。
75144
await PgClient.query(`VACUUM ${DatasetVectorTableName};`);

0 commit comments

Comments
 (0)