Skip to content
This repository has been archived by the owner on Jun 7, 2023. It is now read-only.

Commit

Permalink
Move stream import into node adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
blakeembrey committed Jun 2, 2022
1 parent 9d89036 commit 21c71ae
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 59 deletions.
2 changes: 1 addition & 1 deletion examples/test/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/test/src/_404.tsx
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import React from "react";
import type { ServerSideContext } from "@borderless/site/server";

export default function NotFound() {
Expand Down
15 changes: 13 additions & 2 deletions src/adapters/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type {
ServerResponse,
} from "node:http";
import { URLSearchParams } from "node:url";
import { PassThrough } from "node:stream";
import getRawBody from "raw-body";
import { Request, Headers, Server, StreamOptions } from "../server.js";

Expand Down Expand Up @@ -31,13 +32,23 @@ export function createHandler<C>(
): Handler {
return function handler(req, res, next) {
server(new NodeRequest(req), getContext(req, res))
.then((response) => {
.then(async (response) => {
res.statusCode = response.status;
for (const [key, value] of response.headers) {
res.setHeader(key, value);
}
if (typeof response.body === "object") {
response.body.nodeStream(options).then((x) => x.pipe(res), next);
const { prefix, suffix, stream } = await response.body.nodeStream(
options
);
const proxy = new PassThrough({
flush(cb) {
return cb(null, suffix());
},
});
proxy.write(prefix());
stream.pipe(proxy);
proxy.pipe(res, { end: false });
} else {
res.end(response.body);
}
Expand Down
50 changes: 42 additions & 8 deletions src/adapters/worker.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {
Request as SiteRequest,
Headers as SiteHeaders,
FormParams as SiteFormParams,
Server,
StreamOptions,
} from "../server.js";
Expand All @@ -24,16 +25,30 @@ export function createHandler<C>(
options?: StreamOptions
): Handler {
return async function handler(req) {
const url = new URL(req.url);

const res = await server(new WorkerRequest(req), getContext(req));
const body =
typeof res.body === "object" ? await res.body.readableStream() : res.body;

return new Response(body, {
const init = {
status: res.status,
headers: Array.from(res.headers.entries()),
});
};

if (typeof res.body === "object") {
const { prefix, suffix, stream } = await res.body.readableStream(options);
const { readable, writable } = new TransformStream();

const write = (text: Uint8Array) => {
const writer = writable.getWriter();
return writer.write(text).then(() => writer.releaseLock());
};

write(prefix())
.then(() => stream.pipeTo(writable, { preventClose: true }))
.then(() => write(suffix()).then(() => writable.close()));

return new Response(readable, init);
}

return new Response(res.body, init);
};
}

Expand All @@ -53,6 +68,25 @@ class WorkerHeaders implements SiteHeaders {
}
}

class WorkerForm implements SiteFormParams {
constructor(private formData: FormData) {}

get(name: string) {
const value = this.formData.get(name);
if (typeof value === "string") return value;
return null;
}

getAll(name: string) {
const values = this.formData.getAll(name);
return values.filter((value): value is string => typeof value === "string");
}

has(name: string) {
return this.formData.has(name);
}
}

class WorkerRequest implements SiteRequest {
pathname: string;
search: URLSearchParams;
Expand All @@ -70,8 +104,8 @@ class WorkerRequest implements SiteRequest {
return this.req.arrayBuffer();
}

form() {
return this.req.text().then((value) => new URLSearchParams(value));
async form() {
return new WorkerForm(await this.req.formData());
}

json() {
Expand Down
55 changes: 7 additions & 48 deletions src/server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import * as ReactDOM from "react-dom/server";
import { zip, map } from "iterative";
import { createRouter } from "@borderless/router";
import { FilledContext, HelmetProvider } from "react-helmet-async";
import { PassThrough } from "stream";
import { PageData, PageDataContext } from "./shared.js";
import type { AppProps } from "./app.js";
import type { HeadOptions, TailOptions } from "./document.js";
Expand Down Expand Up @@ -74,13 +73,6 @@ export type GetServerSideProps<P, C> = (
context: ServerSideContext<C>
) => ServerSideProps<P> | undefined | null;

/**
* Generic node.js stream support in the API.
*/
export interface NodeStream {
pipe<Writable extends NodeJS.WritableStream>(destination: Writable): Writable;
}

/**
* Stream rendering options.
*/
Expand All @@ -95,7 +87,7 @@ export interface StreamOptions {
/**
* Raw node.js stream result.
*/
export interface RawNodeStream {
export interface NodeStream {
prefix: () => string;
suffix: () => string;
stream: ReactDOM.PipeableStream;
Expand All @@ -104,7 +96,7 @@ export interface RawNodeStream {
/**
* Raw web stream result.
*/
export interface RawReadableStream {
export interface ReadableStream {
prefix: () => Uint8Array;
suffix: () => Uint8Array;
stream: ReactDOM.ReactDOMServerReadableStream;
Expand All @@ -114,9 +106,7 @@ export interface RawReadableStream {
* Supported body interfaces.
*/
export interface Body {
rawNodeStream(options?: StreamOptions): Promise<RawNodeStream>;
nodeStream(options?: StreamOptions): Promise<NodeStream>;
rawReadableStream(options?: StreamOptions): Promise<RawReadableStream>;
readableStream(options?: StreamOptions): Promise<ReadableStream>;
}

Expand Down Expand Up @@ -462,7 +452,7 @@ function has<T>(value: T | null | undefined, message: string): T {
class ReactBody<C> implements Body {
constructor(private page: JSX.Element, private context: RenderContext<C>) {}

private getApp() {
private render() {
const { hydrate, pageData, helmetContext, scripts } = this.context;

const app = (
Expand Down Expand Up @@ -512,11 +502,9 @@ class ReactBody<C> implements Body {
return this.context.renderTail({ tail });
}

async rawReadableStream(
options: StreamOptions = {}
): Promise<RawReadableStream> {
async readableStream(options: StreamOptions = {}): Promise<ReadableStream> {
const { signal, onError, waitForAllReady, head = "", tail = "" } = options;
const { app, renderOptions } = this.getApp();
const { app, renderOptions } = this.render();
const encoder = new TextEncoder();

const stream = await ReactDOM.renderToReadableStream(app, {
Expand All @@ -534,24 +522,7 @@ class ReactBody<C> implements Body {
};
}

async readableStream(options: StreamOptions = {}): Promise<ReadableStream> {
const { prefix, suffix, stream } = await this.rawReadableStream(options);
const { readable, writable } = new TransformStream();

const write = (text: Uint8Array) => {
const writer = writable.getWriter();
return writer.write(text).then(() => writer.releaseLock());
};

write(prefix())
.then(() => stream.pipeTo(writable, { preventClose: true }))
.then(() => write(suffix()).then(() => writable.close()))
.catch(options.onError);

return readable;
}

rawNodeStream(options: StreamOptions = {}): Promise<RawNodeStream> {
nodeStream(options: StreamOptions = {}): Promise<NodeStream> {
const { signal, onError, waitForAllReady, head = "", tail = "" } = options;

return new Promise((resolve, reject) => {
Expand All @@ -574,7 +545,7 @@ class ReactBody<C> implements Body {
return reject(err);
};

const { app, renderOptions } = this.getApp();
const { app, renderOptions } = this.render();
const stream = ReactDOM.renderToPipeableStream(app, {
onAllReady,
onError,
Expand All @@ -587,18 +558,6 @@ class ReactBody<C> implements Body {
signal?.addEventListener("abort", onAbort);
});
}

async nodeStream(options: StreamOptions = {}): Promise<NodeStream> {
const { prefix, suffix, stream } = await this.rawNodeStream(options);
const proxy = new PassThrough({
flush(cb) {
return cb(null, suffix());
},
});
proxy.write(prefix());
stream.pipe(proxy);
return proxy;
}
}

/**
Expand Down

0 comments on commit 21c71ae

Please sign in to comment.