Skip to content

Commit d525dae

Browse files
committed
perf: using halfvec
1 parent 867e8ac commit d525dae

File tree

3 files changed

+127
-8
lines changed

3 files changed

+127
-8
lines changed

packages/service/common/vectorStore/pg/class.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,23 @@ export class PgVectorCtrl {
2121
CREATE EXTENSION IF NOT EXISTS vector;
2222
CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} (
2323
id BIGSERIAL PRIMARY KEY,
24-
vector VECTOR(1536) NOT NULL,
24+
halfvector HALFVEC(1536) NOT NULL,
2525
team_id VARCHAR(50) NOT NULL,
2626
dataset_id VARCHAR(50) NOT NULL,
2727
collection_id VARCHAR(50) NOT NULL,
2828
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
2929
);
3030
`);
3131

32-
await PgClient.query(
33-
`CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${DatasetVectorTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 128);`
34-
);
3532
await PgClient.query(
3633
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
3734
);
3835
await PgClient.query(
3936
`CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index ON ${DatasetVectorTableName} USING btree(createtime);`
4037
);
38+
await PgClient.query(
39+
`CREATE INDEX CONCURRENTLY IF NOT EXISTS halfvector_index ON ${DatasetVectorTableName} USING hnsw (halfvector halfvec_ip_ops) WITH (m = 32, ef_construction = 128);`
40+
);
4141

4242
addLog.info('init pg successful');
4343
} catch (error) {
@@ -51,7 +51,7 @@ export class PgVectorCtrl {
5151
const { rowCount, rows } = await PgClient.insert(DatasetVectorTableName, {
5252
values: [
5353
[
54-
{ key: 'vector', value: `[${vector}]` },
54+
{ key: 'halfvector', value: `[${vector}]` },
5555
{ key: 'team_id', value: String(teamId) },
5656
{ key: 'dataset_id', value: String(datasetId) },
5757
{ key: 'collection_id', value: String(collectionId) }
@@ -169,7 +169,7 @@ export class PgVectorCtrl {
169169
SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};
170170
SET LOCAL hnsw.iterative_scan = relaxed_order;
171171
WITH relaxed_results AS MATERIALIZED (
172-
select id, collection_id, vector <#> '[${vector}]' AS score
172+
select id, collection_id, halfvector <#> '[${vector}]' AS score
173173
from ${DatasetVectorTableName}
174174
where team_id='${teamId}'
175175
AND dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')})

packages/service/common/vectorStore/pg/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,13 @@ class PgClass {
169169
const pg = await connectPg();
170170
return pg.query<{ id: string }>(sql);
171171
}
172-
async query<T extends QueryResultRow = any>(sql: string) {
172+
async query<T extends QueryResultRow = any>(sql: string, warning = true) {
173173
const pg = await connectPg();
174174
const start = Date.now();
175175
return pg.query<T>(sql).then((res) => {
176176
const time = Date.now() - start;
177177

178-
if (time > 300) {
178+
if (warning && time > 300) {
179179
addLog.warn(`pg query time: ${time}ms, sql: ${sql}`);
180180
}
181181

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import type { NextApiRequest, NextApiResponse } from 'next';
2+
import { jsonRes } from '@fastgpt/service/common/response';
3+
import { connectToDatabase } from '@/service/mongo';
4+
import { authCert } from '@fastgpt/service/support/permission/auth/common';
5+
import { PgClient } from '@fastgpt/service/common/vectorStore/pg';
6+
import { DatasetVectorTableName } from '@fastgpt/service/common/vectorStore/constants';
7+
8+
async function setHalfvec() {
9+
let totalRowsUpdated = 0;
10+
let lastLoggedTime = Date.now();
11+
let lastLoggedRows = 0;
12+
13+
const logUpdateSpeed = () => {
14+
const currentTime = Date.now();
15+
const timeElapsed = (currentTime - lastLoggedTime) / 1000; // seconds
16+
const rowsUpdated = totalRowsUpdated - lastLoggedRows;
17+
const speed = rowsUpdated / timeElapsed; // rows per second
18+
console.log(`Update speed: ${speed.toFixed(2)} rows/s`);
19+
lastLoggedTime = currentTime;
20+
lastLoggedRows = totalRowsUpdated;
21+
};
22+
23+
const asyncUpdate = async () => {
24+
while (true) {
25+
const rowsUpdated = await PgClient.query(
26+
`UPDATE ${DatasetVectorTableName}
27+
SET halfvector = vector
28+
WHERE ctid = ANY(ARRAY(
29+
SELECT ctid FROM ${DatasetVectorTableName} WHERE halfvector IS NULL LIMIT 200
30+
FOR NO KEY UPDATE SKIP LOCKED
31+
))`,
32+
false
33+
);
34+
if (rowsUpdated?.rowCount) {
35+
totalRowsUpdated += rowsUpdated.rowCount;
36+
console.log(`Rows updated: ${rowsUpdated.rowCount}`);
37+
} else {
38+
console.log('No more rows to update');
39+
break;
40+
}
41+
}
42+
};
43+
44+
const worker = async () => {
45+
let retry = 0;
46+
while (retry < 3) {
47+
try {
48+
await asyncUpdate();
49+
break;
50+
} catch (error: any) {
51+
console.error('Error updating halfvector:', error?.message);
52+
retry++;
53+
}
54+
}
55+
};
56+
57+
const maxConcurrency = Number(process.env.DB_MAX_LINK || 20);
58+
const telemetryInterval = setInterval(logUpdateSpeed, 10000);
59+
60+
try {
61+
await Promise.all(Array.from({ length: maxConcurrency }, () => worker()));
62+
} finally {
63+
clearInterval(telemetryInterval);
64+
}
65+
66+
console.log('halfvector column updated');
67+
}
68+
69+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
70+
try {
71+
await connectToDatabase();
72+
await authCert({ req, authRoot: true });
73+
74+
// pg add column halfvector
75+
const columnExists = await PgClient.query(`
76+
SELECT column_name
77+
FROM information_schema.columns
78+
WHERE table_name='${DatasetVectorTableName}';
79+
`);
80+
81+
if (columnExists.rows.findIndex((item) => item.column_name === 'halfvector') === -1) {
82+
await PgClient.query(`
83+
BEGIN;
84+
ALTER TABLE ${DatasetVectorTableName} ADD COLUMN halfvector halfvec(1536);
85+
COMMIT;
86+
`);
87+
console.log('halfvector column added');
88+
}
89+
90+
if (columnExists.rows.findIndex((item) => item.column_name === 'vector') !== -1) {
91+
await setHalfvec();
92+
}
93+
94+
// 设置halfvector字段为非空
95+
await PgClient.query(
96+
`BEGIN;
97+
ALTER TABLE ${DatasetVectorTableName} ALTER COLUMN halfvector SET NOT NULL;
98+
DROP INDEX IF EXISTS vector_index;
99+
ALTER TABLE ${DatasetVectorTableName} DROP COLUMN IF EXISTS vector;
100+
COMMIT;
101+
`
102+
);
103+
console.log('halfvector column set not null');
104+
105+
// 后台释放空间,避免使用 VACUUM FULL 导致锁表。
106+
PgClient.query(`VACUUM ${DatasetVectorTableName};`, true);
107+
108+
jsonRes(res, {
109+
message: 'success'
110+
});
111+
} catch (error) {
112+
console.log(error);
113+
114+
jsonRes(res, {
115+
code: 500,
116+
error
117+
});
118+
}
119+
}

0 commit comments

Comments
 (0)