From ae43ce5daf2408c37914d79f08815f4aeae4d91d Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Tue, 6 Dec 2022 14:23:50 -0600 Subject: [PATCH 1/9] Add test suite for 0.34.0 SDK, bump working version to -dev --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index b958b67..f15fd62 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@interval/sdk", - "version": "0.34.0", + "version": "0.35.0-dev", "homepage": "https://interval.com", "repository": { "type": "git", From b3fff10624d81956f7714c5f8aaf32a357b800f4 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Tue, 1 Nov 2022 13:32:27 -0500 Subject: [PATCH 2/9] Add peer connections to hosts via WebRTC datachannels, try to use first If a peer connection isn't established or if sending via the peer connection fails, fall back to websocket messages. This isn't final, some bookkeeping that we do in wss is probably being missed by exclusively sending messages through peers, but it's a start! --- package.json | 8 +- src/classes/DataChannelConnection.ts | 59 ++++ src/classes/DuplexRPCClient.ts | 2 +- src/classes/IOClient.ts | 2 + src/classes/ISocket.ts | 108 ++++++- src/classes/IntervalClient.ts | 414 +++++++++++++++++++++++++-- src/examples/app/index.ts | 4 +- src/examples/basic/index.ts | 32 ++- src/internalRpcSchema.ts | 51 ++++ src/ioSchema.ts | 13 +- 10 files changed, 656 insertions(+), 37 deletions(-) create mode 100644 src/classes/DataChannelConnection.ts diff --git a/package.json b/package.json index f15fd62..19aac65 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,12 @@ "url": "github:interval/interval-node" }, "bugs": "https://github.com/interval/interval-node/issues", - "keywords": ["internal tool", "app", "ui", "ui builder"], + "keywords": [ + "internal tool", + "app", + "ui", + "ui builder" + ], "license": "MIT", "engines": { "node": ">=12.17.0" @@ -24,6 +29,7 @@ "dependencies": { "cross-fetch": "^3.1.5", "evt": "^2.4.10", + "node-datachannel": "^0.4.0", "superjson": "^1.9.1", "uuid": "^9.0.0", "ws": "^8.4.1", diff --git a/src/classes/DataChannelConnection.ts b/src/classes/DataChannelConnection.ts new file mode 100644 index 0000000..b5d53ae --- /dev/null +++ b/src/classes/DataChannelConnection.ts @@ -0,0 +1,59 @@ +import { PeerConnection } from 'node-datachannel' +import type { DuplexRPCClient } from './DuplexRPCClient' +import { + HostSchema, + PeerConnectionInitializer, + ClientSchema, + clientSchema, +} from '../internalRpcSchema' +import ISocket, { DataChannelSocket } from './ISocket' + +export default class DataChannelConnection { + peer: PeerConnection + ds?: DataChannelSocket + rpc?: DuplexRPCClient + + constructor({ + id, + send, + rpcConstructor, + }: { + id: string + send: PeerConnectionInitializer + rpcConstructor: (props: { + communicator: ISocket + canCall: ClientSchema + }) => DuplexRPCClient + }) { + this.peer = new PeerConnection(id, { + iceServers: [ + 'stun:stun.l.google.com:19302', + 'stun:global.stun.twilio.com:3478', + ], + }) + // For some reason these cause segfaults? + // this.peer.onStateChange(state => { + // console.log('State:', state) + // }) + // this.peer.onGatheringStateChange(state => { + // console.log('GatheringState:', state) + // }) + this.peer.onLocalDescription((description, type) => { + send({ id, type, description }) + }) + this.peer.onLocalCandidate((candidate, mid) => { + send({ id, type: 'candidate', candidate, mid }) + }) + this.peer.onDataChannel(dc => { + const ds = new DataChannelSocket(dc) + this.ds = ds + + const communicator = new ISocket(ds) + + this.rpc = rpcConstructor({ + communicator, + canCall: clientSchema, + }) + }) + } +} diff --git a/src/classes/DuplexRPCClient.ts b/src/classes/DuplexRPCClient.ts index 0d4d612..114535c 100644 --- a/src/classes/DuplexRPCClient.ts +++ b/src/classes/DuplexRPCClient.ts @@ -9,7 +9,7 @@ function generateId() { return count.toString() } -interface MethodDef { +export interface MethodDef { [key: string]: { inputs: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion returns: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion diff --git a/src/classes/IOClient.ts b/src/classes/IOClient.ts index e7bd397..834d3de 100644 --- a/src/classes/IOClient.ts +++ b/src/classes/IOClient.ts @@ -193,6 +193,8 @@ export class IOClient { }) ) + validationErrorMessage = undefined + if (validities.some(v => !v)) { render() return diff --git a/src/classes/ISocket.ts b/src/classes/ISocket.ts index 922f4c1..a519f37 100644 --- a/src/classes/ISocket.ts +++ b/src/classes/ISocket.ts @@ -1,4 +1,5 @@ import type { WebSocket as NodeWebSocket } from 'ws' +import type { DataChannel } from 'node-datachannel' import { Evt } from 'evt' import { v4 } from 'uuid' import { z } from 'zod' @@ -25,6 +26,102 @@ interface ISocketConfig { id?: string // manually specifying ids is helpful for debugging } +export class DataChannelSocket { + dc: DataChannel | RTCDataChannel + #readyState: string = 'connecting' + + constructor(dc: DataChannel | RTCDataChannel) { + this.dc = dc + } + + public static OPEN = 'open' as const + + get readyState(): string { + if ('readyState' in this.dc) { + return this.dc.readyState + } + + return this.#readyState + } + + get OPEN() { + return DataChannelSocket.OPEN + } + + send(message: string) { + if ('sendMessage' in this.dc) { + // node + this.dc.sendMessage(message) + } else { + // web + this.dc.send(message) + } + } + + close(code?: number, reason?: string) { + // TODO: Do something with codes? + this.#readyState = 'closing' + this.dc.close() + } + + set onopen(cb: () => void) { + if ('onOpen' in this.dc) { + // node + this.dc.onOpen(cb) + } else { + // web + this.dc.onopen = cb + } + } + + set onclose(cb: () => void) { + const handleClose = () => { + this.#readyState = 'closed' + cb() + } + if ('onClosed' in this.dc) { + // node + this.dc.onClosed(handleClose) + } else { + // web + this.dc.onclose = handleClose + } + } + + set onerror(cb: (ev: ErrorEvent | Event) => void) { + if ('onError' in this.dc) { + // node + this.dc.onError((err: string) => { + // ?? + cb( + new ErrorEvent('ErrorEvent', { + message: err, + }) + ) + }) + } else { + // web + this.dc.onerror = cb + } + } + + set onmessage(cb: (evt: MessageEvent) => void) { + if ('onMessage' in this.dc) { + // node + this.dc.onMessage((msg: string | Buffer) => { + cb( + new MessageEvent('MessageEvent', { + data: msg, + }) + ) + }) + } else { + // web + this.dc.onmessage = cb + } + } +} + /** * A relatively thin wrapper around an underlying WebSocket connection. Can be thought of as a TCP layer on top of WebSockets, * ISockets send and expect `ACK` messages following receipt of a `MESSAGE` message containing the transmitted data. @@ -42,7 +139,7 @@ interface ISocketConfig { * rejecting the `ping` Promise. */ export default class ISocket { - private ws: WebSocket | NodeWebSocket + private ws: WebSocket | NodeWebSocket | DataChannelSocket private connectTimeout: number private sendTimeout: number private pingTimeout: number @@ -132,7 +229,10 @@ export default class ISocket { return this.ws.close(code, reason) } - constructor(ws: WebSocket | NodeWebSocket, config?: ISocketConfig) { + constructor( + ws: WebSocket | NodeWebSocket | DataChannelSocket, + config?: ISocketConfig + ) { // this works but on("error") does not. No idea why ¯\_(ツ)_/¯ // will emit "closed" regardless // this.ws.addEventListener('error', e => { @@ -167,8 +267,8 @@ export default class ISocket { this.onOpen.post() } - this.ws.onclose = (ev: CloseEvent) => { - this.onClose.post([ev.code, ev.reason]) + this.ws.onclose = (ev?: CloseEvent) => { + this.onClose.post([ev?.code ?? 0, ev?.reason ?? 'idk']) } this.ws.onerror = (ev: ErrorEvent | Event) => { diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index 9484686..afde785 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -5,7 +5,12 @@ import fetch from 'node-fetch' import * as superjson from 'superjson' import { JSONValue } from 'superjson/dist/types' import ISocket, { TimeoutError } from './ISocket' -import { DuplexRPCClient, DuplexRPCHandlers } from './DuplexRPCClient' +import type { DescriptionType } from 'node-datachannel' +import { + DuplexRPCClient, + DuplexRPCHandlers, + MethodDef, +} from './DuplexRPCClient' import IOError from './IOError' import Logger from './Logger' import { @@ -18,12 +23,16 @@ import { ActionDefinition, PageDefinition, HostSchema, + ClientSchema, + WSServerSchema, } from '../internalRpcSchema' import { ActionResultSchema, IOFunctionReturnType, IO_RESPONSE, LegacyLinkProps, + requiresServer, + T_IO_METHOD_NAMES, T_IO_RENDER_INPUT, T_IO_RESPONSE, } from '../ioSchema' @@ -41,7 +50,8 @@ import type { IntervalRouteDefinitions, IntervalPageHandler, } from '../types' -import TransactionLoadingState from '../classes/TransactionLoadingState' +import type DataChannelConnection from './DataChannelConnection' +import TransactionLoadingState from './TransactionLoadingState' import { Interval, InternalConfig, IntervalError } from '..' import Page from './Page' import { @@ -260,7 +270,12 @@ export default class IntervalClient { string, [(output?: any) => void, (err?: any) => void] >() + #ws: ISocket | undefined = undefined + #dccMap = new Map() + #peerIdMap = new Map() + #peerIdToTransactionIdsMap = new Map>() + #serverRpc: | DuplexRPCClient | undefined = undefined @@ -312,7 +327,9 @@ export default class IntervalClient { private async initializeConnection() { await this.#createSocketConnection() - this.#createRPCClient() + this.#serverRpc = this.#createRPCClient({ + canCall: wsServerSchema, + }) } async respondToRequest(requestId: string) { @@ -325,7 +342,10 @@ export default class IntervalClient { } if (!this.#serverRpc) { - this.#createRPCClient(requestId) + this.#serverRpc = this.#createRPCClient({ + requestId, + canCall: wsServerSchema, + }) } const result = new Promise((resolve, reject) => { @@ -351,6 +371,12 @@ export default class IntervalClient { this.#ws = undefined } + for (const dcc of this.#dccMap.values()) { + dcc.ds?.close() + dcc.peer.close() + } + this.#dccMap.clear() + this.#isConnected = false } @@ -416,10 +442,16 @@ export default class IntervalClient { /** * Resends pending IO calls upon reconnection. */ - async #resendPendingIOCalls() { + async #resendPendingIOCalls(resendToTransactionIds?: string[]) { if (!this.#isConnected) return - const toResend = new Map(this.#pendingIOCalls) + const toResend = resendToTransactionIds + ? new Map( + resendToTransactionIds + .map(id => [id, this.#pendingIOCalls.get(id)]) + .filter(([, state]) => !!state) as [string, string][] + ) + : new Map(this.#pendingIOCalls) while (toResend.size > 0) { await Promise.allSettled( @@ -471,10 +503,16 @@ export default class IntervalClient { /** * Resends pending transaction loading states upon reconnection. */ - async #resendTransactionLoadingStates() { + async #resendTransactionLoadingStates(resendToTransactionIds?: string[]) { if (!this.#isConnected) return - const toResend = new Map(this.#transactionLoadingStates) + const toResend = resendToTransactionIds + ? new Map( + resendToTransactionIds + .map(id => [id, this.#transactionLoadingStates.get(id)]) + .filter(([, state]) => !!state) as [string, LoadingState][] + ) + : new Map(this.#transactionLoadingStates) while (toResend.size > 0) { await Promise.allSettled( @@ -638,13 +676,117 @@ export default class IntervalClient { #createRPCHandlers(requestId?: string): DuplexRPCHandlers { const intervalClient = this return { + INITIALIZE_PEER_CONNECTION: async inputs => { + if (typeof window !== 'undefined') return + + const { default: DataChannelConnection } = await import( + './DataChannelConnection' + ) + + this.#logger.debug('INITIALIZE_PEER_CONNECTION:', inputs) + switch (inputs.type) { + case 'offer': { + const dcc = new DataChannelConnection({ + id: inputs.id, + send: inputs => + this.#send('INITIALIZE_PEER_CONNECTION', inputs).catch(err => { + this.#logger.debug( + 'Failed sending initialize peer connection', + err + ) + }), + rpcConstructor: props => { + const rpc = this.#createRPCClient(props) + rpc.communicator.onOpen.attach(() => { + const set = this.#peerIdToTransactionIdsMap.get(inputs.id) + if (set) { + this.#resendTransactionLoadingStates( + Array.from(set.values()) + ) + this.#resendPendingIOCalls(Array.from(set.values())) + } + }) + return rpc + }, + }) + this.#dccMap.set(inputs.id, dcc) + dcc.peer.setRemoteDescription( + inputs.description, + inputs.type as DescriptionType + ) + dcc.peer.onStateChange(state => { + this.#logger.debug('Peer state change:', state) + if (state === 'failed' || state === 'closed') { + this.#dccMap.delete(inputs.id) + } + }) + + break + } + case 'answer': { + const dcc = this.#dccMap.get(inputs.id) + + if (dcc) { + dcc.peer.setRemoteDescription( + inputs.description, + inputs.type as DescriptionType + ) + } else { + this.#logger.warn( + 'INITIALIZE_PEER_CONNECTION:', + 'DCC not found for id', + inputs.id + ) + } + break + } + case 'candidate': { + const dcc = this.#dccMap.get(inputs.id) + + if (dcc) { + dcc.peer.addRemoteCandidate(inputs.candidate, inputs.mid) + } else { + this.#logger.warn( + 'INITIALIZE_PEER_CONNECTION', + 'DCC not found for id', + inputs.id + ) + } + break + } + } + }, START_TRANSACTION: async inputs => { if (!intervalClient.organization) { intervalClient.#log.error('No organization defined') return } - const { action, transactionId } = inputs + const { action, transactionId, clientId } = inputs + const prevClientId = this.#peerIdMap.get(transactionId) + + if (clientId && clientId !== prevClientId) { + this.#peerIdMap.set(transactionId, clientId) + let set = this.#peerIdToTransactionIdsMap.get(clientId) + if (!set) { + set = new Set() + this.#peerIdToTransactionIdsMap.set(clientId, set) + } + set.add(transactionId) + } + + if (this.#ioResponseHandlers.has(transactionId)) { + this.#logger.debug('Transaction already started, not starting again') + + if (clientId !== prevClientId) { + // only resend if is a new peer connection + this.#resendPendingIOCalls([transactionId]) + this.#resendTransactionLoadingStates([transactionId]) + } + + return + } + const actionHandler = intervalClient.#actionHandlers.get(action.slug) intervalClient.#log.debug(actionHandler) @@ -666,10 +808,25 @@ export default class IntervalClient { toRender: ioCall, }) } else { - await intervalClient.#send('SEND_IO_CALL', { - transactionId, - ioCall, - }) + let attemptPeerSend = true + for (const renderMethod of ioRenderInstruction.toRender) { + const methodName = renderMethod.methodName as T_IO_METHOD_NAMES + if (requiresServer(methodName)) { + attemptPeerSend = false + break + } + } + + await intervalClient.#send( + 'SEND_IO_CALL', + { + transactionId, + ioCall, + }, + { + attemptPeerSend, + } + ) } intervalClient.#transactionLoadingStates.delete(transactionId) @@ -700,7 +857,7 @@ export default class IntervalClient { const ctx: ActionCtx = { user: inputs.user, - // TODO: Remove intervalClient when all active SDKs support superjson + // TODO: Remove this when all active SDKs support superjson params: deserializeDates(params), environment: inputs.environment, organization: intervalClient.organization, @@ -780,6 +937,7 @@ export default class IntervalClient { } else { await intervalClient.#send('MARK_TRANSACTION_COMPLETE', { transactionId, + resultStatus: res.status, result: JSON.stringify(res), }) } @@ -839,6 +997,14 @@ export default class IntervalClient { for (const key of client.inlineActionKeys.values()) { intervalClient.#actionHandlers.delete(key) } + + const peerId = this.#peerIdMap.get(transactionId) + if (peerId) { + this.#peerIdMap.delete(transactionId) + this.#peerIdToTransactionIdsMap + .get(peerId) + ?.delete(transactionId) + } }) } @@ -882,9 +1048,13 @@ export default class IntervalClient { return { type: 'ERROR' as const } } - const { pageKey } = inputs + const { pageKey, clientId } = inputs const pageHandler = this.#pageHandlers.get(inputs.page.slug) + if (clientId) { + this.#peerIdMap.set(pageKey, clientId) + } + if (!pageHandler) { this.#log.debug('No app handler called', inputs.page.slug) return { type: 'ERROR' as const } @@ -1215,6 +1385,7 @@ export default class IntervalClient { this.#pageIOClients.delete(inputs.pageKey) } + this.#peerIdMap.delete(inputs.pageKey) this.#ioResponseHandlers.delete(inputs.pageKey) }, } @@ -1224,19 +1395,25 @@ export default class IntervalClient { * Creates the DuplexRPCClient responsible for sending * messages to Interval. */ - #createRPCClient(requestId?: string) { - if (!this.#ws) { - throw new Error('ISocket not initialized') + #createRPCClient({ + communicator = this.#ws, + requestId, + canCall, + }: { + communicator?: ISocket + requestId?: string + canCall: CallerSchema + }) { + if (!communicator) { + throw new Error('Communicator not initialized') } - const serverRpc = new DuplexRPCClient({ - communicator: this.#ws, - canCall: wsServerSchema, + return new DuplexRPCClient({ + communicator, + canCall, canRespondTo: hostSchema, handlers: this.#createRPCHandlers(requestId), }) - - this.#serverRpc = serverRpc } /** @@ -1309,15 +1486,200 @@ export default class IntervalClient { return response } - async #send( + async #sendToClientPeer( + rpc: NonNullable, + methodName: MethodName, + inputs: z.input + ) { + const NUM_P2P_TRIES = 3 + for (let i = 0; i <= NUM_P2P_TRIES; i++) { + try { + return await rpc.send(methodName, inputs) + } catch (err) { + if (err instanceof TimeoutError) { + this.#log.debug( + `Peer RPC call timed out, retrying in ${Math.round( + this.#retryIntervalMs / 1000 + )}s...` + ) + this.#log.debug(err) + sleep(this.#retryIntervalMs) + } else { + throw err + } + } + } + + throw new TimeoutError() + } + + /** + * @returns undefined if was unsent, null if was sent but should send via server anyway, + * and true/false if was sent but should not send. This is obviously pretty bad code, should be fixed. + */ + async #attemptPeerSend( + methodName: MethodName, + serverInputs: z.input + ): Promise< + z.input | undefined | null + > { + const hostInstanceId = this.#ws?.id + let rpc: DataChannelConnection['rpc'] + const sessionKey = serverInputs + ? 'transactionId' in serverInputs + ? serverInputs.transactionId + : 'pageKey' in serverInputs + ? serverInputs.pageKey + : undefined + : undefined + + if (sessionKey) { + const key = this.#peerIdMap.get(sessionKey) + if (key) { + rpc = this.#dccMap.get(key)?.rpc + } + } + + if (hostInstanceId && rpc) { + this.#logger.debug( + 'Sending with peer connection', + methodName, + serverInputs + ) + + switch (methodName) { + case 'SEND_LOG': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_LOG']['inputs'] + > + + await this.#sendToClientPeer(rpc, 'LOG', { + ...inputs, + index: inputs.index as number, + timestamp: inputs.timestamp as number, + }) + // send to backend too + return null + } + case 'SEND_PAGE': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_PAGE']['inputs'] + > + return await this.#sendToClientPeer(rpc, 'RENDER_PAGE', { + ...inputs, + hostInstanceId, + }) + } + case 'SEND_IO_CALL': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_IO_CALL']['inputs'] + > + await this.#sendToClientPeer(rpc, 'RENDER', { + transactionId: inputs.transactionId, + toRender: inputs.ioCall, + }) + // send to backend too + return null + } + case 'MARK_TRANSACTION_COMPLETE': { + const inputs = serverInputs as z.input< + WSServerSchema['MARK_TRANSACTION_COMPLETE']['inputs'] + > + await this.#sendToClientPeer(rpc, 'TRANSACTION_COMPLETED', { + transactionId: inputs.transactionId, + resultStatus: inputs.resultStatus ?? 'SUCCESS', + result: inputs.result, + }) + + return null + } + case 'SEND_REDIRECT': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_REDIRECT']['inputs'] + > + + if ('url' in inputs) { + await this.#sendToClientPeer(rpc, 'REDIRECT', { + transactionId: inputs.transactionId, + url: inputs.url, + }) + } else if ('route' in inputs) { + await this.#sendToClientPeer(rpc, 'REDIRECT', { + transactionId: inputs.transactionId, + route: inputs.route, + params: inputs.params, + }) + } else { + await this.#sendToClientPeer(rpc, 'REDIRECT', { + transactionId: inputs.transactionId, + route: inputs.action, + params: inputs.params, + }) + } + + // send to backend too + return null + } + case 'SEND_LOADING_CALL': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_LOADING_CALL']['inputs'] + > + await this.#sendToClientPeer(rpc, 'LOADING_STATE', { + ...inputs, + }) + // send to backend too + return null + } + default: + this.#logger.debug( + 'Unsupported peer method', + methodName, + 'sending via server' + ) + } + } else { + this.#logger.debug( + 'No peer connection established, skipping', + methodName, + serverInputs + ) + } + + return undefined + } + + async #send( methodName: MethodName, - inputs: z.input + inputs: z.input, + { attemptPeerSend = true }: { attemptPeerSend?: boolean } = {} ) { if (!this.#serverRpc) throw new IntervalError('serverRpc not initialized') + let skipClientCall = false + + console.log('#send ??') + + try { + if (attemptPeerSend) { + const peerResponse = await this.#attemptPeerSend(methodName, inputs) + + if (peerResponse != null) { + return peerResponse + } else if (peerResponse === null) { + skipClientCall = true + } + } + } catch (err) { + this.#logger.debug('Error from peer RPC', err) + } + while (true) { try { - return await this.#serverRpc.send(methodName, inputs) + this.#logger.debug('Sending via server', methodName, inputs) + return await this.#serverRpc.send(methodName, { + ...inputs, + skipClientCall, + }) } catch (err) { if (err instanceof TimeoutError) { this.#log.debug( diff --git a/src/examples/app/index.ts b/src/examples/app/index.ts index 1eb4b84..58b5393 100644 --- a/src/examples/app/index.ts +++ b/src/examples/app/index.ts @@ -190,9 +190,11 @@ const users = new Page({ }) const interval = new Interval({ - apiKey: 'alex_dev_Bku6kYZlyhyvkCO36W5HnpwtXACI1khse8SnZ9PuwsmqdRfe', logLevel: 'debug', + apiKey: 'alex_dev_Bku6kYZlyhyvkCO36W5HnpwtXACI1khse8SnZ9PuwsmqdRfe', endpoint: 'ws://localhost:3000/websocket', + // apiKey: 'jacob_dev_VZo5ilzcng5iTwuUakQ1z7U88rMKfmIQU1Xc7mnYRRNrFf7U', + // endpoint: 'wss://staging.interval.com/websocket', routes: { hello_app, users, diff --git a/src/examples/basic/index.ts b/src/examples/basic/index.ts index 76171a1..6ef2bd9 100644 --- a/src/examples/basic/index.ts +++ b/src/examples/basic/index.ts @@ -1,4 +1,4 @@ -import { T_IO_METHOD, T_IO_PROPS } from './../../ioSchema' +import { T_IO_PROPS } from './../../ioSchema' import Interval, { IOError, io, ctx } from '../../index' import IntervalClient from '../../classes/IntervalClient' import { @@ -62,8 +62,33 @@ const actionLinks: IntervalActionHandler = async () => { const prod = new Interval({ apiKey: 'live_N47qd1BrOMApNPmVd0BiDZQRLkocfdJKzvt8W6JT5ICemrAN', endpoint: 'ws://localhost:3000/websocket', + logLevel: 'debug', routes: { + backgroundable: { + backgroundable: true, + handler: async () => { + const first = await io.input.text('First input') + const second = await io.input.text('Second input') + + return { first, second } + }, + }, actionLinks, + helloCurrentUser: { + name: 'Hello, current user!', + description: '👋', + handler: async () => { + console.log(ctx.params) + + let heading = `Hello, ${ctx.user.firstName} ${ctx.user.lastName}` + + if (ctx.params.message) { + heading += ` (Message: ${ctx.params.message})` + } + + return heading + }, + }, redirectWithoutWarningTest: async () => { const text = await io.input.text('Edit text before navigating', { defaultValue: 'Backspace me', @@ -595,6 +620,11 @@ const interval = new Interval({ return { num1, num2, sum: num1 + num2 } }, + logs: async (_, ctx) => { + for (let i = 0; i < 10; i++) { + ctx.log('Log number', i) + } + }, logTest: async (io, ctx) => { ctx.log(new Date().toUTCString()) const name = await io.input.text('Your name') diff --git a/src/internalRpcSchema.ts b/src/internalRpcSchema.ts index 8c053af..50705be 100644 --- a/src/internalRpcSchema.ts +++ b/src/internalRpcSchema.ts @@ -172,6 +172,40 @@ export const DECLARE_HOST = { ]), } +const INITIALIZE_PEER_CONNECTION = { + inputs: z.discriminatedUnion('type', [ + z.object({ + type: z.literal('offer'), + id: z.string(), + description: z.string(), + }), + z.object({ + type: z.literal('answer'), + id: z.string(), + description: z.string(), + }), + z.object({ + type: z.literal('candidate'), + id: z.string(), + candidate: z.string(), + mid: z.string(), + }), + z.object({ + type: z.literal('unspec'), + id: z.string(), + }), + z.object({ + type: z.literal('pranswer'), + id: z.string(), + }), + z.object({ + type: z.literal('rollback'), + id: z.string(), + }), + ]), + returns: z.void(), +} + export const wsServerSchema = { CONNECT_TO_TRANSACTION_AS_CLIENT: { inputs: z.object({ @@ -217,6 +251,7 @@ export const wsServerSchema = { inputs: z.object({ transactionId: z.string(), ioCall: z.string(), + skipClientCall: z.boolean().optional(), }), returns: z.boolean(), }, @@ -234,6 +269,7 @@ export const wsServerSchema = { z.object({ transactionId: z.string(), label: z.string().optional(), + skipClientCall: z.boolean().optional(), }) ), returns: z.boolean(), @@ -244,6 +280,7 @@ export const wsServerSchema = { data: z.string(), index: z.number().optional(), timestamp: z.number().optional(), + skipClientCall: z.boolean().optional(), }), returns: z.boolean(), }, @@ -269,6 +306,7 @@ export const wsServerSchema = { inputs: z.intersection( z.object({ transactionId: z.string(), + skipClientCall: z.boolean().optional(), }), legacyLinkSchema ), @@ -277,7 +315,11 @@ export const wsServerSchema = { MARK_TRANSACTION_COMPLETE: { inputs: z.object({ transactionId: z.string(), + resultStatus: z + .enum(['SUCCESS', 'FAILURE', 'CANCELED', 'REDIRECTED']) + .optional(), result: z.string().optional(), + skipClientCall: z.boolean().optional(), }), returns: z.boolean(), }, @@ -285,6 +327,7 @@ export const wsServerSchema = { inputs: z.undefined(), returns: z.boolean(), }, + INITIALIZE_PEER_CONNECTION, INITIALIZE_HOST: { inputs: z.intersection( z.object({ @@ -415,6 +458,7 @@ export const clientSchema = { ), returns: z.boolean(), }, + INITIALIZE_PEER_CONNECTION, } export type ClientSchema = typeof clientSchema @@ -423,6 +467,7 @@ export const hostSchema = { OPEN_PAGE: { inputs: z.object({ pageKey: z.string(), + clientId: z.string().optional(), page: z.object({ slug: z.string(), }), @@ -455,6 +500,7 @@ export const hostSchema = { START_TRANSACTION: { inputs: z.object({ transactionId: z.string(), + clientId: z.string().optional(), // Actually slug, for backward compatibility // TODO: Remove breaking release, superfluous with slug below actionName: z.string(), @@ -480,6 +526,11 @@ export const hostSchema = { }), returns: z.void().nullable(), }, + INITIALIZE_PEER_CONNECTION, } export type HostSchema = typeof hostSchema + +export type PeerConnectionInitializer = ( + inputs: z.infer +) => Promise> diff --git a/src/ioSchema.ts b/src/ioSchema.ts index 601bb3f..d986584 100644 --- a/src/ioSchema.ts +++ b/src/ioSchema.ts @@ -334,7 +334,13 @@ export type DateTimeObject = z.infer * will resolve immediately when awaited. */ export function resolvesImmediately(methodName: T_IO_METHOD_NAMES): boolean { - return 'immediate' in ioSchema[methodName] + const schema = ioSchema[methodName] + return schema && 'immediate' in schema && schema.immediate +} + +export function requiresServer(methodName: T_IO_METHOD_NAMES): boolean { + const schema = ioSchema[methodName] + return schema && 'requiresServer' in schema && schema.requiresServer } export const metaItemSchema = z.object({ @@ -439,13 +445,13 @@ const DISPLAY_SCHEMA = { }), state: z.null(), returns: z.null(), - immediate: z.literal(true), + immediate: true, }, DISPLAY_PROGRESS_INDETERMINATE: { props: z.object({}), state: z.null(), returns: z.null(), - immediate: z.literal(true), + immediate: true, }, DISPLAY_PROGRESS_THROUGH_LIST: { props: z.object({ @@ -638,6 +644,7 @@ const INPUT_SCHEMA = { state: z.null(), returns: z.boolean(), exclusive: z.literal(true), + requiresServer: true, }, SELECT_TABLE: { props: z.object({ From 18a9fd23a4f45c6f78763e60b3ef807bfdfe2d58 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Tue, 29 Nov 2022 15:29:42 -0600 Subject: [PATCH 3/9] Add chunking support for large datachannel messages This should potentially not be necessary in the future, some implementations already support virtually limitless messages, but many browsers don't yet (nor does node-datachannel, I think). https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Using_data_channels#concerns_with_large_messages --- src/classes/DuplexRPCClient.ts | 161 +++++++++++++++++++++++++-------- src/classes/ISocket.ts | 21 +++++ src/internalRpcSchema.ts | 27 ++++-- 3 files changed, 166 insertions(+), 43 deletions(-) diff --git a/src/classes/DuplexRPCClient.ts b/src/classes/DuplexRPCClient.ts index 114535c..11f0a25 100644 --- a/src/classes/DuplexRPCClient.ts +++ b/src/classes/DuplexRPCClient.ts @@ -1,6 +1,7 @@ import { z, ZodError } from 'zod' import type { DuplexMessage } from '../internalRpcSchema' import { DUPLEX_MESSAGE_SCHEMA } from '../internalRpcSchema' +import IntervalError from './IntervalError' import ISocket from './ISocket' let count = 0 @@ -18,31 +19,6 @@ export interface MethodDef { type OnReplyFn = (anyObject: any) => void -function packageResponse({ - id, - methodName, - data, -}: Omit) { - const preparedResponseText: DuplexMessage = { - id: id, - kind: 'RESPONSE', - methodName: methodName, - data, - } - return JSON.stringify(preparedResponseText) -} - -function packageCall({ id, methodName, data }: Omit) { - const callerData: DuplexMessage = { - id, - kind: 'CALL', - data, - methodName: methodName as string, // ?? - } - - return JSON.stringify(callerData) -} - export type DuplexRPCHandlers = { [Property in keyof ResponderSchema]: ( inputs: z.infer @@ -59,6 +35,18 @@ interface CreateDuplexRPCClientProps< handlers: DuplexRPCHandlers } +function getSizeBytes(str: string): number { + if (typeof Blob !== 'undefined') { + return new Blob([str]).size + } else if (typeof Buffer !== 'undefined') { + return Buffer.from(str).byteLength + } else { + throw new IntervalError( + 'Unsupported runtime, must have either Buffer or Blob global' + ) + } +} + /** * Responsible for making RPC calls to another DuplexRPCClient. * Can send messages from CallerSchema and respond to messages @@ -82,6 +70,7 @@ export class DuplexRPCClient< ) => Promise> } pendingCalls = new Map() + messageChunks = new Map() constructor({ communicator, @@ -96,13 +85,78 @@ export class DuplexRPCClient< this.handlers = handlers } + private packageResponse({ + id, + methodName, + data, + }: Omit) { + const preparedResponseText: DuplexMessage = { + id: id, + kind: 'RESPONSE', + methodName: methodName, + data, + } + return JSON.stringify(preparedResponseText) + } + + private packageCall({ + id, + methodName, + data, + }: Omit): string | string[] { + const callerData: DuplexMessage = { + id, + kind: 'CALL', + data, + methodName: methodName as string, // ?? + } + + const totalData = JSON.stringify(callerData) + const totalSize = getSizeBytes(totalData) + const maxMessageSize = this.communicator.maxMessageSize + if (maxMessageSize === undefined || totalSize < maxMessageSize) { + return totalData + } + + // console.debug('Chunking!') + // console.debug('Max size:', maxMessageSize) + + let chunkStart = 0 + const chunks: string[] = [] + + const MESSAGE_OVERHEAD_SIZE = 4096 // magic number from experimentation + while (chunkStart < totalData.length) { + const chunkEnd = chunkStart + maxMessageSize - MESSAGE_OVERHEAD_SIZE + chunks.push(totalData.substring(chunkStart, chunkEnd)) + chunkStart = chunkEnd + } + + const totalChunks = chunks.length + return chunks.map((data, chunk) => { + const chunkData: DuplexMessage = { + id, + kind: 'CALL_CHUNK', + totalChunks, + chunk, + data, + } + + const chunkString = JSON.stringify(chunkData) + + // console.debug('Data size:', getSizeBytes(data)) + // console.debug('Chunk size:', getSizeBytes(chunkString)) + + return chunkString + }) + } + public setCommunicator(newCommunicator: ISocket): void { this.communicator.onMessage.detach() this.communicator = newCommunicator this.communicator.onMessage.attach(this.onmessage.bind(this)) } - private handleReceivedResponse(parsed: DuplexMessage) { + private handleReceivedResponse(parsed: DuplexMessage & { kind: 'RESPONSE' }) { const onReplyFn = this.pendingCalls.get(parsed.id) if (!onReplyFn) return @@ -110,7 +164,7 @@ export class DuplexRPCClient< this.pendingCalls.delete(parsed.id) } - private async handleReceivedCall(parsed: DuplexMessage) { + private async handleReceivedCall(parsed: DuplexMessage & { kind: 'CALL' }) { type MethodKeys = keyof typeof this.canRespondTo const methodName = parsed.methodName as MethodKeys @@ -127,7 +181,7 @@ export class DuplexRPCClient< const returnValue = await handler(inputs) - const preparedResponseText = packageResponse({ + const preparedResponseText = this.packageResponse({ id: parsed.id, methodName: methodName as string, //?? data: returnValue, @@ -145,7 +199,32 @@ export class DuplexRPCClient< private async onmessage(data: unknown) { const txt = data as string try { - const inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(txt)) + let inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(txt)) + + if (inputParsed.kind === 'CALL_CHUNK') { + let chunks = this.messageChunks.get(inputParsed.id) + if (!chunks) { + chunks = Array(inputParsed.totalChunks) + this.messageChunks.set(inputParsed.id, chunks) + } + chunks[inputParsed.chunk] = inputParsed.data + let complete = true + for (let i = 0; i < inputParsed.totalChunks; i++) { + complete = complete && !!chunks[i] + } + if (complete) { + const combinedData = chunks.join('') + try { + inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(combinedData)) + } catch (err) { + console.error( + '[DuplexRPCClient] Failed reconstructing chunked call:', + err + ) + throw err + } + } + } if (inputParsed.kind === 'CALL') { try { @@ -164,9 +243,7 @@ export class DuplexRPCClient< } console.error(err) } - } - - if (inputParsed.kind === 'RESPONSE') { + } else if (inputParsed.kind === 'RESPONSE') { try { this.handleReceivedResponse(inputParsed) } catch (err) { @@ -191,13 +268,13 @@ export class DuplexRPCClient< } } - public send( + public async send( methodName: MethodName, inputs: z.input ) { const id = generateId() - const msg = packageCall({ + const msg = this.packageCall({ id, data: inputs, methodName: methodName as string, // ?? @@ -216,9 +293,19 @@ export class DuplexRPCClient< } }) - this.communicator.send(msg).catch(err => { - reject(err) - }) + if (Array.isArray(msg)) { + Promise.all( + msg.map(chunk => { + this.communicator.send(chunk) + }) + ).catch(err => { + reject(err) + }) + } else { + this.communicator.send(msg).catch(err => { + reject(err) + }) + } }) } } diff --git a/src/classes/ISocket.ts b/src/classes/ISocket.ts index a519f37..faf773e 100644 --- a/src/classes/ISocket.ts +++ b/src/classes/ISocket.ts @@ -58,6 +58,19 @@ export class DataChannelSocket { } } + get maxMessageSize(): number { + // This is unreliable, is the max size the other end supports but not necessarily what the originating end supports. + // Also it's not always implemented in browsers yet, for some reason. + // + // if ('maxMessageSize' in this.dc) { + // return this.dc.maxMessageSize() + // } + + // Firefox can support up to 1GB, but this is the safe lower-bound assumption for compatibility + // https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Using_data_channels#concerns_with_large_messages + return 16_000 + } + close(code?: number, reason?: string) { // TODO: Do something with codes? this.#readyState = 'closing' @@ -155,6 +168,14 @@ export default class ISocket { private pendingMessages = new Map() + get maxMessageSize(): number | undefined { + if ('maxMessageSize' in this.ws) { + return this.ws.maxMessageSize + } + + return undefined + } + /** Client **/ /** * Establishes an ISocket connection to the connected WebSocket diff --git a/src/internalRpcSchema.ts b/src/internalRpcSchema.ts index 50705be..654809f 100644 --- a/src/internalRpcSchema.ts +++ b/src/internalRpcSchema.ts @@ -6,12 +6,27 @@ import { serializableRecord, } from './ioSchema' -export const DUPLEX_MESSAGE_SCHEMA = z.object({ - id: z.string(), - methodName: z.string(), - data: z.any(), - kind: z.enum(['CALL', 'RESPONSE']), -}) +export const DUPLEX_MESSAGE_SCHEMA = z.discriminatedUnion('kind', [ + z.object({ + id: z.string(), + kind: z.literal('CALL'), + methodName: z.string(), + data: z.any(), + }), + z.object({ + id: z.string(), + kind: z.literal('RESPONSE'), + methodName: z.string(), + data: z.any(), + }), + z.object({ + id: z.string(), + kind: z.literal('CALL_CHUNK'), + chunk: z.number(), + totalChunks: z.number(), + data: z.string(), + }), +]) export type DuplexMessage = z.infer From c1ad1136e508e58f9997545d113434a0fbefd47e Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Wed, 30 Nov 2022 10:50:59 -0600 Subject: [PATCH 4/9] Add flag to force peer messages, don't fall back to WSS messages Only for a few transactional messages: - `SEND_IO_CALL` - `SEND_LOADING_STATE` - `SEND_PAGE` The rest are required for bookkeeping, and shouldn't have much protected info in them anyway (aside from logs, which can be changed to use peer-only too). --- src/classes/IntervalClient.ts | 66 +++++++++++++++++++++++------------ src/internalRpcSchema.ts | 1 + 2 files changed, 44 insertions(+), 23 deletions(-) diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index afde785..b3180d7 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -129,6 +129,7 @@ export default class IntervalClient { } | undefined environment: ActionEnvironment | undefined + #forcePeerMessages = false constructor(interval: Interval, config: InternalConfig) { this.#interval = interval @@ -1474,6 +1475,7 @@ export default class IntervalClient { this.organization = response.organization this.environment = response.environment + this.#forcePeerMessages = response.forcePeerMessages ?? false if (isInitialInitialization) { this.#log.prod( @@ -1524,7 +1526,7 @@ export default class IntervalClient { z.input | undefined | null > { const hostInstanceId = this.#ws?.id - let rpc: DataChannelConnection['rpc'] + let dcc: DataChannelConnection | undefined const sessionKey = serverInputs ? 'transactionId' in serverInputs ? serverInputs.transactionId @@ -1536,11 +1538,11 @@ export default class IntervalClient { if (sessionKey) { const key = this.#peerIdMap.get(sessionKey) if (key) { - rpc = this.#dccMap.get(key)?.rpc + dcc = this.#dccMap.get(key) } } - if (hostInstanceId && rpc) { + if (hostInstanceId && dcc?.rpc) { this.#logger.debug( 'Sending with peer connection', methodName, @@ -1553,11 +1555,12 @@ export default class IntervalClient { WSServerSchema['SEND_LOG']['inputs'] > - await this.#sendToClientPeer(rpc, 'LOG', { + await this.#sendToClientPeer(dcc.rpc, 'LOG', { ...inputs, index: inputs.index as number, timestamp: inputs.timestamp as number, }) + // send to backend too return null } @@ -1565,7 +1568,7 @@ export default class IntervalClient { const inputs = serverInputs as z.input< WSServerSchema['SEND_PAGE']['inputs'] > - return await this.#sendToClientPeer(rpc, 'RENDER_PAGE', { + return await this.#sendToClientPeer(dcc.rpc, 'RENDER_PAGE', { ...inputs, hostInstanceId, }) @@ -1574,23 +1577,29 @@ export default class IntervalClient { const inputs = serverInputs as z.input< WSServerSchema['SEND_IO_CALL']['inputs'] > - await this.#sendToClientPeer(rpc, 'RENDER', { + const response = await this.#sendToClientPeer(dcc.rpc, 'RENDER', { transactionId: inputs.transactionId, toRender: inputs.ioCall, }) - // send to backend too - return null + + if (this.#forcePeerMessages) { + return response + } else { + // send to backend too + return null + } } case 'MARK_TRANSACTION_COMPLETE': { const inputs = serverInputs as z.input< WSServerSchema['MARK_TRANSACTION_COMPLETE']['inputs'] > - await this.#sendToClientPeer(rpc, 'TRANSACTION_COMPLETED', { + await this.#sendToClientPeer(dcc.rpc, 'TRANSACTION_COMPLETED', { transactionId: inputs.transactionId, resultStatus: inputs.resultStatus ?? 'SUCCESS', result: inputs.result, }) + // send to backend too return null } case 'SEND_REDIRECT': { @@ -1599,18 +1608,18 @@ export default class IntervalClient { > if ('url' in inputs) { - await this.#sendToClientPeer(rpc, 'REDIRECT', { + await this.#sendToClientPeer(dcc.rpc, 'REDIRECT', { transactionId: inputs.transactionId, url: inputs.url, }) } else if ('route' in inputs) { - await this.#sendToClientPeer(rpc, 'REDIRECT', { + await this.#sendToClientPeer(dcc.rpc, 'REDIRECT', { transactionId: inputs.transactionId, route: inputs.route, params: inputs.params, }) } else { - await this.#sendToClientPeer(rpc, 'REDIRECT', { + await this.#sendToClientPeer(dcc.rpc, 'REDIRECT', { transactionId: inputs.transactionId, route: inputs.action, params: inputs.params, @@ -1624,11 +1633,20 @@ export default class IntervalClient { const inputs = serverInputs as z.input< WSServerSchema['SEND_LOADING_CALL']['inputs'] > - await this.#sendToClientPeer(rpc, 'LOADING_STATE', { - ...inputs, - }) - // send to backend too - return null + const response = await this.#sendToClientPeer( + dcc.rpc, + 'LOADING_STATE', + { + ...inputs, + } + ) + + if (this.#forcePeerMessages) { + return response + } else { + // send to backend too + return null + } } default: this.#logger.debug( @@ -1638,11 +1656,15 @@ export default class IntervalClient { ) } } else { - this.#logger.debug( - 'No peer connection established, skipping', - methodName, - serverInputs + if ( + this.#forcePeerMessages && + ['SEND_LOADING_CALL', 'SEND_IO_CALL', 'SEND_PAGE'].includes(methodName) ) + this.#logger.debug( + 'No peer connection established, skipping', + methodName, + serverInputs + ) } return undefined @@ -1657,8 +1679,6 @@ export default class IntervalClient { let skipClientCall = false - console.log('#send ??') - try { if (attemptPeerSend) { const peerResponse = await this.#attemptPeerSend(methodName, inputs) diff --git a/src/internalRpcSchema.ts b/src/internalRpcSchema.ts index 654809f..c327ba9 100644 --- a/src/internalRpcSchema.ts +++ b/src/internalRpcSchema.ts @@ -374,6 +374,7 @@ export const wsServerSchema = { slug: z.string(), }), dashboardUrl: z.string(), + forcePeerMessages: z.boolean().optional(), sdkAlert: SDK_ALERT.nullish(), warnings: z.array(z.string()), }), From 10c5b4c71fb580b3bf9d67fb18180f0937afc32e Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Wed, 30 Nov 2022 13:06:35 -0600 Subject: [PATCH 5/9] Add dynamic ICE configuration with turn servers from Twilio Fallback to the public default stun servers otherwise. The fallbacks are still hardcoded, it didn't seem worth it to set up a dynamic system for them given that the Twilio ones should be used most of the time. --- src/classes/DataChannelConnection.ts | 9 +- src/classes/IntervalClient.ts | 144 ++++++++++++++------------- src/index.ts | 21 +++- src/internalRpcSchema.ts | 18 ++++ 4 files changed, 119 insertions(+), 73 deletions(-) diff --git a/src/classes/DataChannelConnection.ts b/src/classes/DataChannelConnection.ts index b5d53ae..e6d9b09 100644 --- a/src/classes/DataChannelConnection.ts +++ b/src/classes/DataChannelConnection.ts @@ -1,4 +1,4 @@ -import { PeerConnection } from 'node-datachannel' +import { PeerConnection, IceServer } from 'node-datachannel' import type { DuplexRPCClient } from './DuplexRPCClient' import { HostSchema, @@ -15,10 +15,12 @@ export default class DataChannelConnection { constructor({ id, + iceServers, send, rpcConstructor, }: { id: string + iceServers: (string | IceServer)[] send: PeerConnectionInitializer rpcConstructor: (props: { communicator: ISocket @@ -26,10 +28,7 @@ export default class DataChannelConnection { }) => DuplexRPCClient }) { this.peer = new PeerConnection(id, { - iceServers: [ - 'stun:stun.l.google.com:19302', - 'stun:global.stun.twilio.com:3478', - ], + iceServers, }) // For some reason these cause segfaults? // this.peer.onStateChange(state => { diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index b3180d7..49c9ceb 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -5,7 +5,7 @@ import fetch from 'node-fetch' import * as superjson from 'superjson' import { JSONValue } from 'superjson/dist/types' import ISocket, { TimeoutError } from './ISocket' -import type { DescriptionType } from 'node-datachannel' +import type { DescriptionType, IceServer } from 'node-datachannel' import { DuplexRPCClient, DuplexRPCHandlers, @@ -680,81 +680,91 @@ export default class IntervalClient { INITIALIZE_PEER_CONNECTION: async inputs => { if (typeof window !== 'undefined') return - const { default: DataChannelConnection } = await import( - './DataChannelConnection' - ) - - this.#logger.debug('INITIALIZE_PEER_CONNECTION:', inputs) - switch (inputs.type) { - case 'offer': { - const dcc = new DataChannelConnection({ - id: inputs.id, - send: inputs => - this.#send('INITIALIZE_PEER_CONNECTION', inputs).catch(err => { - this.#logger.debug( - 'Failed sending initialize peer connection', - err - ) - }), - rpcConstructor: props => { - const rpc = this.#createRPCClient(props) - rpc.communicator.onOpen.attach(() => { - const set = this.#peerIdToTransactionIdsMap.get(inputs.id) - if (set) { - this.#resendTransactionLoadingStates( - Array.from(set.values()) - ) - this.#resendPendingIOCalls(Array.from(set.values())) - } - }) - return rpc - }, - }) - this.#dccMap.set(inputs.id, dcc) - dcc.peer.setRemoteDescription( - inputs.description, - inputs.type as DescriptionType - ) - dcc.peer.onStateChange(state => { - this.#logger.debug('Peer state change:', state) - if (state === 'failed' || state === 'closed') { - this.#dccMap.delete(inputs.id) - } - }) - - break - } - case 'answer': { - const dcc = this.#dccMap.get(inputs.id) + try { + this.#logger.debug('INITIALIZE_PEER_CONNECTION:', inputs) + switch (inputs.type) { + case 'offer': { + const { default: DataChannelConnection } = await import( + './DataChannelConnection' + ) - if (dcc) { + const iceConfig = await this.#interval.fetchIceConfig() + + const dcc = new DataChannelConnection({ + id: inputs.id, + // TypeScript an enum and not a string, though they're equivalent + iceServers: iceConfig.iceServers as IceServer[], + send: inputs => + this.#send('INITIALIZE_PEER_CONNECTION', inputs).catch( + err => { + this.#logger.debug( + 'Failed sending initialize peer connection', + err + ) + } + ), + rpcConstructor: props => { + const rpc = this.#createRPCClient(props) + rpc.communicator.onOpen.attach(() => { + const set = this.#peerIdToTransactionIdsMap.get(inputs.id) + if (set) { + this.#resendTransactionLoadingStates( + Array.from(set.values()) + ) + this.#resendPendingIOCalls(Array.from(set.values())) + } + }) + return rpc + }, + }) + this.#dccMap.set(inputs.id, dcc) dcc.peer.setRemoteDescription( inputs.description, inputs.type as DescriptionType ) - } else { - this.#logger.warn( - 'INITIALIZE_PEER_CONNECTION:', - 'DCC not found for id', - inputs.id - ) + dcc.peer.onStateChange(state => { + this.#logger.debug('Peer state change:', state) + if (state === 'failed' || state === 'closed') { + this.#dccMap.delete(inputs.id) + } + }) + + break } - break - } - case 'candidate': { - const dcc = this.#dccMap.get(inputs.id) + case 'answer': { + const dcc = this.#dccMap.get(inputs.id) - if (dcc) { - dcc.peer.addRemoteCandidate(inputs.candidate, inputs.mid) - } else { - this.#logger.warn( - 'INITIALIZE_PEER_CONNECTION', - 'DCC not found for id', - inputs.id - ) + if (dcc) { + dcc.peer.setRemoteDescription( + inputs.description, + inputs.type as DescriptionType + ) + } else { + this.#logger.warn( + 'INITIALIZE_PEER_CONNECTION:', + 'DCC not found for id', + inputs.id + ) + } + break + } + case 'candidate': { + const dcc = this.#dccMap.get(inputs.id) + + if (dcc) { + dcc.peer.addRemoteCandidate(inputs.candidate, inputs.mid) + } else { + this.#logger.warn( + 'INITIALIZE_PEER_CONNECTION', + 'DCC not found for id', + inputs.id + ) + } + break } - break } + } catch (err) { + this.#logger.error('Failed initializing peer connection', err) } }, START_TRANSACTION: async inputs => { diff --git a/src/index.ts b/src/index.ts index 5c518b8..062d57f 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,13 @@ import Routes from './classes/Routes' import IOError from './classes/IOError' import Logger from './classes/Logger' import Page from './classes/Page' -import { NOTIFY, ClientSchema, HostSchema } from './internalRpcSchema' +import { + NOTIFY, + ClientSchema, + HostSchema, + ICE_CONFIG, + IceConfig, +} from './internalRpcSchema' import { DuplexRPCHandlers } from './classes/DuplexRPCClient' import { SerializableRecord } from './ioSchema' import type { @@ -203,6 +209,19 @@ export default class Interval { return this.#client } + async fetchIceConfig(): Promise { + const response = await fetch(`${this.#httpEndpoint}/api/ice-config`, { + method: 'GET', + headers: { + Authorization: `Bearer ${this.#apiKey}`, + }, + }).then(r => r.json()) + + const parsed = ICE_CONFIG.parse(response) + + return parsed + } + async notify(config: NotifyConfig): Promise { if ( !config.transactionId && diff --git a/src/internalRpcSchema.ts b/src/internalRpcSchema.ts index c327ba9..11b8f64 100644 --- a/src/internalRpcSchema.ts +++ b/src/internalRpcSchema.ts @@ -94,6 +94,24 @@ export const PAGE_DEFINITION = z.object({ export type PageDefinition = z.infer +export const ICE_SERVER = z.object({ + url: z.string(), + urls: z.string(), + hostname: z.string(), + port: z.number(), + relayType: z.enum(['TurnUdp', 'TurnTcp', 'TurnTls']).optional(), + username: z.string().optional(), + credential: z.string().optional(), +}) + +export type IceServer = z.infer + +export const ICE_CONFIG = z.object({ + iceServers: z.array(ICE_SERVER), +}) + +export type IceConfig = z.infer + export const ENQUEUE_ACTION = { inputs: z.object({ slug: z.string(), From ba71279d06ce72de21bdc73a88c683c70aee82b7 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Thu, 1 Dec 2022 10:31:02 -0600 Subject: [PATCH 6/9] Add pings to peer connection, attempt to disconnect on failure Disconnecting does not quite work properly yet, new connections are made on demand but old connections aren't being cleaned up because handlers aren't being called as expected yet. --- src/classes/ISocket.ts | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/src/classes/ISocket.ts b/src/classes/ISocket.ts index faf773e..8b9f246 100644 --- a/src/classes/ISocket.ts +++ b/src/classes/ISocket.ts @@ -289,7 +289,7 @@ export default class ISocket { } this.ws.onclose = (ev?: CloseEvent) => { - this.onClose.post([ev?.code ?? 0, ev?.reason ?? 'idk']) + this.onClose.post([ev?.code ?? 0, ev?.reason ?? 'Unknown']) } this.ws.onerror = (ev: ErrorEvent | Event) => { @@ -320,9 +320,11 @@ export default class ISocket { if (meta.data === 'authenticated') { this.isAuthenticated = true this.onAuthenticated.post() - return + } else if (meta.data === 'ping') { + // do nothing + } else { + this.onMessage.post(meta.data) } - this.onMessage.post(meta.data) } } @@ -353,13 +355,6 @@ export default class ISocket { async ping() { if (this.isClosed) throw new NotConnectedError() - if (!('ping' in this.ws)) { - // Not supported in web client WebSocket - throw new Error( - 'ping not supported in this underlying websocket connection' - ) - } - const ws = this.ws return new Promise((resolve, reject) => { const pongTimeout = setTimeout( @@ -377,11 +372,16 @@ export default class ISocket { resolve() }, }) - ws.ping(id, undefined, err => { - if (err) { - reject(err) - } - }) + + if ('ping' in ws) { + ws.ping(id, undefined, err => { + if (err) { + reject(err) + } + }) + } else { + ws.send(JSON.stringify({ type: 'MESSAGE', id, data: 'ping' })) + } }) } } From 1e3cbc2eefe8d857a51162c9b6a10289b58b5268 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Thu, 1 Dec 2022 12:44:08 -0600 Subject: [PATCH 7/9] Save peer candidates that are sent before connection initialization --- src/classes/IntervalClient.ts | 28 +++++++++++++++++++++++----- src/internalRpcSchema.ts | 16 ++++++++++------ 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index 49c9ceb..55043f3 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -25,6 +25,7 @@ import { HostSchema, ClientSchema, WSServerSchema, + PeerCandidate, } from '../internalRpcSchema' import { ActionResultSchema, @@ -274,6 +275,7 @@ export default class IntervalClient { #ws: ISocket | undefined = undefined #dccMap = new Map() + #pendingCandidatesMap = new Map() #peerIdMap = new Map() #peerIdToTransactionIdsMap = new Map>() @@ -729,6 +731,19 @@ export default class IntervalClient { } }) + const pendingCandidates = this.#pendingCandidatesMap.get( + inputs.id + ) + if (pendingCandidates) { + for (const candidate of pendingCandidates) { + dcc.peer.addRemoteCandidate( + candidate.candidate, + candidate.mid + ) + } + this.#pendingCandidatesMap.delete(inputs.id) + } + break } case 'answer': { @@ -742,8 +757,8 @@ export default class IntervalClient { } else { this.#logger.warn( 'INITIALIZE_PEER_CONNECTION:', - 'DCC not found for id', - inputs.id + 'DCC not found for inputs', + inputs ) } break @@ -754,11 +769,14 @@ export default class IntervalClient { if (dcc) { dcc.peer.addRemoteCandidate(inputs.candidate, inputs.mid) } else { - this.#logger.warn( - 'INITIALIZE_PEER_CONNECTION', - 'DCC not found for id', + let pendingCandidates = this.#pendingCandidatesMap.get( inputs.id ) + if (!pendingCandidates) { + pendingCandidates = [] + this.#pendingCandidatesMap.set(inputs.id, pendingCandidates) + } + pendingCandidates.push(inputs) } break } diff --git a/src/internalRpcSchema.ts b/src/internalRpcSchema.ts index 11b8f64..afcb03b 100644 --- a/src/internalRpcSchema.ts +++ b/src/internalRpcSchema.ts @@ -205,6 +205,15 @@ export const DECLARE_HOST = { ]), } +const PEER_CANDIDATE = z.object({ + type: z.literal('candidate'), + id: z.string(), + candidate: z.string(), + mid: z.string(), +}) + +export type PeerCandidate = z.infer + const INITIALIZE_PEER_CONNECTION = { inputs: z.discriminatedUnion('type', [ z.object({ @@ -217,12 +226,7 @@ const INITIALIZE_PEER_CONNECTION = { id: z.string(), description: z.string(), }), - z.object({ - type: z.literal('candidate'), - id: z.string(), - candidate: z.string(), - mid: z.string(), - }), + PEER_CANDIDATE, z.object({ type: z.literal('unspec'), id: z.string(), From 5b3e7815e924686465599e120d49219b4edbae08 Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Wed, 7 Dec 2022 12:04:18 -0600 Subject: [PATCH 8/9] Listen to iceconnectionstatechange to clean up client connection map The `close` event in RTCDataChannel is just not being called for some reason. It does not seem to be due to our code from what I can tell, we are calling RTCDataChannel.close() but the event is not being fired. Will want to investigate further before publicly releasing, but it is behind a flag for now. --- src/classes/IntervalClient.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index 55043f3..181aaba 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -726,7 +726,11 @@ export default class IntervalClient { ) dcc.peer.onStateChange(state => { this.#logger.debug('Peer state change:', state) - if (state === 'failed' || state === 'closed') { + if ( + state === 'failed' || + state === 'closed' || + state === 'disconnected' + ) { this.#dccMap.delete(inputs.id) } }) From 042b59496065f39f1b2d128ef294ac329c7c975c Mon Sep 17 00:00:00 2001 From: Jacob Mischka Date: Wed, 7 Dec 2022 13:21:35 -0600 Subject: [PATCH 9/9] Fix some flaky tests for forced peer mode --- src/examples/basic/index.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/examples/basic/index.ts b/src/examples/basic/index.ts index 6ef2bd9..701f5d1 100644 --- a/src/examples/basic/index.ts +++ b/src/examples/basic/index.ts @@ -622,7 +622,8 @@ const interval = new Interval({ }, logs: async (_, ctx) => { for (let i = 0; i < 10; i++) { - ctx.log('Log number', i) + await ctx.log('Log number', i) + await sleep(500) } }, logTest: async (io, ctx) => {