Skip to content

Commit 488edce

Browse files
committed
perf: using halfvec
1 parent 867e8ac commit 488edce

File tree

3 files changed

+127
-8
lines changed

3 files changed

+127
-8
lines changed

packages/service/common/vectorStore/pg/class.ts

+6-6
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,23 @@ export class PgVectorCtrl {
2121
CREATE EXTENSION IF NOT EXISTS vector;
2222
CREATE TABLE IF NOT EXISTS ${DatasetVectorTableName} (
2323
id BIGSERIAL PRIMARY KEY,
24-
vector VECTOR(1536) NOT NULL,
24+
halfvector HALFVEC(1536) NOT NULL,
2525
team_id VARCHAR(50) NOT NULL,
2626
dataset_id VARCHAR(50) NOT NULL,
2727
collection_id VARCHAR(50) NOT NULL,
2828
createtime TIMESTAMP DEFAULT CURRENT_TIMESTAMP
2929
);
3030
`);
3131

32-
await PgClient.query(
33-
`CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${DatasetVectorTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 128);`
34-
);
3532
await PgClient.query(
3633
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${DatasetVectorTableName} USING btree(team_id, dataset_id, collection_id);`
3734
);
3835
await PgClient.query(
3936
`CREATE INDEX CONCURRENTLY IF NOT EXISTS create_time_index ON ${DatasetVectorTableName} USING btree(createtime);`
4037
);
38+
await PgClient.query(
39+
`CREATE INDEX CONCURRENTLY IF NOT EXISTS halfvector_index ON ${DatasetVectorTableName} USING hnsw (halfvector halfvec_ip_ops) WITH (m = 32, ef_construction = 128);`
40+
);
4141

