Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: send data by chunk in websocket #3988

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
16 changes: 11 additions & 5 deletions packages/components/src/recycle-tree/tree/TreeNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,15 @@ const { Path } = path;
* @param items 插入的数组
*/
export function spliceArray(arr: number[], start: number, deleteCount = 0, items?: number[] | null) {
const a = arr.slice(0);
a.splice(start, deleteCount, ...(items || []));
return a;
// 如果没有修改操作,直接返回原数组
if (deleteCount === 0 && (!items || items.length === 0)) {
return arr;
}

// 直接使用 slice + concat 避免 spread operator
const before = arr.slice(0, start);
const after = arr.slice(start + deleteCount);
return before.concat(items || []).concat(after);
}

export enum BranchOperatorStatus {
Expand Down Expand Up @@ -568,10 +574,10 @@ export class CompositeTreeNode extends TreeNode implements ICompositeTreeNode {
}

/**
* 确保此“目录”的子级已加载(不影响“展开”状态)
* 确保此"目录"的子级已加载(不影响"展开"状态)
* 如果子级已经加载,则返回的Promise将立即解决
* 否则,将发出重新加载请求并返回Promise
* 一旦返回的Promise.resolve,CompositeTreeNode#children 便可以访问到对于节点
* 一旦返回的Promise.resolve,"CompositeTreeNode#children" 便可以访问到对于节点
*/
public async ensureLoaded(token?: CancellationToken) {
if (this._children) {
Expand Down
7 changes: 4 additions & 3 deletions packages/connection/__test__/browser/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { furySerializer } from '@opensumi/ide-connection';
import { WSWebSocketConnection, furySerializer } from '@opensumi/ide-connection';
import { ReconnectingWebSocketConnection } from '@opensumi/ide-connection/lib/common/connection/drivers/reconnecting-websocket';
import { sleep } from '@opensumi/ide-core-common';
import { Server, WebSocket } from '@opensumi/mock-socket';
Expand All @@ -21,10 +21,11 @@ describe('connection browser', () => {
let data2Received = false;

mockServer.on('connection', (socket) => {
socket.on('message', (msg) => {
const connection = new WSWebSocketConnection(socket as any);
connection.onMessage((msg) => {
const msgObj = furySerializer.deserialize(msg as Uint8Array);
if (msgObj.kind === 'open') {
socket.send(
connection.send(
furySerializer.serialize({
id: msgObj.id,
kind: 'server-ready',
Expand Down
88 changes: 42 additions & 46 deletions packages/connection/__test__/common/frame-decoder.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ console.timeEnd('createPayload');
// 1m
const pressure = 1024 * 1024;

const purePackets = [p1k, p64k, p128k, p5m, p10m].map((v) => [LengthFieldBasedFrameDecoder.construct(v), v] as const);
const purePackets = [p1k, p64k, p128k, p5m, p10m].map(
(v) => [LengthFieldBasedFrameDecoder.construct(v).dump(), v] as const,
);

const size = purePackets.reduce((acc, v) => acc + v[0].byteLength, 0);

Expand All @@ -48,7 +50,7 @@ purePackets.forEach((v) => {
});

const mixedPackets = [p1m, p5m].map((v) => {
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump();
const newPacket = createPayload(1024 + sumiPacket.byteLength);
newPacket.set(sumiPacket, 1024);
return [newPacket, v] as const;
Expand All @@ -59,7 +61,7 @@ const packets = [...purePackets, ...mixedPackets];
describe('frame decoder', () => {
it('can create frame', () => {
const content = new Uint8Array([1, 2, 3]);
const packet = LengthFieldBasedFrameDecoder.construct(content);
const packet = LengthFieldBasedFrameDecoder.construct(content).dump();
const reader = BinaryReader({});

reader.reset(packet);
Expand All @@ -69,67 +71,61 @@ describe('frame decoder', () => {
});

packets.forEach(([packet, expected]) => {
it(`can decode stream: ${round(packet.byteLength / 1024 / 1024, 2)}m`, (done) => {
it(`can decode stream: ${round(packet.byteLength / 1024 / 1024, 2)}m`, async () => {
const decoder = new LengthFieldBasedFrameDecoder();

decoder.onData((data) => {
fastExpectBufferEqual(data, expected);
decoder.dispose();
done();
});
const result = await new Promise<Uint8Array>((resolve) => {
decoder.onData((data) => resolve(data));

console.log('write chunk', packet.byteLength);
// write chunk by ${pressure} bytes
for (let i = 0; i < packet.byteLength; i += pressure) {
decoder.push(packet.subarray(i, i + pressure));
logMemoryUsage();
}
// Push the full packet - the decoder will handle chunking internally
decoder.push(packet);
});

logMemoryUsage();
fastExpectBufferEqual(result, expected);
decoder.dispose();
});
});

it('can decode a stream payload contains multiple frames', (done) => {
it('can decode a stream payload contains multiple frames', async () => {
const decoder = new LengthFieldBasedFrameDecoder();
const expectCount = purePackets.length;
let count = 0;
decoder.onData((data) => {
const expected = purePackets[count][1];
fastExpectBufferEqual(data, expected);

count++;
if (count === expectCount) {
decoder.dispose();
done();
}
const receivedData: Uint8Array[] = [];
let resolved = false;

const dataPromise = new Promise<void>((resolve) => {
decoder.onData((data) => {
receivedData.push(data);
if (receivedData.length === purePackets.length && !resolved) {
resolved = true;
resolve();
}
});
});

console.log('write chunk', bigPayload.byteLength);
// write chunk by ${pressure} bytes
for (let i = 0; i < bigPayload.byteLength; i += pressure) {
decoder.push(bigPayload.subarray(i, i + pressure));
logMemoryUsage();
}
// Push the full payload - the decoder will handle chunking internally
decoder.push(bigPayload);

await dataPromise;

// Verify all packets were received in order
receivedData.forEach((data, index) => {
fastExpectBufferEqual(data, purePackets[index][1]);
});

logMemoryUsage();
decoder.dispose();
});

it('can decode a stream it has no valid length info', (done) => {
it('can decode a stream it has no valid length info', async () => {
const v = createPayload(1024);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v);
const sumiPacket = LengthFieldBasedFrameDecoder.construct(v).dump();

const decoder = new LengthFieldBasedFrameDecoder();
decoder.onData((data) => {
fastExpectBufferEqual(data, v);
done();
const result = await new Promise<Uint8Array>((resolve) => {
decoder.onData((data) => resolve(data));
decoder.push(sumiPacket);
});

console.log('write chunk', sumiPacket.byteLength);
// use pressure = 2 to simulate the header and payload are separated
const pressure = 2;
for (let i = 0; i < sumiPacket.byteLength; i += pressure) {
decoder.push(sumiPacket.subarray(i, i + pressure));
}
fastExpectBufferEqual(result, v);
decoder.dispose();
});
});

Expand Down
14 changes: 8 additions & 6 deletions packages/connection/__test__/node/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { SumiConnection } from '@opensumi/ide-connection/src/common/rpc/connecti
import { Deferred } from '@opensumi/ide-core-common';

import { RPCService } from '../../src';
import { RPCServiceCenter, initRPCService } from '../../src/common';
import { ChannelMessage, RPCServiceCenter, initRPCService } from '../../src/common';
import { CommonChannelPathHandler } from '../../src/common/server-handler';
import { WSChannel } from '../../src/common/ws-channel';
import { CommonChannelHandler, WebSocketServerRoute } from '../../src/node';
Expand Down Expand Up @@ -62,14 +62,16 @@ describe('connection', () => {
});
const clientId = 'TEST_CLIENT';
const wsConnection = new WSWebSocketConnection(connection);
const wrappedConnection = wrapSerializer(wsConnection, furySerializer);

const channel = new WSChannel(wrapSerializer(wsConnection, furySerializer), {
id: 'TEST_CHANNEL_ID',
});
connection.on('message', (msg: Uint8Array) => {
const msgObj = furySerializer.deserialize(msg);
if (msgObj.kind === 'server-ready') {
if (msgObj.id === 'TEST_CHANNEL_ID') {
channel.dispatch(msgObj);

wrappedConnection.onMessage((msg: ChannelMessage) => {
if (msg.kind === 'server-ready') {
if (msg.id === 'TEST_CHANNEL_ID') {
channel.dispatch(msg);
}
}
});
Expand Down
40 changes: 40 additions & 0 deletions packages/connection/src/common/buffers/buffers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

export const emptyBuffer = new Uint8Array(0);
export const buffer4Capacity = new Uint8Array(4);

export function copy(
source: Uint8Array,
Expand Down Expand Up @@ -72,6 +73,39 @@ export class Buffers {
return target;
}

slice4(start: number) {
let end = start + 4;
const buffers = this.buffers;

if (end > this.size) {
end = this.size;
}

if (start >= end) {
return emptyBuffer;
}

let startBytes = 0;
let si = 0;
for (; si < buffers.length && startBytes + buffers[si].length <= start; si++) {
startBytes += buffers[si].length;
}

const target = buffer4Capacity;

let ti = 0;
for (let ii = si; ti < end - start && ii < buffers.length; ii++) {
const len = buffers[ii].length;

const _start = ti === 0 ? start - startBytes : 0;
const _end = ti + len >= end - start ? Math.min(_start + (end - start) - ti, len) : len;
copy(buffers[ii], target, ti, _start, _end);
ti += _end - _start;
}

return target;
}

pos(i: number): { buf: number; offset: number } {
if (i < 0 || i >= this.size) {
throw new Error(`out of range, ${i} not in [0, ${this.size})`);
Expand Down Expand Up @@ -268,6 +302,12 @@ export class Cursor {
return buffers;
}

read4() {
const buffers = this.buffers.slice4(this.offset);
this.skip(4);
return buffers;
}

skip(n: number) {
let count = 0;
while (this.chunkIndex < this.buffers.buffers.length) {
Expand Down
Loading
Loading