-
Notifications
You must be signed in to change notification settings - Fork 400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: send data by chunk in websocket #3988
base: main
Are you sure you want to change the base?
Conversation
Walkthrough此次更改涉及多个文件,主要集中在 Changes
Suggested reviewers
Warning There were issues while running some tools. Please review the errors and either fix the tool’s configuration or disable the tool if it’s a critical failure. 🔧 ESLint
yarn install v1.22.22 ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
🚅 Previously deployed to Railway in the core project. Environment has been deleted. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Outside diff range, codebase verification and nitpick comments (7)
packages/connection/src/common/constants.ts (1)
3-6
: 代码变更看起来不错!新增的
chunkSize
常量与 PR 的目标一致,用于实现 WebSocket 连接的分块数据传输。8MB 的大小选择合理。建议为注释添加英文翻译,以便于国际协作。例如:
/** * 分片大小, 8MB + * Chunk size, 8MB */ export const chunkSize = 8 * 1024 * 1024;
packages/connection/src/common/connection/drivers/ws-websocket.ts (1)
11-11
: 新增的decoder
成员看起来不错,但可以考虑添加注释。新增的
decoder
成员用于处理传入的消息,这是一个很好的改进。建议为
decoder
成员添加简短的注释,解释其用途和重要性。例如:// 用于解码和处理传入的 WebSocket 消息 protected decoder = new LengthFieldBasedFrameDecoder();packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (2)
20-30
: 更新构造函数和 send 方法构造函数中新增的事件监听器和
send
方法的重写都很好地实现了分块数据传输的目标。这些更改与 PR 的目标一致,有助于提高大文件传输的性能。然而,我建议在
send
方法中添加一个注释,解释为什么要使用分块传输,以及chunkSize
的值是多少。这将有助于其他开发者理解这个实现的目的。建议在
send
方法开始处添加如下注释:/** * 发送数据,使用分块传输以避免大文件传输时的线程阻塞。 * 每个块的大小为 ${chunkSize} 字节。 */
87-103
: 新增 dataHandler 方法和更新 dispose 方法新增的
dataHandler
方法很好地处理了不同类型的传入数据,包括 Blob、ArrayBuffer 和 Buffer。这种实现提高了代码的健壮性。dispose
方法的更新确保了正确的资源清理。然而,我建议在
dataHandler
方法中添加错误处理,以防在数据处理过程中出现异常。建议在
dataHandler
方法中添加错误处理:private dataHandler = (e: MessageEvent) => { // ... 现有代码 ... buffer.then((v) => this.decoder.push(new Uint8Array(v, 0, v.byteLength))) .catch((error) => { console.error('处理传入消息时出错:', error); // 可以在这里添加额外的错误处理逻辑 }); };packages/connection/src/common/connection/drivers/frame-decoder.ts (2)
67-69
: 数据发送逻辑的更新这个更改与新的监听器管理方法保持一致,直接调用
_onDataListener
而不是发出事件。建议添加一个空值检查,以增加代码的健壮性:
if (this._onDataListener && typeof this._onDataListener === 'function') { this._onDataListener(binary); }这样可以避免在
_onDataListener
不是函数时可能出现的运行时错误。
166-166
: dispose 方法的更新这个更改与新的监听器管理方法保持一致,但是使用
undefined
而不是null
与onData
方法中的做法不一致。为了保持一致性,建议将这行改为:
this._onDataListener = null;这样可以确保在整个类中使用相同的方式来表示监听器被移除。
packages/connection/__test__/common/frame-decoder.test.ts (1)
Line range hint
38-121
: 总结:数据包构造方法的系统性更新整个文件中,所有数据包构造的地方都一致地添加了
.dump()
方法的调用。这种系统性的更改可能反映了LengthFieldBasedFrameDecoder
类实现的变化。建议:
- 确保这些更改与
LengthFieldBasedFrameDecoder
类的最新实现保持同步。- 考虑在测试文件的开头添加一个注释,解释为什么需要使用
.dump()
方法,以及它对测试结果的影响。- 如果
.dump()
方法改变了数据包的结构或内容,可能需要更新其他依赖于这些测试的部分。
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | ||
const packet = handle.get(); | ||
for (let i = 0; i < packet.byteLength; i += chunkSize) { | ||
this.socket.send(packet.subarray(i, i + chunkSize)); | ||
} | ||
|
||
handle.dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send
方法的改进很好,但可以考虑进一步优化。
新的 send
方法使用 LengthFieldBasedFrameDecoder
构造数据包并分块发送,这是一个很好的改进,可以更好地处理大型消息。
为了进一步优化性能,您可以考虑以下建议:
- 使用
ArrayBuffer
或SharedArrayBuffer
来减少内存复制。 - 考虑使用
WebSocket.bufferedAmount
来控制发送速率,避免缓冲区溢出。
示例实现:
send(data: Uint8Array): void {
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
const packet = handle.get();
const sendChunk = (start: number) => {
while (start < packet.byteLength && this.socket.bufferedAmount < 1024 * 1024) { // 1MB buffer threshold
const end = Math.min(start + chunkSize, packet.byteLength);
this.socket.send(packet.subarray(start, end));
start = end;
}
if (start < packet.byteLength) {
setTimeout(() => sendChunk(start), 0);
} else {
handle.dispose();
}
};
sendChunk(0);
}
这个实现使用了递归的方式来控制发送速率,避免一次性将所有数据推入缓冲区。
private _onDataListener: MaybeNull<(data: Uint8Array) => void>; | ||
onData(listener: (data: Uint8Array) => void) { | ||
this._onDataListener = listener; | ||
return { | ||
dispose: () => { | ||
this._onDataListener = null; | ||
}, | ||
}; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
监听器管理方式的重大变更
这个变更将之前基于 Emitter
的多监听器方法改为了单一监听器方法。虽然这简化了事件处理机制,但也限制了类只能同时处理一个监听器。
考虑以下几点:
- 这种改变可能会影响依赖多个监听器的现有代码。
- 单一监听器模式可能会在某些使用场景下造成限制。
建议考虑以下改进:
- 如果确实需要多个监听器,可以考虑使用数组来存储多个监听器函数。
- 添加清晰的文档注释,说明这个类现在只支持单一监听器,以防止误用。
- 考虑添加一个
removeListener
方法,使 API 更加完整和直观。
private _onDataListeners: Array<(data: Uint8Array) => void> = [];
onData(listener: (data: Uint8Array) => void) {
this._onDataListeners.push(listener);
return {
dispose: () => {
const index = this._onDataListeners.indexOf(listener);
if (index > -1) {
this._onDataListeners.splice(index, 1);
}
},
};
}
removeListener(listener: (data: Uint8Array) => void) {
const index = this._onDataListeners.indexOf(listener);
if (index > -1) {
this._onDataListeners.splice(index, 1);
}
}
这样的实现既保持了简单性,又提供了更大的灵活性。
9d104a8
to
a8ae167
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
♻️ Duplicate comments (2)
packages/connection/src/common/connection/drivers/ws-websocket.ts (1)
27-29
:⚠️ Potential issue建议在消息处理添加错误处理机制
在构造函数中,当调用
this.decoder.push(data)
时,建议添加错误处理,以防止在数据解码过程中出现未捕获的异常导致程序崩溃。您可以考虑如下修改:
this.socket.on('message', (data: Buffer) => { try { this.decoder.push(data); } catch (error) { console.error('Error decoding message:', error); // 根据需要添加进一步的错误处理逻辑 } });packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
27-38
: 🛠️ Refactor suggestion将多监听器改为单一监听器可能限制功能
当前的修改将事件监听机制从支持多个监听器更改为仅支持单个监听器,这可能限制了代码的可扩展性,影响到需要添加多个监听器的需求。
建议考虑继续支持多个监听器,您可以使用数组来保存监听器列表,并在接收到数据时遍历调用。
例如:
private _onDataListeners: Array<(data: Uint8Array) => void> = []; onData(listener: (data: Uint8Array) => void) { this._onDataListeners.push(listener); return { dispose: () => { const index = this._onDataListeners.indexOf(listener); if (index > -1) { this._onDataListeners.splice(index, 1); } }, }; } protected async processBuffers(): Promise<void> { // ... existing code ... if (this._onDataListeners.length > 0) { for (const listener of this._onDataListeners) { try { await Promise.resolve().then(() => listener(binary)); } catch (error) { console.error('[Frame Decoder] Error in data listener:', error); } } } }
🧹 Nitpick comments (6)
packages/connection/src/common/connection/drivers/ws-websocket.ts (1)
19-19
: 建议将MAX_QUEUE_SIZE
设为可配置项当前
MAX_QUEUE_SIZE
被设置为固定值100,建议将其设为可配置参数,以便根据实际需求调整队列大小,增强系统的灵活性。packages/components/src/recycle-tree/tree/TreeNode.ts (1)
1388-1434
: 优化了hardReloadChildren
方法,提升了大数据量时的性能通过多项优化措施,如预先分配数组大小、采用动态批处理、缓存频繁访问的属性和方法、使用
for
循环替代for...of
,以及合并监听器更新等,显著提高了处理大量节点时的性能。然而,
hardReloadChildren
方法较长且复杂,建议将部分逻辑提取到辅助函数中,以提高代码的可读性和可维护性。packages/connection/__test__/browser/index.test.ts (1)
28-34
: 建议优化错误处理在发送响应消息时,建议添加错误处理逻辑。
建议修改如下:
- connection.send( + connection.send( furySerializer.serialize({ id: msgObj.id, kind: 'server-ready', traceId: '', }), - ); + ).catch((error) => { + console.error('Failed to send server-ready message:', error); + });packages/connection/__test__/common/frame-decoder.test.ts (2)
38-40
: 测试数据构造方法改进使用
dump()
方法处理数据包的实现很好,但建议添加对异常情况的测试。建议添加以下测试场景:
- 处理超大数据包时的内存使用情况
- 数据包损坏的情况
- 网络延迟模拟
52-57
: 建议优化测试用例结构当前的混合数据包测试实现较好,但可以进一步提升可维护性。
建议将测试数据的创建逻辑抽取为独立的辅助函数:
+function createMixedPacket(payload: Uint8Array, headerSize: number = 1024) { + const sumiPacket = LengthFieldBasedFrameDecoder.construct(payload).dump(); + const newPacket = createPayload(headerSize + sumiPacket.byteLength); + newPacket.set(sumiPacket, headerSize); + return [newPacket, payload] as const; +} + const mixedPackets = [p1m, p5m].map((v) => { - const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump(); - const newPacket = createPayload(1024 + sumiPacket.byteLength); - newPacket.set(sumiPacket, 1024); - return [newPacket, v] as const; + return createMixedPacket(v); });packages/connection/src/common/buffers/buffers.ts (1)
76-107
: 建议重构以减少代码重复
slice4
方法与slice
方法有大量重复代码。建议重构以提高可维护性。建议使用以下实现:
slice4(start: number) { - let end = start + 4; - const buffers = this.buffers; - - if (end > this.size) { - end = this.size; - } - - if (start >= end) { - return emptyBuffer; - } - - let startBytes = 0; - let si = 0; - for (; si < buffers.length && startBytes + buffers[si].length <= start; si++) { - startBytes += buffers[si].length; - } - - const target = buffer4Capacity; - - let ti = 0; - for (let ii = si; ti < end - start && ii < buffers.length; ii++) { - const len = buffers[ii].length; - - const _start = ti === 0 ? start - startBytes : 0; - const _end = ti + len >= end - start ? Math.min(_start + (end - start) - ti, len) : len; - copy(buffers[ii], target, ti, _start, _end); - ti += _end - _start; - } - - return target; + const end = start + 4; + if (start >= end || start >= this.size) { + return emptyBuffer; + } + const slice = this.slice(start, end); + if (slice === emptyBuffer) { + return emptyBuffer; + } + copy(slice, buffer4Capacity, 0, 0, slice.length); + return buffer4Capacity; }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
packages/components/src/recycle-tree/tree/TreeNode.ts
(3 hunks)packages/connection/__test__/browser/index.test.ts
(2 hunks)packages/connection/__test__/common/frame-decoder.test.ts
(4 hunks)packages/connection/src/common/buffers/buffers.ts
(3 hunks)packages/connection/src/common/connection/drivers/frame-decoder.ts
(5 hunks)packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
(3 hunks)packages/connection/src/common/connection/drivers/stream.ts
(1 hunks)packages/connection/src/common/connection/drivers/ws-websocket.ts
(2 hunks)packages/connection/src/common/constants.ts
(1 hunks)packages/connection/src/node/common-channel-handler.ts
(1 hunks)packages/core-browser/__tests__/bootstrap/connection.test.ts
(2 hunks)
🔇 Additional comments (9)
packages/connection/src/common/connection/drivers/ws-websocket.ts (1)
114-121
: 在 dispose 方法中拒绝所有待发送消息是一个很好的实践通过在连接关闭时拒绝所有待发送的消息,可以防止出现未处理的 Promise,保持应用程序状态的一致性。
packages/components/src/recycle-tree/tree/TreeNode.ts (2)
42-50
: 优化了spliceArray
函数,提高了性能新的实现通过在没有修改操作时直接返回原数组,并使用
slice
和concat
方法替代展开运算符,减少了不必要的操作,提高了函数的性能。
1381-1381
: 在异常处理中添加了return false
;在
catch
块中添加了return false;
,确保在发生异常时方法返回正确的状态,符合方法的返回类型,提高了代码的健壮性。packages/connection/src/common/constants.ts (1)
3-6
: 新增了chunkSize
常量添加了用于数据传输分片处理的常量
chunkSize
,大小为 1 * 1024 * 1024(1MB),为后续的 WebSocket 数据分片发送提供了基础。packages/core-browser/__tests__/bootstrap/connection.test.ts (1)
3-3
: 使用sleep
函数优化了延迟处理引入并使用了
sleep
函数,替代了原先基于setTimeout
的 Promise 实现,简化了代码,提高了可读性和可维护性。Also applies to: 38-40
packages/connection/src/node/common-channel-handler.ts (1)
45-45
: 代码优化得当!直接在
receiveConnection
方法中创建WSWebSocketConnection
实例,避免了不必要的中间变量,使代码更简洁。packages/connection/__test__/browser/index.test.ts (1)
24-25
: 测试用例实现合理!使用
WSWebSocketConnection
替代直接的 WebSocket 操作,更好地测试了新的分块传输功能。packages/connection/src/common/buffers/buffers.ts (2)
305-309
: 实现正确且符合设计模式
read4
方法的实现简洁明了,并且与现有的read
方法保持一致的设计模式。
8-8
: 注意:重用缓冲区可能导致并发问题
buffer4Capacity
作为静态缓冲区被重用可能在并发场景下导致数据竞争。建议添加相关文档说明此限制。建议添加以下注释说明:
+/** + * 警告:此缓冲区在多个调用之间共享和重用。 + * 在并发场景下使用时需要确保正确的同步。 + */ export const buffer4Capacity = new Uint8Array(4);✅ Verification successful
建议为 buffer4Capacity 添加警告注释
在文件
packages/connection/src/common/buffers/buffers.ts
的第 8 行中,声明的静态缓冲区buffer4Capacity
被设计为重复使用以避免重复分配。但需要注意的是,在并发使用场景下,该缓冲区的重用可能会引发数据竞争问题。建议在该常量前添加具体的警告注释,说明该缓冲区在并发环境下需要额外注意并做好同步保护,例如:+/** + * 警告:此缓冲区在多个调用之间共享和重用。 + * 在并发场景下使用时需要确保正确的同步,以避免数据竞争。 + */ export const buffer4Capacity = new Uint8Array(4);这样能更好地提醒开发者注意潜在的并发问题。
} catch (error) { | ||
console.error('[ReconnectingWebSocket] Error sending data:', error); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在发送数据出错时未正确处理 Promise
在 processSendQueue
方法中,如果在发送数据时发生错误,您记录了错误信息,但未对对应的 Promise 进行拒绝操作。这可能导致调用 send()
方法的 Promise 永远不会得到解决或拒绝,导致内存泄漏或未处理的状态。
建议在 catch
块中调用 reject(error)
,以确保 Promise 被正确拒绝。
例如:
} catch (error) {
console.error('[ReconnectingWebSocket] Error sending data:', error);
reject(error);
}
send(data: Uint8Array): Promise<void> { | ||
return new Promise((resolve) => { | ||
this.sendQueue.push({ data, resolve }); | ||
this.processSendQueue(); | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
send 方法未提供 reject 回调以处理发送失败
在 send
方法中,您在创建 Promise 时仅提供了 resolve
回调,但未提供 reject
回调。因此,在发送过程中发生错误时,无法正确地拒绝 Promise,导致调用方无法得知发送失败。
建议在创建 Promise 时同时提供 reject
回调,并在 processSendQueue
中适当处理。
例如:
return new Promise((resolve, reject) => {
this.sendQueue.push({ data, resolve, reject });
this.processSendQueue();
});
然后在 processSendQueue
的 catch
块中调用 reject(error)
。
this.socket.removeEventListener('message', this.arrayBufferHandler); | ||
this.sendQueue = []; | ||
this.sending = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在 dispose 方法中未拒绝待发送的消息
在 dispose
方法中,您清空了 sendQueue
但没有拒绝其中的 Promise,这可能导致调用方的 Promise 永远不会得到解决,从而出现内存泄漏或未处理的状态。
建议在清空 sendQueue
之前,遍历队列并对每个待发送的消息进行 reject
,以确保所有的 Promise 都能正确处理。
例如:
while (this.sendQueue.length > 0) {
const { reject } = this.sendQueue.shift()!;
reject(new Error('Connection disposed'));
}
push(chunk: Uint8Array): void { | ||
// 如果新数据太大,只接收部分 | ||
if (chunk.byteLength > LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE) { | ||
console.warn('[Frame Decoder] Chunk too large, truncating'); | ||
chunk = chunk.slice(0, LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在收到超过 MAX_FRAME_SIZE 的数据块时,应考虑拒绝而非截断
在 push
方法中,如果接收到的数据块大小超过了 MAX_FRAME_SIZE
,当前的处理方式是截取前面的部分。这可能导致数据不完整,出现不可预知的问题。
建议在这种情况下抛出错误或拒绝处理该数据块,以保证数据的完整性和传输的可靠性。
例如:
if (chunk.byteLength > LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE) {
console.warn('[Frame Decoder] Chunk too large, rejecting');
throw new Error('Chunk size exceeds maximum frame size');
}
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | ||
this.writable.write(handle.get(), () => { | ||
// TODO: logger error | ||
}); | ||
handle.dispose(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
内存管理需要进一步优化
当前实现在写入操作失败时可能会导致内存泄漏。建议将 handle 的处理放在 try-finally 块中。
建议按如下方式修改代码:
- const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
- this.writable.write(handle.get(), () => {
- // TODO: logger error
- });
- handle.dispose();
+ const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn();
+ try {
+ this.writable.write(handle.get(), (error) => {
+ if (error) {
+ // TODO: logger error
+ console.error('Failed to write data:', error);
+ }
+ });
+ } finally {
+ handle.dispose();
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | |
this.writable.write(handle.get(), () => { | |
// TODO: logger error | |
}); | |
handle.dispose(); | |
const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); | |
try { | |
this.writable.write(handle.get(), (error) => { | |
if (error) { | |
// TODO: logger error | |
console.error('Failed to write data:', error); | |
} | |
}); | |
} finally { | |
handle.dispose(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (2)
30-58
: 建议优化分块发送逻辑当前实现中,每个分块都创建新的 Promise,这可能导致不必要的开销。考虑以下优化建议:
- 使用批量处理减少 Promise 创建次数
- 添加可配置的延迟以控制发送速率
建议重构为:
private async processSendQueue() { if (this.sending) { return; } this.sending = true; while (this.sendQueue.length > 0) { const { data, resolve, reject } = this.sendQueue[0]; try { const handle = LengthFieldBasedFrameDecoder.construct(data).dumpAndOwn(); const packet = handle.get(); + const chunks = []; for (let i = 0; i < packet.byteLength; i += chunkSize) { - await new Promise<void>((resolve) => { - const chunk = packet.subarray(i, Math.min(i + chunkSize, packet.byteLength)); - this.socket.send(chunk); - resolve(); - }); + chunks.push(packet.subarray(i, Math.min(i + chunkSize, packet.byteLength))); } + + for (const chunk of chunks) { + this.socket.send(chunk); + await new Promise(resolve => setTimeout(resolve, 0)); // 控制发送速率 + } handle.dispose(); resolve(); } catch (error) {
60-65
: 建议添加方法文档
send
方法的实现看起来不错,但建议添加 JSDoc 文档说明以下内容:
- 方法的异步特性
- 可能的错误类型
- 分块发送的大小限制
建议添加如下文档:
+/** + * 异步发送数据,支持大文件分块传输 + * @param data 要发送的数据 + * @returns Promise 在数据完全发送后 resolve,发送出错时 reject + * @throws Error 当 WebSocket 连接关闭或发送失败时 + */ send(data: Uint8Array): Promise<void> {packages/connection/src/common/connection/drivers/frame-decoder.ts (2)
92-126
: 建议优化帧处理性能当前实现中的异步处理可能导致不必要的延迟。建议:
- 优化内存使用,避免频繁的数据拷贝
- 考虑使用
TypedArray
的视图而不是切片建议添加性能监控:
protected async readFrame(): Promise<boolean> { + const startTime = performance.now(); try { const found = this.readLengthField(); if (!found) { return true; } const start = this.cursor.offset; const end = start + this.contentLength; if (end > this.buffers.byteLength) { return true; } const binary = this.buffers.slice(start, end); // 立即清理已处理的数据 this.buffers.splice(0, end); this.reset(); if (this._onDataListener) { try { await Promise.resolve().then(() => this._onDataListener?.(binary)); } catch (error) { console.error('[Frame Decoder] Error in data listener:', error); } } + const duration = performance.now() - startTime; + if (duration > 100) { // 记录处理时间超过 100ms 的帧 + console.warn('[Frame Decoder] Slow frame processing:', { + size: binary.byteLength, + duration + }); + } return false; } catch (error) {
212-225
: 建议完善静态方法文档
construct
静态方法的实现看起来不错,但建议添加更详细的文档说明其用途和注意事项。建议添加如下文档:
+/** + * 构造一个新的帧 + * @param content 要编码的内容 + * @returns 返回一个新的 writer 实例,调用者负责调用 dispose 释放资源 + * @throws Error 当编码失败时抛出错误 + * @example + * const writer = LengthFieldBasedFrameDecoder.construct(data); + * try { + * const frame = writer.dumpAndOwn(); + * // 使用 frame + * } finally { + * writer.dispose(); + * } + */ static construct(content: Uint8Array) {
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
packages/connection/src/common/connection/drivers/frame-decoder.ts
(5 hunks)packages/connection/src/common/connection/drivers/reconnecting-websocket.ts
(3 hunks)packages/connection/src/common/connection/drivers/stream.ts
(2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- packages/connection/src/common/connection/drivers/stream.ts
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: unittest (ubuntu-latest, 18.x, jsdom)
- GitHub Check: unittest (ubuntu-latest, 18.x, node)
- GitHub Check: build (ubuntu-latest, 20.x)
- GitHub Check: unittest (macos-latest, 18.x, jsdom)
- GitHub Check: build (macos-latest, 20.x)
- GitHub Check: unittest (macos-latest, 18.x, node)
- GitHub Check: build-windows
- GitHub Check: ubuntu-latest, Node.js 20.x
🔇 Additional comments (3)
packages/connection/src/common/connection/drivers/reconnecting-websocket.ts (1)
20-28
: 构造函数可见性变更提醒将构造函数改为
protected
限制了类的实例化方式,现在只能通过static forURL
方法创建实例。这增强了封装性,但可能影响现有代码的扩展性。请确认此更改不会影响现有的继承类。
packages/connection/src/common/connection/drivers/frame-decoder.ts (2)
27-38
: 监听器实现限制提醒当前实现将多监听器模式改为单一监听器模式,这可能在某些场景下造成限制。建议考虑以下几点:
- 是否有需要支持多个监听器的场景
- 是否需要提供移除监听器的方法
请确认这个更改不会影响现有的使用场景。
54-69
: 🛠️ Refactor suggestion建议改进大数据包处理策略
当前实现在收到超大数据包时直接截断,这可能导致数据不完整。建议:
- 考虑拒绝而不是截断超大数据包
- 添加数据包大小的日志记录
建议修改为:
push(chunk: Uint8Array): void { - // 如果新数据太大,只接收部分 if (chunk.byteLength > LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE) { - console.warn('[Frame Decoder] Chunk too large, truncating'); - chunk = chunk.slice(0, LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE); + console.error('[Frame Decoder] Chunk size exceeds limit:', { + size: chunk.byteLength, + limit: LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE + }); + throw new Error('Chunk size exceeds maximum frame size'); }Likely invalid or redundant comment.
private arrayBufferHandler = (e: MessageEvent<ArrayBuffer>) => { | ||
const buffer: ArrayBuffer = e.data; | ||
this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength)); | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
建议添加错误处理
arrayBufferHandler
方法在处理消息时缺少错误处理机制,可能导致解码错误时程序崩溃。
建议添加 try-catch 块:
private arrayBufferHandler = (e: MessageEvent<ArrayBuffer>) => {
- const buffer: ArrayBuffer = e.data;
- this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength));
+ try {
+ const buffer: ArrayBuffer = e.data;
+ this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength));
+ } catch (error) {
+ console.error('[ReconnectingWebSocket] Error handling ArrayBuffer message:', error);
+ }
};
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
private arrayBufferHandler = (e: MessageEvent<ArrayBuffer>) => { | |
const buffer: ArrayBuffer = e.data; | |
this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength)); | |
}; | |
private arrayBufferHandler = (e: MessageEvent<ArrayBuffer>) => { | |
try { | |
const buffer: ArrayBuffer = e.data; | |
this.decoder.push(new Uint8Array(buffer, 0, buffer.byteLength)); | |
} catch (error) { | |
console.error('[ReconnectingWebSocket] Error handling ArrayBuffer message:', error); | |
} | |
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
packages/connection/src/common/connection/drivers/frame-decoder.ts (2)
13-20
: 常量定义优化建议新增的常量定义提高了代码的可维护性,但建议考虑以下几点:
MAX_FRAME_SIZE
的大小(1MB)可能需要根据实际使用场景进行调整MAX_ITERATIONS
的值(50)应该添加注释说明选择该值的原因建议添加以下注释:
- private static readonly MAX_FRAME_SIZE = 1 * 1024 * 1024; // 1MB + private static readonly MAX_FRAME_SIZE = 1 * 1024 * 1024; // 1MB - 根据WebSocket最佳实践设置的单帧大小限制 - private static readonly MAX_ITERATIONS = 50; + private static readonly MAX_ITERATIONS = 50; // 单次处理最大迭代次数,防止过度占用CPUAlso applies to: 27-29
54-69
: 异步处理优化值得肯定代码通过以下方式优化了性能:
- 通过
processingPromise
确保同时只有一个处理过程- 每处理10个包就让出执行权,防止长时间占用主线程
- 设置了最大迭代次数,避免死循环
建议考虑添加可配置的处理间隔时间,例如:
+ private static readonly YIELD_INTERVAL = 10; // 每处理多少个包让出执行权 // ... - if (iterations % 10 === 0) { + if (iterations % LengthFieldBasedFrameDecoder.YIELD_INTERVAL === 0) {Also applies to: 71-90
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
packages/connection/src/common/connection/drivers/frame-decoder.ts
(5 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: unittest (ubuntu-latest, 18.x, jsdom)
- GitHub Check: unittest (ubuntu-latest, 18.x, node)
- GitHub Check: unittest (macos-latest, 18.x, jsdom)
- GitHub Check: unittest (macos-latest, 18.x, node)
- GitHub Check: ubuntu-latest, Node.js 20.x
- GitHub Check: build (ubuntu-latest, 20.x)
- GitHub Check: build (macos-latest, 20.x)
- GitHub Check: build-windows
🔇 Additional comments (1)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
30-38
: 监听器管理方式的重大变更这个变更将之前基于
Emitter
的多监听器方法改为了单一监听器方法。虽然这简化了事件处理机制,但也限制了类只能同时处理一个监听器。
protected async readFrame(): Promise<boolean> { | ||
try { | ||
const found = this.readLengthField(); | ||
if (!found) { | ||
return true; | ||
} | ||
|
||
if (found) { | ||
const start = this.cursor.offset; | ||
const end = start + this.contentLength; | ||
|
||
if (end > this.buffers.byteLength) { | ||
return true; | ||
} | ||
|
||
const binary = this.buffers.slice(start, end); | ||
|
||
this.dataEmitter.fire(binary); | ||
// 立即清理已处理的数据 | ||
this.buffers.splice(0, end); | ||
this.reset(); | ||
|
||
if (this.buffers.byteLength > end) { | ||
this.contentLength = -1; | ||
this.state = 0; | ||
this.cursor.moveTo(end); | ||
// has more data, continue to parse | ||
return false; | ||
if (this._onDataListener) { | ||
try { | ||
await Promise.resolve().then(() => this._onDataListener?.(binary)); | ||
} catch (error) { | ||
console.error('[Frame Decoder] Error in data listener:', error); | ||
} | ||
} | ||
|
||
// delete used buffers | ||
this.buffers.splice(0, end); | ||
return false; | ||
} catch (error) { | ||
console.error('[Frame Decoder] Error processing frame:', error); | ||
this.reset(); | ||
return true; | ||
} | ||
|
||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
错误处理机制的改进建议
当前的错误处理有以下几点需要注意:
- 对于超大数据包选择截断而不是拒绝,可能导致数据不完整
- 错误日志中缺少详细信息,不利于问题排查
建议进行以下改进:
- console.warn('[Frame Decoder] Chunk too large, truncating');
- chunk = chunk.slice(0, LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE);
+ const error = new Error(`Chunk size ${chunk.byteLength} exceeds maximum frame size ${LengthFieldBasedFrameDecoder.MAX_FRAME_SIZE}`);
+ console.warn('[Frame Decoder] Rejecting large chunk:', error);
+ throw error;
- console.error('[Frame Decoder] Error in data listener:', error);
+ console.error('[Frame Decoder] Error in data listener:', {
+ error,
+ contentLength: this.contentLength,
+ bufferSize: this.buffers.byteLength
+ });
Also applies to: 212-225
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (6)
packages/connection/src/common/connection/drivers/frame-decoder.ts (3)
53-82
: 处理队列的实现值得肯定,建议补充日志当前实现通过 Promise 队列和迭代限制很好地解决了处理阻塞问题。建议添加以下改进:
if (iterations % 10 === 0) { + console.debug('[Frame Decoder] Yielding after processing 10 frames'); await new Promise((resolve) => setTimeout(resolve, 0)); }
85-119
: 建议优化内存管理当前实现在处理大量数据时可能会占用较多内存。建议添加以下优化:
const binary = this.buffers.slice(start, end); +// 设置一个合理的缓冲区大小限制 +const MAX_BUFFER_SIZE = 50 * 1024 * 1024; // 50MB +if (this.buffers.byteLength > MAX_BUFFER_SIZE) { + console.warn('[Frame Decoder] Buffer size exceeds limit, clearing'); + this.buffers.clear(); + this.reset(); +}
205-218
: 建议添加输入验证当前实现没有对输入内容进行验证。建议添加:
static construct(content: Uint8Array) { + if (!content || content.byteLength === 0) { + throw new Error('Content cannot be empty'); + } const writer = BinaryWriter({});packages/connection/__test__/common/frame-decoder.test.ts (3)
38-40
: 建议添加更多边界测试用例当前测试用例覆盖了基本场景,建议添加以下边界情况:
- 空数据包
- 最大帧大小的数据包
- 无效的帧长度
74-86
: 建议添加超时处理当前异步测试可能在数据未到达时无限等待。建议添加:
- const result = await new Promise<Uint8Array>((resolve) => { + const result = await new Promise<Uint8Array>((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Timeout waiting for data')); + }, 5000); decoder.onData((data) => { + clearTimeout(timeout); resolve(data) });
89-115
: 建议增强测试断言当前测试只验证了数据内容,建议添加以下断言:
- 验证接收顺序
- 验证接收时间间隔
- 验证内存使用情况
receivedData.forEach((data, index) => { fastExpectBufferEqual(data, purePackets[index][1]); + // 验证接收时间间隔不超过100ms + if (index > 0) { + expect(receivedTimes[index] - receivedTimes[index - 1]).toBeLessThan(100); + } });
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/connection/__test__/common/frame-decoder.test.ts
(4 hunks)packages/connection/src/common/connection/drivers/frame-decoder.ts
(6 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (8)
- GitHub Check: unittest (ubuntu-latest, 18.x, jsdom)
- GitHub Check: unittest (ubuntu-latest, 18.x, node)
- GitHub Check: build (ubuntu-latest, 20.x)
- GitHub Check: build-windows
- GitHub Check: unittest (macos-latest, 18.x, jsdom)
- GitHub Check: unittest (macos-latest, 18.x, node)
- GitHub Check: build (macos-latest, 20.x)
- GitHub Check: ubuntu-latest, Node.js 20.x
🔇 Additional comments (1)
packages/connection/src/common/connection/drivers/frame-decoder.ts (1)
29-37
: 监听器管理方式需要重新考虑当前实现将多监听器模式改为单一监听器模式,这可能会限制类的使用场景。
建议:
- 考虑使用数组存储多个监听器
- 添加文档说明当前限制
- 实现
removeListener
方法使 API 更完整
if (this.contentLength === -1) { | ||
if (this.cursor.offset + this.lengthFieldLength > bufferLength) { | ||
if (this.cursor.offset + lengthFieldLength > bufferLength) { | ||
// Not enough data yet, wait for more data | ||
return false; | ||
} | ||
|
||
// read the content length | ||
const buf = this.cursor.read(this.lengthFieldLength); | ||
const buf = this.cursor.read4(); | ||
// fury writer use little endian | ||
this.contentLength = readUInt32LE(buf, 0); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
建议添加最大帧大小验证
当前实现没有对帧大小进行限制,可能导致内存问题。建议添加:
this.contentLength = readUInt32LE(buf, 0);
+const MAX_FRAME_SIZE = 10 * 1024 * 1024; // 10MB
+if (this.contentLength > MAX_FRAME_SIZE) {
+ throw new Error(`Frame size ${this.contentLength} exceeds maximum size ${MAX_FRAME_SIZE}`);
+}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if (this.contentLength === -1) { | |
if (this.cursor.offset + this.lengthFieldLength > bufferLength) { | |
if (this.cursor.offset + lengthFieldLength > bufferLength) { | |
// Not enough data yet, wait for more data | |
return false; | |
} | |
// read the content length | |
const buf = this.cursor.read(this.lengthFieldLength); | |
const buf = this.cursor.read4(); | |
// fury writer use little endian | |
this.contentLength = readUInt32LE(buf, 0); | |
} | |
if (this.contentLength === -1) { | |
if (this.cursor.offset + lengthFieldLength > bufferLength) { | |
// Not enough data yet, wait for more data | |
return false; | |
} | |
// read the content length | |
const buf = this.cursor.read4(); | |
// fury writer use little endian | |
this.contentLength = readUInt32LE(buf, 0); | |
const MAX_FRAME_SIZE = 10 * 1024 * 1024; // 10MB | |
if (this.contentLength > MAX_FRAME_SIZE) { | |
throw new Error(`Frame size ${this.contentLength} exceeds maximum size ${MAX_FRAME_SIZE}`); | |
} | |
} |
…dispatching in WSChannel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
packages/connection/src/common/fury-extends/one-of.ts (1)
71-73
: 增强了错误处理机制添加了默认分支来处理无效的索引值,这样可以及时发现序列化过程中的异常情况。
建议进一步改进错误消息,提供更多上下文信息:
- throw new Error('unknown index: ' + idx); + throw new Error(`无效的序列化索引: ${idx},有效范围为 0-${cap - 1}`);packages/connection/__test__/node/index.test.ts (1)
65-66
: 消息处理逻辑重构重构后的消息处理逻辑更加清晰和类型安全:
- 使用了
wrappedConnection
进行序列化处理- 通过
onMessage
方法统一处理消息- 使用了类型化的消息处理机制
建议添加错误处理机制:
wrappedConnection.onMessage((msg: ChannelMessage) => { + try { if (msg.kind === 'server-ready') { if (msg.id === 'TEST_CHANNEL_ID') { channel.dispatch(msg); } } + } catch (error) { + console.error('消息处理失败:', error); + // 可以在这里添加适当的错误处理逻辑 + } });Also applies to: 71-77
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
packages/connection/__test__/node/index.test.ts
(2 hunks)packages/connection/src/common/fury-extends/one-of.ts
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (6)
- GitHub Check: unittest (ubuntu-latest, 18.x, jsdom)
- GitHub Check: build-windows
- GitHub Check: unittest (ubuntu-latest, 18.x, node)
- GitHub Check: unittest (macos-latest, 18.x, jsdom)
- GitHub Check: unittest (macos-latest, 18.x, node)
- GitHub Check: ubuntu-latest, Node.js 20.x
🔇 Additional comments (1)
packages/connection/__test__/node/index.test.ts (1)
11-11
: 类型导入优化通过导入
ChannelMessage
类型,增强了消息处理的类型安全性。
Types
Background or solution
一次性发送几十M的文件会导致线程卡顿,大文件分 chunk 发送
Changelog
Summary by CodeRabbit
新功能
LengthFieldBasedFrameDecoder
实例以改进 WebSocket 消息的处理。chunkSize
,设定为 1MB,用于数据处理和传输。sendQueue
机制以增强 WebSocket 消息发送的可靠性。slice4
方法以优化缓冲区的切片处理。功能增强
Buffers
和Cursor
类的功能,提供更高效的缓冲区处理方法。CommonChannelHandler
的 WebSocket 连接处理。StreamConnection
类的错误处理和资源管理。修复