4242
addLog.info('init pg successful');
4343
} catch (error) {
@@ -51,7 +51,7 @@ export class PgVectorCtrl {
5151
const { rowCount, rows } = await PgClient.insert(DatasetVectorTableName, {
5252
values: [
5353
[
54-
{ key: 'vector', value: `[${vector}]` },
54+
{ key: 'halfvector', value: `[${vector}]` },
5555
{ key: 'team_id', value: String(teamId) },
5656
{ key: 'dataset_id', value: String(datasetId) },
5757
{ key: 'collection_id', value: String(collectionId) }
@@ -169,7 +169,7 @@ export class PgVectorCtrl {
169169
SET LOCAL hnsw.ef_search = ${global.systemEnv?.pgHNSWEfSearch || 100};
170170
SET LOCAL hnsw.iterative_scan = relaxed_order;
171171
WITH relaxed_results AS MATERIALIZED (
172-
select id, collection_id, vector <#> '[${vector}]' AS score
172+
select id, collection_id, halfvector <#> '[${vector}]' AS score
173173
from ${DatasetVectorTableName}
174174
where team_id='${teamId}'
175175
AND dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')})

packages/service/common/vectorStore/pg/index.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,13 @@ class PgClass {
169169
const pg = await connectPg();
170170
return pg.query<{ id: string }>(sql);
171171
}
172-
async query<T extends QueryResultRow = any>(sql: string) {
172+
async query<T extends QueryResultRow = any>(sql: string, warning = true) {
173173
const pg = await connectPg();
174174
const start = Date.now();
175175
return pg.query<T>(sql).then((res) => {
176176
const time = Date.now() - start;
177177

178-
if (time > 300) {
178+
if (warning && time > 300) {
179179
addLog.warn(`pg query time: ${time}ms, sql: ${sql}`);
180180
}
181181

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import type { NextApiRequest, NextApiResponse } from 'next';
2+
import { jsonRes } from '@fastgpt/service/common/response';
3+
import { connectToDatabase } from '@/service/mongo';
4+
import { authCert } from '@fastgpt/service/support/permission/auth/common';
5+
import { PgClient } from '@fastgpt/service/common/vectorStore/pg';
6+
import { DatasetVectorTableName } from '@fastgpt/service/common/vectorStore/constants';
7+
8+
async function setHalfvec() {
9+
let totalRowsUpdated = 0;
10+
let lastLoggedTime = Date.now();
11+
let lastLoggedRows = 0;
12+
13+
const logUpdateSpeed = () => {
14+
const currentTime = Date.now();
15+
const timeElapsed = (currentTime - lastLoggedTime) / 1000; // seconds
16+
const rowsUpdated = totalRowsUpdated - lastLoggedRows;
17+
const speed = rowsUpdated / timeElapsed; // rows per second
18+
console.log(`Update speed: ${speed.toFixed(2)} rows/s`);
19+
lastLoggedTime = currentTime;
20+
lastLoggedRows = totalRowsUpdated;
21+
};
22+
23+
const asyncUpdate = async () => {
24+
while (true) {
25+
const rowsUpdated = await PgClient.query(
26+
`BEGIN;
27+
SET LOCAL synchronous_commit = off;
28+
UPDATE ${DatasetVectorTableName}
29+
SET halfvector = vector
30+
WHERE ctid = ANY(ARRAY(
31+
SELECT ctid FROM ${DatasetVectorTableName} WHERE halfvector IS NULL LIMIT 200
32+
FOR NO KEY UPDATE SKIP LOCKED
33+
))`,
34+
false
35+
);
36+
if (rowsUpdated?.rowCount) {
37+
totalRowsUpdated += rowsUpdated.rowCount;
38+
console.log(`Rows updated: ${rowsUpdated.rowCount}`);
39+
} else {
40+
console.log('No more rows to update');
41+
break;
42+
}
43+
}
44+
};
45+
46+
const worker = async () => {
47+
let retry = 0;
48+
while (retry < 3) {
49+
try {
50+
await asyncUpdate();
51+
break;
52+
} catch (error: any) {
53+
console.error('Error updating halfvector:', error?.message);
54+
retry++;
55+
}
56+
}
57+
};
58+
59+
const maxConcurrency = Number(process.env.DB_MAX_LINK || 20);
60+
const telemetryInterval = setInterval(logUpdateSpeed, 10000);
61+
62+
try {
63+
await Promise.all(Array.from({ length: maxConcurrency }, () => worker()));
64+
} finally {
65+
clearInterval(telemetryInterval);
66+
}
67+
68+
console.log('halfvector column updated');
69+
}
70+
71+
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
72+
try {
73+
await connectToDatabase();
74+
await authCert({ req, authRoot: true });
75+
76+
// pg add column halfvector
77+
const columnExists = await PgClient.query(`
78+
SELECT column_name
79+
FROM information_schema.columns
80+
WHERE table_name='${DatasetVectorTableName}';
81+
`);
82+
83+
if (columnExists.rows.findIndex((item) => item.column_name === 'halfvector') === -1) {
84+
await PgClient.query(
85+
`ALTER TABLE ${DatasetVectorTableName} ADD COLUMN halfvector halfvec(1536);`
86+
);
87+
console.log('halfvector column added');
88+
}
89+
90+
if (columnExists.rows.findIndex((item) => item.column_name === 'vector') !== -1) {
91+
await setHalfvec();
92+
}
93+
94+
// set halfvector NOT NULL
95+
await PgClient.query(
96+
`BEGIN;
97+
ALTER TABLE ${DatasetVectorTableName} ALTER COLUMN halfvector SET NOT NULL;
98+
DROP INDEX IF EXISTS vector_index;
99+
ALTER TABLE ${DatasetVectorTableName} DROP COLUMN IF EXISTS vector;
100+
COMMIT;
101+
`
102+
);
103+
console.log('halfvector column set not null');
104+
105+
// VACUUM
106+
PgClient.query(`VACUUM ${DatasetVectorTableName};`, true);
107+
108+
jsonRes(res, {
109+
message: 'success'
110+
});
111+
} catch (error) {
112+
console.log(error);
113+
114+
jsonRes(res, {
115+
code: 500,
116+
error
117+
});
118+
}
119+
}

0 commit comments

Comments
 (0)