diff --git a/package.json b/package.json index b958b67..19aac65 100644 --- a/package.json +++ b/package.json @@ -1,13 +1,18 @@ { "name": "@interval/sdk", - "version": "0.34.0", + "version": "0.35.0-dev", "homepage": "https://interval.com", "repository": { "type": "git", "url": "github:interval/interval-node" }, "bugs": "https://github.com/interval/interval-node/issues", - "keywords": ["internal tool", "app", "ui", "ui builder"], + "keywords": [ + "internal tool", + "app", + "ui", + "ui builder" + ], "license": "MIT", "engines": { "node": ">=12.17.0" @@ -24,6 +29,7 @@ "dependencies": { "cross-fetch": "^3.1.5", "evt": "^2.4.10", + "node-datachannel": "^0.4.0", "superjson": "^1.9.1", "uuid": "^9.0.0", "ws": "^8.4.1", diff --git a/src/classes/DataChannelConnection.ts b/src/classes/DataChannelConnection.ts new file mode 100644 index 0000000..e6d9b09 --- /dev/null +++ b/src/classes/DataChannelConnection.ts @@ -0,0 +1,58 @@ +import { PeerConnection, IceServer } from 'node-datachannel' +import type { DuplexRPCClient } from './DuplexRPCClient' +import { + HostSchema, + PeerConnectionInitializer, + ClientSchema, + clientSchema, +} from '../internalRpcSchema' +import ISocket, { DataChannelSocket } from './ISocket' + +export default class DataChannelConnection { + peer: PeerConnection + ds?: DataChannelSocket + rpc?: DuplexRPCClient + + constructor({ + id, + iceServers, + send, + rpcConstructor, + }: { + id: string + iceServers: (string | IceServer)[] + send: PeerConnectionInitializer + rpcConstructor: (props: { + communicator: ISocket + canCall: ClientSchema + }) => DuplexRPCClient + }) { + this.peer = new PeerConnection(id, { + iceServers, + }) + // For some reason these cause segfaults? + // this.peer.onStateChange(state => { + // console.log('State:', state) + // }) + // this.peer.onGatheringStateChange(state => { + // console.log('GatheringState:', state) + // }) + this.peer.onLocalDescription((description, type) => { + send({ id, type, description }) + }) + this.peer.onLocalCandidate((candidate, mid) => { + send({ id, type: 'candidate', candidate, mid }) + }) + this.peer.onDataChannel(dc => { + const ds = new DataChannelSocket(dc) + this.ds = ds + + const communicator = new ISocket(ds) + + this.rpc = rpcConstructor({ + communicator, + canCall: clientSchema, + }) + }) + } +} diff --git a/src/classes/DuplexRPCClient.ts b/src/classes/DuplexRPCClient.ts index 0d4d612..11f0a25 100644 --- a/src/classes/DuplexRPCClient.ts +++ b/src/classes/DuplexRPCClient.ts @@ -1,6 +1,7 @@ import { z, ZodError } from 'zod' import type { DuplexMessage } from '../internalRpcSchema' import { DUPLEX_MESSAGE_SCHEMA } from '../internalRpcSchema' +import IntervalError from './IntervalError' import ISocket from './ISocket' let count = 0 @@ -9,7 +10,7 @@ function generateId() { return count.toString() } -interface MethodDef { +export interface MethodDef { [key: string]: { inputs: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion returns: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion @@ -18,31 +19,6 @@ interface MethodDef { type OnReplyFn = (anyObject: any) => void -function packageResponse({ - id, - methodName, - data, -}: Omit) { - const preparedResponseText: DuplexMessage = { - id: id, - kind: 'RESPONSE', - methodName: methodName, - data, - } - return JSON.stringify(preparedResponseText) -} - -function packageCall({ id, methodName, data }: Omit) { - const callerData: DuplexMessage = { - id, - kind: 'CALL', - data, - methodName: methodName as string, // ?? - } - - return JSON.stringify(callerData) -} - export type DuplexRPCHandlers = { [Property in keyof ResponderSchema]: ( inputs: z.infer @@ -59,6 +35,18 @@ interface CreateDuplexRPCClientProps< handlers: DuplexRPCHandlers } +function getSizeBytes(str: string): number { + if (typeof Blob !== 'undefined') { + return new Blob([str]).size + } else if (typeof Buffer !== 'undefined') { + return Buffer.from(str).byteLength + } else { + throw new IntervalError( + 'Unsupported runtime, must have either Buffer or Blob global' + ) + } +} + /** * Responsible for making RPC calls to another DuplexRPCClient. * Can send messages from CallerSchema and respond to messages @@ -82,6 +70,7 @@ export class DuplexRPCClient< ) => Promise> } pendingCalls = new Map() + messageChunks = new Map() constructor({ communicator, @@ -96,13 +85,78 @@ export class DuplexRPCClient< this.handlers = handlers } + private packageResponse({ + id, + methodName, + data, + }: Omit) { + const preparedResponseText: DuplexMessage = { + id: id, + kind: 'RESPONSE', + methodName: methodName, + data, + } + return JSON.stringify(preparedResponseText) + } + + private packageCall({ + id, + methodName, + data, + }: Omit): string | string[] { + const callerData: DuplexMessage = { + id, + kind: 'CALL', + data, + methodName: methodName as string, // ?? + } + + const totalData = JSON.stringify(callerData) + const totalSize = getSizeBytes(totalData) + const maxMessageSize = this.communicator.maxMessageSize + if (maxMessageSize === undefined || totalSize < maxMessageSize) { + return totalData + } + + // console.debug('Chunking!') + // console.debug('Max size:', maxMessageSize) + + let chunkStart = 0 + const chunks: string[] = [] + + const MESSAGE_OVERHEAD_SIZE = 4096 // magic number from experimentation + while (chunkStart < totalData.length) { + const chunkEnd = chunkStart + maxMessageSize - MESSAGE_OVERHEAD_SIZE + chunks.push(totalData.substring(chunkStart, chunkEnd)) + chunkStart = chunkEnd + } + + const totalChunks = chunks.length + return chunks.map((data, chunk) => { + const chunkData: DuplexMessage = { + id, + kind: 'CALL_CHUNK', + totalChunks, + chunk, + data, + } + + const chunkString = JSON.stringify(chunkData) + + // console.debug('Data size:', getSizeBytes(data)) + // console.debug('Chunk size:', getSizeBytes(chunkString)) + + return chunkString + }) + } + public setCommunicator(newCommunicator: ISocket): void { this.communicator.onMessage.detach() this.communicator = newCommunicator this.communicator.onMessage.attach(this.onmessage.bind(this)) } - private handleReceivedResponse(parsed: DuplexMessage) { + private handleReceivedResponse(parsed: DuplexMessage & { kind: 'RESPONSE' }) { const onReplyFn = this.pendingCalls.get(parsed.id) if (!onReplyFn) return @@ -110,7 +164,7 @@ export class DuplexRPCClient< this.pendingCalls.delete(parsed.id) } - private async handleReceivedCall(parsed: DuplexMessage) { + private async handleReceivedCall(parsed: DuplexMessage & { kind: 'CALL' }) { type MethodKeys = keyof typeof this.canRespondTo const methodName = parsed.methodName as MethodKeys @@ -127,7 +181,7 @@ export class DuplexRPCClient< const returnValue = await handler(inputs) - const preparedResponseText = packageResponse({ + const preparedResponseText = this.packageResponse({ id: parsed.id, methodName: methodName as string, //?? data: returnValue, @@ -145,7 +199,32 @@ export class DuplexRPCClient< private async onmessage(data: unknown) { const txt = data as string try { - const inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(txt)) + let inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(txt)) + + if (inputParsed.kind === 'CALL_CHUNK') { + let chunks = this.messageChunks.get(inputParsed.id) + if (!chunks) { + chunks = Array(inputParsed.totalChunks) + this.messageChunks.set(inputParsed.id, chunks) + } + chunks[inputParsed.chunk] = inputParsed.data + let complete = true + for (let i = 0; i < inputParsed.totalChunks; i++) { + complete = complete && !!chunks[i] + } + if (complete) { + const combinedData = chunks.join('') + try { + inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(combinedData)) + } catch (err) { + console.error( + '[DuplexRPCClient] Failed reconstructing chunked call:', + err + ) + throw err + } + } + } if (inputParsed.kind === 'CALL') { try { @@ -164,9 +243,7 @@ export class DuplexRPCClient< } console.error(err) } - } - - if (inputParsed.kind === 'RESPONSE') { + } else if (inputParsed.kind === 'RESPONSE') { try { this.handleReceivedResponse(inputParsed) } catch (err) { @@ -191,13 +268,13 @@ export class DuplexRPCClient< } } - public send( + public async send( methodName: MethodName, inputs: z.input ) { const id = generateId() - const msg = packageCall({ + const msg = this.packageCall({ id, data: inputs, methodName: methodName as string, // ?? @@ -216,9 +293,19 @@ export class DuplexRPCClient< } }) - this.communicator.send(msg).catch(err => { - reject(err) - }) + if (Array.isArray(msg)) { + Promise.all( + msg.map(chunk => { + this.communicator.send(chunk) + }) + ).catch(err => { + reject(err) + }) + } else { + this.communicator.send(msg).catch(err => { + reject(err) + }) + } }) } } diff --git a/src/classes/IOClient.ts b/src/classes/IOClient.ts index e7bd397..834d3de 100644 --- a/src/classes/IOClient.ts +++ b/src/classes/IOClient.ts @@ -193,6 +193,8 @@ export class IOClient { }) ) + validationErrorMessage = undefined + if (validities.some(v => !v)) { render() return diff --git a/src/classes/ISocket.ts b/src/classes/ISocket.ts index 922f4c1..8b9f246 100644 --- a/src/classes/ISocket.ts +++ b/src/classes/ISocket.ts @@ -1,4 +1,5 @@ import type { WebSocket as NodeWebSocket } from 'ws' +import type { DataChannel } from 'node-datachannel' import { Evt } from 'evt' import { v4 } from 'uuid' import { z } from 'zod' @@ -25,6 +26,115 @@ interface ISocketConfig { id?: string // manually specifying ids is helpful for debugging } +export class DataChannelSocket { + dc: DataChannel | RTCDataChannel + #readyState: string = 'connecting' + + constructor(dc: DataChannel | RTCDataChannel) { + this.dc = dc + } + + public static OPEN = 'open' as const + + get readyState(): string { + if ('readyState' in this.dc) { + return this.dc.readyState + } + + return this.#readyState + } + + get OPEN() { + return DataChannelSocket.OPEN + } + + send(message: string) { + if ('sendMessage' in this.dc) { + // node + this.dc.sendMessage(message) + } else { + // web + this.dc.send(message) + } + } + + get maxMessageSize(): number { + // This is unreliable, is the max size the other end supports but not necessarily what the originating end supports. + // Also it's not always implemented in browsers yet, for some reason. + // + // if ('maxMessageSize' in this.dc) { + // return this.dc.maxMessageSize() + // } + + // Firefox can support up to 1GB, but this is the safe lower-bound assumption for compatibility + // https://developer.mozilla.org/en-US/docs/Web/API/WebRTC_API/Using_data_channels#concerns_with_large_messages + return 16_000 + } + + close(code?: number, reason?: string) { + // TODO: Do something with codes? + this.#readyState = 'closing' + this.dc.close() + } + + set onopen(cb: () => void) { + if ('onOpen' in this.dc) { + // node + this.dc.onOpen(cb) + } else { + // web + this.dc.onopen = cb + } + } + + set onclose(cb: () => void) { + const handleClose = () => { + this.#readyState = 'closed' + cb() + } + if ('onClosed' in this.dc) { + // node + this.dc.onClosed(handleClose) + } else { + // web + this.dc.onclose = handleClose + } + } + + set onerror(cb: (ev: ErrorEvent | Event) => void) { + if ('onError' in this.dc) { + // node + this.dc.onError((err: string) => { + // ?? + cb( + new ErrorEvent('ErrorEvent', { + message: err, + }) + ) + }) + } else { + // web + this.dc.onerror = cb + } + } + + set onmessage(cb: (evt: MessageEvent) => void) { + if ('onMessage' in this.dc) { + // node + this.dc.onMessage((msg: string | Buffer) => { + cb( + new MessageEvent('MessageEvent', { + data: msg, + }) + ) + }) + } else { + // web + this.dc.onmessage = cb + } + } +} + /** * A relatively thin wrapper around an underlying WebSocket connection. Can be thought of as a TCP layer on top of WebSockets, * ISockets send and expect `ACK` messages following receipt of a `MESSAGE` message containing the transmitted data. @@ -42,7 +152,7 @@ interface ISocketConfig { * rejecting the `ping` Promise. */ export default class ISocket { - private ws: WebSocket | NodeWebSocket + private ws: WebSocket | NodeWebSocket | DataChannelSocket private connectTimeout: number private sendTimeout: number private pingTimeout: number @@ -58,6 +168,14 @@ export default class ISocket { private pendingMessages = new Map() + get maxMessageSize(): number | undefined { + if ('maxMessageSize' in this.ws) { + return this.ws.maxMessageSize + } + + return undefined + } + /** Client **/ /** * Establishes an ISocket connection to the connected WebSocket @@ -132,7 +250,10 @@ export default class ISocket { return this.ws.close(code, reason) } - constructor(ws: WebSocket | NodeWebSocket, config?: ISocketConfig) { + constructor( + ws: WebSocket | NodeWebSocket | DataChannelSocket, + config?: ISocketConfig + ) { // this works but on("error") does not. No idea why ¯\_(ツ)_/¯ // will emit "closed" regardless // this.ws.addEventListener('error', e => { @@ -167,8 +288,8 @@ export default class ISocket { this.onOpen.post() } - this.ws.onclose = (ev: CloseEvent) => { - this.onClose.post([ev.code, ev.reason]) + this.ws.onclose = (ev?: CloseEvent) => { + this.onClose.post([ev?.code ?? 0, ev?.reason ?? 'Unknown']) } this.ws.onerror = (ev: ErrorEvent | Event) => { @@ -199,9 +320,11 @@ export default class ISocket { if (meta.data === 'authenticated') { this.isAuthenticated = true this.onAuthenticated.post() - return + } else if (meta.data === 'ping') { + // do nothing + } else { + this.onMessage.post(meta.data) } - this.onMessage.post(meta.data) } } @@ -232,13 +355,6 @@ export default class ISocket { async ping() { if (this.isClosed) throw new NotConnectedError() - if (!('ping' in this.ws)) { - // Not supported in web client WebSocket - throw new Error( - 'ping not supported in this underlying websocket connection' - ) - } - const ws = this.ws return new Promise((resolve, reject) => { const pongTimeout = setTimeout( @@ -256,11 +372,16 @@ export default class ISocket { resolve() }, }) - ws.ping(id, undefined, err => { - if (err) { - reject(err) - } - }) + + if ('ping' in ws) { + ws.ping(id, undefined, err => { + if (err) { + reject(err) + } + }) + } else { + ws.send(JSON.stringify({ type: 'MESSAGE', id, data: 'ping' })) + } }) } } diff --git a/src/classes/IntervalClient.ts b/src/classes/IntervalClient.ts index 4b47bce..dcde058 100644 --- a/src/classes/IntervalClient.ts +++ b/src/classes/IntervalClient.ts @@ -5,7 +5,12 @@ import fetch from 'node-fetch' import * as superjson from 'superjson' import { JSONValue } from 'superjson/dist/types' import ISocket, { TimeoutError } from './ISocket' -import { DuplexRPCClient, DuplexRPCHandlers } from './DuplexRPCClient' +import type { DescriptionType, IceServer } from 'node-datachannel' +import { + DuplexRPCClient, + DuplexRPCHandlers, + MethodDef, +} from './DuplexRPCClient' import IOError from './IOError' import Logger from './Logger' import { @@ -18,12 +23,17 @@ import { ActionDefinition, PageDefinition, HostSchema, + ClientSchema, + WSServerSchema, + PeerCandidate, } from '../internalRpcSchema' import { ActionResultSchema, IOFunctionReturnType, IO_RESPONSE, LegacyLinkProps, + requiresServer, + T_IO_METHOD_NAMES, T_IO_RENDER_INPUT, T_IO_RESPONSE, } from '../ioSchema' @@ -41,7 +51,8 @@ import type { IntervalRouteDefinitions, IntervalPageHandler, } from '../types' -import TransactionLoadingState from '../classes/TransactionLoadingState' +import type DataChannelConnection from './DataChannelConnection' +import TransactionLoadingState from './TransactionLoadingState' import { Interval, InternalConfig, IntervalError } from '..' import Page from './Page' import { @@ -119,6 +130,7 @@ export default class IntervalClient { } | undefined environment: ActionEnvironment | undefined + #forcePeerMessages = false constructor(interval: Interval, config: InternalConfig) { this.#interval = interval @@ -260,7 +272,13 @@ export default class IntervalClient { string, [(output?: any) => void, (err?: any) => void] >() + #ws: ISocket | undefined = undefined + #dccMap = new Map() + #pendingCandidatesMap = new Map() + #peerIdMap = new Map() + #peerIdToTransactionIdsMap = new Map>() + #serverRpc: | DuplexRPCClient | undefined = undefined @@ -312,7 +330,9 @@ export default class IntervalClient { private async initializeConnection() { await this.#createSocketConnection() - this.#createRPCClient() + this.#serverRpc = this.#createRPCClient({ + canCall: wsServerSchema, + }) } async respondToRequest(requestId: string) { @@ -325,7 +345,10 @@ export default class IntervalClient { } if (!this.#serverRpc) { - this.#createRPCClient(requestId) + this.#serverRpc = this.#createRPCClient({ + requestId, + canCall: wsServerSchema, + }) } const result = new Promise((resolve, reject) => { @@ -351,6 +374,12 @@ export default class IntervalClient { this.#ws = undefined } + for (const dcc of this.#dccMap.values()) { + dcc.ds?.close() + dcc.peer.close() + } + this.#dccMap.clear() + this.#isConnected = false } @@ -416,10 +445,16 @@ export default class IntervalClient { /** * Resends pending IO calls upon reconnection. */ - async #resendPendingIOCalls() { + async #resendPendingIOCalls(resendToTransactionIds?: string[]) { if (!this.#isConnected) return - const toResend = new Map(this.#pendingIOCalls) + const toResend = resendToTransactionIds + ? new Map( + resendToTransactionIds + .map(id => [id, this.#pendingIOCalls.get(id)]) + .filter(([, state]) => !!state) as [string, string][] + ) + : new Map(this.#pendingIOCalls) while (toResend.size > 0) { await Promise.allSettled( @@ -471,10 +506,16 @@ export default class IntervalClient { /** * Resends pending transaction loading states upon reconnection. */ - async #resendTransactionLoadingStates() { + async #resendTransactionLoadingStates(resendToTransactionIds?: string[]) { if (!this.#isConnected) return - const toResend = new Map(this.#transactionLoadingStates) + const toResend = resendToTransactionIds + ? new Map( + resendToTransactionIds + .map(id => [id, this.#transactionLoadingStates.get(id)]) + .filter(([, state]) => !!state) as [string, LoadingState][] + ) + : new Map(this.#transactionLoadingStates) while (toResend.size > 0) { await Promise.allSettled( @@ -638,13 +679,147 @@ export default class IntervalClient { #createRPCHandlers(requestId?: string): DuplexRPCHandlers { const intervalClient = this return { + INITIALIZE_PEER_CONNECTION: async inputs => { + if (typeof window !== 'undefined') return + + try { + this.#logger.debug('INITIALIZE_PEER_CONNECTION:', inputs) + switch (inputs.type) { + case 'offer': { + const { default: DataChannelConnection } = await import( + './DataChannelConnection' + ) + + const iceConfig = await this.#interval.fetchIceConfig() + + const dcc = new DataChannelConnection({ + id: inputs.id, + // TypeScript an enum and not a string, though they're equivalent + iceServers: iceConfig.iceServers as IceServer[], + send: inputs => + this.#send('INITIALIZE_PEER_CONNECTION', inputs).catch( + err => { + this.#logger.debug( + 'Failed sending initialize peer connection', + err + ) + } + ), + rpcConstructor: props => { + const rpc = this.#createRPCClient(props) + rpc.communicator.onOpen.attach(() => { + const set = this.#peerIdToTransactionIdsMap.get(inputs.id) + if (set) { + this.#resendTransactionLoadingStates( + Array.from(set.values()) + ) + this.#resendPendingIOCalls(Array.from(set.values())) + } + }) + return rpc + }, + }) + this.#dccMap.set(inputs.id, dcc) + dcc.peer.setRemoteDescription( + inputs.description, + inputs.type as DescriptionType + ) + dcc.peer.onStateChange(state => { + this.#logger.debug('Peer state change:', state) + if ( + state === 'failed' || + state === 'closed' || + state === 'disconnected' + ) { + this.#dccMap.delete(inputs.id) + } + }) + + const pendingCandidates = this.#pendingCandidatesMap.get( + inputs.id + ) + if (pendingCandidates) { + for (const candidate of pendingCandidates) { + dcc.peer.addRemoteCandidate( + candidate.candidate, + candidate.mid + ) + } + this.#pendingCandidatesMap.delete(inputs.id) + } + + break + } + case 'answer': { + const dcc = this.#dccMap.get(inputs.id) + + if (dcc) { + dcc.peer.setRemoteDescription( + inputs.description, + inputs.type as DescriptionType + ) + } else { + this.#logger.warn( + 'INITIALIZE_PEER_CONNECTION:', + 'DCC not found for inputs', + inputs + ) + } + break + } + case 'candidate': { + const dcc = this.#dccMap.get(inputs.id) + + if (dcc) { + dcc.peer.addRemoteCandidate(inputs.candidate, inputs.mid) + } else { + let pendingCandidates = this.#pendingCandidatesMap.get( + inputs.id + ) + if (!pendingCandidates) { + pendingCandidates = [] + this.#pendingCandidatesMap.set(inputs.id, pendingCandidates) + } + pendingCandidates.push(inputs) + } + break + } + } + } catch (err) { + this.#logger.error('Failed initializing peer connection', err) + } + }, START_TRANSACTION: async inputs => { if (!intervalClient.organization) { intervalClient.#log.error('No organization defined') return } - const { action, transactionId } = inputs + const { action, transactionId, clientId } = inputs + const prevClientId = this.#peerIdMap.get(transactionId) + + if (clientId && clientId !== prevClientId) { + this.#peerIdMap.set(transactionId, clientId) + let set = this.#peerIdToTransactionIdsMap.get(clientId) + if (!set) { + set = new Set() + this.#peerIdToTransactionIdsMap.set(clientId, set) + } + set.add(transactionId) + } + + if (this.#ioResponseHandlers.has(transactionId)) { + this.#logger.debug('Transaction already started, not starting again') + + if (clientId !== prevClientId) { + // only resend if is a new peer connection + this.#resendPendingIOCalls([transactionId]) + this.#resendTransactionLoadingStates([transactionId]) + } + + return + } + const actionHandler = intervalClient.#actionHandlers.get(action.slug) intervalClient.#log.debug(actionHandler) @@ -666,10 +841,25 @@ export default class IntervalClient { toRender: ioCall, }) } else { - await intervalClient.#send('SEND_IO_CALL', { - transactionId, - ioCall, - }) + let attemptPeerSend = true + for (const renderMethod of ioRenderInstruction.toRender) { + const methodName = renderMethod.methodName as T_IO_METHOD_NAMES + if (requiresServer(methodName)) { + attemptPeerSend = false + break + } + } + + await intervalClient.#send( + 'SEND_IO_CALL', + { + transactionId, + ioCall, + }, + { + attemptPeerSend, + } + ) } intervalClient.#transactionLoadingStates.delete(transactionId) @@ -700,7 +890,7 @@ export default class IntervalClient { const ctx: ActionCtx = { user: inputs.user, - // TODO: Remove intervalClient when all active SDKs support superjson + // TODO: Remove this when all active SDKs support superjson params: deserializeDates(params), environment: inputs.environment, organization: intervalClient.organization, @@ -780,6 +970,7 @@ export default class IntervalClient { } else { await intervalClient.#send('MARK_TRANSACTION_COMPLETE', { transactionId, + resultStatus: res.status, result: JSON.stringify(res), }) } @@ -839,6 +1030,14 @@ export default class IntervalClient { for (const key of client.inlineActionKeys.values()) { intervalClient.#actionHandlers.delete(key) } + + const peerId = this.#peerIdMap.get(transactionId) + if (peerId) { + this.#peerIdMap.delete(transactionId) + this.#peerIdToTransactionIdsMap + .get(peerId) + ?.delete(transactionId) + } }) } @@ -882,9 +1081,13 @@ export default class IntervalClient { return { type: 'ERROR' as const } } - const { pageKey } = inputs + const { pageKey, clientId } = inputs const pageHandler = this.#pageHandlers.get(inputs.page.slug) + if (clientId) { + this.#peerIdMap.set(pageKey, clientId) + } + if (!pageHandler) { this.#log.debug('No app handler called', inputs.page.slug) return { type: 'ERROR' as const } @@ -1169,6 +1372,7 @@ export default class IntervalClient { this.#pageIOClients.delete(inputs.pageKey) } + this.#peerIdMap.delete(inputs.pageKey) this.#ioResponseHandlers.delete(inputs.pageKey) }, } @@ -1178,19 +1382,25 @@ export default class IntervalClient { * Creates the DuplexRPCClient responsible for sending * messages to Interval. */ - #createRPCClient(requestId?: string) { - if (!this.#ws) { - throw new Error('ISocket not initialized') + #createRPCClient({ + communicator = this.#ws, + requestId, + canCall, + }: { + communicator?: ISocket + requestId?: string + canCall: CallerSchema + }) { + if (!communicator) { + throw new Error('Communicator not initialized') } - const serverRpc = new DuplexRPCClient({ - communicator: this.#ws, - canCall: wsServerSchema, + return new DuplexRPCClient({ + communicator, + canCall, canRespondTo: hostSchema, handlers: this.#createRPCHandlers(requestId), }) - - this.#serverRpc = serverRpc } /** @@ -1251,6 +1461,7 @@ export default class IntervalClient { this.organization = response.organization this.environment = response.environment + this.#forcePeerMessages = response.forcePeerMessages ?? false if (isInitialInitialization) { this.#log.prod( @@ -1263,15 +1474,218 @@ export default class IntervalClient { return response } - async #send( + async #sendToClientPeer( + rpc: NonNullable, methodName: MethodName, - inputs: z.input + inputs: z.input + ) { + const NUM_P2P_TRIES = 3 + for (let i = 0; i <= NUM_P2P_TRIES; i++) { + try { + return await rpc.send(methodName, inputs) + } catch (err) { + if (err instanceof TimeoutError) { + this.#log.debug( + `Peer RPC call timed out, retrying in ${Math.round( + this.#retryIntervalMs / 1000 + )}s...` + ) + this.#log.debug(err) + sleep(this.#retryIntervalMs) + } else { + throw err + } + } + } + + throw new TimeoutError() + } + + /** + * @returns undefined if was unsent, null if was sent but should send via server anyway, + * and true/false if was sent but should not send. This is obviously pretty bad code, should be fixed. + */ + async #attemptPeerSend( + methodName: MethodName, + serverInputs: z.input + ): Promise< + z.input | undefined | null + > { + const hostInstanceId = this.#ws?.id + let dcc: DataChannelConnection | undefined + const sessionKey = serverInputs + ? 'transactionId' in serverInputs + ? serverInputs.transactionId + : 'pageKey' in serverInputs + ? serverInputs.pageKey + : undefined + : undefined + + if (sessionKey) { + const key = this.#peerIdMap.get(sessionKey) + if (key) { + dcc = this.#dccMap.get(key) + } + } + + if (hostInstanceId && dcc?.rpc) { + this.#logger.debug( + 'Sending with peer connection', + methodName, + serverInputs + ) + + switch (methodName) { + case 'SEND_LOG': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_LOG']['inputs'] + > + + await this.#sendToClientPeer(dcc.rpc, 'LOG', { + ...inputs, + index: inputs.index as number, + timestamp: inputs.timestamp as number, + }) + + // send to backend too + return null + } + case 'SEND_PAGE': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_PAGE']['inputs'] + > + return await this.#sendToClientPeer(dcc.rpc, 'RENDER_PAGE', { + ...inputs, + hostInstanceId, + }) + } + case 'SEND_IO_CALL': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_IO_CALL']['inputs'] + > + const response = await this.#sendToClientPeer(dcc.rpc, 'RENDER', { + transactionId: inputs.transactionId, + toRender: inputs.ioCall, + }) + + if (this.#forcePeerMessages) { + return response + } else { + // send to backend too + return null + } + } + case 'MARK_TRANSACTION_COMPLETE': { + const inputs = serverInputs as z.input< + WSServerSchema['MARK_TRANSACTION_COMPLETE']['inputs'] + > + await this.#sendToClientPeer(dcc.rpc, 'TRANSACTION_COMPLETED', { + transactionId: inputs.transactionId, + resultStatus: inputs.resultStatus ?? 'SUCCESS', + result: inputs.result, + }) + + // send to backend too + return null + } + case 'SEND_REDIRECT': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_REDIRECT']['inputs'] + > + + if ('url' in inputs) { + await this.#sendToClientPeer(dcc.rpc, 'REDIRECT', { + transactionId: inputs.transactionId, + url: inputs.url, + }) + } else if ('route' in inputs) { + await this.#sendToClientPeer(dcc.rpc, 'REDIRECT', { + transactionId: inputs.transactionId, + route: inputs.route, + params: inputs.params, + }) + } else { + await this.#sendToClientPeer(dcc.rpc, 'REDIRECT', { + transactionId: inputs.transactionId, + route: inputs.action, + params: inputs.params, + }) + } + + // send to backend too + return null + } + case 'SEND_LOADING_CALL': { + const inputs = serverInputs as z.input< + WSServerSchema['SEND_LOADING_CALL']['inputs'] + > + const response = await this.#sendToClientPeer( + dcc.rpc, + 'LOADING_STATE', + { + ...inputs, + } + ) + + if (this.#forcePeerMessages) { + return response + } else { + // send to backend too + return null + } + } + default: + this.#logger.debug( + 'Unsupported peer method', + methodName, + 'sending via server' + ) + } + } else { + if ( + this.#forcePeerMessages && + ['SEND_LOADING_CALL', 'SEND_IO_CALL', 'SEND_PAGE'].includes(methodName) + ) + this.#logger.debug( + 'No peer connection established, skipping', + methodName, + serverInputs + ) + } + + return undefined + } + + async #send( + methodName: MethodName, + inputs: z.input, + { attemptPeerSend = true }: { attemptPeerSend?: boolean } = {} ) { if (!this.#serverRpc) throw new IntervalError('serverRpc not initialized') + let skipClientCall = false + + try { + if (attemptPeerSend) { + const peerResponse = await this.#attemptPeerSend(methodName, inputs) + + if (peerResponse != null) { + return peerResponse + } else if (peerResponse === null) { + skipClientCall = true + } + } + } catch (err) { + this.#logger.debug('Error from peer RPC', err) + } + while (true) { try { - return await this.#serverRpc.send(methodName, inputs) + this.#logger.debug('Sending via server', methodName, inputs) + return await this.#serverRpc.send(methodName, { + ...inputs, + skipClientCall, + }) } catch (err) { if (err instanceof TimeoutError) { this.#log.debug( diff --git a/src/examples/app/index.ts b/src/examples/app/index.ts index bfe8be3..6a82f06 100644 --- a/src/examples/app/index.ts +++ b/src/examples/app/index.ts @@ -156,9 +156,11 @@ const users = new Page({ }) const interval = new Interval({ - apiKey: 'alex_dev_Bku6kYZlyhyvkCO36W5HnpwtXACI1khse8SnZ9PuwsmqdRfe', logLevel: 'debug', + apiKey: 'alex_dev_Bku6kYZlyhyvkCO36W5HnpwtXACI1khse8SnZ9PuwsmqdRfe', endpoint: 'ws://localhost:3000/websocket', + // apiKey: 'jacob_dev_VZo5ilzcng5iTwuUakQ1z7U88rMKfmIQU1Xc7mnYRRNrFf7U', + // endpoint: 'wss://staging.interval.com/websocket', routes: { hello_app, users, diff --git a/src/examples/basic/index.ts b/src/examples/basic/index.ts index 1364dfa..6ac0e09 100644 --- a/src/examples/basic/index.ts +++ b/src/examples/basic/index.ts @@ -1,4 +1,4 @@ -import { T_IO_METHOD, T_IO_PROPS } from './../../ioSchema' +import { T_IO_PROPS } from './../../ioSchema' import Interval, { IOError, io, ctx, Page } from '../../index' import IntervalClient from '../../classes/IntervalClient' import { @@ -61,8 +61,33 @@ const actionLinks: IntervalActionHandler = async () => { const prod = new Interval({ apiKey: 'live_N47qd1BrOMApNPmVd0BiDZQRLkocfdJKzvt8W6JT5ICemrAN', endpoint: 'ws://localhost:3000/websocket', + logLevel: 'debug', routes: { + backgroundable: { + backgroundable: true, + handler: async () => { + const first = await io.input.text('First input') + const second = await io.input.text('Second input') + + return { first, second } + }, + }, actionLinks, + helloCurrentUser: { + name: 'Hello, current user!', + description: '👋', + handler: async () => { + console.log(ctx.params) + + let heading = `Hello, ${ctx.user.firstName} ${ctx.user.lastName}` + + if (ctx.params.message) { + heading += ` (Message: ${ctx.params.message})` + } + + return heading + }, + }, redirectWithoutWarningTest: async () => { const text = await io.input.text('Edit text before navigating', { defaultValue: 'Backspace me', @@ -594,6 +619,12 @@ const interval = new Interval({ return { num1, num2, sum: num1 + num2 } }, + logs: async (_, ctx) => { + for (let i = 0; i < 10; i++) { + await ctx.log('Log number', i) + await sleep(500) + } + }, logTest: async (io, ctx) => { ctx.log(new Date().toUTCString()) const name = await io.input.text('Your name') diff --git a/src/index.ts b/src/index.ts index 3fbf325..3ad6355 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,7 +4,13 @@ import Routes from './classes/Routes' import IOError from './classes/IOError' import Logger from './classes/Logger' import Page from './classes/Page' -import { NOTIFY, ClientSchema, HostSchema } from './internalRpcSchema' +import { + NOTIFY, + ClientSchema, + HostSchema, + ICE_CONFIG, + IceConfig, +} from './internalRpcSchema' import { DuplexRPCHandlers } from './classes/DuplexRPCClient' import { SerializableRecord } from './ioSchema' import type { @@ -224,6 +230,19 @@ export default class Interval { return this.#client } + async fetchIceConfig(): Promise { + const response = await fetch(`${this.#httpEndpoint}/api/ice-config`, { + method: 'GET', + headers: { + Authorization: `Bearer ${this.#apiKey}`, + }, + }).then(r => r.json()) + + const parsed = ICE_CONFIG.parse(response) + + return parsed + } + async notify(config: NotifyConfig): Promise { if ( !config.transactionId && diff --git a/src/internalRpcSchema.ts b/src/internalRpcSchema.ts index 8c053af..afcb03b 100644 --- a/src/internalRpcSchema.ts +++ b/src/internalRpcSchema.ts @@ -6,12 +6,27 @@ import { serializableRecord, } from './ioSchema' -export const DUPLEX_MESSAGE_SCHEMA = z.object({ - id: z.string(), - methodName: z.string(), - data: z.any(), - kind: z.enum(['CALL', 'RESPONSE']), -}) +export const DUPLEX_MESSAGE_SCHEMA = z.discriminatedUnion('kind', [ + z.object({ + id: z.string(), + kind: z.literal('CALL'), + methodName: z.string(), + data: z.any(), + }), + z.object({ + id: z.string(), + kind: z.literal('RESPONSE'), + methodName: z.string(), + data: z.any(), + }), + z.object({ + id: z.string(), + kind: z.literal('CALL_CHUNK'), + chunk: z.number(), + totalChunks: z.number(), + data: z.string(), + }), +]) export type DuplexMessage = z.infer @@ -79,6 +94,24 @@ export const PAGE_DEFINITION = z.object({ export type PageDefinition = z.infer +export const ICE_SERVER = z.object({ + url: z.string(), + urls: z.string(), + hostname: z.string(), + port: z.number(), + relayType: z.enum(['TurnUdp', 'TurnTcp', 'TurnTls']).optional(), + username: z.string().optional(), + credential: z.string().optional(), +}) + +export type IceServer = z.infer + +export const ICE_CONFIG = z.object({ + iceServers: z.array(ICE_SERVER), +}) + +export type IceConfig = z.infer + export const ENQUEUE_ACTION = { inputs: z.object({ slug: z.string(), @@ -172,6 +205,44 @@ export const DECLARE_HOST = { ]), } +const PEER_CANDIDATE = z.object({ + type: z.literal('candidate'), + id: z.string(), + candidate: z.string(), + mid: z.string(), +}) + +export type PeerCandidate = z.infer + +const INITIALIZE_PEER_CONNECTION = { + inputs: z.discriminatedUnion('type', [ + z.object({ + type: z.literal('offer'), + id: z.string(), + description: z.string(), + }), + z.object({ + type: z.literal('answer'), + id: z.string(), + description: z.string(), + }), + PEER_CANDIDATE, + z.object({ + type: z.literal('unspec'), + id: z.string(), + }), + z.object({ + type: z.literal('pranswer'), + id: z.string(), + }), + z.object({ + type: z.literal('rollback'), + id: z.string(), + }), + ]), + returns: z.void(), +} + export const wsServerSchema = { CONNECT_TO_TRANSACTION_AS_CLIENT: { inputs: z.object({ @@ -217,6 +288,7 @@ export const wsServerSchema = { inputs: z.object({ transactionId: z.string(), ioCall: z.string(), + skipClientCall: z.boolean().optional(), }), returns: z.boolean(), }, @@ -234,6 +306,7 @@ export const wsServerSchema = { z.object({ transactionId: z.string(), label: z.string().optional(), + skipClientCall: z.boolean().optional(), }) ), returns: z.boolean(), @@ -244,6 +317,7 @@ export const wsServerSchema = { data: z.string(), index: z.number().optional(), timestamp: z.number().optional(), + skipClientCall: z.boolean().optional(), }), returns: z.boolean(), }, @@ -269,6 +343,7 @@ export const wsServerSchema = { inputs: z.intersection( z.object({ transactionId: z.string(), + skipClientCall: z.boolean().optional(), }), legacyLinkSchema ), @@ -277,7 +352,11 @@ export const wsServerSchema = { MARK_TRANSACTION_COMPLETE: { inputs: z.object({ transactionId: z.string(), + resultStatus: z + .enum(['SUCCESS', 'FAILURE', 'CANCELED', 'REDIRECTED']) + .optional(), result: z.string().optional(), + skipClientCall: z.boolean().optional(), }), returns: z.boolean(), }, @@ -285,6 +364,7 @@ export const wsServerSchema = { inputs: z.undefined(), returns: z.boolean(), }, + INITIALIZE_PEER_CONNECTION, INITIALIZE_HOST: { inputs: z.intersection( z.object({ @@ -316,6 +396,7 @@ export const wsServerSchema = { slug: z.string(), }), dashboardUrl: z.string(), + forcePeerMessages: z.boolean().optional(), sdkAlert: SDK_ALERT.nullish(), warnings: z.array(z.string()), }), @@ -415,6 +496,7 @@ export const clientSchema = { ), returns: z.boolean(), }, + INITIALIZE_PEER_CONNECTION, } export type ClientSchema = typeof clientSchema @@ -423,6 +505,7 @@ export const hostSchema = { OPEN_PAGE: { inputs: z.object({ pageKey: z.string(), + clientId: z.string().optional(), page: z.object({ slug: z.string(), }), @@ -455,6 +538,7 @@ export const hostSchema = { START_TRANSACTION: { inputs: z.object({ transactionId: z.string(), + clientId: z.string().optional(), // Actually slug, for backward compatibility // TODO: Remove breaking release, superfluous with slug below actionName: z.string(), @@ -480,6 +564,11 @@ export const hostSchema = { }), returns: z.void().nullable(), }, + INITIALIZE_PEER_CONNECTION, } export type HostSchema = typeof hostSchema + +export type PeerConnectionInitializer = ( + inputs: z.infer +) => Promise> diff --git a/src/ioSchema.ts b/src/ioSchema.ts index 601bb3f..d986584 100644 --- a/src/ioSchema.ts +++ b/src/ioSchema.ts @@ -334,7 +334,13 @@ export type DateTimeObject = z.infer * will resolve immediately when awaited. */ export function resolvesImmediately(methodName: T_IO_METHOD_NAMES): boolean { - return 'immediate' in ioSchema[methodName] + const schema = ioSchema[methodName] + return schema && 'immediate' in schema && schema.immediate +} + +export function requiresServer(methodName: T_IO_METHOD_NAMES): boolean { + const schema = ioSchema[methodName] + return schema && 'requiresServer' in schema && schema.requiresServer } export const metaItemSchema = z.object({ @@ -439,13 +445,13 @@ const DISPLAY_SCHEMA = { }), state: z.null(), returns: z.null(), - immediate: z.literal(true), + immediate: true, }, DISPLAY_PROGRESS_INDETERMINATE: { props: z.object({}), state: z.null(), returns: z.null(), - immediate: z.literal(true), + immediate: true, }, DISPLAY_PROGRESS_THROUGH_LIST: { props: z.object({ @@ -638,6 +644,7 @@ const INPUT_SCHEMA = { state: z.null(), returns: z.boolean(), exclusive: z.literal(true), + requiresServer: true, }, SELECT_TABLE: { props: z.object({