diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml
index 31869b86ccc2..703c07494251 100644
--- a/.github/workflows/checks.yml
+++ b/.github/workflows/checks.yml
@@ -190,6 +190,7 @@ jobs:
run: |
source ${{ github.workspace }}/python/.venv/bin/activate
poe gen-proto
+ poe gen-test-proto
working-directory: ./python
- name: Check if there are uncommited changes
id: changes
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 411a8da3f3c6..fadb8091a0d6 100644
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -24,10 +24,62 @@ This project has adopted the [Microsoft Open Source Code of Conduct](https://ope
For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or
contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments.
-## Roadmaps
+## Running CI checks locally
-To see what we are working on and what we plan to work on, please check our
-[Roadmap Issues](https://aka.ms/autogen-roadmap).
+It is important to use `uv` when running CI checks locally as it ensures that the correct dependencies and versions are used.
+
+Please follow the instructions [here](./python/README.md#setup) to get set up.
+
+For common tasks that are helpful during development and run in CI, see [here](./python/README.md#common-tasks).
+
+## Roadmap
+
+We use GitHub issues and milestones to track our roadmap. You can view the upcoming milestones [here]([Roadmap Issues](https://aka.ms/autogen-roadmap).
+
+## Versioning
+
+The set of `autogen-*` packages are generally all versioned together. When a change is made to one package, all packages are updated to the same version. This is to ensure that all packages are in sync with each other.
+
+We will update verion numbers according to the following rules:
+
+- Increase minor version (0.X.0) upon breaking changes
+- Increase patch version (0.0.X) upon new features or bug fixes
+
+## Release process
+
+1. Create a PR that updates the version numbers across the codebase ([example](https://github.com/microsoft/autogen/pull/4359))
+ 2. The docs CI will fail for the PR, but this is expected and will be resolved in the next step
+2. After merging the PR, create and push a tag that corresponds to the new verion. For example, for `0.4.0.dev7`:
+ - `git tag 0.4.0.dev7 && git push origin 0.4.0.dev7`
+3. Restart the docs CI by finding the failed [job corresponding to the `push` event](https://github.com/microsoft/autogen/actions/workflows/docs.yml) and restarting all jobs
+4. Run [this](https://github.com/microsoft/autogen/actions/workflows/single-python-package.yml) workflow for each of the packages that need to be released and get an approval for the release for it to run
+
+## Triage process
+
+To help ensure the health of the project and community the AutoGen committers have a weekly triage process to ensure that all issues and pull requests are reviewed and addressed in a timely manner. The following documents the responsibilites while on triage duty:
+
+- Issues
+ - Review all new issues - these will be tagged with [`needs-triage`](https://github.com/microsoft/autogen/issues?q=is%3Aissue%20state%3Aopen%20label%3Aneeds-triage).
+ - Apply appropriate labels:
+ - One of `proj-*` labels based on the project the issue is related to
+ - `documentation`: related to documentation
+ - `x-lang`: related to cross language functionality
+ - `dotnet`: related to .NET
+ - Add the issue to a relevant milestone if necessary
+ - If you can resolve the issue or reply to the OP please do.
+ - If you cannot resolve the issue, assign it to the appropriate person.
+ - If awaiting a reply add the tag `awaiting-op-response` (this will be auto removed when the OP replies).
+ - Bonus: there is a backlog of old issues that need to be reviewed - if you have time, review these as well and close or refresh as many as you can.
+- PRs
+ - The UX on GH flags all recently updated PRs. Draft PRs can be ignored, otherwise review all recently updated PRs.
+ - If a PR is ready for review and you can provide one please go ahead. If you cant, please assign someone. You can quickly spin up a codespace with the PR to test it out.
+ - If a PR is needing a reply from the op, please tag it `awaiting-op-response`.
+ - If a PR is approved and passes CI, its ready to merge, please do so.
+ - If it looks like there is a possibly transient CI failure, re-run failed jobs.
+- Discussions
+ - Look for recently updated discussions and reply as needed or find someone on the team to reply.
+- Security
+ - Look through any securty alerts and file issues or dismiss as needed.
## Becoming a Reviewer
diff --git a/README.md b/README.md
index 253a6b0ea073..2cb9b266d835 100644
--- a/README.md
+++ b/README.md
@@ -3,7 +3,9 @@
-[data:image/s3,"s3://crabby-images/11831/118313f8085965b0f93a11e7a016a75ff532f042" alt="Twitter"](https://twitter.com/pyautogen)
+[data:image/s3,"s3://crabby-images/11831/118313f8085965b0f93a11e7a016a75ff532f042" alt="Twitter"](https://twitter.com/pyautogen) [data:image/s3,"s3://crabby-images/3f1b1/3f1b1c63a3de0533c7ed62451b53ffa9cdbe2fde" alt="GitHub Discussions"](https://github.com/microsoft/autogen/discussions) [data:image/s3,"s3://crabby-images/50eeb/50eeb6dfc2af08f236a9251046667035d2fd0bd8" alt="0.2 Docs"](https://microsoft.github.io/autogen/0.2/) [data:image/s3,"s3://crabby-images/8919a/8919ab59fa967e721e6e92067da8ab922a54f76b" alt="0.4 Docs"](https://microsoft.github.io/autogen/dev/)
+[data:image/s3,"s3://crabby-images/76e81/76e81642a4273cff14f15710c749160975e86589" alt="PyPi autogen-core"](https://pypi.org/project/autogen-core/0.4.0.dev7/) [data:image/s3,"s3://crabby-images/7eaab/7eaaba0beaaf08c01a7190f2c5b74753cddc0ccb" alt="PyPi autogen-agentchat"](https://pypi.org/project/autogen-agentchat/0.4.0.dev7/) [data:image/s3,"s3://crabby-images/4a48b/4a48baeebadb95db237c7032f25a5cbc15ae2294" alt="PyPi autogen-ext"](https://pypi.org/project/autogen-ext/0.4.0.dev7/)
+
@@ -13,6 +15,7 @@
> - (11/14/24) ⚠️ In response to a number of asks to clarify and distinguish between official AutoGen and its forks that created confusion, we issued a [clarification statement](https://github.com/microsoft/autogen/discussions/4217).
> - (10/13/24) Interested in the standard AutoGen as a prior user? Find it at the actively-maintained *AutoGen* [0.2 branch](https://github.com/microsoft/autogen/tree/0.2) and `autogen-agentchat~=0.2` PyPi package.
> - (10/02/24) [AutoGen 0.4](https://microsoft.github.io/autogen/dev) is a from-the-ground-up rewrite of AutoGen. Learn more about the history, goals and future at [this blog post](https://microsoft.github.io/autogen/blog). We’re excited to work with the community to gather feedback, refine, and improve the project before we officially release 0.4. This is a big change, so AutoGen 0.2 is still available, maintained, and developed in the [0.2 branch](https://github.com/microsoft/autogen/tree/0.2).
+> - *[Join us for Community Office Hours](https://github.com/microsoft/autogen/discussions/4059)* We will host a weekly open discussion to answer questions, talk about Roadmap, etc.
AutoGen is an open-source framework for building AI agent systems.
It simplifies the creation of event-driven, distributed, scalable, and resilient agentic applications.
diff --git a/docs/design/04 - Agent and Topic ID Specs.md b/docs/design/04 - Agent and Topic ID Specs.md
index ee872ab2ac1e..b0e0e0e94e60 100644
--- a/docs/design/04 - Agent and Topic ID Specs.md
+++ b/docs/design/04 - Agent and Topic ID Specs.md
@@ -34,7 +34,7 @@ This document describes the structure, constraints, and behavior of Agent IDs an
- Type: `string`
- Description: Topic type is usually defined by application code to mark the type of messages the topic is for.
-- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), or underscores (\_). A valid identifier cannot start with a number, or contain any spaces.
+- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), ':', '=', or underscores (\_). A valid identifier cannot start with a number, or contain any spaces.
- Examples:
- `GitHub_Issues`
diff --git a/docs/design/readme.md b/docs/design/readme.md
new file mode 100644
index 000000000000..6a8221027f75
--- /dev/null
+++ b/docs/design/readme.md
@@ -0,0 +1,3 @@
+# Docs
+
+You can find the project documentation [here](https://microsoft.github.io/autogen/dev/).
diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
index 8920fa0b80e0..59bfd8837129 100644
--- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
+++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs
@@ -15,27 +15,40 @@ namespace Microsoft.AutoGen.Agents;
public abstract class AgentBase : IAgentBase, IHandle, IHandle
{
public static readonly ActivitySource s_source = new("AutoGen.Agent");
- public AgentId AgentId => _context.AgentId;
+ public AgentId AgentId => _runtime.AgentId;
private readonly object _lock = new();
private readonly Dictionary> _pendingRequests = [];
private readonly Channel _mailbox = Channel.CreateUnbounded();
- private readonly IAgentRuntime _context;
+ private readonly IAgentRuntime _runtime;
public string Route { get; set; } = "base";
protected internal ILogger _logger;
- public IAgentRuntime Context => _context;
+ public IAgentRuntime Context => _runtime;
protected readonly EventTypes EventTypes;
protected AgentBase(
- IAgentRuntime context,
+ IAgentRuntime runtime,
EventTypes eventTypes,
ILogger? logger = null)
{
- _context = context;
- context.AgentInstance = this;
+ _runtime = runtime;
+ runtime.AgentInstance = this;
this.EventTypes = eventTypes;
_logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger();
+ var subscriptionRequest = new AddSubscriptionRequest
+ {
+ RequestId = Guid.NewGuid().ToString(),
+ Subscription = new Subscription
+ {
+ TypeSubscription = new TypeSubscription
+ {
+ AgentType = this.AgentId.Type,
+ TopicType = this.AgentId.Type + "/" + this.AgentId.Key
+ }
+ }
+ };
+ _runtime.SendMessageAsync(new Message { AddSubscriptionRequest = subscriptionRequest }).AsTask().Wait();
Completion = Start();
}
internal Task Completion { get; }
@@ -131,19 +144,19 @@ public List Subscribe(string topic)
}
}
};
- _context.SendMessageAsync(message).AsTask().Wait();
+ _runtime.SendMessageAsync(message).AsTask().Wait();
return new List { topic };
}
public async Task StoreAsync(AgentState state, CancellationToken cancellationToken = default)
{
- await _context.StoreAsync(state, cancellationToken).ConfigureAwait(false);
+ await _runtime.StoreAsync(state, cancellationToken).ConfigureAwait(false);
return;
}
public async Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new()
{
- var agentState = await _context.ReadAsync(agentId, cancellationToken).ConfigureAwait(false);
- return agentState.FromAgentState();
+ var agentstate = await _runtime.ReadAsync(agentId, cancellationToken).ConfigureAwait(false);
+ return agentstate.FromAgentState();
}
private void OnResponseCore(RpcResponse response)
{
@@ -171,7 +184,7 @@ private async Task OnRequestCoreAsync(RpcRequest request, CancellationToken canc
{
response = new RpcResponse { Error = ex.Message };
}
- await _context.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false);
+ await _runtime.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false);
}
protected async Task RequestAsync(AgentId target, string method, Dictionary parameters)
@@ -195,7 +208,7 @@ protected async Task RequestAsync(AgentId target, string method, Di
activity?.SetTag("peer.service", target.ToString());
var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _context.Update(request, activity);
+ _runtime.Update(request, activity);
await this.InvokeWithActivityAsync(
static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource) state, CancellationToken ct) =>
{
@@ -206,7 +219,7 @@ static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource
{
- await state.Agent._context.PublishEventAsync(state.Event, ct).ConfigureAwait(false);
+ await state.Agent._runtime.PublishEventAsync(state.Event).ConfigureAwait(false);
},
(this, item),
activity,
diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs
index a69da96fb3d4..f9a5050534c8 100644
--- a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs
+++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs
@@ -24,6 +24,8 @@ public class AgentWorker :
private readonly CancellationTokenSource _shutdownCts;
private readonly IServiceProvider _serviceProvider;
private readonly IEnumerable> _configuredAgentTypes;
+ private readonly ConcurrentDictionary _subscriptionsByAgentType = new();
+ private readonly ConcurrentDictionary> _subscriptionsByTopic = new();
private readonly DistributedContextPropagator _distributedContextPropagator;
private readonly CancellationTokenSource _shutdownCancellationToken = new();
private Task? _mailboxTask;
@@ -96,11 +98,7 @@ public async Task RunMessagePump()
if (message == null) { continue; }
switch (message)
{
- case Message.MessageOneofCase.AddSubscriptionResponse:
- break;
- case Message.MessageOneofCase.RegisterAgentTypeResponse:
- break;
- case Message msg:
+ case Message msg when msg.CloudEvent != null:
var item = msg.CloudEvent;
@@ -110,6 +108,13 @@ public async Task RunMessagePump()
agentToInvoke.ReceiveMessage(msg);
}
break;
+ case Message msg when msg.AddSubscriptionRequest != null:
+ await AddSubscriptionRequestAsync(msg.AddSubscriptionRequest).ConfigureAwait(true);
+ break;
+ case Message msg when msg.AddSubscriptionResponse != null:
+ break;
+ case Message msg when msg.RegisterAgentTypeResponse != null:
+ break;
default:
throw new InvalidOperationException($"Unexpected message '{message}'.");
}
@@ -123,6 +128,23 @@ public async Task RunMessagePump()
}
}
}
+ private async ValueTask AddSubscriptionRequestAsync(AddSubscriptionRequest subscription)
+ {
+ var topic = subscription.Subscription.TypeSubscription.TopicType;
+ var agentType = subscription.Subscription.TypeSubscription.AgentType;
+ _subscriptionsByAgentType[agentType] = subscription.Subscription;
+ _subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType);
+ Message response = new()
+ {
+ AddSubscriptionResponse = new()
+ {
+ RequestId = subscription.RequestId,
+ Error = "",
+ Success = true
+ }
+ };
+ await _mailbox.Writer.WriteAsync(response).ConfigureAwait(false);
+ }
public async Task StartAsync(CancellationToken cancellationToken)
{
diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs
index 45477c8eb5a6..ab24a0e15fe5 100644
--- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs
+++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs
@@ -153,6 +153,22 @@ private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection,
Success = true
}
};
+ // add a default subscription for the agent type
+ //TODO: we should consider having constraints on the namespace or at least migrate all our examples to use well typed namesspaces like com.microsoft.autogen/hello/HelloAgents etc
+ var subscriptionRequest = new AddSubscriptionRequest
+ {
+ RequestId = Guid.NewGuid().ToString(),
+ Subscription = new Subscription
+ {
+ TypeSubscription = new TypeSubscription
+ {
+ AgentType = msg.Type,
+ TopicType = msg.Type
+ }
+ }
+ };
+ await AddSubscriptionAsync(connection, subscriptionRequest).ConfigureAwait(true);
+
await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false);
}
private async ValueTask DispatchEventAsync(CloudEvent evt)
diff --git a/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs
index e58fdb00f0a0..7e272ce6bed9 100644
--- a/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs
+++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs
@@ -23,6 +23,10 @@ public class AgentBaseTests(InMemoryAgentRuntimeFixture fixture)
public async Task ItInvokeRightHandlerTestAsync()
{
var mockContext = new Mock();
+ mockContext.SetupGet(x => x.AgentId).Returns(new AgentId("test", "test"));
+ // mock SendMessageAsync
+ mockContext.Setup(x => x.SendMessageAsync(It.IsAny(), It.IsAny()))
+ .Returns(new ValueTask());
var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger(new LoggerFactory()));
await agent.HandleObject("hello world");
diff --git a/protos/agent_worker.proto b/protos/agent_worker.proto
index 7b0b5245dd3e..61b00333cd24 100644
--- a/protos/agent_worker.proto
+++ b/protos/agent_worker.proto
@@ -63,9 +63,15 @@ message TypeSubscription {
string agent_type = 2;
}
+message TypePrefixSubscription {
+ string topic_type_prefix = 1;
+ string agent_type = 2;
+}
+
message Subscription {
oneof subscription {
TypeSubscription typeSubscription = 1;
+ TypePrefixSubscription typePrefixSubscription = 2;
}
}
diff --git a/python/README.md b/python/README.md
index 5b012ff60bf9..69085b4c8dd2 100644
--- a/python/README.md
+++ b/python/README.md
@@ -1,8 +1,11 @@
# AutoGen Python packages
-See [`autogen-core`](./packages/autogen-core/) package for main functionality.
+[data:image/s3,"s3://crabby-images/8919a/8919ab59fa967e721e6e92067da8ab922a54f76b" alt="0.4 Docs"](https://microsoft.github.io/autogen/dev/)
+[data:image/s3,"s3://crabby-images/76e81/76e81642a4273cff14f15710c749160975e86589" alt="PyPi autogen-core"](https://pypi.org/project/autogen-core/0.4.0.dev7/) [data:image/s3,"s3://crabby-images/7eaab/7eaaba0beaaf08c01a7190f2c5b74753cddc0ccb" alt="PyPi autogen-agentchat"](https://pypi.org/project/autogen-agentchat/0.4.0.dev7/) [data:image/s3,"s3://crabby-images/4a48b/4a48baeebadb95db237c7032f25a5cbc15ae2294" alt="PyPi autogen-ext"](https://pypi.org/project/autogen-ext/0.4.0.dev7/)
+This directory works as a single `uv` workspace containing all project packages. See [`packages`](./packages/) to discover all project packages.
+
## Development
**TL;DR**, run all checks with:
diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py
index cb1eff8d6f6e..0870a6c2f3b0 100644
--- a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py
+++ b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py
@@ -82,13 +82,13 @@ def _handoff_tool() -> str:
class AssistantAgent(BaseChatAgent):
"""An agent that provides assistance with tool use.
- It responds with a StopMessage when 'terminate' is detected in the response.
-
Args:
name (str): The name of the agent.
model_client (ChatCompletionClient): The model client to use for inference.
tools (List[Tool | Callable[..., Any] | Callable[..., Awaitable[Any]]] | None, optional): The tools to register with the agent.
- handoffs (List[Handoff | str] | None, optional): The handoff configurations for the agent, allowing it to transfer to other agents by responding with a HandoffMessage.
+ handoffs (List[Handoff | str] | None, optional): The handoff configurations for the agent,
+ allowing it to transfer to other agents by responding with a :class:`HandoffMessage`.
+ The transfer is only executed when the team is in :class:`~autogen_agentchat.teams.Swarm`.
If a handoff is a string, it should represent the target agent's name.
description (str, optional): The description of the agent.
system_message (str, optional): The system message for the model.
diff --git a/python/packages/autogen-core/pyproject.toml b/python/packages/autogen-core/pyproject.toml
index 064928590b0f..8727f5ee0c25 100644
--- a/python/packages/autogen-core/pyproject.toml
+++ b/python/packages/autogen-core/pyproject.toml
@@ -81,7 +81,7 @@ dev-dependencies = [
[tool.ruff]
extend = "../../pyproject.toml"
-exclude = ["build", "dist", "src/autogen_core/application/protos"]
+exclude = ["build", "dist", "src/autogen_core/application/protos", "tests/protos"]
include = ["src/**", "samples/*.py", "docs/**/*.ipynb", "tests/**"]
[tool.ruff.lint.per-file-ignores]
@@ -91,7 +91,7 @@ include = ["src/**", "samples/*.py", "docs/**/*.ipynb", "tests/**"]
[tool.pyright]
extends = "../../pyproject.toml"
include = ["src", "tests", "samples"]
-exclude = ["src/autogen_core/application/protos"]
+exclude = ["src/autogen_core/application/protos", "tests/protos"]
reportDeprecated = false
[tool.pytest.ini_options]
@@ -111,7 +111,7 @@ include = "../../shared_tasks.toml"
test = "pytest -n auto"
mypy.default_item_type = "cmd"
mypy.sequence = [
- "mypy --config-file ../../pyproject.toml --exclude src/autogen_core/application/protos src tests",
+ "mypy --config-file ../../pyproject.toml --exclude src/autogen_core/application/protos --exclude tests/protos src tests",
"nbqa mypy docs/src --config-file ../../pyproject.toml",
]
diff --git a/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py
index 52d24c64d0cb..3d81f15eb330 100644
--- a/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py
+++ b/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py
@@ -4,6 +4,7 @@
import inspect
import logging
import threading
+import uuid
import warnings
from asyncio import CancelledError, Future, Task
from collections.abc import Sequence
@@ -53,6 +54,7 @@ class PublishMessageEnvelope:
sender: AgentId | None
topic_id: TopicId
metadata: EnvelopeMetadata | None = None
+ message_id: str
@dataclass(kw_only=True)
@@ -256,6 +258,7 @@ async def publish_message(
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
+ message_id: str | None = None,
) -> None:
with self._tracer_helper.trace_block(
"create",
@@ -268,6 +271,9 @@ async def publish_message(
content = message.__dict__ if hasattr(message, "__dict__") else message
logger.info(f"Publishing message of type {type(message).__name__} to all subscribers: {content}")
+ if message_id is None:
+ message_id = str(uuid.uuid4())
+
# event_logger.info(
# MessageEvent(
# payload=message,
@@ -285,6 +291,7 @@ async def publish_message(
sender=sender,
topic_id=topic_id,
metadata=get_telemetry_envelope_metadata(),
+ message_id=message_id,
)
)
@@ -327,6 +334,8 @@ async def _process_send(self, message_envelope: SendMessageEnvelope) -> None:
topic_id=None,
is_rpc=True,
cancellation_token=message_envelope.cancellation_token,
+ # Will be fixed when send API removed
+ message_id="NOT_DEFINED_TODO_FIX",
)
with MessageHandlerContext.populate_context(recipient_agent.id):
response = await recipient_agent.on_message(
@@ -385,6 +394,7 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No
topic_id=message_envelope.topic_id,
is_rpc=False,
cancellation_token=message_envelope.cancellation_token,
+ message_id=message_envelope.message_id,
)
agent = await self._get_agent(agent_id)
diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py
index 2c405710876a..0e5fb933a08e 100644
--- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py
+++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py
@@ -3,6 +3,7 @@
import json
import logging
import signal
+import uuid
import warnings
from asyncio import Future, Task
from collections import defaultdict
@@ -47,7 +48,7 @@
)
from ..base._serialization import MessageSerializer, SerializationRegistry
from ..base._type_helpers import ChannelArgumentType
-from ..components import TypeSubscription
+from ..components import TypePrefixSubscription, TypeSubscription
from ._helpers import SubscriptionManager, get_impl
from ._utils import GRPC_IMPORT_ERROR_STR
from .protos import agent_worker_pb2, agent_worker_pb2_grpc
@@ -371,11 +372,17 @@ async def publish_message(
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
+ message_id: str | None = None,
) -> None:
if not self._running:
raise ValueError("Runtime must be running when publishing message.")
if self._host_connection is None:
raise RuntimeError("Host connection is not set.")
+ if message_id is None:
+ message_id = str(uuid.uuid4())
+
+ # TODO: consume message_id
+
message_type = self._serialization_registry.type_name(message)
with self._trace_helper.trace_block(
"create", topic_id, parent=None, extraAttributes={"message_type": message_type}
@@ -447,6 +454,7 @@ async def _process_request(self, request: agent_worker_pb2.RpcRequest) -> None:
topic_id=None,
is_rpc=True,
cancellation_token=CancellationToken(),
+ message_id=request.request_id,
)
# Call the receiving agent.
@@ -530,11 +538,13 @@ async def _process_event(self, event: agent_worker_pb2.Event) -> None:
for agent_id in recipients:
if agent_id == sender:
continue
+ # TODO: consume message_id
message_context = MessageContext(
sender=sender,
topic_id=topic_id,
is_rpc=False,
cancellation_token=CancellationToken(),
+ message_id="NOT_DEFINED_TODO_FIX",
)
agent = await self._get_agent(agent_id)
with MessageHandlerContext.populate_context(agent.id):
@@ -705,27 +715,44 @@ async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = A
async def add_subscription(self, subscription: Subscription) -> None:
if self._host_connection is None:
raise RuntimeError("Host connection is not set.")
- if not isinstance(subscription, TypeSubscription):
- raise ValueError("Only TypeSubscription is supported.")
- # Add to local subscription manager.
- await self._subscription_manager.add_subscription(subscription)
# Create a future for the subscription response.
future = asyncio.get_event_loop().create_future()
request_id = await self._get_new_request_id()
+
+ match subscription:
+ case TypeSubscription(topic_type=topic_type, agent_type=agent_type):
+ message = agent_worker_pb2.Message(
+ addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
+ request_id=request_id,
+ subscription=agent_worker_pb2.Subscription(
+ typeSubscription=agent_worker_pb2.TypeSubscription(
+ topic_type=topic_type, agent_type=agent_type
+ )
+ ),
+ )
+ )
+ case TypePrefixSubscription(topic_type_prefix=topic_type_prefix, agent_type=agent_type):
+ message = agent_worker_pb2.Message(
+ addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
+ request_id=request_id,
+ subscription=agent_worker_pb2.Subscription(
+ typePrefixSubscription=agent_worker_pb2.TypePrefixSubscription(
+ topic_type_prefix=topic_type_prefix, agent_type=agent_type
+ )
+ ),
+ )
+ )
+ case _:
+ raise ValueError("Unsupported subscription type.")
+
+ # Add the future to the pending requests.
self._pending_requests[request_id] = future
+ # Add to local subscription manager.
+ await self._subscription_manager.add_subscription(subscription)
+
# Send the subscription to the host.
- message = agent_worker_pb2.Message(
- addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest(
- request_id=request_id,
- subscription=agent_worker_pb2.Subscription(
- typeSubscription=agent_worker_pb2.TypeSubscription(
- topic_type=subscription.topic_type, agent_type=subscription.agent_type
- )
- ),
- )
- )
await self._host_connection.send(message)
# Wait for the subscription response.
diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py
index 3da50c56f048..7c597bd07a8f 100644
--- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py
+++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py
@@ -4,7 +4,9 @@
from asyncio import Future, Task
from typing import Any, Dict, Set
-from ..base import TopicId
+from autogen_core.base._type_prefix_subscription import TypePrefixSubscription
+
+from ..base import Subscription, TopicId
from ..components import TypeSubscription
from ._helpers import SubscriptionManager
from ._utils import GRPC_IMPORT_ERROR_STR
@@ -221,34 +223,46 @@ async def _process_add_subscription_request(
self, add_subscription_req: agent_worker_pb2.AddSubscriptionRequest, client_id: int
) -> None:
oneofcase = add_subscription_req.subscription.WhichOneof("subscription")
+ subscription: Subscription | None = None
match oneofcase:
case "typeSubscription":
type_subscription_msg: agent_worker_pb2.TypeSubscription = (
add_subscription_req.subscription.typeSubscription
)
- type_subscription = TypeSubscription(
+ subscription = TypeSubscription(
topic_type=type_subscription_msg.topic_type, agent_type=type_subscription_msg.agent_type
)
- try:
- await self._subscription_manager.add_subscription(type_subscription)
- subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set())
- subscription_ids.add(type_subscription.id)
- success = True
- error = None
- except ValueError as e:
- success = False
- error = str(e)
- # Send a response back to the client.
- await self._send_queues[client_id].put(
- agent_worker_pb2.Message(
- addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse(
- request_id=add_subscription_req.request_id, success=success, error=error
- )
- )
+
+ case "typePrefixSubscription":
+ type_prefix_subscription_msg: agent_worker_pb2.TypePrefixSubscription = (
+ add_subscription_req.subscription.typePrefixSubscription
+ )
+ subscription = TypePrefixSubscription(
+ topic_type_prefix=type_prefix_subscription_msg.topic_type_prefix,
+ agent_type=type_prefix_subscription_msg.agent_type,
)
case None:
logger.warning("Received empty subscription message")
+ if subscription is not None:
+ try:
+ await self._subscription_manager.add_subscription(subscription)
+ subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set())
+ subscription_ids.add(subscription.id)
+ success = True
+ error = None
+ except ValueError as e:
+ success = False
+ error = str(e)
+ # Send a response back to the client.
+ await self._send_queues[client_id].put(
+ agent_worker_pb2.Message(
+ addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse(
+ request_id=add_subscription_req.request_id, success=success, error=error
+ )
+ )
+ )
+
async def GetState( # type: ignore
self,
request: agent_worker_pb2.AgentId,
diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py
index 0637e866c4de..8f143d770aef 100644
--- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py
+++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py
@@ -16,7 +16,7 @@
from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3')
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"G\n\x16TypePrefixSubscription\x12\x19\n\x11topic_type_prefix\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"\x96\x01\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x12@\n\x16typePrefixSubscription\x18\x02 \x01(\x0b\x32\x1e.agents.TypePrefixSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3')
_globals = globals()
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
@@ -54,20 +54,22 @@
_globals['_REGISTERAGENTTYPERESPONSE']._serialized_end=1067
_globals['_TYPESUBSCRIPTION']._serialized_start=1069
_globals['_TYPESUBSCRIPTION']._serialized_end=1127
- _globals['_SUBSCRIPTION']._serialized_start=1129
- _globals['_SUBSCRIPTION']._serialized_end=1213
- _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1215
- _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303
- _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305
- _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397
- _globals['_AGENTSTATE']._serialized_start=1400
- _globals['_AGENTSTATE']._serialized_end=1557
- _globals['_GETSTATERESPONSE']._serialized_start=1559
- _globals['_GETSTATERESPONSE']._serialized_end=1665
- _globals['_SAVESTATERESPONSE']._serialized_start=1667
- _globals['_SAVESTATERESPONSE']._serialized_end=1733
- _globals['_MESSAGE']._serialized_start=1736
- _globals['_MESSAGE']._serialized_end=2190
- _globals['_AGENTRPC']._serialized_start=2193
- _globals['_AGENTRPC']._serialized_end=2371
+ _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_start=1129
+ _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_end=1200
+ _globals['_SUBSCRIPTION']._serialized_start=1203
+ _globals['_SUBSCRIPTION']._serialized_end=1353
+ _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1355
+ _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1443
+ _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1445
+ _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1537
+ _globals['_AGENTSTATE']._serialized_start=1540
+ _globals['_AGENTSTATE']._serialized_end=1697
+ _globals['_GETSTATERESPONSE']._serialized_start=1699
+ _globals['_GETSTATERESPONSE']._serialized_end=1805
+ _globals['_SAVESTATERESPONSE']._serialized_start=1807
+ _globals['_SAVESTATERESPONSE']._serialized_end=1873
+ _globals['_MESSAGE']._serialized_start=1876
+ _globals['_MESSAGE']._serialized_end=2330
+ _globals['_AGENTRPC']._serialized_start=2333
+ _globals['_AGENTRPC']._serialized_end=2511
# @@protoc_insertion_point(module_scope)
diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi
index 522124ab8891..728bfafcc81a 100644
--- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi
+++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi
@@ -273,21 +273,43 @@ class TypeSubscription(google.protobuf.message.Message):
global___TypeSubscription = TypeSubscription
+@typing.final
+class TypePrefixSubscription(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ TOPIC_TYPE_PREFIX_FIELD_NUMBER: builtins.int
+ AGENT_TYPE_FIELD_NUMBER: builtins.int
+ topic_type_prefix: builtins.str
+ agent_type: builtins.str
+ def __init__(
+ self,
+ *,
+ topic_type_prefix: builtins.str = ...,
+ agent_type: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(self, field_name: typing.Literal["agent_type", b"agent_type", "topic_type_prefix", b"topic_type_prefix"]) -> None: ...
+
+global___TypePrefixSubscription = TypePrefixSubscription
+
@typing.final
class Subscription(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
TYPESUBSCRIPTION_FIELD_NUMBER: builtins.int
+ TYPEPREFIXSUBSCRIPTION_FIELD_NUMBER: builtins.int
@property
def typeSubscription(self) -> global___TypeSubscription: ...
+ @property
+ def typePrefixSubscription(self) -> global___TypePrefixSubscription: ...
def __init__(
self,
*,
typeSubscription: global___TypeSubscription | None = ...,
+ typePrefixSubscription: global___TypePrefixSubscription | None = ...,
) -> None: ...
- def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ...
- def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> None: ...
- def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription"] | None: ...
+ def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ...
+ def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> None: ...
+ def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription", "typePrefixSubscription"] | None: ...
global___Subscription = Subscription
diff --git a/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py b/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py
index a8e7f0096324..27c37ad9f349 100644
--- a/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py
+++ b/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py
@@ -55,6 +55,7 @@ async def publish_message(
*,
sender: AgentId | None = None,
cancellation_token: CancellationToken | None = None,
+ message_id: str | None = None,
) -> None:
"""Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender.
@@ -64,7 +65,8 @@ async def publish_message(
message (Any): The message to publish.
topic (TopicId): The topic to publish the message to.
sender (AgentId | None, optional): The agent which sent the message. Defaults to None.
- cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None.
+ cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress. Defaults to None.
+ message_id (str | None, optional): The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID.
Raises:
UndeliverableException: If the message cannot be delivered.
diff --git a/python/packages/autogen-core/src/autogen_core/base/_base_agent.py b/python/packages/autogen-core/src/autogen_core/base/_base_agent.py
index 5d8e94225b3a..70481705ca6e 100644
--- a/python/packages/autogen-core/src/autogen_core/base/_base_agent.py
+++ b/python/packages/autogen-core/src/autogen_core/base/_base_agent.py
@@ -20,6 +20,7 @@
from ._subscription import Subscription, UnboundSubscription
from ._subscription_context import SubscriptionInstantiationContext
from ._topic import TopicId
+from ._type_prefix_subscription import TypePrefixSubscription
T = TypeVar("T", bound=Agent)
@@ -149,6 +150,7 @@ async def register(
factory: Callable[[], Self | Awaitable[Self]],
*,
skip_class_subscriptions: bool = False,
+ skip_direct_message_subscription: bool = False,
) -> AgentType:
agent_type = AgentType(type)
agent_type = await runtime.register_factory(type=agent_type, agent_factory=factory, expected_class=cls)
@@ -166,6 +168,16 @@ async def register(
for subscription in subscriptions:
await runtime.add_subscription(subscription)
+ if not skip_direct_message_subscription:
+ # Additionally adds a special prefix subscription for this agent to receive direct messages
+ await runtime.add_subscription(
+ TypePrefixSubscription(
+ # The prefix MUST include ":" to avoid collisions with other agents
+ topic_type_prefix=agent_type.type + ":",
+ agent_type=agent_type.type,
+ )
+ )
+
# TODO: deduplication
for _message_type, serializer in cls._handles_types():
runtime.add_message_serializer(serializer)
diff --git a/python/packages/autogen-core/src/autogen_core/base/_message_context.py b/python/packages/autogen-core/src/autogen_core/base/_message_context.py
index 0a2c2973bc01..c5c00559ed0e 100644
--- a/python/packages/autogen-core/src/autogen_core/base/_message_context.py
+++ b/python/packages/autogen-core/src/autogen_core/base/_message_context.py
@@ -11,3 +11,4 @@ class MessageContext:
topic_id: TopicId | None
is_rpc: bool
cancellation_token: CancellationToken
+ message_id: str
diff --git a/python/packages/autogen-core/src/autogen_core/base/_type_prefix_subscription.py b/python/packages/autogen-core/src/autogen_core/base/_type_prefix_subscription.py
new file mode 100644
index 000000000000..f00119165090
--- /dev/null
+++ b/python/packages/autogen-core/src/autogen_core/base/_type_prefix_subscription.py
@@ -0,0 +1,65 @@
+import uuid
+
+from ._agent_id import AgentId
+from ._subscription import Subscription
+from ._topic import TopicId
+from .exceptions import CantHandleException
+
+
+class TypePrefixSubscription(Subscription):
+ """This subscription matches on topics based on a prefix of the type and maps to agents using the source of the topic as the agent key.
+
+ This subscription causes each source to have its own agent instance.
+
+ Example:
+
+ .. code-block:: python
+
+ from autogen_core.components import TypePrefixSubscription
+
+ subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1")
+
+ In this case:
+
+ - A topic_id with type `t1` and source `s1` will be handled by an agent of type `a1` with key `s1`
+ - A topic_id with type `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`.
+ - A topic_id with type `t1SUFFIX` and source `s2` will be handled by an agent of type `a1` with key `s2`.
+
+ Args:
+ topic_type_prefix (str): Topic type prefix to match against
+ agent_type (str): Agent type to handle this subscription
+ """
+
+ def __init__(self, topic_type_prefix: str, agent_type: str):
+ self._topic_type_prefix = topic_type_prefix
+ self._agent_type = agent_type
+ self._id = str(uuid.uuid4())
+
+ @property
+ def id(self) -> str:
+ return self._id
+
+ @property
+ def topic_type_prefix(self) -> str:
+ return self._topic_type_prefix
+
+ @property
+ def agent_type(self) -> str:
+ return self._agent_type
+
+ def is_match(self, topic_id: TopicId) -> bool:
+ return topic_id.type.startswith(self._topic_type_prefix)
+
+ def map_to_agent(self, topic_id: TopicId) -> AgentId:
+ if not self.is_match(topic_id):
+ raise CantHandleException("TopicId does not match the subscription")
+
+ return AgentId(type=self._agent_type, key=topic_id.source)
+
+ def __eq__(self, other: object) -> bool:
+ if not isinstance(other, TypePrefixSubscription):
+ return False
+
+ return self.id == other.id or (
+ self.agent_type == other.agent_type and self.topic_type_prefix == other.topic_type_prefix
+ )
diff --git a/python/packages/autogen-core/src/autogen_core/components/__init__.py b/python/packages/autogen-core/src/autogen_core/components/__init__.py
index 9ad8bdb35a19..75bb5eabcbe8 100644
--- a/python/packages/autogen-core/src/autogen_core/components/__init__.py
+++ b/python/packages/autogen-core/src/autogen_core/components/__init__.py
@@ -2,6 +2,7 @@
The :mod:`autogen_core.components` module provides building blocks for creating single agents
"""
+from ..base._type_prefix_subscription import TypePrefixSubscription
from ._closure_agent import ClosureAgent
from ._default_subscription import DefaultSubscription, default_subscription, type_subscription
from ._default_topic import DefaultTopicId
@@ -24,4 +25,5 @@
"DefaultTopicId",
"default_subscription",
"type_subscription",
+ "TypePrefixSubscription",
]
diff --git a/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py b/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py
index 92709a457aec..94def76595d5 100644
--- a/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py
+++ b/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py
@@ -1,7 +1,6 @@
import uuid
-from typing import TypeVar
-from ..base import AgentId, BaseAgent, Subscription, TopicId
+from ..base import AgentId, Subscription, TopicId
from ..base.exceptions import CantHandleException
@@ -59,6 +58,3 @@ def __eq__(self, other: object) -> bool:
return False
return self.id == other.id or (self.agent_type == other.agent_type and self.topic_type == other.topic_type)
-
-
-BaseAgentType = TypeVar("BaseAgentType", bound="BaseAgent")
diff --git a/python/packages/autogen-core/tests/protos/serialization_test.proto b/python/packages/autogen-core/tests/protos/serialization_test.proto
new file mode 100644
index 000000000000..611100ccde12
--- /dev/null
+++ b/python/packages/autogen-core/tests/protos/serialization_test.proto
@@ -0,0 +1,11 @@
+syntax = "proto3";
+
+package agents;
+
+message ProtoMessage {
+ string message = 1;
+}
+message NestingProtoMessage {
+ string message = 1;
+ ProtoMessage nested = 2;
+}
\ No newline at end of file
diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2.py b/python/packages/autogen-core/tests/protos/serialization_test_pb2.py
new file mode 100644
index 000000000000..ebc4bfee7018
--- /dev/null
+++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2.py
@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler. DO NOT EDIT!
+# source: serialization_test.proto
+# Protobuf Python Version: 4.25.1
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18serialization_test.proto\x12\x06\x61gents\"\x1f\n\x0cProtoMessage\x12\x0f\n\x07message\x18\x01 \x01(\t\"L\n\x13NestingProtoMessage\x12\x0f\n\x07message\x18\x01 \x01(\t\x12$\n\x06nested\x18\x02 \x01(\x0b\x32\x14.agents.ProtoMessageb\x06proto3')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'serialization_test_pb2', _globals)
+if _descriptor._USE_C_DESCRIPTORS == False:
+ DESCRIPTOR._options = None
+ _globals['_PROTOMESSAGE']._serialized_start=36
+ _globals['_PROTOMESSAGE']._serialized_end=67
+ _globals['_NESTINGPROTOMESSAGE']._serialized_start=69
+ _globals['_NESTINGPROTOMESSAGE']._serialized_end=145
+# @@protoc_insertion_point(module_scope)
diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi b/python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi
new file mode 100644
index 000000000000..b8a284663f6e
--- /dev/null
+++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi
@@ -0,0 +1,46 @@
+"""
+@generated by mypy-protobuf. Do not edit manually!
+isort:skip_file
+"""
+
+import builtins
+import google.protobuf.descriptor
+import google.protobuf.message
+import typing
+
+DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
+
+@typing.final
+class ProtoMessage(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ MESSAGE_FIELD_NUMBER: builtins.int
+ message: builtins.str
+ def __init__(
+ self,
+ *,
+ message: builtins.str = ...,
+ ) -> None: ...
+ def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ...
+
+global___ProtoMessage = ProtoMessage
+
+@typing.final
+class NestingProtoMessage(google.protobuf.message.Message):
+ DESCRIPTOR: google.protobuf.descriptor.Descriptor
+
+ MESSAGE_FIELD_NUMBER: builtins.int
+ NESTED_FIELD_NUMBER: builtins.int
+ message: builtins.str
+ @property
+ def nested(self) -> global___ProtoMessage: ...
+ def __init__(
+ self,
+ *,
+ message: builtins.str = ...,
+ nested: global___ProtoMessage | None = ...,
+ ) -> None: ...
+ def HasField(self, field_name: typing.Literal["nested", b"nested"]) -> builtins.bool: ...
+ def ClearField(self, field_name: typing.Literal["message", b"message", "nested", b"nested"]) -> None: ...
+
+global___NestingProtoMessage = NestingProtoMessage
diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py
new file mode 100644
index 000000000000..2daafffebfc8
--- /dev/null
+++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py
@@ -0,0 +1,4 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+"""Client and server classes corresponding to protobuf-defined services."""
+import grpc
+
diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi
new file mode 100644
index 000000000000..a6a9cff9dfd4
--- /dev/null
+++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi
@@ -0,0 +1,17 @@
+"""
+@generated by mypy-protobuf. Do not edit manually!
+isort:skip_file
+"""
+
+import abc
+import collections.abc
+import grpc
+import grpc.aio
+import typing
+
+_T = typing.TypeVar("_T")
+
+class _MaybeAsyncIterator(collections.abc.AsyncIterator[_T], collections.abc.Iterator[_T], metaclass=abc.ABCMeta): ...
+
+class _ServicerContext(grpc.ServicerContext, grpc.aio.ServicerContext): # type: ignore[misc, type-arg]
+ ...
diff --git a/python/packages/autogen-core/tests/test_serialization.py b/python/packages/autogen-core/tests/test_serialization.py
index 3f3b0174c8b0..6b5568411f6f 100644
--- a/python/packages/autogen-core/tests/test_serialization.py
+++ b/python/packages/autogen-core/tests/test_serialization.py
@@ -11,6 +11,7 @@
from autogen_core.base._serialization import DataclassJsonMessageSerializer, PydanticJsonMessageSerializer
from autogen_core.components import Image
from PIL import Image as PILImage
+from protos.serialization_test_pb2 import NestingProtoMessage, ProtoMessage
from pydantic import BaseModel
@@ -83,6 +84,36 @@ def test_nesting_dataclass_dataclass() -> None:
serde.add_serializer(try_get_known_serializers_for_type(NestingDataclassMessage))
+def test_proto() -> None:
+ serde = SerializationRegistry()
+ serde.add_serializer(try_get_known_serializers_for_type(ProtoMessage))
+
+ message = ProtoMessage(message="hello")
+ name = serde.type_name(message)
+ # TODO: should be PROTO_DATA_CONTENT_TYPE
+ data = serde.serialize(message, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE)
+ assert name == "ProtoMessage"
+ # TODO: assert data == stuff
+ deserialized = serde.deserialize(data, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE)
+ assert deserialized == message
+
+
+def test_nested_proto() -> None:
+ serde = SerializationRegistry()
+ serde.add_serializer(try_get_known_serializers_for_type(NestingProtoMessage))
+
+ message = NestingProtoMessage(message="hello", nested=ProtoMessage(message="world"))
+ name = serde.type_name(message)
+
+ # TODO: should be PROTO_DATA_CONTENT_TYPE
+ data = serde.serialize(message, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE)
+
+ # TODO: assert data == stuff
+
+ deserialized = serde.deserialize(data, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE)
+ assert deserialized == message
+
+
@dataclass
class DataclassNestedUnionSyntaxOldMessage:
message: Union[str, int]
diff --git a/python/packages/autogen-core/tests/test_worker_runtime.py b/python/packages/autogen-core/tests/test_worker_runtime.py
index d58233c3ac82..26c95dc01860 100644
--- a/python/packages/autogen-core/tests/test_worker_runtime.py
+++ b/python/packages/autogen-core/tests/test_worker_runtime.py
@@ -360,7 +360,7 @@ async def get_subscribed_recipients() -> List[AgentId]:
)
subscriptions1 = get_current_subscriptions()
- assert len(subscriptions1) == 1
+ assert len(subscriptions1) == 2
recipients1 = await get_subscribed_recipients()
assert AgentId(type="worker1", key="default") in recipients1
@@ -388,7 +388,7 @@ async def get_subscribed_recipients() -> List[AgentId]:
)
subscriptions3 = get_current_subscriptions()
- assert len(subscriptions3) == 1
+ assert len(subscriptions3) == 2
assert first_subscription_id not in [x.id for x in subscriptions3]
recipients3 = await get_subscribed_recipients()
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 3b099db535e8..e9b9753cfca9 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -79,3 +79,5 @@ test = "python run_task_in_pkgs_if_exist.py test"
check = ["fmt", "lint", "pyright", "mypy", "test"]
gen-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/src/autogen_core/application/protos --grpc_python_out=./packages/autogen-core/src/autogen_core/application/protos --mypy_out=./packages/autogen-core/src/autogen_core/application/protos --mypy_grpc_out=./packages/autogen-core/src/autogen_core/application/protos --proto_path ../protos/ agent_worker.proto --proto_path ../protos/ cloudevent.proto"
+
+gen-test-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/tests/protos --grpc_python_out=./packages/autogen-core/tests/protos --mypy_out=./packages/autogen-core/tests/protos --mypy_grpc_out=./packages/autogen-core/tests/protos --proto_path ./packages/autogen-core/tests/protos serialization_test.proto"
\ No newline at end of file