-
Notifications
You must be signed in to change notification settings - Fork 443
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flexible IO proposal #1161
base: main
Are you sure you want to change the base?
Flexible IO proposal #1161
Conversation
|
proposal.md
Outdated
class PipelineIO(ABC): | ||
|
||
def before_stt_node(self, source: AsyncIterator[rtc.AudioFrame]) -> AsyncIterator[rtc.AudioFrame]: | ||
return source | ||
|
||
def after_stt_node(self, source: AsyncIterator[SpeechEvent]) -> AsyncIterator[SpeechEvent]: | ||
return source | ||
|
||
def before_llm_node(self, chat_ctx: ChatContext) -> AsyncIterator[ChatChunk] | None: | ||
return None | ||
|
||
def after_llm_node(self, source: AsyncIterator[ChatChunk]) -> AsyncIterator[ChatChunk]: | ||
return source | ||
|
||
def before_tts_node(self, source: AsyncIterator[str]) -> AsyncIterator[rtc.AudioFrame] | None: | ||
return source | ||
|
||
def after_tts_node(self, source: AsyncIterator[rtc.AudioFrame]) -> AsyncIterator[rtc.AudioFrame]: | ||
return source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not my wheelhouse, but is this in any way more convenient or idiomatic than if each pipeline stage managed its own pre/post transform callbacks? E.G.:
def passthrough_audio(source: AsyncIterator[rtc.AudioFrame]) -> AsyncIterator[rtc.AudioFrame]:
return source
def filter_swearwords(source: AsyncIterator[SpeechEvent]) -> AsyncIterator[SpeechEvent]:
return source
agent = PipelineAgent(
stt=STT(pre=passthrough_audio, post=filter_swearwords)
...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The scope is different, ideally, we can add new parameters like speech_id
inside each step like before_tts_node, ...
class TextOutput(Protocol): | ||
async def write(self, text: str) -> None: ... | ||
|
||
def flush(self) -> None: ... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how should frontend applications reason about "turns" with these two output types? is that what "flush" means? UI will likely want to render each complete "message" in a chat bubble, for instance. maybe having a unique id somewhere could help?
STT -> LLM -> TTS | ||
|
||
```python | ||
AudioInput = AsyncIterator[rtc.AudioFrame | rtc.AudioFrameEvent] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure the scope of what you're working on but where would text input/image/file "chat" input fit in?
def clear_queue(self) -> None: ... | ||
|
||
|
||
class TextOutput(Protocol): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In addition to text and audio output that are essentially "talking" or "chat", many applications "output" other structured things either by returning images or through function calls (i.e. JSON output). do we have any thoughts about whether it would make sense to provide an affordance for that in pipelineagent?
return source | ||
|
||
def before_tts_node(self, source: AsyncIterator[str] | str) -> AsyncIterator[rtc.AudioFrame] | None: | ||
return source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this method return AsyncIterator[str] | str
?
|
||
class PipelineIO(ABC): | ||
|
||
def before_stt_node(self, source: AsyncIterator[rtc.AudioFrame]) -> AsyncIterator[rtc.AudioFrame]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't these methods all be async?
return source | ||
|
||
def before_llm_node(self, chat_ctx: ChatContext) -> AsyncIterator[ChatChunk] | None: | ||
return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this one different than the others? it feels odd that it doesn't have the same return type as input type, and the default implementation returns None which implies its actually very semantically different than the other methods which are all open-ended hooks to transform data in the pipeline or add logging or other side effects.
def flush(self) -> None: ... | ||
|
||
|
||
class PipelineIO(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this name feels a little odd, given that in addition to PipelineIO
we also have PipelineOutput
(and maybe some forthcoming Input
protocol too), but the "IO" in PipelineIO
is not related to the pipeline's input nor output itself... might be better as PipelineHooks
No description provided.