From 46af3b3dfef95bb7126d762bfe9267cf36e68cf3 Mon Sep 17 00:00:00 2001 From: James Riehl Date: Wed, 26 Mar 2025 17:31:48 +0000 Subject: [PATCH] feat: handle content lists in chat proto --- .../finance/finance-q&a-agent/chat_proto.py | 52 ++++++++++++++----- .../chat_proto.py | 38 ++++++++++---- .../claude.ai-agent/chat_proto.py | 23 +++++--- .../google-gemini-agent/chat_proto.py | 23 +++++--- .../knowledge-base/openai-agent/chat_proto.py | 23 +++++--- .../flights-retriever-agent/chat_proto.py | 38 +++++++++----- 6 files changed, 143 insertions(+), 54 deletions(-) diff --git a/6-deployed-agents/finance/finance-q&a-agent/chat_proto.py b/6-deployed-agents/finance/finance-q&a-agent/chat_proto.py index 133a81d..cd58ed5 100644 --- a/6-deployed-agents/finance/finance-q&a-agent/chat_proto.py +++ b/6-deployed-agents/finance/finance-q&a-agent/chat_proto.py @@ -122,28 +122,54 @@ def create_text_chat(text: str) -> ChatMessage: ) +def create_end_session_chat() -> ChatMessage: + return ChatMessage( + timestamp=datetime.utcnow(), + msg_id=uuid4(), + content=[EndSessionContent(type="end-session")], + ) + + chat_proto = Protocol(name="AgentChatProtcol", version="0.2.1") @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): - ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") - ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, - ChatAcknowledgement(timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id), + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id + ), ) - try: - response = get_completion(prompt=msg.content[0].text) - result = response.strip() - except Exception as exc: - ctx.logger.warning(exc) - await ctx.send(sender, create_text_chat(str(exc))) - return - - await ctx.send(sender, create_text_chat(result)) + for item in msg.content: + if isinstance(item, StartSessionContent): + ctx.logger.info(f"Got a start session message from {sender}") + continue + elif isinstance(item, TextContent): + ctx.logger.info(f"Got a message from {sender}: {item.text}") + ctx.storage.set(str(ctx.session), sender) + await ctx.send( + sender, + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id + ), + ) + try: + response = get_completion(prompt=item.text) + result = response.strip() + except Exception as exc: + ctx.logger.warning(exc) + await ctx.send(sender, create_text_chat(str(exc))) + return + + await ctx.send(sender, create_text_chat(result)) + await ctx.send(sender, create_end_session_chat()) + else: + ctx.logger.info(f"Got unexpected content from {sender}") @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") + ctx.logger.info( + f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}" + ) diff --git a/6-deployed-agents/geo/google-api-geolocation-agent/chat_proto.py b/6-deployed-agents/geo/google-api-geolocation-agent/chat_proto.py index 564dfcc..f762a85 100644 --- a/6-deployed-agents/geo/google-api-geolocation-agent/chat_proto.py +++ b/6-deployed-agents/geo/google-api-geolocation-agent/chat_proto.py @@ -126,6 +126,7 @@ def create_text_chat(text: str) -> ChatMessage: content=[TextContent(type="text", text=text)], ) + def create_end_session_chat() -> ChatMessage: return ChatMessage( timestamp=datetime.utcnow(), @@ -152,24 +153,34 @@ class StructuredOutputResponse(Model): @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): - ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") - ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, - ChatAcknowledgement(timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id), - ) - - await ctx.send( - AI_AGENT_ADDRESS, - StructuredOutputPrompt( - prompt=msg.content[0].text, output_schema=GeolocationRequest.schema() + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id ), ) + for item in msg.content: + if isinstance(item, StartSessionContent): + ctx.logger.info(f"Got a start session message from {sender}") + continue + elif isinstance(item, TextContent): + ctx.logger.info(f"Got a message from {sender}: {item.text}") + ctx.storage.set(str(ctx.session), sender) + await ctx.send( + AI_AGENT_ADDRESS, + StructuredOutputPrompt( + prompt=item.text, output_schema=GeolocationRequest.schema() + ), + ) + else: + ctx.logger.info(f"Got unexpected content from {sender}") @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") + ctx.logger.info( + f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}" + ) @struct_output_client_proto.on_message(StructuredOutputResponse) @@ -178,6 +189,11 @@ async def handle_structured_output_response( ): prompt = GeolocationRequest.parse_obj(msg.output) session_sender = ctx.storage.get(str(ctx.session)) + if session_sender is None: + ctx.logger.error( + "Discarding message because no session sender found in storage" + ) + return try: coordinates = await find_coordinates(prompt.address) @@ -200,4 +216,4 @@ async def handle_structured_output_response( f"Longitude: {coordinates['longitude']}\n" ) await ctx.send(session_sender, chat_message) - await ctx.send(session_sender, create_end_session_chat()) \ No newline at end of file + await ctx.send(session_sender, create_end_session_chat()) diff --git a/6-deployed-agents/knowledge-base/claude.ai-agent/chat_proto.py b/6-deployed-agents/knowledge-base/claude.ai-agent/chat_proto.py index 3536f3e..be506d8 100644 --- a/6-deployed-agents/knowledge-base/claude.ai-agent/chat_proto.py +++ b/6-deployed-agents/knowledge-base/claude.ai-agent/chat_proto.py @@ -131,18 +131,29 @@ def create_text_chat(text: str) -> ChatMessage: @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): - ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") - ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, - ChatAcknowledgement(timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id), + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id + ), ) + for item in msg.content: + if isinstance(item, StartSessionContent): + ctx.logger.info(f"Got a start session message from {sender}") + continue + elif isinstance(item, TextContent): + ctx.logger.info(f"Got a message from {sender}: {item.text}") + ctx.storage.set(str(ctx.session), sender) - completion = get_completion(prompt=msg.content[0].text) + completion = get_completion(prompt=msg.content[0].text) - await ctx.send(sender, create_text_chat(completion)) + await ctx.send(sender, create_text_chat(completion)) + else: + ctx.logger.info(f"Got unexpected content from {sender}") @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") + ctx.logger.info( + f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}" + ) diff --git a/6-deployed-agents/knowledge-base/google-gemini-agent/chat_proto.py b/6-deployed-agents/knowledge-base/google-gemini-agent/chat_proto.py index 3536f3e..be506d8 100644 --- a/6-deployed-agents/knowledge-base/google-gemini-agent/chat_proto.py +++ b/6-deployed-agents/knowledge-base/google-gemini-agent/chat_proto.py @@ -131,18 +131,29 @@ def create_text_chat(text: str) -> ChatMessage: @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): - ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") - ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, - ChatAcknowledgement(timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id), + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id + ), ) + for item in msg.content: + if isinstance(item, StartSessionContent): + ctx.logger.info(f"Got a start session message from {sender}") + continue + elif isinstance(item, TextContent): + ctx.logger.info(f"Got a message from {sender}: {item.text}") + ctx.storage.set(str(ctx.session), sender) - completion = get_completion(prompt=msg.content[0].text) + completion = get_completion(prompt=msg.content[0].text) - await ctx.send(sender, create_text_chat(completion)) + await ctx.send(sender, create_text_chat(completion)) + else: + ctx.logger.info(f"Got unexpected content from {sender}") @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") + ctx.logger.info( + f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}" + ) diff --git a/6-deployed-agents/knowledge-base/openai-agent/chat_proto.py b/6-deployed-agents/knowledge-base/openai-agent/chat_proto.py index 5f3284b..ca032f7 100644 --- a/6-deployed-agents/knowledge-base/openai-agent/chat_proto.py +++ b/6-deployed-agents/knowledge-base/openai-agent/chat_proto.py @@ -131,18 +131,29 @@ def create_text_chat(text: str) -> ChatMessage: @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): - ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") - ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, - ChatAcknowledgement(timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id), + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id + ), ) + for item in msg.content: + if isinstance(item, StartSessionContent): + ctx.logger.info(f"Got a start session message from {sender}") + continue + elif isinstance(item, TextContent): + ctx.logger.info(f"Got a message from {sender}: {item.text}") + ctx.storage.set(str(ctx.session), sender) - completion = get_completion(context="", prompt=msg.content[0].text) + completion = get_completion(context="", prompt=msg.content[0].text) - await ctx.send(sender, create_text_chat(completion)) + await ctx.send(sender, create_text_chat(completion)) + else: + ctx.logger.info(f"Got unexpected content from {sender}") @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") + ctx.logger.info( + f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}" + ) diff --git a/6-deployed-agents/travel/flights-retriever-agent/chat_proto.py b/6-deployed-agents/travel/flights-retriever-agent/chat_proto.py index 507a55e..de2e6c1 100644 --- a/6-deployed-agents/travel/flights-retriever-agent/chat_proto.py +++ b/6-deployed-agents/travel/flights-retriever-agent/chat_proto.py @@ -131,6 +131,7 @@ def create_text_chat(text: str) -> ChatMessage: content=[TextContent(type="text", text=text)], ) + def create_end_session_chat() -> ChatMessage: return ChatMessage( timestamp=datetime.utcnow(), @@ -157,24 +158,33 @@ class StructuredOutputResponse(Model): @chat_proto.on_message(ChatMessage) async def handle_message(ctx: Context, sender: str, msg: ChatMessage): - ctx.logger.info(f"Got a message from {sender}: {msg.content[0].text}") - ctx.storage.set(str(ctx.session), sender) await ctx.send( sender, - ChatAcknowledgement(timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id), - ) - - await ctx.send( - AI_AGENT_ADDRESS, - StructuredOutputPrompt( - prompt=msg.content[0].text, output_schema=FlightsSearchRequest.schema() + ChatAcknowledgement( + timestamp=datetime.utcnow(), acknowledged_msg_id=msg.msg_id ), ) + for item in msg.content: + if isinstance(item, StartSessionContent): + ctx.logger.info(f"Got a start session message from {sender}") + continue + elif isinstance(item, TextContent): + ctx.logger.info(f"Got a message from {sender}: {item.text}") + ctx.storage.set(str(ctx.session), sender) + + await ctx.send( + AI_AGENT_ADDRESS, + StructuredOutputPrompt( + prompt=item.text, output_schema=FlightsSearchRequest.schema() + ), + ) @chat_proto.on_message(ChatAcknowledgement) async def handle_ack(ctx: Context, sender: str, msg: ChatAcknowledgement): - ctx.logger.info(f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}") + ctx.logger.info( + f"Got an acknowledgement from {sender} for {msg.acknowledged_msg_id}" + ) @struct_output_client_proto.on_message(StructuredOutputResponse) @@ -189,7 +199,10 @@ async def handle_structured_output_response( logger=ctx.logger, request=prompt, storage=ctx.storage ) if flights_raw is None: - await ctx.send(session_sender, create_text_chat("Error while connecting to the external API.")) + await ctx.send( + session_sender, + create_text_chat("Error while connecting to the external API."), + ) return flights: list[Flight] = [] @@ -217,6 +230,7 @@ async def handle_structured_output_response( chat_message = create_text_chat("\n\n".join(flight_messages)) + ctx.logger.debug(f"Sending response: {chat_message}") await ctx.send(session_sender, chat_message) await ctx.send(session_sender, create_end_session_chat()) except Exception as err: @@ -227,4 +241,4 @@ async def handle_structured_output_response( "Sorry, I couldn't process your request. Please try again later." ), ) - return \ No newline at end of file + return