Replies: 2 comments
-
This is the existing demo without the new websockets feature. I looked into nitro websocket and I think it's easy to implement as long as we have access to the websocket object since it's required in the applyWSSHandler function when creating a WS server in tRPC. |
Beta Was this translation helpful? Give feedback.
0 replies
-
I'm trying to utilize nitro's native websocket-compatible (crossws) implementation, and currently crossws can't actively close connections. import {
callTRPCProcedure,
getErrorShape,
getTRPCErrorFromUnknown,
transformTRPCResponse,
TRPCError,
} from '@trpc/server';
import { resolveHTTPResponse } from '@trpc/server/http';
import { isObservable } from '@trpc/server/observable';
import { parseTRPCMessage } from '@trpc/server/rpc';
import { Peer } from 'crossws';
import { createError, defineEventHandler, getRequestURL, isMethod, readBody } from 'h3';
import type { AnyTRPCRouter, inferRouterContext, inferRouterError, TRPCProcedureType } from '@trpc/server';
import type { ResponseMeta } from '@trpc/server/http';
import type { Unsubscribable } from '@trpc/server/observable';
import type { JSONRPC2, TRPCClientOutgoingMessage, TRPCResponse, TRPCResponseMessage } from '@trpc/server/rpc';
import type { EventHandlerObject, H3Event } from 'h3';
type MaybePromise<T> = T | Promise<T>;
export type CreateContextFn<TRouter extends AnyTRPCRouter> = (
event: H3Event,
) => MaybePromise<inferRouterContext<TRouter>>;
export interface ResponseMetaFnPayload<TRouter extends AnyTRPCRouter> {
data: TRPCResponse<unknown, inferRouterError<TRouter>>[];
ctx?: inferRouterContext<TRouter>;
paths?: string[];
type: TRPCProcedureType | 'unknown';
errors: TRPCError[];
}
export type ResponseMetaFn<TRouter extends AnyTRPCRouter> = (opts: ResponseMetaFnPayload<TRouter>) => ResponseMeta;
export interface OnErrorPayload<TRouter extends AnyTRPCRouter> {
error: TRPCError;
type: TRPCProcedureType | 'unknown';
path: string | undefined;
req: H3Event['node']['req'];
input: unknown;
ctx: undefined | inferRouterContext<TRouter>;
}
export type OnErrorFn<TRouter extends AnyTRPCRouter> = (opts: OnErrorPayload<TRouter>) => void;
export declare type WithTrpcPeer = Peer & {
clientSubscriptions: Map<number | string, Unsubscribable>;
trpcCtx: inferRouterContext<AnyTRPCRouter>;
};
export interface ResolveHTTPRequestOptions<TRouter extends AnyTRPCRouter> {
router: TRouter;
createContext?: CreateContextFn<TRouter>;
responseMeta?: ResponseMetaFn<TRouter>;
onError?: OnErrorFn<TRouter>;
enableWebsockets?: boolean;
}
function getPath(event: H3Event): string | null {
const { params } = event.context;
if (typeof params?.trpc === 'string') {
return params.trpc;
}
if (params?.trpc && Array.isArray(params.trpc)) {
return (params.trpc as string[]).join('/');
}
return null;
}
function buildWebsocketHooks<TRouter extends AnyTRPCRouter>(
opts: Omit<ResolveHTTPRequestOptions<TRouter>, 'enableWebsockets'>,
): NonNullable<EventHandlerObject['websocket']> {
const { createContext, router } = opts;
const { transformer } = router._def._config;
function respond(peer: WithTrpcPeer, untransformedJSON: TRPCResponseMessage) {
peer.send(JSON.stringify(transformTRPCResponse(router._def._config, untransformedJSON)));
}
function stopSubscription(
peer: WithTrpcPeer,
subscription: Unsubscribable,
{ id, jsonrpc }: JSONRPC2.BaseEnvelope & { id: JSONRPC2.RequestId },
) {
subscription.unsubscribe();
respond(peer, {
id,
jsonrpc,
result: {
type: 'stopped',
},
});
}
function close(peer: WithTrpcPeer) {
const { clientSubscriptions } = peer as WithTrpcPeer;
for (const sub of clientSubscriptions.values()) {
sub.unsubscribe();
}
clientSubscriptions.clear();
}
async function handleRequest(peer: WithTrpcPeer, msg: TRPCClientOutgoingMessage) {
const { clientSubscriptions, trpcCtx } = peer;
const { id, jsonrpc } = msg;
/* istanbul ignore next -- @preserve */
if (id === null) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: '`id` is required',
});
}
if (msg.method === 'subscription.stop') {
const sub = clientSubscriptions.get(id);
if (sub) {
stopSubscription(peer, sub, { id, jsonrpc });
}
clientSubscriptions.delete(id);
return;
}
const { path, input } = msg.params;
const type = msg.method;
try {
const result = await callTRPCProcedure({
procedures: router._def.procedures,
path,
getRawInput: async () => input,
ctx: trpcCtx,
type,
});
if (type === 'subscription') {
if (!isObservable(result)) {
throw new TRPCError({
message: `Subscription ${path} did not return an observable`,
code: 'INTERNAL_SERVER_ERROR',
});
}
} else {
// send the value as data if the method is not a subscription
respond(peer, {
id,
jsonrpc,
result: {
type: 'data',
data: result,
},
});
return;
}
const sub = result.subscribe({
next(data) {
respond(peer, {
id,
jsonrpc,
result: {
type: 'data',
data,
},
});
},
error(err) {
const error = getTRPCErrorFromUnknown(err);
opts.onError?.({ error, path, type, ctx: trpcCtx, req: peer as never, input });
respond(peer, {
id,
jsonrpc,
error: getErrorShape({
config: router._def._config,
error,
type,
path,
input,
ctx: trpcCtx,
}),
});
},
complete() {
respond(peer, {
id,
jsonrpc,
result: {
type: 'stopped',
},
});
},
});
/* istanbul ignore next -- @preserve */
if (peer.readyState !== 1) {
// if the client got disconnected whilst initializing the subscription
// no need to send stopped message if the client is disconnected
sub.unsubscribe();
return;
}
/* istanbul ignore next -- @preserve */
if (clientSubscriptions.has(id)) {
// duplicate request ids for client
stopSubscription(peer, sub, { id, jsonrpc });
throw new TRPCError({
message: `Duplicate id ${id}`,
code: 'BAD_REQUEST',
});
}
clientSubscriptions.set(id, sub);
respond(peer, {
id,
jsonrpc,
result: {
type: 'started',
},
});
} catch (cause) {
const error = getTRPCErrorFromUnknown(cause);
opts.onError?.({ error, path, type, ctx: trpcCtx, req: peer as never, input });
respond(peer, {
id,
jsonrpc,
error: getErrorShape({
config: router._def._config,
error,
type,
path,
input,
ctx: trpcCtx,
}),
});
}
return;
}
return {
async open(connection) {
const peer = connection as WithTrpcPeer;
peer.clientSubscriptions = new Map();
let ctx: inferRouterContext<TRouter> | undefined = undefined;
try {
ctx = await createContext?.({ peer } as never);
peer.trpcCtx = ctx!;
} catch (cause) {
const error = getTRPCErrorFromUnknown(cause);
opts.onError?.({
error,
path: undefined,
type: 'unknown',
ctx,
req: peer as never,
input: undefined,
});
respond(peer, {
id: null,
error: getErrorShape({
config: router._def._config,
error,
type: 'unknown',
path: undefined,
input: undefined,
ctx,
}),
});
// close in next tick
(global.setImmediate ?? global.setTimeout)(() => {
// !! not exits function peer.close();
close(peer);
});
}
},
async message(connection, message) {
const peer = connection as WithTrpcPeer;
try {
const raw: unknown = JSON.parse(message.text());
const messages: unknown[] = Array.isArray(raw) ? raw : [raw];
const promises = messages
.map((raw) => parseTRPCMessage(raw, transformer))
.map((rpcMessage) => {
return handleRequest(peer, rpcMessage);
});
await Promise.all(promises);
} catch (cause) {
const error = new TRPCError({
code: 'PARSE_ERROR',
cause,
});
respond(peer, {
id: null,
error: getErrorShape({
config: router._def._config,
error,
type: 'unknown',
path: undefined,
input: undefined,
ctx: undefined,
}),
});
}
},
async close(peer, event) {
close(peer as WithTrpcPeer);
},
async error(peer, error) {
const { clientSubscriptions, trpcCtx } = peer as WithTrpcPeer;
opts.onError?.({
ctx: trpcCtx,
error: getTRPCErrorFromUnknown(error),
input: undefined,
path: undefined,
type: 'unknown',
req: peer as never,
});
},
};
}
export function createNuxtApiHandler<TRouter extends AnyTRPCRouter>({
router,
createContext,
responseMeta,
onError,
enableWebsockets = false,
}: ResolveHTTPRequestOptions<TRouter>) {
return defineEventHandler({
handler: async (event) => {
const { req, res } = event.node;
const $url = getRequestURL(event);
const path = getPath(event);
if (path === null) {
const error = getErrorShape({
config: router._def._config,
error: new TRPCError({
message: 'Query "trpc" not found - is the file named `[trpc]`.ts or `[...trpc].ts`?',
code: 'INTERNAL_SERVER_ERROR',
}),
type: 'unknown',
ctx: undefined,
path: undefined,
input: undefined,
});
throw createError({
statusCode: 500,
statusMessage: JSON.stringify(error),
});
}
const httpResponse = await resolveHTTPResponse({
allowBatching: true,
router,
req: {
method: req.method!,
headers: req.headers,
body: isMethod(event, 'GET') ? null : await readBody(event),
query: $url.searchParams,
},
path,
createContext: async () => await createContext?.(event),
responseMeta,
onError: (o) => {
onError?.({
...o,
req,
});
},
});
const { status, headers, body } = httpResponse;
res.statusCode = status;
headers &&
Object.keys(headers).forEach((key) => {
res.setHeader(key, headers[key]!);
});
return body;
},
websocket: enableWebsockets
? buildWebsocketHooks({
router,
createContext,
responseMeta,
onError,
})
: undefined,
});
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Just saw a tweet about how nuxt currently supports websockets! I'm not sure if there is a good standard way of hooking up to it with trpc, but maybe we could get some guidance/support on this, thanks!
Related docs: https://nitro.unjs.io/guide/websocket
Beta Was this translation helpful? Give feedback.
All reactions