From d743bc739c6411982ad994bdacc1fc956711b128 Mon Sep 17 00:00:00 2001 From: Josiah Bryan Date: Mon, 3 Mar 2025 11:42:51 -0600 Subject: [PATCH 1/3] feat: Added plugins/assemblyai --- plugins/assemblyai/CHANGELOG.md | 7 + plugins/assemblyai/README.md | 16 +++ plugins/assemblyai/api-extractor.json | 20 +++ plugins/assemblyai/package.json | 48 +++++++ plugins/assemblyai/src/index.ts | 5 + plugins/assemblyai/src/stt.test.ts | 13 ++ plugins/assemblyai/src/stt.ts | 195 ++++++++++++++++++++++++++ plugins/assemblyai/tsconfig.json | 16 +++ plugins/assemblyai/tsup.config.ts | 7 + 9 files changed, 327 insertions(+) create mode 100644 plugins/assemblyai/CHANGELOG.md create mode 100644 plugins/assemblyai/README.md create mode 100644 plugins/assemblyai/api-extractor.json create mode 100644 plugins/assemblyai/package.json create mode 100644 plugins/assemblyai/src/index.ts create mode 100644 plugins/assemblyai/src/stt.test.ts create mode 100644 plugins/assemblyai/src/stt.ts create mode 100644 plugins/assemblyai/tsconfig.json create mode 100644 plugins/assemblyai/tsup.config.ts diff --git a/plugins/assemblyai/CHANGELOG.md b/plugins/assemblyai/CHANGELOG.md new file mode 100644 index 00000000..4f250fa8 --- /dev/null +++ b/plugins/assemblyai/CHANGELOG.md @@ -0,0 +1,7 @@ +# @livekit/agents-plugin-assemblyai + +## 0.7.0 + +### Minor Changes + +- add AssemblyAI text-to-speech plugin - ([@josiahbryan](https://github.com/josiahbryan)) \ No newline at end of file diff --git a/plugins/assemblyai/README.md b/plugins/assemblyai/README.md new file mode 100644 index 00000000..43696243 --- /dev/null +++ b/plugins/assemblyai/README.md @@ -0,0 +1,16 @@ + +# AssemblyAI plugin for LiveKit Agents + +The Agents Framework is designed for building realtime, programmable +participants that run on servers. Use it to create conversational, multi-modal +voice agents that can see, hear, and understand. + +This package contains the AssemblyAI plugin, which allows for speech recognition. +Refer to the [documentation](https://docs.livekit.io/agents/overview/) for +information on how to use it. +See the [repository](https://github.com/livekit/agents-js) for more information +about the framework as a whole. diff --git a/plugins/assemblyai/api-extractor.json b/plugins/assemblyai/api-extractor.json new file mode 100644 index 00000000..1f75e070 --- /dev/null +++ b/plugins/assemblyai/api-extractor.json @@ -0,0 +1,20 @@ +/** + * Config file for API Extractor. For more info, please visit: https://api-extractor.com + */ +{ + "$schema": "https://developer.microsoft.com/json-schemas/api-extractor/v7/api-extractor.schema.json", + + /** + * Optionally specifies another JSON config file that this file extends from. This provides a way for + * standard settings to be shared across multiple projects. + * + * If the path starts with "./" or "../", the path is resolved relative to the folder of the file that contains + * the "extends" field. Otherwise, the first path segment is interpreted as an NPM package name, and will be + * resolved using NodeJS require(). + * + * SUPPORTED TOKENS: none + * DEFAULT VALUE: "" + */ + "extends": "../../api-extractor-shared.json", + "mainEntryPointFilePath": "./dist/index.d.ts" +} diff --git a/plugins/assemblyai/package.json b/plugins/assemblyai/package.json new file mode 100644 index 00000000..9e19bf29 --- /dev/null +++ b/plugins/assemblyai/package.json @@ -0,0 +1,48 @@ +{ + "name": "@livekit/agents-plugin-assemblyai", + "version": "0.5.4", + "description": "AssemblyAI plugin for LiveKit Agents for Node.js", + "main": "dist/index.js", + "require": "dist/index.cjs", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "import": "./dist/index.js", + "require": "./dist/index.cjs" + } + }, + "author": "LiveKit", + "type": "module", + "repository": "git@github.com:livekit/agents-js.git", + "license": "Apache-2.0", + "files": [ + "dist", + "src", + "README.md" + ], + "scripts": { + "build": "tsup --onSuccess \"tsc --declaration --emitDeclarationOnly\"", + "clean": "rm -rf dist", + "clean:build": "pnpm clean && pnpm build", + "lint": "eslint -f unix \"src/**/*.{ts,js}\"", + "api:check": "api-extractor run --typescript-compiler-folder ../../node_modules/typescript", + "api:update": "api-extractor run --local --typescript-compiler-folder ../../node_modules/typescript --verbose" + }, + "devDependencies": { + "@livekit/agents": "workspace:^x", + "@livekit/agents-plugin-silero": "workspace:^x", + "@livekit/agents-plugins-test": "workspace:^x", + "@livekit/rtc-node": "^0.13.4", + "@microsoft/api-extractor": "^7.35.0", + "tsup": "^8.3.5", + "typescript": "^5.0.0" + }, + "dependencies": { + "assemblyai": "^4.9.0" + }, + "peerDependencies": { + "@livekit/agents": "workspace:^x", + "@livekit/rtc-node": "^0.13.4" + } +} diff --git a/plugins/assemblyai/src/index.ts b/plugins/assemblyai/src/index.ts new file mode 100644 index 00000000..a649b5ef --- /dev/null +++ b/plugins/assemblyai/src/index.ts @@ -0,0 +1,5 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +export * from './stt.js'; diff --git a/plugins/assemblyai/src/stt.test.ts b/plugins/assemblyai/src/stt.test.ts new file mode 100644 index 00000000..897c877b --- /dev/null +++ b/plugins/assemblyai/src/stt.test.ts @@ -0,0 +1,13 @@ +// SPDX-FileCopyrightText: 2024 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { initializeLogger } from '@livekit/agents'; +import { VAD } from '@livekit/agents-plugin-silero'; +import { stt } from '@livekit/agents-plugins-test'; +import { describe } from 'vitest'; +import { STT } from './stt.js'; + +describe('AssemblyAI', async () => { + initializeLogger({ pretty: false }); + await stt(new STT(), await VAD.load(), { nonStreaming: false }); +}); diff --git a/plugins/assemblyai/src/stt.ts b/plugins/assemblyai/src/stt.ts new file mode 100644 index 00000000..a9702ecc --- /dev/null +++ b/plugins/assemblyai/src/stt.ts @@ -0,0 +1,195 @@ +// SPDX-FileCopyrightText: 2024 Josiah Bryan, LLC +// +// SPDX-License-Identifier: Apache-2.0 +import { type AudioBuffer, AudioByteStream, AudioEnergyFilter, log, stt } from '@livekit/agents'; +import type { AudioFrame } from '@livekit/rtc-node'; +import { AssemblyAI } from 'assemblyai'; +import type { RealtimeTranscriber } from 'assemblyai'; + +export interface STTOptions { + apiKey?: string; + interimResults: boolean; + sampleRate: number; + keywords: [string, number][]; + endUtteranceSilenceThreshold?: number; +} + +const defaultSTTOptions: STTOptions = { + apiKey: process.env.ASSEMBLY_AI_KEY, + interimResults: true, + sampleRate: 16000, + keywords: [], + // NOTE: + // The default is 700ms from AssemblyAI. + // I use a low default of 300ms here because I also use + // the new end-of-utterance model from LiveKit to handle + // turn detection in my agent. Which means that even though + // this will quickly return a final transcript EVEN THOUGH + // USER IS NOT DONE SPEAKING, the EOU model from LiveKit + // DOES properly differentiate and doesn't interrupt (magically!) + // Ref: https://blog.livekit.io/using-a-transformer-to-improve-end-of-turn-detection/ + endUtteranceSilenceThreshold: 200, +}; + +export class STT extends stt.STT { + #opts: STTOptions; + #logger = log(); + label = 'assemblyai.STT'; + + constructor(opts: Partial = defaultSTTOptions) { + super({ + streaming: true, + interimResults: opts.interimResults ?? defaultSTTOptions.interimResults, + }); + if (opts.apiKey === undefined && defaultSTTOptions.apiKey === undefined) { + throw new Error( + 'AssemblyAI API key is required, whether as an argument or as $ASSEMBLY_AI_KEY', + ); + } + + this.#opts = { ...defaultSTTOptions, ...opts }; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async _recognize(_: AudioBuffer): Promise { + throw new Error('Recognize is not supported on AssemblyAI STT'); + } + + stream(): stt.SpeechStream { + return new SpeechStream(this, this.#opts); + } +} + +export class SpeechStream extends stt.SpeechStream { + #opts: STTOptions; + #audioEnergyFilter: AudioEnergyFilter; + #logger = log(); + #speaking = false; + #client: AssemblyAI; + #transcriber?: RealtimeTranscriber; + label = 'assemblyai.SpeechStream'; + + constructor(stt: STT, opts: STTOptions) { + super(stt); + this.#opts = opts; + this.closed = false; + this.#audioEnergyFilter = new AudioEnergyFilter(); + this.#client = new AssemblyAI({ + apiKey: this.#opts.apiKey || '', + }); + + this.#run(); + } + + async #run() { + try { + // Create the realtime transcriber with parameters that AssemblyAI supports + this.#transcriber = this.#client.realtime.transcriber({ + sampleRate: this.#opts.sampleRate, + wordBoost: this.#opts.keywords.map((k) => k[0]), + endUtteranceSilenceThreshold: this.#opts.endUtteranceSilenceThreshold, + }); + + // Set up event handlers + this.#transcriber.on('open', (data) => { + this.#logger.debug( + `AssemblyAI session opened: ${data.sessionId}, expires at: ${data.expiresAt}`, + ); + }); + + this.#transcriber.on('close', (code, reason) => { + this.#logger.debug(`AssemblyAI session closed: ${code}, reason: ${reason}`); + if (!this.closed) { + // Try to reconnect if not intentionally closed + this.#run(); + } + }); + + this.#transcriber.on('error', (error) => { + this.#logger.error(`AssemblyAI error: ${error.message}`); + }); + + this.#transcriber.on('transcript', (transcript) => { + if (this.closed) return; + + if (!transcript.text || transcript.text.trim() === '') { + return; + } + + // If we haven't started speaking yet, emit a start of speech event + if (!this.#speaking) { + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + } + + // Handle partial and final transcripts + if (transcript.message_type === 'PartialTranscript') { + this.queue.put({ + type: stt.SpeechEventType.INTERIM_TRANSCRIPT, + alternatives: [assemblyTranscriptToSpeechData(transcript)], + }); + } else if (transcript.message_type === 'FinalTranscript') { + this.queue.put({ + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [assemblyTranscriptToSpeechData(transcript)], + }); + } + }); + + // Connect to the AssemblyAI service + await this.#transcriber.connect(); + + // Process audio data from the input stream + const sendTask = async () => { + const samples100Ms = Math.floor(this.#opts.sampleRate / 10); + const stream = new AudioByteStream(this.#opts.sampleRate, 1, samples100Ms); + + for await (const data of this.input) { + if (this.closed) break; + + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + } else if (data.sampleRate === this.#opts.sampleRate) { + frames = stream.write(data.data.buffer); + } else { + throw new Error(`Sample rate or channel count of frame does not match`); + } + + for await (const frame of frames) { + if (this.#audioEnergyFilter.pushFrame(frame)) { + // Send audio data to AssemblyAI + this.#transcriber?.sendAudio(new Uint8Array(frame.data.buffer)); + } + } + } + + // Close the connection when done + if (this.#transcriber) { + await this.#transcriber.close(); + } + }; + + // Start processing audio + await sendTask(); + } catch (error) { + this.#logger.error(`Error in AssemblyAI STT: ${error}`); + + // Try to reconnect after a delay if not intentionally closed + if (!this.closed) { + setTimeout(() => this.#run(), 5000); + } + } + } +} + +// Helper function to convert AssemblyAI transcript to SpeechData +const assemblyTranscriptToSpeechData = (transcript: any): stt.SpeechData => { + return { + language: 'en-US', + startTime: transcript.audio_start || 0, + endTime: transcript.audio_end || 0, + confidence: transcript.confidence || 1.0, + text: transcript.text || '', + }; +}; diff --git a/plugins/assemblyai/tsconfig.json b/plugins/assemblyai/tsconfig.json new file mode 100644 index 00000000..0ec37836 --- /dev/null +++ b/plugins/assemblyai/tsconfig.json @@ -0,0 +1,16 @@ +{ + "extends": "../../tsconfig.json", + "include": ["./src"], + "compilerOptions": { + // match output dir to input dir. e.g. dist/index instead of dist/src/index + "rootDir": "./src", + "declarationDir": "./dist", + "outDir": "./dist" + }, + "typedocOptions": { + "name": "plugins/agents-plugin-assemblyai", + "entryPointStrategy": "resolve", + "readme": "none", + "entryPoints": ["src/index.ts"] + } +} diff --git a/plugins/assemblyai/tsup.config.ts b/plugins/assemblyai/tsup.config.ts new file mode 100644 index 00000000..8ca20961 --- /dev/null +++ b/plugins/assemblyai/tsup.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from 'tsup'; + +import defaults from '../../tsup.config'; + +export default defineConfig({ + ...defaults, +}); From d08895d2033d639c7f23705dbc50bdf55304475d Mon Sep 17 00:00:00 2001 From: Josiah Bryan Date: Wed, 5 Mar 2025 15:42:29 -0600 Subject: [PATCH 2/3] chore: Reformat and fixes per PR review --- plugins/assemblyai/CHANGELOG.md | 7 - plugins/assemblyai/package.json | 2 +- plugins/assemblyai/src/stt.ts | 346 ++++++++++++++++---------------- pnpm-lock.yaml | 63 +++++- turbo.json | 1 + 5 files changed, 234 insertions(+), 185 deletions(-) delete mode 100644 plugins/assemblyai/CHANGELOG.md diff --git a/plugins/assemblyai/CHANGELOG.md b/plugins/assemblyai/CHANGELOG.md deleted file mode 100644 index 4f250fa8..00000000 --- a/plugins/assemblyai/CHANGELOG.md +++ /dev/null @@ -1,7 +0,0 @@ -# @livekit/agents-plugin-assemblyai - -## 0.7.0 - -### Minor Changes - -- add AssemblyAI text-to-speech plugin - ([@josiahbryan](https://github.com/josiahbryan)) \ No newline at end of file diff --git a/plugins/assemblyai/package.json b/plugins/assemblyai/package.json index 9e19bf29..804cc4ae 100644 --- a/plugins/assemblyai/package.json +++ b/plugins/assemblyai/package.json @@ -1,6 +1,6 @@ { "name": "@livekit/agents-plugin-assemblyai", - "version": "0.5.4", + "version": "0.0.0", "description": "AssemblyAI plugin for LiveKit Agents for Node.js", "main": "dist/index.js", "require": "dist/index.cjs", diff --git a/plugins/assemblyai/src/stt.ts b/plugins/assemblyai/src/stt.ts index a9702ecc..c5608cda 100644 --- a/plugins/assemblyai/src/stt.ts +++ b/plugins/assemblyai/src/stt.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-explicit-any */ // SPDX-FileCopyrightText: 2024 Josiah Bryan, LLC // // SPDX-License-Identifier: Apache-2.0 @@ -7,189 +8,190 @@ import { AssemblyAI } from 'assemblyai'; import type { RealtimeTranscriber } from 'assemblyai'; export interface STTOptions { - apiKey?: string; - interimResults: boolean; - sampleRate: number; - keywords: [string, number][]; - endUtteranceSilenceThreshold?: number; + apiKey?: string; + interimResults: boolean; + sampleRate: number; + keywords: [string, number][]; + endUtteranceSilenceThreshold?: number; } const defaultSTTOptions: STTOptions = { - apiKey: process.env.ASSEMBLY_AI_KEY, - interimResults: true, - sampleRate: 16000, - keywords: [], - // NOTE: - // The default is 700ms from AssemblyAI. - // I use a low default of 300ms here because I also use - // the new end-of-utterance model from LiveKit to handle - // turn detection in my agent. Which means that even though - // this will quickly return a final transcript EVEN THOUGH - // USER IS NOT DONE SPEAKING, the EOU model from LiveKit - // DOES properly differentiate and doesn't interrupt (magically!) - // Ref: https://blog.livekit.io/using-a-transformer-to-improve-end-of-turn-detection/ - endUtteranceSilenceThreshold: 200, + apiKey: process.env.ASSEMBLY_AI_KEY, + interimResults: true, + sampleRate: 16000, + keywords: [], + // NOTE: + // The default is 700ms from AssemblyAI. + // I use a low default of 300ms here because I also use + // the new end-of-utterance model from LiveKit to handle + // turn detection in my agent. Which means that even though + // this will quickly return a final transcript EVEN THOUGH + // USER IS NOT DONE SPEAKING, the EOU model from LiveKit + // DOES properly differentiate and doesn't interrupt (magically!) + // Ref: https://blog.livekit.io/using-a-transformer-to-improve-end-of-turn-detection/ + endUtteranceSilenceThreshold: 200, }; export class STT extends stt.STT { - #opts: STTOptions; - #logger = log(); - label = 'assemblyai.STT'; - - constructor(opts: Partial = defaultSTTOptions) { - super({ - streaming: true, - interimResults: opts.interimResults ?? defaultSTTOptions.interimResults, - }); - if (opts.apiKey === undefined && defaultSTTOptions.apiKey === undefined) { - throw new Error( - 'AssemblyAI API key is required, whether as an argument or as $ASSEMBLY_AI_KEY', - ); - } - - this.#opts = { ...defaultSTTOptions, ...opts }; - } - - // eslint-disable-next-line @typescript-eslint/no-unused-vars - async _recognize(_: AudioBuffer): Promise { - throw new Error('Recognize is not supported on AssemblyAI STT'); - } - - stream(): stt.SpeechStream { - return new SpeechStream(this, this.#opts); - } + #opts: STTOptions; + #logger = log(); + label = 'assemblyai.STT'; + + constructor(opts: Partial = defaultSTTOptions) { + super({ + streaming: true, + interimResults: opts.interimResults ?? defaultSTTOptions.interimResults, + }); + if (opts.apiKey === undefined && defaultSTTOptions.apiKey === undefined) { + throw new Error( + 'AssemblyAI API key is required, whether as an argument or as $ASSEMBLY_AI_KEY', + ); + } + + this.#opts = { ...defaultSTTOptions, ...opts }; + } + + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async _recognize(_: AudioBuffer): Promise { + throw new Error('Recognize is not supported on AssemblyAI STT'); + } + + stream(): stt.SpeechStream { + return new SpeechStream(this, this.#opts); + } } export class SpeechStream extends stt.SpeechStream { - #opts: STTOptions; - #audioEnergyFilter: AudioEnergyFilter; - #logger = log(); - #speaking = false; - #client: AssemblyAI; - #transcriber?: RealtimeTranscriber; - label = 'assemblyai.SpeechStream'; - - constructor(stt: STT, opts: STTOptions) { - super(stt); - this.#opts = opts; - this.closed = false; - this.#audioEnergyFilter = new AudioEnergyFilter(); - this.#client = new AssemblyAI({ - apiKey: this.#opts.apiKey || '', - }); - - this.#run(); - } - - async #run() { - try { - // Create the realtime transcriber with parameters that AssemblyAI supports - this.#transcriber = this.#client.realtime.transcriber({ - sampleRate: this.#opts.sampleRate, - wordBoost: this.#opts.keywords.map((k) => k[0]), - endUtteranceSilenceThreshold: this.#opts.endUtteranceSilenceThreshold, - }); - - // Set up event handlers - this.#transcriber.on('open', (data) => { - this.#logger.debug( - `AssemblyAI session opened: ${data.sessionId}, expires at: ${data.expiresAt}`, - ); - }); - - this.#transcriber.on('close', (code, reason) => { - this.#logger.debug(`AssemblyAI session closed: ${code}, reason: ${reason}`); - if (!this.closed) { - // Try to reconnect if not intentionally closed - this.#run(); - } - }); - - this.#transcriber.on('error', (error) => { - this.#logger.error(`AssemblyAI error: ${error.message}`); - }); - - this.#transcriber.on('transcript', (transcript) => { - if (this.closed) return; - - if (!transcript.text || transcript.text.trim() === '') { - return; - } - - // If we haven't started speaking yet, emit a start of speech event - if (!this.#speaking) { - this.#speaking = true; - this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); - } - - // Handle partial and final transcripts - if (transcript.message_type === 'PartialTranscript') { - this.queue.put({ - type: stt.SpeechEventType.INTERIM_TRANSCRIPT, - alternatives: [assemblyTranscriptToSpeechData(transcript)], - }); - } else if (transcript.message_type === 'FinalTranscript') { - this.queue.put({ - type: stt.SpeechEventType.FINAL_TRANSCRIPT, - alternatives: [assemblyTranscriptToSpeechData(transcript)], - }); - } - }); - - // Connect to the AssemblyAI service - await this.#transcriber.connect(); - - // Process audio data from the input stream - const sendTask = async () => { - const samples100Ms = Math.floor(this.#opts.sampleRate / 10); - const stream = new AudioByteStream(this.#opts.sampleRate, 1, samples100Ms); - - for await (const data of this.input) { - if (this.closed) break; - - let frames: AudioFrame[]; - if (data === SpeechStream.FLUSH_SENTINEL) { - frames = stream.flush(); - } else if (data.sampleRate === this.#opts.sampleRate) { - frames = stream.write(data.data.buffer); - } else { - throw new Error(`Sample rate or channel count of frame does not match`); - } - - for await (const frame of frames) { - if (this.#audioEnergyFilter.pushFrame(frame)) { - // Send audio data to AssemblyAI - this.#transcriber?.sendAudio(new Uint8Array(frame.data.buffer)); - } - } - } - - // Close the connection when done - if (this.#transcriber) { - await this.#transcriber.close(); - } - }; - - // Start processing audio - await sendTask(); - } catch (error) { - this.#logger.error(`Error in AssemblyAI STT: ${error}`); - - // Try to reconnect after a delay if not intentionally closed - if (!this.closed) { - setTimeout(() => this.#run(), 5000); - } - } - } + #opts: STTOptions; + #audioEnergyFilter: AudioEnergyFilter; + #logger = log(); + #speaking = false; + #client: AssemblyAI; + #transcriber?: RealtimeTranscriber; + label = 'assemblyai.SpeechStream'; + + constructor(stt: STT, opts: STTOptions) { + super(stt); + this.#opts = opts; + this.closed = false; + this.#audioEnergyFilter = new AudioEnergyFilter(); + this.#client = new AssemblyAI({ + // Defaults to the apiKey in defaultSTTOptions, which pulls in process.env.ASSEMBLY_AI_KEY, + apiKey: this.#opts.apiKey || '', + }); + + this.#run(); + } + + async #run() { + try { + // Create the realtime transcriber with parameters that AssemblyAI supports + this.#transcriber = this.#client.realtime.transcriber({ + sampleRate: this.#opts.sampleRate, + wordBoost: this.#opts.keywords.map((k) => k[0]), + endUtteranceSilenceThreshold: this.#opts.endUtteranceSilenceThreshold, + }); + + // Set up event handlers + this.#transcriber.on('open', (data) => { + this.#logger + .child({ sessionId: data.sessionId, expiresAt: data.expiresAt }) + .debug(`AssemblyAI session opened`); + }); + + this.#transcriber.on('close', (code, reason) => { + this.#logger.child({ code, reason }).debug(`AssemblyAI session closed`); + if (!this.closed) { + // Try to reconnect if not intentionally closed + this.#run(); + } + }); + + this.#transcriber.on('error', (error) => { + this.#logger.child({ error: error.message }).error(`AssemblyAI error`); + }); + + this.#transcriber.on('transcript', (transcript) => { + if (this.closed) return; + + if (!transcript.text || transcript.text.trim() === '') { + return; + } + + // If we haven't started speaking yet, emit a start of speech event + if (!this.#speaking) { + this.#speaking = true; + this.queue.put({ type: stt.SpeechEventType.START_OF_SPEECH }); + } + + // Handle partial and final transcripts + if (transcript.message_type === 'PartialTranscript') { + this.queue.put({ + type: stt.SpeechEventType.INTERIM_TRANSCRIPT, + alternatives: [assemblyTranscriptToSpeechData(transcript)], + }); + } else if (transcript.message_type === 'FinalTranscript') { + this.queue.put({ + type: stt.SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [assemblyTranscriptToSpeechData(transcript)], + }); + } + }); + + // Connect to the AssemblyAI service + await this.#transcriber.connect(); + + // Process audio data from the input stream + const sendTask = async () => { + const samples100Ms = Math.floor(this.#opts.sampleRate / 10); + const stream = new AudioByteStream(this.#opts.sampleRate, 1, samples100Ms); + + for await (const data of this.input) { + if (this.closed) break; + + let frames: AudioFrame[]; + if (data === SpeechStream.FLUSH_SENTINEL) { + frames = stream.flush(); + } else if (data.sampleRate === this.#opts.sampleRate) { + frames = stream.write(data.data.buffer); + } else { + throw new Error(`Sample rate or channel count of frame does not match`); + } + + for await (const frame of frames) { + if (this.#audioEnergyFilter.pushFrame(frame)) { + // Send audio data to AssemblyAI + this.#transcriber?.sendAudio(new Uint8Array(frame.data.buffer)); + } + } + } + + // Close the connection when done + if (this.#transcriber) { + await this.#transcriber.close(); + } + }; + + // Start processing audio + await sendTask(); + } catch (error: any) { + this.#logger.child({ error: error.message }).error(`Error in AssemblyAI STT`); + + // Try to reconnect after a delay if not intentionally closed + if (!this.closed) { + setTimeout(() => this.#run(), 5000); + } + } + } } // Helper function to convert AssemblyAI transcript to SpeechData const assemblyTranscriptToSpeechData = (transcript: any): stt.SpeechData => { - return { - language: 'en-US', - startTime: transcript.audio_start || 0, - endTime: transcript.audio_end || 0, - confidence: transcript.confidence || 1.0, - text: transcript.text || '', - }; + return { + language: 'en-US', + startTime: transcript.audio_start || 0, + endTime: transcript.audio_end || 0, + confidence: transcript.confidence || 1.0, + text: transcript.text || '', + }; }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b151df08..71b7b5d9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -37,7 +37,7 @@ importers: version: 8.10.0(eslint@8.57.0) eslint-config-standard: specifier: ^17.1.0 - version: 17.1.0(eslint-plugin-import@2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0))(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0) + version: 17.1.0(eslint-plugin-import@2.29.1)(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0) eslint-config-turbo: specifier: ^1.12.2 version: 1.13.3(eslint@8.57.0) @@ -164,6 +164,34 @@ importers: specifier: ^5.0.0 version: 5.4.5 + plugins/assemblyai: + dependencies: + assemblyai: + specifier: ^4.9.0 + version: 4.9.0 + devDependencies: + '@livekit/agents': + specifier: workspace:^x + version: link:../../agents + '@livekit/agents-plugin-silero': + specifier: workspace:^x + version: link:../silero + '@livekit/agents-plugins-test': + specifier: workspace:^x + version: link:../test + '@livekit/rtc-node': + specifier: ^0.13.4 + version: 0.13.4 + '@microsoft/api-extractor': + specifier: ^7.35.0 + version: 7.43.7(@types/node@22.5.5) + tsup: + specifier: ^8.3.5 + version: 8.3.5(@microsoft/api-extractor@7.43.7(@types/node@22.5.5))(postcss@8.4.38)(tsx@4.19.2)(typescript@5.4.5) + typescript: + specifier: ^5.0.0 + version: 5.4.5 + plugins/cartesia: dependencies: ws: @@ -1678,6 +1706,10 @@ packages: resolution: {integrity: sha512-3CYzex9M9FGQjCGMGyi6/31c8GJbgb0qGyrx5HWxPd0aCwh4cB2YjMb2Xf9UuoogrMrlO9cTqnB5rI5GHZTcUA==} engines: {node: '>=0.10.0'} + assemblyai@4.9.0: + resolution: {integrity: sha512-YUvkVIdMKvMLNQ07zWNma9YWvdSoGZy9RuJ/fB5uceAgDnTL2uo8sAlytkt52hD8DJ61xmZkfO9jkjXl3sZTiw==} + engines: {node: '>=18'} + assertion-error@1.1.0: resolution: {integrity: sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==} @@ -4039,6 +4071,18 @@ packages: utf-8-validate: optional: true + ws@8.18.1: + resolution: {integrity: sha512-RKW2aJZMXeMxVpnZ6bck+RswznaxmzdULiBr6KY7XkTnW8uvt0iT9H5DkHUChXrc+uurzwa0rVI16n/Xzjdz1w==} + engines: {node: '>=10.0.0'} + peerDependencies: + bufferutil: ^4.0.1 + utf-8-validate: '>=5.0.2' + peerDependenciesMeta: + bufferutil: + optional: true + utf-8-validate: + optional: true + y18n@4.0.3: resolution: {integrity: sha512-JKhqTOwSrqNA1NY5lSztJ1GrBiUodLMmIZuLiDaMRJ+itFd+ABVE8XBjOvIWL+rSqNDC74LCSFmlb/U4UZ4hJQ==} @@ -5313,6 +5357,13 @@ snapshots: arrify@1.0.1: {} + assemblyai@4.9.0: + dependencies: + ws: 8.18.1 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + assertion-error@1.1.0: {} ast-types-flow@0.0.8: {} @@ -5836,7 +5887,7 @@ snapshots: dependencies: eslint: 8.57.0 - eslint-config-standard@17.1.0(eslint-plugin-import@2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint@8.57.0))(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0): + eslint-config-standard@17.1.0(eslint-plugin-import@2.29.1)(eslint-plugin-n@16.6.2(eslint@8.57.0))(eslint-plugin-promise@6.1.1(eslint@8.57.0))(eslint@8.57.0): dependencies: eslint: 8.57.0 eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) @@ -5861,7 +5912,7 @@ snapshots: debug: 4.3.4 enhanced-resolve: 5.16.1 eslint: 8.57.0 - eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0) + eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) fast-glob: 3.3.2 get-tsconfig: 4.7.5 @@ -5873,7 +5924,7 @@ snapshots: - eslint-import-resolver-webpack - supports-color - eslint-module-utils@2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0): + eslint-module-utils@2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0): dependencies: debug: 3.2.7 optionalDependencies: @@ -5901,7 +5952,7 @@ snapshots: doctrine: 2.1.0 eslint: 8.57.0 eslint-import-resolver-node: 0.3.9 - eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1)(eslint@8.57.0))(eslint@8.57.0) + eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.57.0)(typescript@5.4.5))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1)(eslint@8.57.0) hasown: 2.0.2 is-core-module: 2.13.1 is-glob: 4.0.3 @@ -7943,6 +7994,8 @@ snapshots: ws@8.17.0: {} + ws@8.18.1: {} + y18n@4.0.3: {} y18n@5.0.8: {} diff --git a/turbo.json b/turbo.json index ffe72487..e9380574 100644 --- a/turbo.json +++ b/turbo.json @@ -1,6 +1,7 @@ { "$schema": "https://turborepo.org/schema.json", "globalEnv": [ + "ASSEMBLY_AI_KEY", "AZURE_API_KEY", "AZURE_OPENAI_API_KEY", "AZURE_OPENAI_DEPLOYMENT", From 82e9eee137afbe99a9e768e007725406ddb57f21 Mon Sep 17 00:00:00 2001 From: Josiah Bryan Date: Tue, 11 Mar 2025 08:24:34 -0500 Subject: [PATCH 3/3] fix: PR feedback --- plugins/assemblyai/src/stt.ts | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/plugins/assemblyai/src/stt.ts b/plugins/assemblyai/src/stt.ts index c5608cda..94f40b29 100644 --- a/plugins/assemblyai/src/stt.ts +++ b/plugins/assemblyai/src/stt.ts @@ -20,16 +20,8 @@ const defaultSTTOptions: STTOptions = { interimResults: true, sampleRate: 16000, keywords: [], - // NOTE: - // The default is 700ms from AssemblyAI. - // I use a low default of 300ms here because I also use - // the new end-of-utterance model from LiveKit to handle - // turn detection in my agent. Which means that even though - // this will quickly return a final transcript EVEN THOUGH - // USER IS NOT DONE SPEAKING, the EOU model from LiveKit - // DOES properly differentiate and doesn't interrupt (magically!) - // Ref: https://blog.livekit.io/using-a-transformer-to-improve-end-of-turn-detection/ - endUtteranceSilenceThreshold: 200, + // AssemblyAI default is 700ms + endUtteranceSilenceThreshold: 700, }; export class STT extends stt.STT { @@ -96,11 +88,11 @@ export class SpeechStream extends stt.SpeechStream { this.#transcriber.on('open', (data) => { this.#logger .child({ sessionId: data.sessionId, expiresAt: data.expiresAt }) - .debug(`AssemblyAI session opened`); + .debug('AssemblyAI session opened'); }); this.#transcriber.on('close', (code, reason) => { - this.#logger.child({ code, reason }).debug(`AssemblyAI session closed`); + this.#logger.child({ code, reason }).debug('AssemblyAI session closed'); if (!this.closed) { // Try to reconnect if not intentionally closed this.#run(); @@ -108,7 +100,7 @@ export class SpeechStream extends stt.SpeechStream { }); this.#transcriber.on('error', (error) => { - this.#logger.child({ error: error.message }).error(`AssemblyAI error`); + this.#logger.child({ error: error.message }).error('AssemblyAI error'); }); this.#transcriber.on('transcript', (transcript) => { @@ -155,7 +147,7 @@ export class SpeechStream extends stt.SpeechStream { } else if (data.sampleRate === this.#opts.sampleRate) { frames = stream.write(data.data.buffer); } else { - throw new Error(`Sample rate or channel count of frame does not match`); + throw new Error('Sample rate or channel count of frame does not match'); } for await (const frame of frames) { @@ -175,7 +167,7 @@ export class SpeechStream extends stt.SpeechStream { // Start processing audio await sendTask(); } catch (error: any) { - this.#logger.child({ error: error.message }).error(`Error in AssemblyAI STT`); + this.#logger.child({ error: error.message }).error('Error in AssemblyAI STT'); // Try to reconnect after a delay if not intentionally closed if (!this.closed) { @@ -191,7 +183,7 @@ const assemblyTranscriptToSpeechData = (transcript: any): stt.SpeechData => { language: 'en-US', startTime: transcript.audio_start || 0, endTime: transcript.audio_end || 0, - confidence: transcript.confidence || 1.0, + confidence: transcript.confidence ?? 1.0, text: transcript.text || '', }; };