Skip to content

chore: Add a2a agent executor #1563

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 47 additions & 11 deletions src/google/adk/a2a/converters/event_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

from __future__ import annotations

import datetime
from datetime import datetime
from datetime import timezone
import logging
from typing import Any
from typing import Dict
Expand All @@ -35,6 +36,7 @@

from ...agents.invocation_context import InvocationContext
from ...events.event import Event
from ...flows.llm_flows.functions import REQUEST_EUC_FUNCTION_CALL_NAME
from ...utils.feature_decorator import working_in_progress
from .part_converter import A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
from .part_converter import A2A_DATA_PART_METADATA_TYPE_KEY
Expand Down Expand Up @@ -224,7 +226,7 @@ def _process_long_running_tool(a2a_part, event: Event) -> None:
_get_adk_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)
)
== A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
and a2a_part.root.metadata.get("id") in event.long_running_tool_ids
and a2a_part.root.data.get("id") in event.long_running_tool_ids
):
a2a_part.root.metadata[_get_adk_metadata_key("is_long_running")] = True

Expand Down Expand Up @@ -287,24 +289,34 @@ def _create_error_status_event(
"""
error_message = getattr(event, "error_message", None) or DEFAULT_ERROR_MESSAGE

# Get context metadata and add error code
event_metadata = _get_context_metadata(event, invocation_context)
if event.error_code:
event_metadata[_get_adk_metadata_key("error_code")] = str(event.error_code)

return TaskStatusUpdateEvent(
taskId=str(uuid.uuid4()),
contextId=invocation_context.session.id,
final=False,
metadata=_get_context_metadata(event, invocation_context),
metadata=event_metadata,
status=TaskStatus(
state=TaskState.failed,
message=Message(
messageId=str(uuid.uuid4()),
role=Role.agent,
parts=[TextPart(text=error_message)],
metadata={
_get_adk_metadata_key("error_code"): str(event.error_code)
}
if event.error_code
else {},
),
timestamp=datetime.datetime.now().isoformat(),
timestamp=datetime.now(timezone.utc).isoformat(),
),
)


def _create_running_status_event(
def _create_status_update_event(
message: Message, invocation_context: InvocationContext, event: Event
) -> TaskStatusUpdateEvent:
"""Creates a TaskStatusUpdateEvent for running scenarios.
Expand All @@ -317,15 +329,39 @@ def _create_running_status_event(
Returns:
A TaskStatusUpdateEvent with RUNNING state.
"""
status = TaskStatus(
state=TaskState.working,
message=message,
timestamp=datetime.now(timezone.utc).isoformat(),
)

if any(
part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)
)
== A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
and part.root.metadata.get(_get_adk_metadata_key("is_long_running"))
is True
and part.root.data.get("name") == REQUEST_EUC_FUNCTION_CALL_NAME
for part in message.parts
):
status.state = TaskState.auth_required
elif any(
part.root.metadata.get(
_get_adk_metadata_key(A2A_DATA_PART_METADATA_TYPE_KEY)
)
== A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL
and part.root.metadata.get(_get_adk_metadata_key("is_long_running"))
is True
for part in message.parts
):
status.state = TaskState.input_required

return TaskStatusUpdateEvent(
taskId=str(uuid.uuid4()),
contextId=invocation_context.session.id,
final=False,
status=TaskStatus(
state=TaskState.working,
message=message,
timestamp=datetime.datetime.now().isoformat(),
),
status=status,
metadata=_get_context_metadata(event, invocation_context),
)

Expand Down Expand Up @@ -370,7 +406,7 @@ def convert_event_to_a2a_events(
# Handle regular message content
message = convert_event_to_a2a_status_message(event, invocation_context)
if message:
running_event = _create_running_status_event(
running_event = _create_status_update_event(
message, invocation_context, event
)
a2a_events.append(running_event)
Expand Down
52 changes: 48 additions & 4 deletions src/google/adk/a2a/converters/part_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

from __future__ import annotations

import base64
import json
import logging
import sys
Expand Down Expand Up @@ -45,6 +46,8 @@
A2A_DATA_PART_METADATA_TYPE_KEY = 'type'
A2A_DATA_PART_METADATA_TYPE_FUNCTION_CALL = 'function_call'
A2A_DATA_PART_METADATA_TYPE_FUNCTION_RESPONSE = 'function_response'
A2A_DATA_PART_METADATA_TYPE_CODE_EXECUTION_RESULT = 'code_execution_result'
A2A_DATA_PART_METADATA_TYPE_EXECUTABLE_CODE = 'executable_code'


@working_in_progress
Expand All @@ -67,7 +70,8 @@ def convert_a2a_part_to_genai_part(
elif isinstance(part.file, a2a_types.FileWithBytes):
return genai_types.Part(
inline_data=genai_types.Blob(
data=part.file.bytes.encode('utf-8'), mime_type=part.file.mimeType
data=base64.b64decode(part.file.bytes),
mime_type=part.file.mimeType,
)
)
else:
Expand Down Expand Up @@ -118,8 +122,12 @@ def convert_genai_part_to_a2a_part(
part: genai_types.Part,
) -> Optional[a2a_types.Part]:
"""Convert a Google GenAI Part to an A2A Part."""

if part.text:
return a2a_types.TextPart(text=part.text)
a2a_part = a2a_types.TextPart(text=part.text)
if part.thought is not None:
a2a_part.metadata = {_get_adk_metadata_key('thought'): part.thought}
return a2a_part

if part.file_data:
return a2a_types.FilePart(
Expand All @@ -130,14 +138,22 @@ def convert_genai_part_to_a2a_part(
)

if part.inline_data:
return a2a_types.Part(
a2a_part = a2a_types.Part(
root=a2a_types.FilePart(
file=a2a_types.FileWithBytes(
bytes=part.inline_data.data,
bytes=base64.b64encode(part.inline_data.data).decode('utf-8'),
mimeType=part.inline_data.mime_type,
)
)
)
if part.video_metadata:
a2a_part.metadata = {
_get_adk_metadata_key(
'video_metadata'
): part.video_metadata.model_dump(by_alias=True, exclude_none=True)
}

return a2a_part

# Conver the funcall and function reponse to A2A DataPart.
# This is mainly for converting human in the loop and auth request and
Expand Down Expand Up @@ -172,6 +188,34 @@ def convert_genai_part_to_a2a_part(
)
)

if part.code_execution_result:
return a2a_types.Part(
root=a2a_types.DataPart(
data=part.code_execution_result.model_dump(
by_alias=True, exclude_none=True
),
metadata={
A2A_DATA_PART_METADATA_TYPE_KEY: (
A2A_DATA_PART_METADATA_TYPE_CODE_EXECUTION_RESULT
)
},
)
)

if part.executable_code:
return a2a_types.Part(
root=a2a_types.DataPart(
data=part.executable_code.model_dump(
by_alias=True, exclude_none=True
),
metadata={
A2A_DATA_PART_METADATA_TYPE_KEY: (
A2A_DATA_PART_METADATA_TYPE_EXECUTABLE_CODE
)
},
)
)

logger.warning(
'Cannot convert unsupported part for Google GenAI part: %s',
part,
Expand Down
23 changes: 19 additions & 4 deletions src/google/adk/a2a/converters/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ def _to_a2a_context_id(app_name: str, user_id: str, session_id: str) -> str:

Returns:
The A2A context id.

Raises:
ValueError: If any of the input parameters are empty or None.
"""
return [ADK_CONTEXT_ID_PREFIX, app_name, user_id, session_id].join("$")
if not all([app_name, user_id, session_id]):
raise ValueError(
"All parameters (app_name, user_id, session_id) must be non-empty"
)
return "$".join([ADK_CONTEXT_ID_PREFIX, app_name, user_id, session_id])


def _from_a2a_context_id(context_id: str) -> tuple[str, str, str]:
Expand All @@ -64,8 +71,16 @@ def _from_a2a_context_id(context_id: str) -> tuple[str, str, str]:
if not context_id:
return None, None, None

prefix, app_name, user_id, session_id = context_id.split("$")
if prefix == "ADK" and app_name and user_id and session_id:
return app_name, user_id, session_id
try:
parts = context_id.split("$")
if len(parts) != 4:
return None, None, None

prefix, app_name, user_id, session_id = parts
if prefix == ADK_CONTEXT_ID_PREFIX and app_name and user_id and session_id:
return app_name, user_id, session_id
except ValueError:
# Handle any split errors gracefully
pass

return None, None, None
13 changes: 13 additions & 0 deletions src/google/adk/a2a/executor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Loading