Skip to content

Commit

Permalink
Fix socket/stream mutual destruction, remove setTimeout
Browse files Browse the repository at this point in the history
  • Loading branch information
boronine committed Nov 3, 2024
1 parent fb6cc45 commit 25d39b9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 62 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# h2tunnel

![NPM Version](https://img.shields.io/npm/v/h2tunnel)
![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/boronine/h2tunnel/node.js.yml)
[![NPM Version](https://img.shields.io/npm/v/h2tunnel)](https://www.npmjs.com/package/h2tunnel)
[![GitHub Actions Workflow Status](https://img.shields.io/github/actions/workflow/status/boronine/h2tunnel/node.js.yml)](https://github.com/boronine/h2tunnel/actions/workflows/node.js.yml)

A CLI tool and Node.js library for a popular "tunneling" workflow, similar to the proprietary [ngrok](https://ngrok.com/)
or the openssh-based `ssh -L` solution. All in [less than 500 LOC](https://github.com/boronine/h2tunnel/blob/main/src/h2tunnel.ts)
Expand Down
117 changes: 90 additions & 27 deletions src/h2tunnel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {
TunnelServer,
} from "./h2tunnel.js";
import net from "node:net";
import * as http2 from "node:http2";
import { strictEqual } from "node:assert";

// localhost HTTP1 server "python3 -m http.server"
const LOCAL_PORT = 14000;
Expand All @@ -20,6 +22,9 @@ const TUNNEL_PORT = 14005;
// remote HTTPS server that is piped through the tunnel to localhost
const MUX_PORT = 14006;

// Reduce this to make tests faster
const TIME_MULTIPLIER = 0.1;

const CLIENT_KEY = `-----BEGIN PRIVATE KEY-----
MIG2AgEAMBAGByqGSM49AgEGBSuBBAAiBIGeMIGbAgEBBDCDzcLnOqzvCrnUyd4P
1QcIG/Xi/VPpA5dVIwPVkutr9y/wZo3aJsYUX5xExQMsEeihZANiAAQfSPquV3P/
Expand Down Expand Up @@ -65,13 +70,13 @@ const clientOptions: ClientOptions = {
cert: CLIENT_CRT,
localHttpPort: LOCAL_PORT,
demuxListenPort: DEMUX_PORT,
tunnelRestartTimeout: 500,
tunnelRestartTimeout: 500 * TIME_MULTIPLIER,
};

type Conn = { clientSocket: net.Socket; originSocket: net.Socket };

async function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
return new Promise((resolve) => setTimeout(resolve, ms * TIME_MULTIPLIER));
}

async function createBadTlsServer(port: number): Promise<() => Promise<void>> {
Expand Down Expand Up @@ -294,9 +299,9 @@ async function testConn(
if (term === "FIN") {
await t.test(
`clean termination by ${by} FIN`,
{ plan: 12 },
{ plan: 12, timeout: 1000 },
(t: TestContext) =>
new Promise<void>((resolve) => {
new Promise<void>((resolve, reject) => {
let i = 0;
const done = () => i === 2 && resolve();
t.assert.strictEqual(socket2.readyState, "open");
Expand Down Expand Up @@ -332,7 +337,7 @@ async function testConn(
} else if (term == "RST") {
await t.test(
`clean reset by ${by} RST`,
{ plan: 8 },
{ plan: 8, timeout: 1000 },
(t: TestContext) =>
new Promise<void>((resolve) => {
let i = 0;
Expand Down Expand Up @@ -360,37 +365,43 @@ async function testConn(
}
}

await test("basic connection and termination", async (t) => {
await test.only("basic connection and termination", async (t) => {
const net = new NetworkEmulator(LOCAL_PORT, PROXY_TEST_PORT);
const server = new TunnelServer(serverOptions);
const client = new TunnelClient(clientOptions);
server.start();
client.start();
await server.waitUntilListening();
await client.waitUntilConnected();
await server.waitUntilConnected();
console.log(0, client.state);
await net.startAndWaitUntilReady();
for (const term of ["FIN", "RST"] satisfies ("FIN" | "RST")[]) {
for (const by of ["client", "server"] satisfies ("client" | "server")[]) {
for (const numBytes of [1, 4]) {
for (const proxyPort of [LOCAL_PORT, PROXY_TEST_PORT, PROXY_PORT]) {
const echoServer = new EchoServer(LOCAL_PORT, proxyPort);
await echoServer.startAndWaitUntilReady();
const strict = proxyPort !== PROXY_PORT;
// Test single
await testConn(t, echoServer, numBytes, term, by, 0, strict);
// Test double simultaneous
await Promise.all([
testConn(t, echoServer, numBytes, term, by, 0, strict),
testConn(t, echoServer, numBytes, term, by, 0, strict),
]);
// Test triple delayed
await Promise.all([
testConn(t, echoServer, numBytes, term, by, 0, strict),
testConn(t, echoServer, numBytes, term, by, 10, strict),
testConn(t, echoServer, numBytes, term, by, 100, strict),
]);
await echoServer.stopAndWaitUntilClosed();
}
for (const proxyPort of [LOCAL_PORT, PROXY_TEST_PORT, PROXY_PORT]) {
await t.test(
`clean termination by ${by} ${term} on ${proxyPort}`,
async (t) => {
const echoServer = new EchoServer(LOCAL_PORT, proxyPort);
await echoServer.startAndWaitUntilReady();
const strict = proxyPort !== PROXY_PORT;
// Test single
await testConn(t, echoServer, 1, term, by, 0, strict);
await testConn(t, echoServer, 4, term, by, 0, strict);
// Test double simultaneous
await Promise.all([
testConn(t, echoServer, 3, term, by, 0, strict),
testConn(t, echoServer, 3, term, by, 0, strict),
]);
// Test triple delayed
await Promise.all([
testConn(t, echoServer, 4, term, by, 0, strict),
testConn(t, echoServer, 4, term, by, 10, strict),
testConn(t, echoServer, 4, term, by, 100, strict),
]);
await echoServer.stopAndWaitUntilClosed();
},
);
}
}
}
Expand All @@ -400,7 +411,7 @@ await test("basic connection and termination", async (t) => {
await server.stop();
});

await test.only("happy-path", async (t) => {
await test("happy-path", async (t) => {
const echo = new EchoServer(LOCAL_PORT, PROXY_PORT);
await echo.startAndWaitUntilReady();

Expand Down Expand Up @@ -516,3 +527,55 @@ await test("garbage-to-server", async (t: TestContext) => {
await server.stop();
await echoServer.stopAndWaitUntilClosed();
});

// await test("node-http-session-reset", async (t: TestContext) => {
// const net = new NetworkEmulator(LOCAL_PORT, PROXY_TEST_PORT);
// await net.startAndWaitUntilReady();
// const server = http2.createServer();
//
// const serverSessionEvents: any[] = [];
// const clientSessionEvents: any[] = [];
//
// function instrumentStream(stream: http2.Http2Stream, eventArr: any[]) {
// stream.on("data", (chunk) => {
// stream.write(chunk);
// });
// stream.on("end", () => eventArr.push("end"));
// stream.on("close", () => {
// eventArr.push(["close", stream.aborted]);
// });
// stream.on("aborted", () => eventArr.push("aborted"));
// stream.on("error", () => eventArr.push("error"));
// }
//
// server.on("session", (session) => {
// session.on("stream", (stream) => {
// instrumentStream(stream, serverSessionEvents);
// });
// });
// await new Promise<void>((resolve) => server.listen(LOCAL_PORT, resolve));
//
// const session = http2.connect(`http://localhost:${PROXY_TEST_PORT}`);
// const stream = session.request();
//
// instrumentStream(stream, clientSessionEvents);
//
// await sleep(10);
// stream.end();
// // stream.close(http2.constants.NGHTTP2_CANCEL);
// // net.incomingSocket!.resetAndDestroy();
// // net.outgoingSocket!.resetAndDestroy();
// // stream.destroy();
// await sleep(10);
//
// console.log(2);
// console.log(serverSessionEvents);
// console.log(clientSessionEvents);
// t.assert.deepEqual(serverSessionEvents, []);
// console.log(3);
// session.destroy();
// console.log(4);
// await new Promise<void>((resolve) => server.close(() => resolve()));
// console.log(5);
// await net.stopAndWaitUntilClosed();
// });
68 changes: 35 additions & 33 deletions src/h2tunnel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,6 @@ export abstract class AbstractTunnel<

linkSocketsIfNecessary() {
if (this.tunnelSocket && !this.tunnelSocket.closed && this.muxSocket) {
this.tunnelSocket.on("data", (data) =>
this.log({ tunnelSocketBytes: data.length }),
);
this.muxSocket.on("data", (data) =>
this.log({ muxSocketBytes: data.length }),
);
this.tunnelSocket.pipe(this.muxSocket);
this.muxSocket.pipe(this.tunnelSocket);
this.log({ linked: true });
Expand Down Expand Up @@ -147,39 +141,47 @@ export abstract class AbstractTunnel<

addDemuxSocket(socket: net.Socket, stream: http2.Http2Stream): void {
this.log({ demuxSocket: "added", streamId: stream.id });
socket.on("data", (chunk) => stream.write(chunk));
stream.on("data", (chunk) => socket.write(chunk));
socket.on("error", (err) => {
this.log({ demuxSocket: "error", err });
if (!stream.closed) {
stream.close(http2.constants.NGHTTP2_CANCEL);
}
socket.on("data", (chunk) => {
this.log({ streamDataWrite: chunk.length, streamId: stream.id });
stream.write(chunk);
});
socket.on("end", () => {
this.log({ demuxSocket: "end" });
stream.end();
stream.on("data", (chunk) => {
this.log({ streamDataRead: chunk.length, streamId: stream.id });
socket.write(chunk);
});
stream.on("aborted", () => {
this.log({ demuxStream: "aborted" });
});
stream.on("error", (err) => {
this.log({ demuxStream: "error", err });
// Prevent error being logged, we are handling it during the "close" event
socket.on("error", () => {});
socket.on("close", () => {
this.log({
demuxSocket: "close",
streamId: stream.id,
streamError: stream.errored,
socketError: socket.errored,
});
if (!stream.destroyed) {
if (socket.errored) {
stream.close(http2.constants.NGHTTP2_INTERNAL_ERROR);
} else {
stream.destroy();
}
}
});
stream.on("end", () => {
this.log({ demuxStream: "end" });
// This is a hack to workaround Node.js behavior where "end"/"close" event is emitted before "aborted"/"error"
setTimeout(() => {
if (stream.aborted) {
this.log({
demuxStream: "actually-aborted",
socketAlreadyDestroyed: socket.destroyed,
});
// Prevent error being logged, we are handling it during the "close" event
stream.on("error", () => {});
stream.on("close", () => {
this.log({
demuxStream: "close",
streamId: stream.id,
streamError: stream.errored,
socketError: socket.errored,
});
if (!socket.destroyed) {
if (stream.errored) {
socket.resetAndDestroy();
} else {
this.log({ demuxStream: "actually-ended" });
socket.end();
socket.destroy();
}
}, 1);
}
});
}

Expand Down

0 comments on commit 25d39b9

Please sign in to comment.