Skip to content

Commit

Permalink
Merge branch 'main' into public-pages-api
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobmischka committed Dec 8, 2022
2 parents 55039c6 + 042b594 commit b0d84a1
Show file tree
Hide file tree
Showing 11 changed files with 932 additions and 96 deletions.
10 changes: 8 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
{
"name": "@interval/sdk",
"version": "0.34.0",
"version": "0.35.0-dev",
"homepage": "https://interval.com",
"repository": {
"type": "git",
"url": "github:interval/interval-node"
},
"bugs": "https://github.com/interval/interval-node/issues",
"keywords": ["internal tool", "app", "ui", "ui builder"],
"keywords": [
"internal tool",
"app",
"ui",
"ui builder"
],
"license": "MIT",
"engines": {
"node": ">=12.17.0"
Expand All @@ -24,6 +29,7 @@
"dependencies": {
"cross-fetch": "^3.1.5",
"evt": "^2.4.10",
"node-datachannel": "^0.4.0",
"superjson": "^1.9.1",
"uuid": "^9.0.0",
"ws": "^8.4.1",
Expand Down
58 changes: 58 additions & 0 deletions src/classes/DataChannelConnection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { PeerConnection, IceServer } from 'node-datachannel'
import type { DuplexRPCClient } from './DuplexRPCClient'
import {
HostSchema,
PeerConnectionInitializer,
ClientSchema,
clientSchema,
} from '../internalRpcSchema'
import ISocket, { DataChannelSocket } from './ISocket'

export default class DataChannelConnection {
peer: PeerConnection
ds?: DataChannelSocket
rpc?: DuplexRPCClient<ClientSchema, HostSchema>

constructor({
id,
iceServers,
send,
rpcConstructor,
}: {
id: string
iceServers: (string | IceServer)[]
send: PeerConnectionInitializer
rpcConstructor: (props: {
communicator: ISocket
canCall: ClientSchema
}) => DuplexRPCClient<ClientSchema, HostSchema>
}) {
this.peer = new PeerConnection(id, {
iceServers,
})
// For some reason these cause segfaults?
// this.peer.onStateChange(state => {
// console.log('State:', state)
// })
// this.peer.onGatheringStateChange(state => {
// console.log('GatheringState:', state)
// })
this.peer.onLocalDescription((description, type) => {
send({ id, type, description })
})
this.peer.onLocalCandidate((candidate, mid) => {
send({ id, type: 'candidate', candidate, mid })
})
this.peer.onDataChannel(dc => {
const ds = new DataChannelSocket(dc)
this.ds = ds

const communicator = new ISocket(ds)

this.rpc = rpcConstructor({
communicator,
canCall: clientSchema,
})
})
}
}
163 changes: 125 additions & 38 deletions src/classes/DuplexRPCClient.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { z, ZodError } from 'zod'
import type { DuplexMessage } from '../internalRpcSchema'
import { DUPLEX_MESSAGE_SCHEMA } from '../internalRpcSchema'
import IntervalError from './IntervalError'
import ISocket from './ISocket'

let count = 0
Expand All @@ -9,7 +10,7 @@ function generateId() {
return count.toString()
}

interface MethodDef {
export interface MethodDef {
[key: string]: {
inputs: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any, any>
returns: z.ZodFirstPartySchemaTypes | z.ZodDiscriminatedUnion<any, any, any>
Expand All @@ -18,31 +19,6 @@ interface MethodDef {

type OnReplyFn = (anyObject: any) => void

function packageResponse({
id,
methodName,
data,
}: Omit<DuplexMessage, 'kind'>) {
const preparedResponseText: DuplexMessage = {
id: id,
kind: 'RESPONSE',
methodName: methodName,
data,
}
return JSON.stringify(preparedResponseText)
}

function packageCall({ id, methodName, data }: Omit<DuplexMessage, 'kind'>) {
const callerData: DuplexMessage = {
id,
kind: 'CALL',
data,
methodName: methodName as string, // ??
}

return JSON.stringify(callerData)
}

export type DuplexRPCHandlers<ResponderSchema extends MethodDef> = {
[Property in keyof ResponderSchema]: (
inputs: z.infer<ResponderSchema[Property]['inputs']>
Expand All @@ -59,6 +35,18 @@ interface CreateDuplexRPCClientProps<
handlers: DuplexRPCHandlers<ResponderSchema>
}

function getSizeBytes(str: string): number {
if (typeof Blob !== 'undefined') {
return new Blob([str]).size
} else if (typeof Buffer !== 'undefined') {
return Buffer.from(str).byteLength
} else {
throw new IntervalError(
'Unsupported runtime, must have either Buffer or Blob global'
)
}
}

/**
* Responsible for making RPC calls to another DuplexRPCClient.
* Can send messages from CallerSchema and respond to messages
Expand All @@ -82,6 +70,7 @@ export class DuplexRPCClient<
) => Promise<z.infer<ResponderSchema[Property]['returns']>>
}
pendingCalls = new Map<string, OnReplyFn>()
messageChunks = new Map<string, string[]>()

constructor({
communicator,
Expand All @@ -96,21 +85,86 @@ export class DuplexRPCClient<
this.handlers = handlers
}

private packageResponse({
id,
methodName,
data,
}: Omit<DuplexMessage & { kind: 'RESPONSE' }, 'kind'>) {
const preparedResponseText: DuplexMessage = {
id: id,
kind: 'RESPONSE',
methodName: methodName,
data,
}
return JSON.stringify(preparedResponseText)
}

private packageCall({
id,
methodName,
data,
}: Omit<DuplexMessage & { kind: 'CALL' }, 'kind'>): string | string[] {
const callerData: DuplexMessage = {
id,
kind: 'CALL',
data,
methodName: methodName as string, // ??
}

const totalData = JSON.stringify(callerData)
const totalSize = getSizeBytes(totalData)
const maxMessageSize = this.communicator.maxMessageSize
if (maxMessageSize === undefined || totalSize < maxMessageSize) {
return totalData
}

// console.debug('Chunking!')
// console.debug('Max size:', maxMessageSize)

let chunkStart = 0
const chunks: string[] = []

const MESSAGE_OVERHEAD_SIZE = 4096 // magic number from experimentation
while (chunkStart < totalData.length) {
const chunkEnd = chunkStart + maxMessageSize - MESSAGE_OVERHEAD_SIZE
chunks.push(totalData.substring(chunkStart, chunkEnd))
chunkStart = chunkEnd
}

const totalChunks = chunks.length
return chunks.map((data, chunk) => {
const chunkData: DuplexMessage = {
id,
kind: 'CALL_CHUNK',
totalChunks,
chunk,
data,
}

const chunkString = JSON.stringify(chunkData)

// console.debug('Data size:', getSizeBytes(data))
// console.debug('Chunk size:', getSizeBytes(chunkString))

return chunkString
})
}

public setCommunicator(newCommunicator: ISocket): void {
this.communicator.onMessage.detach()
this.communicator = newCommunicator
this.communicator.onMessage.attach(this.onmessage.bind(this))
}

private handleReceivedResponse(parsed: DuplexMessage) {
private handleReceivedResponse(parsed: DuplexMessage & { kind: 'RESPONSE' }) {
const onReplyFn = this.pendingCalls.get(parsed.id)
if (!onReplyFn) return

onReplyFn(parsed.data)
this.pendingCalls.delete(parsed.id)
}

private async handleReceivedCall(parsed: DuplexMessage) {
private async handleReceivedCall(parsed: DuplexMessage & { kind: 'CALL' }) {
type MethodKeys = keyof typeof this.canRespondTo

const methodName = parsed.methodName as MethodKeys
Expand All @@ -127,7 +181,7 @@ export class DuplexRPCClient<

const returnValue = await handler(inputs)

const preparedResponseText = packageResponse({
const preparedResponseText = this.packageResponse({
id: parsed.id,
methodName: methodName as string, //??
data: returnValue,
Expand All @@ -145,7 +199,32 @@ export class DuplexRPCClient<
private async onmessage(data: unknown) {
const txt = data as string
try {
const inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(txt))
let inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(txt))

if (inputParsed.kind === 'CALL_CHUNK') {
let chunks = this.messageChunks.get(inputParsed.id)
if (!chunks) {
chunks = Array(inputParsed.totalChunks)
this.messageChunks.set(inputParsed.id, chunks)
}
chunks[inputParsed.chunk] = inputParsed.data
let complete = true
for (let i = 0; i < inputParsed.totalChunks; i++) {
complete = complete && !!chunks[i]
}
if (complete) {
const combinedData = chunks.join('')
try {
inputParsed = DUPLEX_MESSAGE_SCHEMA.parse(JSON.parse(combinedData))
} catch (err) {
console.error(
'[DuplexRPCClient] Failed reconstructing chunked call:',
err
)
throw err
}
}
}

if (inputParsed.kind === 'CALL') {
try {
Expand All @@ -164,9 +243,7 @@ export class DuplexRPCClient<
}
console.error(err)
}
}

if (inputParsed.kind === 'RESPONSE') {
} else if (inputParsed.kind === 'RESPONSE') {
try {
this.handleReceivedResponse(inputParsed)
} catch (err) {
Expand All @@ -191,13 +268,13 @@ export class DuplexRPCClient<
}
}

public send<MethodName extends keyof CallerSchema>(
public async send<MethodName extends keyof CallerSchema>(
methodName: MethodName,
inputs: z.input<CallerSchema[MethodName]['inputs']>
) {
const id = generateId()

const msg = packageCall({
const msg = this.packageCall({
id,
data: inputs,
methodName: methodName as string, // ??
Expand All @@ -216,9 +293,19 @@ export class DuplexRPCClient<
}
})

this.communicator.send(msg).catch(err => {
reject(err)
})
if (Array.isArray(msg)) {
Promise.all(
msg.map(chunk => {
this.communicator.send(chunk)
})
).catch(err => {
reject(err)
})
} else {
this.communicator.send(msg).catch(err => {
reject(err)
})
}
})
}
}
2 changes: 2 additions & 0 deletions src/classes/IOClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ export class IOClient {
})
)

validationErrorMessage = undefined

if (validities.some(v => !v)) {
render()
return
Expand Down
Loading

0 comments on commit b0d84a1

Please sign in to comment.