Skip to content

Commit

Permalink
Papayne/program-routine-runner (#316)
Browse files Browse the repository at this point in the history
Adds a program routine runner to the skill library.
  • Loading branch information
payneio authored Feb 10, 2025
1 parent 2c2f710 commit d09f84e
Show file tree
Hide file tree
Showing 35 changed files with 1,229 additions and 587 deletions.
4 changes: 4 additions & 0 deletions assistants/skill-assistant/assistant/skill_assistant.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from skill_library.types import Metadata

from assistant.skill_event_mapper import SkillEventMapper
from assistant.workbench_helpers import WorkbenchMessageProvider

from .assistant_registry import AssistantRegistry
from .config import AssistantConfigModel
Expand Down Expand Up @@ -206,15 +207,18 @@ async def get_or_register_assistant(
assistant_metadata_drive_root = Path(".data") / assistant_id / ".assistant"
assistant_drive = Drive(DriveConfig(root=assistant_drive_root))
language_model = openai_client.create_client(config.service_config)
message_provider = WorkbenchMessageProvider(assistant_id, conversation_context)
chat_driver_config = ChatDriverConfig(
openai_client=language_model,
model=config.chat_driver_config.openai_model,
instructions=config.chat_driver_config.instructions,
message_provider=message_provider,
)

assistant = Assistant(
assistant_id=conversation_context.id,
name="Assistant",
message_history_provider=message_provider.get_history,
chat_driver_config=chat_driver_config,
drive_root=assistant_drive_root,
metadata_drive_root=assistant_metadata_drive_root,
Expand Down
47 changes: 47 additions & 0 deletions assistants/skill-assistant/assistant/workbench_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from openai.types.chat import ChatCompletionMessageParam, ChatCompletionUserMessageParam
from openai_client.chat_driver import MessageHistoryProviderProtocol
from semantic_workbench_api_model.workbench_model import (
ConversationMessageList,
MessageType,
NewConversationMessage,
)
from semantic_workbench_assistant.assistant_app import (
ConversationContext,
)


class WorkbenchMessageProvider(MessageHistoryProviderProtocol):
"""
This class is used to use the workbench for messages.
"""

def __init__(self, session_id: str, conversation_context: ConversationContext) -> None:
self.session_id = session_id
self.conversation_context = conversation_context

async def get(self) -> list[ChatCompletionMessageParam]:
message_list: ConversationMessageList = await self.conversation_context.get_messages()
return [
ChatCompletionUserMessageParam(
role="user",
content=message.content,
)
for message in message_list.messages
if message.message_type == MessageType.chat
]

async def append(self, message: ChatCompletionMessageParam) -> None:
if "content" in message:
await self.conversation_context.send_messages(
NewConversationMessage(
content=str(message["content"]),
message_type=MessageType.chat,
)
)

async def get_history(self) -> ConversationMessageList:
return await self.conversation_context.get_messages()

async def get_history_json(self) -> str:
message_list: ConversationMessageList = await self.conversation_context.get_messages()
return message_list.model_dump_json()
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from typing import Any, Iterable

from openai.types.chat import ChatCompletionMessageParam, ChatCompletionMessageToolCallParam

from openai_client.messages import (
MessageFormatter,
create_assistant_message,
Expand Down Expand Up @@ -39,17 +39,17 @@ def set(self, messages: list[ChatCompletionMessageParam], vars: dict[str, Any])
def delete_all(self) -> None:
self.messages = []

def append_system_message(self, content: str, var: dict[str, Any] | None = None) -> None:
asyncio.run(self.append(create_system_message(content, var, self.formatter)))
async def append_system_message(self, content: str, var: dict[str, Any] | None = None) -> None:
await self.append(create_system_message(content, var, self.formatter))

def append_user_message(self, content: str, var: dict[str, Any] | None = None) -> None:
asyncio.run(self.append(create_user_message(content, var, self.formatter)))
async def append_user_message(self, content: str, var: dict[str, Any] | None = None) -> None:
await self.append(create_user_message(content, var, self.formatter))

def append_assistant_message(
async def append_assistant_message(
self,
content: str,
refusal: str | None = None,
tool_calls: Iterable[ChatCompletionMessageToolCallParam] | None = None,
var: dict[str, Any] | None = None,
) -> None:
asyncio.run(self.append(create_assistant_message(content, refusal, tool_calls, var, self.formatter)))
await self.append(create_assistant_message(content, refusal, tool_calls, var, self.formatter))
31 changes: 27 additions & 4 deletions libraries/python/skills/skill-library/skill_library/assistant.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncio
from os import PathLike
from typing import Any, AsyncIterator, Callable, Optional
from typing import Any, AsyncIterator, Awaitable, Callable, Optional
from uuid import uuid4

from assistant_drive import Drive, DriveConfig, IfDriveFileExistsBehavior
Expand All @@ -16,6 +16,7 @@
)
from openai_client.completion import TEXT_RESPONSE_FORMAT
from openai_client.messages import format_with_liquid
from semantic_workbench_api_model.workbench_model import ConversationMessageList

from skill_library.routine_stack import RoutineStack

Expand All @@ -32,6 +33,7 @@ def __init__(
name,
assistant_id: str | None,
chat_driver_config: ChatDriverConfig,
message_history_provider: Callable[[], Awaitable[ConversationMessageList]],
drive_root: PathLike | None = None,
metadata_drive_root: PathLike | None = None,
skills: list[SkillDefinition] | None = None,
Expand Down Expand Up @@ -79,6 +81,8 @@ def __init__(
)
)

self.message_history_provider = message_history_provider

# Configure the assistant chat interface.
self.chat_driver = self._register_chat_driver(chat_driver_config)

Expand All @@ -87,13 +91,14 @@ def __init__(
self.startup_args = startup_args
self.startup_kwargs = startup_kwargs

async def clear(self) -> None:
async def clear(self, include_drives: bool = True) -> None:
"""Clears the assistant's routine stack and event queue."""
await self.routine_stack.clear()
while not self._event_queue.empty():
self._event_queue.get_nowait()
self.metadrive.delete_drive()
self.drive.delete_drive()
if include_drives:
self.metadrive.delete_drive()
self.drive.delete_drive()

######################################
# Lifecycle and event handling
Expand Down Expand Up @@ -156,6 +161,7 @@ def create_run_context(self) -> RunContext:
return RunContext(
session_id=self.assistant_id,
assistant_drive=self.drive,
conversation_history=self.message_history_provider,
emit=self._emit,
run_action=self.run_action,
run_routine=self.run_routine,
Expand All @@ -181,6 +187,12 @@ async def put_message(self, message: str, metadata: Optional[Metadata] = None) -
currently running, we send the message to the routine. Otherwise, we
send the message to the chat driver.
"""
# If the message is a command, send it on to the chat driver.
if message.startswith("/"):
response = await self.chat_driver.respond(message, metadata=metadata)
self._emit(response)
return

# If a routine is running, send the message to the routine.
if await self.routine_stack.peek():
await self.step_active_routine(message)
Expand Down Expand Up @@ -212,7 +224,12 @@ def _register_chat_driver(self, chat_driver_config: ChatDriverConfig) -> ChatDri
)
)
chat_functions = ChatFunctions(self)

# Make all the chat functions available as commands.
config.commands = chat_functions.list_functions()

# Make only certain functions available as chat tool functions (able to
# be run by the assistant).
config.functions = [chat_functions.list_actions, chat_functions.list_routines]
return ChatDriver(config)

Expand Down Expand Up @@ -287,6 +304,11 @@ class ChatFunctions:
def __init__(self, assistant: Assistant) -> None:
self.assistant = assistant

async def clear_stack(self) -> str:
"""Clears the assistant's routine stack and event queue."""
await self.assistant.clear(include_drives=False)
return "Assistant stack cleared."

def list_routines(self) -> str:
"""Lists all the routines available in the assistant."""
if not self.assistant.skill_registry:
Expand Down Expand Up @@ -341,4 +363,5 @@ def list_functions(self) -> list[Callable]:
self.run_routine,
self.list_actions,
self.run_action,
self.clear_stack,
]
Original file line number Diff line number Diff line change
@@ -1,10 +1,66 @@
from typing import Any
import inspect
from textwrap import dedent
from typing import Any, Callable

from ..utilities import find_template_vars
from .routine import Routine


class ProgramRoutine(Routine):
"""
A routine that is defined by a Python program. The program will be executed
by the assistant. When run, the program routine will be interpreted and
executed up until the first non-builtin function call. The program will the
pause and the external function will be called using the run_action method.
The result of the external function call will be passed back to the
interpreter and the program will resume execution.
## Writing a Program Routine
The program must have a `main` function that will be called to start the
program.
You can define the program with a string, or use the `get_function_source`
function below to convert a function (the `main` function) into a string.
You will have to mock any external functions that are called in the program
so that the program can be linted. To make this simple, you can use the
`ExternalFunctionMock` class below to mock the functions. The program will
call the real functions when it is executed.
See the [Program Routine
Runner](../routine_runners/program_routine_runner/program_routine_runner.py)
to see how the program is executed.
## Loop Variables
Each time the program is run, it is executed from the beginning. This makes
the interpreter simpler since we don't have to track our position in the
program and all variables scopes to that position to resume. To make this
work, the interpreter keeps a cache of previously run function calls and
will use the cached result if the function is called with the same arguments
again.
This means one important way these programs differ from normal Python
programs is that they any external function calls inside loops need to
include the loop variable in the arguments to the function. Otherwise, the
function will be called once and the result will be cached and used for all
iterations of the loop, which is probably not what you want.
## Program source variables
The programs can also use mustache variables replacement to insert values
into the program before it is executed. The mustache variables should be
defined in the arg set that is passed to the program runner. The end result
is that any arguments you pass into the `run_action` method will be
available in the program as mustache variables. This allows you to run the
same program routine with different arguments. For example, the when working
with an assistant, if you use the command
`/run_routine("my_program_routine", {"name": "John"})`, the program will
have access to the variable `name` with the value `"John"` and will replace
`{{name}}` with `"John"` in the program source code before executing it.
"""

def __init__(
self,
name: str,
Expand All @@ -23,9 +79,33 @@ def validate(self, arg_set: dict[str, Any]) -> None:
"""
Validate the routine with the given arguments.
"""
# TODO: implement this.
pass
# All of the routines template variables should be defined in the arg set.
template_vars = find_template_vars(self.program)
missing_vars = [var for var in template_vars if var not in arg_set]
if missing_vars:
raise ValueError(f"Missing variables in arg set: {missing_vars}")

def __str__(self) -> str:
template_vars = find_template_vars(self.program)
return f"{self.name}(vars: {template_vars}): {self.description}"


def get_function_source(func: Callable) -> str:
"""Get the source code of a function."""
return dedent(inspect.getsource(func))


class ExternalFunctionMock:
"""
Program Routines store the source code for a routine that you would like
executed. One way to provide that source code is to write a `main` function
and then use `get_function_source` to get the source code of that function.
This class is used to mock the functions that are called in the `main`
function so it passes linting. Simply define any function that doesn't lint
(which will be executed for real by the program_runner) and assign it a
mock. This is good enough for being able to generate the source code for the
routine.
"""

def __getattr__(self, _: str) -> Any:
return lambda *args, **kwargs: None
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from .action_list_routine_runner import ActionListRoutineRunner
from .instruction_routine_runner import InstructionRoutineRunner
from .program.program_routine_runner import ProgramRoutineRunner
from .program_routine_runner.program_routine_runner import ProgramRoutineRunner
from .state_machine_routine_runner import StateMachineRoutineRunner

RunnerTypes = Union[ActionListRoutineRunner, InstructionRoutineRunner, ProgramRoutineRunner, StateMachineRoutineRunner]
Expand Down
Empty file.
Loading

0 comments on commit d09f84e

Please sign in to comment.