Skip to content

Commit

Permalink
fix: resolve sse parsing when retry invalid and dara divided
Browse files Browse the repository at this point in the history
  • Loading branch information
yndu13 committed Apr 1, 2024
1 parent 7979eb3 commit 7ccb194
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 34 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
operating-system: [ubuntu-latest, macos-latest]
node-version: [10.x, 12.x, 14.x]
node-version: [10.x, 12.x, 14.x, 16.x, 18.x, 20.x]
name: Node.js ${{ matrix.node-version }} Test on ${{ matrix.operating-system }}

steps:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/node.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:

strategy:
matrix:
node-version: [12.x, 14.x, 16.x]
node-version: [12.x, 14.x, 16.x, 18.x, 20.x]
# See supported Node.js release schedule at https://nodejs.org/en/about/releases/

steps:
Expand Down
45 changes: 27 additions & 18 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import { Readable, Writable } from 'stream';
import { IncomingMessage } from 'http';
import { readAsSSE } from 'httpx';
import { Readable } from 'stream';

const DATA_PREFIX = 'data:';
const EVENT_PREFIX = 'event:';
const ID_PREFIX = 'id:';
const RETRY_PREFIX = 'retry:';

function isDigitsOnly(str: string) {
for (let i = 0; i < str.length; i++) {
const c = str.charAt(i);
if (c < '0' || c > '9') {
return false;
}
}
return str.length > 0;
}

