Skip to content

Commit

Permalink
feat: command batching SOFIE-2549 (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
Julusian authored Dec 11, 2023
1 parent 5459211 commit 4986fad
Show file tree
Hide file tree
Showing 12 changed files with 423 additions and 211 deletions.
34 changes: 14 additions & 20 deletions src/__tests__/atem.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { Atem, DEFAULT_PORT } from '../atem'
import { Atem, DEFAULT_MAX_PACKET_SIZE, DEFAULT_PORT } from '../atem'
import { CutCommand } from '../commands'
import { promisify } from 'util'
import { EventEmitter } from 'events'
Expand Down Expand Up @@ -35,13 +35,14 @@ describe('Atem', () => {
disableMultithreaded: true,
log: (conn as any)._log,
port: DEFAULT_PORT,
maxPacketSize: DEFAULT_MAX_PACKET_SIZE,
})
} finally {
await conn.destroy()
}
})
test('constructor test 2', async () => {
const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23 })
const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23, maxPacketSize: 500 })

try {
const socket = (conn as any).socket as AtemSocket
Expand All @@ -55,6 +56,7 @@ describe('Atem', () => {
disableMultithreaded: false,
log: (conn as any)._log,
port: 23,
maxPacketSize: 500,
})
} finally {
await conn.destroy()
Expand Down Expand Up @@ -108,32 +110,28 @@ describe('Atem', () => {
expect(socket).toBeTruthy()

let nextId = 123
Object.defineProperty(socket, 'nextCommandTrackingId', {
Object.defineProperty(socket, 'nextPacketTrackingId', {
get: jest.fn(() => nextId++),
set: jest.fn(),
})
expect(socket.nextCommandTrackingId).toEqual(123)
expect(socket.nextPacketTrackingId).toEqual(123)

socket.sendCommands = jest.fn(() => Promise.resolve(35) as any)
socket.sendCommands = jest.fn(() => Promise.resolve([124]) as any)

const sentQueue = (conn as any)._sentQueue as Record<string, unknown>
expect(Object.keys(sentQueue)).toHaveLength(0)

const cmd = new CutCommand(0)
const res = conn.sendCommand(cmd)
res.catch(() => null) // Dismiss UnhandledPromiseRejection
await setImmediatePromise()
expect(Object.keys(sentQueue)).toHaveLength(1)

expect(socket.sendCommands).toHaveBeenCalledTimes(1)
expect(socket.sendCommands).toHaveBeenCalledWith([
{
rawCommand: cmd,
trackingId: 124,
},
])
expect(socket.sendCommands).toHaveBeenCalledWith([cmd])

// Trigger the ack, and it should switfy resolve
socket.emit('commandsAck', [124])
socket.emit('ackPackets', [124])
expect(Object.keys(sentQueue)).toHaveLength(0)

// Finally, it should now resolve without a timeout
Expand All @@ -152,11 +150,11 @@ describe('Atem', () => {
expect(socket).toBeTruthy()

let nextId = 123
Object.defineProperty(socket, 'nextCommandTrackingId', {
Object.defineProperty(socket, 'nextPacketTrackingId', {
get: jest.fn(() => nextId++),
set: jest.fn(),
})
expect(socket.nextCommandTrackingId).toEqual(123)
expect(socket.nextPacketTrackingId).toEqual(123)

socket.sendCommands = jest.fn(() => Promise.reject(35) as any)

Expand All @@ -165,15 +163,11 @@ describe('Atem', () => {

const cmd = new CutCommand(0)
const res = conn.sendCommand(cmd)
res.catch(() => null) // Dismiss UnhandledPromiseRejection

// Send command should be called
expect(socket.sendCommands).toHaveBeenCalledTimes(1)
expect(socket.sendCommands).toHaveBeenCalledWith([
{
rawCommand: cmd,
trackingId: 124,
},
])
expect(socket.sendCommands).toHaveBeenCalledWith([cmd])

expect(Object.keys(sentQueue)).toHaveLength(0)

Expand Down
10 changes: 5 additions & 5 deletions src/__tests__/connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ class AtemSocketChildMock implements AtemSocketChild {
public onDisconnect: () => Promise<void>
public onLog: (message: string) => Promise<void>
public onCommandsReceived: (payload: Buffer, packetId: number) => Promise<void>
public onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
public onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>

constructor(
onDisconnect: () => Promise<void>,
onLog: (message: string) => Promise<void>,
onCommandsReceived: (payload: Buffer, packetId: number) => Promise<void>,
onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
) {
this.onDisconnect = onDisconnect
this.onLog = onLog
this.onCommandsReceived = onCommandsReceived
this.onCommandsAcknowledged = onCommandsAcknowledged
this.onPacketsAcknowledged = onPacketsAcknowledged
}

public connect = jest.fn(async () => Promise.resolve())
Expand All @@ -43,9 +43,9 @@ class AtemSocketChildMock implements AtemSocketChild {
onDisconnect: () => Promise<void>,
onLog: (message: string) => Promise<void>,
onCommandsReceived: (payload: Buffer, packetId: number) => Promise<void>,
onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
) => {
return new AtemSocketChildMock(onDisconnect, onLog, onCommandsReceived, onCommandsAcknowledged)
return new AtemSocketChildMock(onDisconnect, onLog, onCommandsReceived, onPacketsAcknowledged)
}
)

Expand Down
60 changes: 32 additions & 28 deletions src/atem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export interface AtemOptions {
debugBuffers?: boolean
disableMultithreaded?: boolean
childProcessTimeout?: number
/**
* Maximum size of packets to transmit
*/
maxPacketSize?: number
}

export type AtemEvents = {
Expand All @@ -73,8 +77,7 @@ export type AtemEvents = {
updatedTime: [TimeInfo]
}

interface SentCommand {
command: ISerializableCommand
interface SentPackets {
resolve: () => void
reject: () => void
}
Expand All @@ -86,12 +89,13 @@ export enum AtemConnectionStatus {
}

export const DEFAULT_PORT = 9910
export const DEFAULT_MAX_PACKET_SIZE = 1416 // Matching ATEM software

export class BasicAtem extends EventEmitter<AtemEvents> {
private readonly socket: AtemSocket
protected readonly dataTransferManager: DT.DataTransferManager
private _state: AtemState | undefined
private _sentQueue: { [packetId: string]: SentCommand } = {}
private _sentQueue: { [packetId: string]: SentPackets } = {}
private _status: AtemConnectionStatus

constructor(options?: AtemOptions) {
Expand All @@ -100,19 +104,20 @@ export class BasicAtem extends EventEmitter<AtemEvents> {
this._state = AtemStateUtil.Create()
this._status = AtemConnectionStatus.CLOSED
this.socket = new AtemSocket({
debugBuffers: (options || {}).debugBuffers || false,
address: (options || {}).address || '',
port: (options || {}).port || DEFAULT_PORT,
disableMultithreaded: (options || {}).disableMultithreaded || false,
childProcessTimeout: (options || {}).childProcessTimeout || 600,
debugBuffers: options?.debugBuffers ?? false,
address: options?.address || '',
port: options?.port || DEFAULT_PORT,
disableMultithreaded: options?.disableMultithreaded ?? false,
childProcessTimeout: options?.childProcessTimeout || 600,
maxPacketSize: options?.maxPacketSize ?? DEFAULT_MAX_PACKET_SIZE,
})
this.dataTransferManager = new DT.DataTransferManager(this.sendCommands.bind(this))

this.socket.on('commandsReceived', (commands) => {
this.socket.on('receivedCommands', (commands) => {
this.emit('receivedCommands', commands)
this._mutateState(commands)
})
this.socket.on('commandsAck', (trackingIds) => this._resolveCommands(trackingIds))
this.socket.on('ackPackets', (trackingIds) => this._resolveCommands(trackingIds))
this.socket.on('info', (msg) => this.emit('info', msg))
this.socket.on('debug', (msg) => this.emit('debug', msg))
this.socket.on('error', (e) => this.emit('error', e))
Expand Down Expand Up @@ -151,28 +156,27 @@ export class BasicAtem extends EventEmitter<AtemEvents> {
return this.socket.destroy()
}

private sendCommands(commands: ISerializableCommand[]): Array<Promise<void>> {
const commands2 = commands.map((cmd) => ({
rawCommand: cmd,
trackingId: this.socket.nextCommandTrackingId,
}))
public async sendCommands(commands: ISerializableCommand[]): Promise<void> {
const trackingIds = await this.socket.sendCommands(commands)

const sendPromise = this.socket.sendCommands(commands2)
const promises: Promise<void>[] = []

return commands2.map(async (cmd) => {
await sendPromise
return new Promise<void>((resolve, reject) => {
this._sentQueue[cmd.trackingId] = {
command: cmd.rawCommand,
resolve,
reject,
}
})
})
for (const trackingId of trackingIds) {
promises.push(
new Promise<void>((resolve, reject) => {
this._sentQueue[trackingId] = {
resolve,
reject,
}
})
)
}

await Promise.allSettled(promises)
}

public async sendCommand(command: ISerializableCommand): Promise<void> {
return this.sendCommands([command])[0]
return await this.sendCommands([command])
}

private _mutateState(commands: IDeserializedCommand[]): void {
Expand Down Expand Up @@ -262,7 +266,7 @@ export class BasicAtem extends EventEmitter<AtemEvents> {
const sentQueue = this._sentQueue
this._sentQueue = {}

Object.values<SentCommand>(sentQueue).forEach((sent) => sent.reject())
Object.values<SentPackets>(sentQueue).forEach((sent) => sent.reject())
}
}

Expand Down
12 changes: 5 additions & 7 deletions src/dataTransfer/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ function mangleCommand(cmd: any, dir: string): any {
}

function runDataTransferTest(spec: any): DataTransferManager {
const manager = new DataTransferManager((cmds) =>
cmds.map(async (cmd) => {
const manager = new DataTransferManager(async (cmds) => {
for (const rawCmd of cmds) {
const expectedCmd = spec.shift()
const gotCmd = mangleCommand(cmd, 'send')
const gotCmd = mangleCommand(rawCmd, 'send')
expect(gotCmd).toEqual(expectedCmd)

while (spec.length > 0) {
Expand All @@ -49,10 +49,8 @@ function runDataTransferTest(spec: any): DataTransferManager {
if (!nextCmd2) throw new Error(`Failed specToCommandClass ${nextCmd.name}`)
manager.queueHandleCommand(nextCmd2)
}

return Promise.resolve()
})
)
}
})
manager.startCommandSending(true)
return manager
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"properties": {
"name": "Label",
"description": "",
"fileHash": "T�9z�;{\u0012r����-��",
"fileHash": "VKA5etY7exJy9ayH+i3d4Q==",
"transferId": 0
},
"direction": "send"
Expand Down
8 changes: 4 additions & 4 deletions src/dataTransfer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class DataTransferManager {
}

readonly #sendLockCommand = (/*lock: DataTransferLockingQueue,*/ cmd: ISerializableCommand): void => {
Promise.all(this.#rawSendCommands([cmd])).catch((e) => {
this.#rawSendCommands([cmd]).catch((e) => {
debug(`Failed to send lock command: ${e}`)
console.log('Failed to send lock command')
})
Expand All @@ -41,12 +41,12 @@ export class DataTransferManager {
readonly #labelsLock = new DataTransferSimpleQueue(this.#nextTransferId)
readonly #macroLock = new DataTransferSimpleQueue(this.#nextTransferId)

readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Array<Promise<void>>
readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Promise<void>

private interval?: NodeJS.Timer
private exitUnsubscribe?: () => void

constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Array<Promise<void>>) {
constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Promise<void>) {
this.#rawSendCommands = rawSendCommands
}

Expand Down Expand Up @@ -75,7 +75,7 @@ export class DataTransferManager {
const commandsToSend = lock.popQueuedCommands(MAX_PACKETS_TO_SEND_PER_TICK) // Take some, it is unlikely that multiple will run at once
if (commandsToSend && commandsToSend.length > 0) {
// debug(`Sending ${commandsToSend.length} commands `)
Promise.all(this.#rawSendCommands(commandsToSend)).catch((e) => {
this.#rawSendCommands(commandsToSend).catch((e) => {
// Failed to send/queue something, so abort it
lock.tryAbortTransfer(new Error(`Command send failed: ${e}`))
})
Expand Down
Loading

0 comments on commit 4986fad

Please sign in to comment.