Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: replace socket.io with trpc #3259

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion companion/lib/Registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { ImportExportController } from './ImportExport/Controller.js'
import { ServiceOscSender } from './Service/OscSender.js'
import type { ControlCommonEvents } from './Controls/ControlDependencies.js'
import type { PackageJson } from 'type-fest'
import { createTrpcRouter } from './UI/TRPC.js'

const pkgInfoStr = await fs.readFile(new URL('../package.json', import.meta.url))
const pkgInfo: PackageJson = JSON.parse(pkgInfoStr.toString())
Expand Down Expand Up @@ -279,7 +280,6 @@ export class Registry {

this.ui.io.on('clientConnect', (client) => {
LogController.clientConnect(client)
this.ui.clientConnect(client)
this.#data.clientConnect(client)
this.page.clientConnect(client)
this.controls.clientConnect(client)
Expand Down Expand Up @@ -313,6 +313,9 @@ export class Registry {
await this.instance.initInstances(extraModulePath)

// Instances are loaded, start up http
const router = createTrpcRouter(this)
this.ui.express.bindTrpcRouter(router)
this.ui.io.bindTrpcRouter(router)
this.rebindHttp(bindIp, bindPort)

// Startup has completed, run triggers
Expand Down Expand Up @@ -362,6 +365,8 @@ export class Registry {
Promise.resolve().then(async () => {
this.#logger.info('somewhere, the system wants to exit. kthxbai')

this.ui.close()

// Save the db to disk
this.db.close()
this.#data.cache.close()
Expand Down
11 changes: 4 additions & 7 deletions companion/lib/UI/Controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { AppInfo } from '../Registry.js'
import { UIExpress } from './Express.js'
import { ClientSocket, UIHandler } from './Handler.js'
import { UIHandler } from './Handler.js'
import { UIServer } from './Server.js'
import { UIUpdate } from './Update.js'
import type express from 'express'
Expand All @@ -15,13 +15,10 @@ export class UIController {
this.express = new UIExpress(internalApiRouter)
this.server = new UIServer(this.express.app)
this.io = new UIHandler(appInfo, this.server)
this.update = new UIUpdate(appInfo, this.io)
this.update = new UIUpdate(appInfo)
}

/**
* Setup a new socket client's events
*/
clientConnect(client: ClientSocket): void {
this.update.clientConnect(client)
close() {
this.io.close()
}
}
58 changes: 58 additions & 0 deletions companion/lib/UI/Express.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import fs from 'fs'
// @ts-ignore
import serveZip from 'express-serve-zip'
import { fileURLToPath } from 'url'
import * as trpcExpress from '@trpc/server/adapters/express'
import { AppRouter, createTrpcContext } from './TRPC.js'

/**
* Create a zip serve app
Expand Down Expand Up @@ -63,6 +65,7 @@ export class UIExpress {
#apiRouter = Express.Router()
#legacyApiRouter = Express.Router()
#connectionApiRouter = Express.Router()
#trpcRouter = Express.Router()

constructor(internalApiRouter: Express.Router) {
this.app.use(cors())
Expand Down Expand Up @@ -92,6 +95,9 @@ export class UIExpress {
// Use the router #apiRouter to add API routes dynamically, this router can be redefined at runtime with setter
this.app.use('/api', (r, s, n) => this.#apiRouter(r, s, n))

// Use the router #apiRouter to add API routes dynamically, this router can be redefined at runtime with setter
this.app.use('/trpc', (r, s, n) => this.#trpcRouter(r, s, n))

// Use the router #legacyApiRouter to add API routes dynamically, this router can be redefined at runtime with setter
this.app.use((r, s, n) => this.#legacyApiRouter(r, s, n))

Expand Down Expand Up @@ -146,4 +152,56 @@ export class UIExpress {
set connectionApiRouter(router: Express.Router) {
this.#connectionApiRouter = router
}

#boundTrpcRouter = false
bindTrpcRouter(trpcRouter: AppRouter) {
if (this.#boundTrpcRouter) throw new Error('tRPC router already bound')
this.#boundTrpcRouter = true

this.#trpcRouter.use(
trpcExpress.createExpressMiddleware({
router: trpcRouter,
createContext: createTrpcContext,
})
)

// const wss = new WebSocketServer({
// noServer: true,
// })

// // TODO - this shouldnt be here like this..
// const handler = applyWSSHandler({
// wss,
// router: trpcRouter,
// createTrpcContext,
// // Enable heartbeat messages to keep connection open (disabled by default)
// keepAlive: {
// enabled: true,
// // server ping message interval in milliseconds
// pingMs: 30000,
// // connection is terminated if pong message is not received in this many milliseconds
// pongWaitMs: 5000,
// },
// })

// this.app.on("upgrade", (request, socket, head) => {
// wss.handleUpgrade(request, socket, head, (websocket) => {
// wss.emit("connection", websocket, request);
// });
// });

// wss.on('connection', (ws) => {
// console.log(`➕➕ Connection (${wss.clients.size})`)
// ws.once('close', () => {
// console.log(`➖➖ Connection (${wss.clients.size})`)
// })
// })
// console.log('✅ WebSocket Server listening on ws://localhost:3001')

// process.on('SIGTERM', () => {
// console.log('SIGTERM')
// handler.broadcastReconnectNotification()
// wss.close()
// })
}
}
63 changes: 56 additions & 7 deletions companion/lib/UI/Handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import type { AppInfo } from '../Registry.js'
import type { Server as HttpServer } from 'http'
import type { Server as HttpsServer } from 'https'
import { EventEmitter } from 'events'
import { applyWSSHandler } from '@trpc/server/adapters/ws'
import { WebSocketServer } from 'ws'
import { AppRouter, createTrpcContext } from './TRPC.js'

type IOListenEvents = import('@companion-app/shared/SocketIO.js').ClientToBackendEventsListenMap
type IOEmitEvents = import('@companion-app/shared/SocketIO.js').BackendToClientEventsMap
Expand Down Expand Up @@ -127,6 +130,14 @@ export class UIHandler extends EventEmitter<UIHandlerEvents> {
readonly #httpIO: IOServerType
#httpsIO: IOServerType | undefined

#http: HttpServer

#wss = new WebSocketServer({
noServer: true,
path: '/trpc',
})
#broadcastDisconnect?: () => void

constructor(appInfo: AppInfo, http: HttpServer) {
super()

Expand All @@ -143,6 +154,7 @@ export class UIHandler extends EventEmitter<UIHandlerEvents> {
},
}

this.#http = http
this.#httpIO = new SocketIOServer(http, this.#socketIOOptions)

this.#httpIO.on('connect', this.#clientConnect.bind(this))
Expand All @@ -156,13 +168,6 @@ export class UIHandler extends EventEmitter<UIHandlerEvents> {

const client = new ClientSocket(rawClient, this.#logger)

client.onPromise('app-version-info', () => {
return {
appVersion: this.#appInfo.appVersion,
appBuild: this.#appInfo.appBuild,
}
})

this.emit('clientConnect', client)

client.on('disconnect', () => {
Expand Down Expand Up @@ -219,4 +224,48 @@ export class UIHandler extends EventEmitter<UIHandlerEvents> {
this.#httpsIO.on('connect', this.#clientConnect.bind(this))
}
}

#boundTrpcRouter = false
bindTrpcRouter(trpcRouter: AppRouter) {
if (this.#boundTrpcRouter) throw new Error('tRPC router already bound')
this.#boundTrpcRouter = true

// TODO - this shouldnt be here like this..
const handler = applyWSSHandler({
wss: this.#wss,
router: trpcRouter,
createTrpcContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
})

this.#broadcastDisconnect = handler.broadcastReconnectNotification

this.#http.on('upgrade', (request, socket, head) => {
// TODO - is this guard needed?
if (request.url === '/trpc') {
this.#wss.handleUpgrade(request, socket, head, (websocket) => {
this.#wss.emit('connection', websocket, request)
})
}
})

this.#wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${this.#wss.clients.size})`)
ws.once('close', () => {
console.log(`➖➖ Connection (${this.#wss.clients.size})`)
})
})
}

close(): void {
this.#broadcastDisconnect?.()
this.#wss.close()
}
}
51 changes: 51 additions & 0 deletions companion/lib/UI/TRPC.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { initTRPC } from '@trpc/server'
import type { Registry } from '../Registry.js'
import type * as trpcExpress from '@trpc/server/adapters/express'
import { EventEmitter, on } from 'events'

// created for each request
export const createTrpcContext = ({} /* req, res */ : trpcExpress.CreateExpressContextOptions) => ({}) // no context
type Context = Awaited<ReturnType<typeof createTrpcContext>>

/**
* Initialization of tRPC backend
* Should be done only once per backend!
*/
const t = initTRPC.context<Context>().create()

/**
* Export reusable router and procedure helpers
* that can be used throughout the router
*/
export const router = t.router
export const publicProcedure = t.procedure
export const protectedProcedure = t.procedure

/**
* Create the root TRPC router
* @param registry
* @returns
*/
export function createTrpcRouter(registry: Registry) {
return router({
// ...

userList: publicProcedure.query(async () => {
return [1, 2, 3]
}),

appInfo: registry.ui.update.createTrpcRouter(),
})
}

// Export type router type signature,
// NOT the router itself.
export type AppRouter = ReturnType<typeof createTrpcRouter>

export function toIterable<T extends Record<string, any[]>, TKey extends string & keyof T>(
ee: EventEmitter<T>,
key: TKey,
signal: AbortSignal | undefined
): NodeJS.AsyncIterator<T[TKey]> {
return on(ee as any, key, { signal }) as NodeJS.AsyncIterator<T[TKey]>
}
49 changes: 32 additions & 17 deletions companion/lib/UI/Update.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,30 @@

import LogController from '../Log/Controller.js'
import type { AppInfo } from '../Registry.js'
import type { UIHandler } from './Handler.js'
import type { ClientSocket } from './Handler.js'
import type { AppUpdateInfo } from '@companion-app/shared/Model/Common.js'
import { compileUpdatePayload } from './UpdatePayload.js'
import { publicProcedure, router, toIterable } from './TRPC.js'
import { EventEmitter } from 'events'

interface UpdateEvents {
info: [info: AppUpdateInfo]
}

export class UIUpdate {
readonly #logger = LogController.createLogger('UI/Update')

readonly #appInfo: AppInfo
readonly #ioController: UIHandler

readonly #updateEvents = new EventEmitter<UpdateEvents>()

/**
* Latest update information
*/
#latestUpdateData: AppUpdateInfo | null = null

constructor(appInfo: AppInfo, ioController: UIHandler) {
constructor(appInfo: AppInfo) {
this.#logger.silly('loading update')
this.#appInfo = appInfo
this.#ioController = ioController
}

startCycle() {
Expand All @@ -51,17 +55,6 @@ export class UIUpdate {
)
}

/**
* Setup a new socket client's events
*/
clientConnect(client: ClientSocket): void {
client.on('app-update-info', () => {
if (this.#latestUpdateData) {
client.emit('app-update-info', this.#latestUpdateData)
}
})
}

/**
* Perform the update request
*/
Expand All @@ -78,10 +71,32 @@ export class UIUpdate {
this.#logger.debug(`fresh update data received ${JSON.stringify(body)}`)
this.#latestUpdateData = body as AppUpdateInfo

this.#ioController.emitToAll('app-update-info', this.#latestUpdateData)
this.#updateEvents.emit('info', this.#latestUpdateData)
})
.catch((e) => {
this.#logger.verbose('update server said something unexpected!', e)
})
}

createTrpcRouter() {
const self = this
return router({
version: publicProcedure.query(() => {
return {
appVersion: this.#appInfo.appVersion,
appBuild: this.#appInfo.appBuild,
}
}),

updateInfo: publicProcedure.subscription(async function* (opts) {
const changes = toIterable(self.#updateEvents, 'info', opts.signal)

if (self.#latestUpdateData) yield self.#latestUpdateData

for await (const [data] of changes) {
yield data
}
}),
})
}
}
1 change: 1 addition & 0 deletions companion/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"@loupedeck/node": "^1.2.0",
"@napi-rs/canvas": "^0.1.66",
"@sentry/node": "^8.54.0",
"@trpc/server": "^11.0.0-rc.730",
"archiver": "^7.0.1",
"better-sqlite3": "^11.8.1",
"bufferutil": "^4.0.9",
Expand Down
Loading
Loading