export class SSEEvent {
data?: string;
id?: string;
event?: string;
retry?: number;

constructor(data: {[key: string]: any} = {}){
constructor(data: { [key: string]: any } = {}) {
this.data = data.data;
this.id = data.id;
this.event = data.event;
Expand All @@ -25,34 +32,34 @@ export class SSEEvent {

function read(readable: Readable): Promise<Buffer> {
return new Promise((resolve, reject) => {
let onData: { (chunk: any): void; (buf: Buffer): void; (chunk: any): void; },
onError: { (err: Error): void; (err: Error): void; (err: Error): void; },
let onData: { (chunk: any): void; (buf: Buffer): void; (chunk: any): void; },
onError: { (err: Error): void; (err: Error): void; (err: Error): void; },
onEnd: { (): void; (): void; (): void; };
const cleanup = function () {
// cleanup
readable.removeListener('error', onError);
readable.removeListener('data', onData);
readable.removeListener('end', onEnd);
};

const bufs: Uint8Array[] | Buffer[] = [];
let size = 0;

onData = function (buf: Buffer) {
bufs.push(buf);
size += buf.length;
};

onError = function (err: Error) {
cleanup();
reject(err);
};

onEnd = function () {
cleanup();
resolve(Buffer.concat(bufs, size));
};

readable.on('error', onError);
readable.on('data', onData);
readable.on('end', onEnd);
Expand All @@ -63,8 +70,8 @@ function read(readable: Readable): Promise<Buffer> {

function readyToRead(readable: Readable) {
return new Promise((resolve, reject) => {
let onReadable: { (): void; (): void; (): void; },
onEnd: { (): void; (): void; (): void; },
let onReadable: { (): void; (): void; (): void; },
onEnd: { (): void; (): void; (): void; },
onError: { (err: Error): void; (err: any): void; (err: Error): void; };
const cleanup = function () {
// cleanup
Expand Down Expand Up @@ -119,7 +126,9 @@ function tryGetEvents(head: string, chunk: string): EventResult {
event.id = line.substring(ID_PREFIX.length).trim();
} else if (line.startsWith(RETRY_PREFIX)) {
const retry = line.substring(RETRY_PREFIX.length).trim();
event.retry = parseInt(retry, 10);
if (isDigitsOnly(retry)) {
event.retry = parseInt(retry, 10);
}
} else if (line.startsWith(':')) {
// ignore the line
}
Expand All @@ -130,7 +139,7 @@ function tryGetEvents(head: string, chunk: string): EventResult {
}

const remain = all.substring(start);
return { events, remain } ;
return { events, remain };
}


Expand All @@ -151,16 +160,16 @@ export default class TeaStream {
}

static async *readAsSSE(stream: Readable): AsyncGenerator<SSEEvent> {
let rest = '';
while (true) {
const ended = await readyToRead(stream);
if (ended) {
return;
}

let rest = '';

let chunk;
while (null !== (chunk = stream.read())) {
const { events, remain } = tryGetEvents(rest, chunk.toString());
const { events, remain } = tryGetEvents(rest, chunk.toString());
rest = remain;
if (events && events.length > 0) {
for (const event of events) {
Expand Down
180 changes: 166 additions & 14 deletions test/stream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,55 @@ const server = http.createServer((req, res) => {
res.writeHead(200, headers);
res.flushHeaders();
let count = 0;
const timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data: ${JSON.stringify({count: count})}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n`);
count++;
}, 100);
if (req.url === '/sse') {
const timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data: ${JSON.stringify({ count: count })}\nevent: flow\nid: sse-test\nretry: 3\n:heartbeat\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_with_no_spaces') {
const timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data:${JSON.stringify({ count: count })}\nevent:flow\nid:sse-test\nretry:3\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_invalid_retry') {
const timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
res.write(`data:${JSON.stringify({ count: count })}\nevent:flow\nid:sse-test\nretry: abc\n\n`);
count++;
}, 100);
} else if (req.url === '/sse_with_data_divided') {
const timer = setInterval(() => {
if (count >= 5) {
clearInterval(timer);
res.end();
return;
}
if (count === 1) {
res.write('data:{"count":');
count++;
return;
}
if (count === 2) {
res.write(`${count++},"tag":"divided"}\nevent:flow\nid:sse-test\nretry:3\n\n`);
return;
}
res.write(`data:${JSON.stringify({ count: count++ })}\nevent:flow\nid:sse-test\nretry:3\n\n`);
}, 100);
}
});

class MyReadable extends Readable {
Expand All @@ -46,7 +86,7 @@ describe('$dara stream', function () {
before((done) => {
server.listen(8384, done);
});

after(function (done) {
this.timeout(20000);
server.close(done);
Expand All @@ -71,16 +111,55 @@ describe('$dara stream', function () {
});

it('readAsSSE', async function () {
const res = await httpx.request("http://127.0.0.1:8384", {readTimeout: 5000});
const res = await httpx.request("http://127.0.0.1:8384/sse", { readTimeout: 5000 });
assert.strictEqual(res.statusCode, 200);
const events: SSEEvent[] = [];

for await (const event of $dara.Stream.readAsSSE(res)) {

events.push(event);
}
assert.strictEqual(events.length, 5);

assert.deepStrictEqual([new SSEEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new SSEEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new SSEEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new SSEEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new SSEEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: 3,
})], events);
});

it('readAsSSE with no spaces', async function () {
const res = await httpx.request("http://127.0.0.1:8384/sse_with_no_spaces", { readTimeout: 5000 });
assert.strictEqual(res.statusCode, 200);
const events: SSEEvent[] = [];

for await (const event of $dara.Stream.readAsSSE(res)) {

events.push(event);
}
assert.strictEqual(events.length, 5);

assert.deepStrictEqual([new SSEEvent({
data: '{"count":0}',
event: 'flow',
Expand Down Expand Up @@ -108,4 +187,77 @@ describe('$dara stream', function () {
retry: 3,
})], events);
});

it('readAsSSE with invalid retry', async function () {
const res = await httpx.request("http://127.0.0.1:8384/sse_invalid_retry", { readTimeout: 5000 });
assert.strictEqual(res.statusCode, 200);
const events: SSEEvent[] = [];

for await (const event of $dara.Stream.readAsSSE(res)) {

events.push(event);
}
assert.strictEqual(events.length, 5);

assert.deepStrictEqual([new SSEEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), new SSEEvent({
data: '{"count":1}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), new SSEEvent({
data: '{"count":2}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), new SSEEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: undefined,
}), new SSEEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: undefined,
})], events);
});

it('readAsSSE with dara divided', async function () {
const res = await httpx.request("http://127.0.0.1:8384/sse_with_data_divided", { readTimeout: 5000 });
assert.strictEqual(res.statusCode, 200);
const events: SSEEvent[] = [];

for await (const event of $dara.Stream.readAsSSE(res)) {

events.push(event);
}
assert.strictEqual(events.length, 4);

assert.deepStrictEqual([new SSEEvent({
data: '{"count":0}',
event: 'flow',
id: 'sse-test',
retry: 3
}), new SSEEvent({
data: '{"count":2,"tag":"divided"}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new SSEEvent({
data: '{"count":3}',
event: 'flow',
id: 'sse-test',
retry: 3,
}), new SSEEvent({
data: '{"count":4}',
event: 'flow',
id: 'sse-test',
retry: 3,
})], events);
});
});

0 comments on commit 7ccb194

Please sign in to comment.