@@ -26,37 +26,101 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
26
26
console . log ( 'halfvector column added' ) ;
27
27
}
28
28
29
- let rowsUpdated ;
30
- let retryCount = 0 ;
31
- do {
32
- try {
33
- rowsUpdated = await PgClient . query ( `
34
- WITH updated AS (
35
- UPDATE ${ DatasetVectorTableName }
36
- SET halfvector = vector::halfvec(1536)
37
- WHERE id IN (
38
- SELECT id
39
- FROM ${ DatasetVectorTableName }
40
- WHERE halfvector IS NULL
41
- LIMIT 1000
42
- )
43
- RETURNING 1
44
- )
45
- SELECT count(*) FROM updated;
46
- ` ) ;
47
- console . log ( 'rowsUpdated:' , rowsUpdated . rows [ 0 ] . count ) ;
48
- } catch ( error ) {
49
- console . error ( 'Error updating halfvector:' , error ) ;
50
- retryCount ++ ;
29
+ const maxIdResult = await PgClient . query (
30
+ `SELECT MAX(id) as max_id FROM ${ DatasetVectorTableName } `
31
+ ) ;
32
+ const maxId : number = maxIdResult . rows [ 0 ] . max_id ;
33
+
34
+ if ( ! maxId ) {
35
+ console . warn ( 'No data in the table: empty max_id' ) ;
36
+ jsonRes ( res , { code : 500 , error : 'No data in the table: empty max_id' } ) ;
37
+ return ;
38
+ }
39
+
40
+ const batchSize = 25 ;
41
+ const numBatches = Math . ceil ( maxId / batchSize ) ;
42
+
43
+ const tasks : ( ( ) => Promise < void > ) [ ] = [ ] ;
44
+ let totalRowsUpdated = 0 ;
45
+ let lastLoggedTime = Date . now ( ) ;
46
+ let lastLoggedRows = 0 ;
47
+
48
+ const logUpdateSpeed = ( ) => {
49
+ const currentTime = Date . now ( ) ;
50
+ const timeElapsed = ( currentTime - lastLoggedTime ) / 1000 ; // seconds
51
+ const rowsUpdated = totalRowsUpdated - lastLoggedRows ;
52
+ const speed = rowsUpdated / timeElapsed ; // rows per second
53
+ console . log ( `Update speed: ${ speed . toFixed ( 2 ) } rows/s` ) ;
54
+ lastLoggedTime = currentTime ;
55
+ lastLoggedRows = totalRowsUpdated ;
56
+ } ;
57
+
58
+ for ( let i = 0 ; i < numBatches ; i ++ ) {
59
+ const startId = i * batchSize ;
60
+ const endId = startId + batchSize ;
61
+
62
+ const asyncUpdate = async ( ) => {
63
+ let retryCount = 0 ;
64
+ do {
65
+ try {
66
+ const rowsUpdated = await PgClient . query (
67
+ `
68
+ UPDATE ${ DatasetVectorTableName }
69
+ SET halfvector = vector::halfvec(1536)
70
+ WHERE id >= ${ startId } AND id < ${ endId } AND halfvector IS NULL;
71
+ ` ,
72
+ false
73
+ ) ;
74
+ if ( rowsUpdated ?. rowCount ) {
75
+ totalRowsUpdated += rowsUpdated . rowCount ;
76
+ console . log ( `Batch ${ i + 1 } - rowsUpdated: ${ rowsUpdated . rowCount } ` ) ;
77
+ }
78
+ break ;
79
+ } catch ( error ) {
80
+ console . error ( `Error updating halfvector in batch ${ i + 1 } :` , error ) ;
81
+ retryCount ++ ;
82
+ }
83
+ } while ( retryCount < 3 ) ;
84
+
85
+ if ( retryCount >= 3 ) {
86
+ console . error ( `Failed to update halfvector in batch ${ i + 1 } after 3 retries` ) ;
87
+ Promise . reject ( new Error ( 'Failed to update halfvector in batch' ) ) ;
88
+ }
89
+ } ;
90
+
91
+ tasks . push ( asyncUpdate ) ;
92
+ }
93
+
94
+ // randomize task list
95
+ tasks . sort ( ( ) => Math . random ( ) - 0.5 ) ;
96
+
97
+ const executor = async ( start : number , end : number ) => {
98
+ console . log ( `Executing tasks from ${ start } to ${ end } ` ) ;
99
+ for ( let i = start ; i < end ; ++ i ) {
100
+ try {
101
+ await tasks [ i ] ( ) ;
102
+ } catch ( error ) {
103
+ console . error ( `Error updating halfvector in task ${ i } ` , error ) ;
104
+ }
51
105
}
52
- } while ( retryCount < 3 && rowsUpdated ! . rows [ 0 ] . count > 0 ) ;
53
-
54
- if ( retryCount >= 3 ) {
55
- console . error ( 'Failed to update halfvector after 3 retries' ) ;
56
- return jsonRes ( res , {
57
- code : 500 ,
58
- error : 'Failed to update halfvector after 3 retries'
59
- } ) ;
106
+ console . log ( `Tasks from ${ start } to ${ end } completed` ) ;
107
+ } ;
108
+
109
+ const maxConcurrency = 20 ;
110
+ const taskSize = Math . ceil ( tasks . length / maxConcurrency ) ;
111
+ const promises = [ ] ;
112
+ for ( let i = 0 ; i < maxConcurrency ; ++ i ) {
113
+ const startId = i * taskSize ;
114
+ const endId = Math . min ( ( i + 1 ) * taskSize , tasks . length ) ;
115
+ promises . push ( executor ( startId , endId ) ) ;
116
+ }
117
+
118
+ const telemetryInterval = setInterval ( logUpdateSpeed , 5000 ) ;
119
+
120
+ try {
121
+ await Promise . all ( promises ) ;
122
+ } finally {
123
+ clearInterval ( telemetryInterval ) ;
60
124
}
61
125
62
126
console . log ( 'halfvector column updated' ) ;
@@ -70,6 +134,13 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
70
134
COMMIT;
71
135
`
72
136
) ;
137
+ console . log ( 'halfvector column set not null' ) ;
138
+
139
+ // 创建索引以提升查询性能
140
+ await PgClient . query ( `
141
+ CREATE INDEX CONCURRENTLY IF NOT EXISTS halfvector_index ON ${ DatasetVectorTableName } USING hnsw (halfvector halfvec_ip_ops) WITH (m = 32, ef_construction = 128);
142
+ ` ) ;
143
+ console . log ( 'halfvector index created' ) ;
73
144
74
145
// 后台释放空间,避免使用 VACUUM FULL 导致锁表。
75
146
await PgClient . query ( `VACUUM ${ DatasetVectorTableName } ;` ) ;
0 commit comments