From b94abb2a6c10aca3c3f8d894b9c708e3f7edd37b Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Mon, 25 Nov 2024 16:29:51 -0800 Subject: [PATCH 1/8] =?UTF-8?q?add=20default=20subscriptions=20for=20the?= =?UTF-8?q?=20agent=20type=20-=20Implicitly=20created=20sub=E2=80=A6=20(#4?= =?UTF-8?q?324)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add default subscriptions for the agent type - Implicitly created subscription for agent RPC #4321 * add default sub for agenttype+id * fix subscription implementation for in memory runtime --------- Co-authored-by: XiaoYun Zhang --- .../src/Microsoft.AutoGen/Agents/AgentBase.cs | 43 ++++++++++++------- .../Agents/Services/AgentWorker.cs | 32 +++++++++++--- .../Agents/Services/Grpc/GrpcGateway.cs | 16 +++++++ .../AgentBaseTests.cs | 4 ++ 4 files changed, 75 insertions(+), 20 deletions(-) diff --git a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs index 345e6d34c82..01ad856a2d4 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/AgentBase.cs @@ -15,27 +15,40 @@ namespace Microsoft.AutoGen.Agents; public abstract class AgentBase : IAgentBase, IHandle { public static readonly ActivitySource s_source = new("AutoGen.Agent"); - public AgentId AgentId => _context.AgentId; + public AgentId AgentId => _runtime.AgentId; private readonly object _lock = new(); private readonly Dictionary> _pendingRequests = []; private readonly Channel _mailbox = Channel.CreateUnbounded(); - private readonly IAgentRuntime _context; + private readonly IAgentRuntime _runtime; public string Route { get; set; } = "base"; protected internal ILogger _logger; - public IAgentRuntime Context => _context; + public IAgentRuntime Context => _runtime; protected readonly EventTypes EventTypes; protected AgentBase( - IAgentRuntime context, + IAgentRuntime runtime, EventTypes eventTypes, ILogger? logger = null) { - _context = context; - context.AgentInstance = this; + _runtime = runtime; + runtime.AgentInstance = this; this.EventTypes = eventTypes; _logger = logger ?? LoggerFactory.Create(builder => { }).CreateLogger(); + var subscriptionRequest = new AddSubscriptionRequest + { + RequestId = Guid.NewGuid().ToString(), + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = this.AgentId.Type, + TopicType = this.AgentId.Type + "/" + this.AgentId.Key + } + } + }; + _runtime.SendMessageAsync(new Message { AddSubscriptionRequest = subscriptionRequest }).AsTask().Wait(); Completion = Start(); } internal Task Completion { get; } @@ -131,19 +144,19 @@ public List Subscribe(string topic) } } }; - _context.SendMessageAsync(message).AsTask().Wait(); + _runtime.SendMessageAsync(message).AsTask().Wait(); return new List { topic }; } public async Task StoreAsync(AgentState state, CancellationToken cancellationToken = default) { - await _context.StoreAsync(state, cancellationToken).ConfigureAwait(false); + await _runtime.StoreAsync(state, cancellationToken).ConfigureAwait(false); return; } public async Task ReadAsync(AgentId agentId, CancellationToken cancellationToken = default) where T : IMessage, new() { - var agentState = await _context.ReadAsync(agentId, cancellationToken).ConfigureAwait(false); - return agentState.FromAgentState(); + var agentstate = await _runtime.ReadAsync(agentId, cancellationToken).ConfigureAwait(false); + return agentstate.FromAgentState(); } private void OnResponseCore(RpcResponse response) { @@ -171,7 +184,7 @@ private async Task OnRequestCoreAsync(RpcRequest request, CancellationToken canc { response = new RpcResponse { Error = ex.Message }; } - await _context.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false); + await _runtime.SendResponseAsync(request, response, cancellationToken).ConfigureAwait(false); } protected async Task RequestAsync(AgentId target, string method, Dictionary parameters) @@ -195,7 +208,7 @@ protected async Task RequestAsync(AgentId target, string method, Di activity?.SetTag("peer.service", target.ToString()); var completion = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _context.Update(request, activity); + _runtime.Update(request, activity); await this.InvokeWithActivityAsync( static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource) state, CancellationToken ct) => { @@ -206,7 +219,7 @@ static async ((AgentBase Agent, RpcRequest Request, TaskCompletionSource { - await state.Agent._context.PublishEventAsync(state.Event, ct).ConfigureAwait(false); + await state.Agent._runtime.PublishEventAsync(state.Event).ConfigureAwait(false); }, (this, item), activity, diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs index a69da96fb3d..f9a5050534c 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/AgentWorker.cs @@ -24,6 +24,8 @@ public class AgentWorker : private readonly CancellationTokenSource _shutdownCts; private readonly IServiceProvider _serviceProvider; private readonly IEnumerable> _configuredAgentTypes; + private readonly ConcurrentDictionary _subscriptionsByAgentType = new(); + private readonly ConcurrentDictionary> _subscriptionsByTopic = new(); private readonly DistributedContextPropagator _distributedContextPropagator; private readonly CancellationTokenSource _shutdownCancellationToken = new(); private Task? _mailboxTask; @@ -96,11 +98,7 @@ public async Task RunMessagePump() if (message == null) { continue; } switch (message) { - case Message.MessageOneofCase.AddSubscriptionResponse: - break; - case Message.MessageOneofCase.RegisterAgentTypeResponse: - break; - case Message msg: + case Message msg when msg.CloudEvent != null: var item = msg.CloudEvent; @@ -110,6 +108,13 @@ public async Task RunMessagePump() agentToInvoke.ReceiveMessage(msg); } break; + case Message msg when msg.AddSubscriptionRequest != null: + await AddSubscriptionRequestAsync(msg.AddSubscriptionRequest).ConfigureAwait(true); + break; + case Message msg when msg.AddSubscriptionResponse != null: + break; + case Message msg when msg.RegisterAgentTypeResponse != null: + break; default: throw new InvalidOperationException($"Unexpected message '{message}'."); } @@ -123,6 +128,23 @@ public async Task RunMessagePump() } } } + private async ValueTask AddSubscriptionRequestAsync(AddSubscriptionRequest subscription) + { + var topic = subscription.Subscription.TypeSubscription.TopicType; + var agentType = subscription.Subscription.TypeSubscription.AgentType; + _subscriptionsByAgentType[agentType] = subscription.Subscription; + _subscriptionsByTopic.GetOrAdd(topic, _ => []).Add(agentType); + Message response = new() + { + AddSubscriptionResponse = new() + { + RequestId = subscription.RequestId, + Error = "", + Success = true + } + }; + await _mailbox.Writer.WriteAsync(response).ConfigureAwait(false); + } public async Task StartAsync(CancellationToken cancellationToken) { diff --git a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs index 45477c8eb5a..ab24a0e15fe 100644 --- a/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs +++ b/dotnet/src/Microsoft.AutoGen/Agents/Services/Grpc/GrpcGateway.cs @@ -153,6 +153,22 @@ private async ValueTask RegisterAgentTypeAsync(GrpcWorkerConnection connection, Success = true } }; + // add a default subscription for the agent type + //TODO: we should consider having constraints on the namespace or at least migrate all our examples to use well typed namesspaces like com.microsoft.autogen/hello/HelloAgents etc + var subscriptionRequest = new AddSubscriptionRequest + { + RequestId = Guid.NewGuid().ToString(), + Subscription = new Subscription + { + TypeSubscription = new TypeSubscription + { + AgentType = msg.Type, + TopicType = msg.Type + } + } + }; + await AddSubscriptionAsync(connection, subscriptionRequest).ConfigureAwait(true); + await connection.ResponseStream.WriteAsync(response).ConfigureAwait(false); } private async ValueTask DispatchEventAsync(CloudEvent evt) diff --git a/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs index e58fdb00f0a..7e272ce6bed 100644 --- a/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs +++ b/dotnet/test/Microsoft.AutoGen.Agents.Tests/AgentBaseTests.cs @@ -23,6 +23,10 @@ public class AgentBaseTests(InMemoryAgentRuntimeFixture fixture) public async Task ItInvokeRightHandlerTestAsync() { var mockContext = new Mock(); + mockContext.SetupGet(x => x.AgentId).Returns(new AgentId("test", "test")); + // mock SendMessageAsync + mockContext.Setup(x => x.SendMessageAsync(It.IsAny(), It.IsAny())) + .Returns(new ValueTask()); var agent = new TestAgent(mockContext.Object, new EventTypes(TypeRegistry.Empty, [], []), new Logger(new LoggerFactory())); await agent.HandleObject("hello world"); From bcd6e71e7f762c94d6e2c5a48d4e9856aa46da59 Mon Sep 17 00:00:00 2001 From: Eric Zhu Date: Mon, 25 Nov 2024 18:18:13 -0800 Subject: [PATCH 2/8] Fix assistant agent doc (#4365) --- .../src/autogen_agentchat/agents/_assistant_agent.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py index cb1eff8d6f6..0870a6c2f3b 100644 --- a/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py +++ b/python/packages/autogen-agentchat/src/autogen_agentchat/agents/_assistant_agent.py @@ -82,13 +82,13 @@ def _handoff_tool() -> str: class AssistantAgent(BaseChatAgent): """An agent that provides assistance with tool use. - It responds with a StopMessage when 'terminate' is detected in the response. - Args: name (str): The name of the agent. model_client (ChatCompletionClient): The model client to use for inference. tools (List[Tool | Callable[..., Any] | Callable[..., Awaitable[Any]]] | None, optional): The tools to register with the agent. - handoffs (List[Handoff | str] | None, optional): The handoff configurations for the agent, allowing it to transfer to other agents by responding with a HandoffMessage. + handoffs (List[Handoff | str] | None, optional): The handoff configurations for the agent, + allowing it to transfer to other agents by responding with a :class:`HandoffMessage`. + The transfer is only executed when the team is in :class:`~autogen_agentchat.teams.Swarm`. If a handoff is a string, it should represent the target agent's name. description (str, optional): The description of the agent. system_message (str, optional): The system message for the model. From 6a9c14715b04de653b16a2d1376461e710b80179 Mon Sep 17 00:00:00 2001 From: Ryan Sweet Date: Tue, 26 Nov 2024 00:50:14 -0800 Subject: [PATCH 3/8] add office hours link to README.md (#4366) --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 253a6b0ea07..c38d5f2805f 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ > - (11/14/24) ⚠️ In response to a number of asks to clarify and distinguish between official AutoGen and its forks that created confusion, we issued a [clarification statement](https://github.com/microsoft/autogen/discussions/4217). > - (10/13/24) Interested in the standard AutoGen as a prior user? Find it at the actively-maintained *AutoGen* [0.2 branch](https://github.com/microsoft/autogen/tree/0.2) and `autogen-agentchat~=0.2` PyPi package. > - (10/02/24) [AutoGen 0.4](https://microsoft.github.io/autogen/dev) is a from-the-ground-up rewrite of AutoGen. Learn more about the history, goals and future at [this blog post](https://microsoft.github.io/autogen/blog). We’re excited to work with the community to gather feedback, refine, and improve the project before we officially release 0.4. This is a big change, so AutoGen 0.2 is still available, maintained, and developed in the [0.2 branch](https://github.com/microsoft/autogen/tree/0.2). +> - *[Join us for Community Office Hours](https://github.com/microsoft/autogen/discussions/4059)* We will host a weekly open discussion to answer questions, talk about Roadmap, etc. AutoGen is an open-source framework for building AI agent systems. It simplifies the creation of event-driven, distributed, scalable, and resilient agentic applications. From 5aecb56ebdba7cdab1618a32bcd1942858b58749 Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Tue, 26 Nov 2024 11:04:59 -0500 Subject: [PATCH 4/8] Add badges, update contributing, add pointers in various places (#4371) --- CONTRIBUTING.md | 58 ++++++++++++++++++++++++++++++++++++++++--- README.md | 6 +++-- docs/design/readme.md | 3 +++ python/README.md | 5 +++- 4 files changed, 66 insertions(+), 6 deletions(-) create mode 100644 docs/design/readme.md diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 411a8da3f3c..fadb8091a0d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -24,10 +24,62 @@ This project has adopted the [Microsoft Open Source Code of Conduct](https://ope For more information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeofconduct/faq/) or contact [opencode@microsoft.com](mailto:opencode@microsoft.com) with any additional questions or comments. -## Roadmaps +## Running CI checks locally -To see what we are working on and what we plan to work on, please check our -[Roadmap Issues](https://aka.ms/autogen-roadmap). +It is important to use `uv` when running CI checks locally as it ensures that the correct dependencies and versions are used. + +Please follow the instructions [here](./python/README.md#setup) to get set up. + +For common tasks that are helpful during development and run in CI, see [here](./python/README.md#common-tasks). + +## Roadmap + +We use GitHub issues and milestones to track our roadmap. You can view the upcoming milestones [here]([Roadmap Issues](https://aka.ms/autogen-roadmap). + +## Versioning + +The set of `autogen-*` packages are generally all versioned together. When a change is made to one package, all packages are updated to the same version. This is to ensure that all packages are in sync with each other. + +We will update verion numbers according to the following rules: + +- Increase minor version (0.X.0) upon breaking changes +- Increase patch version (0.0.X) upon new features or bug fixes + +## Release process + +1. Create a PR that updates the version numbers across the codebase ([example](https://github.com/microsoft/autogen/pull/4359)) + 2. The docs CI will fail for the PR, but this is expected and will be resolved in the next step +2. After merging the PR, create and push a tag that corresponds to the new verion. For example, for `0.4.0.dev7`: + - `git tag 0.4.0.dev7 && git push origin 0.4.0.dev7` +3. Restart the docs CI by finding the failed [job corresponding to the `push` event](https://github.com/microsoft/autogen/actions/workflows/docs.yml) and restarting all jobs +4. Run [this](https://github.com/microsoft/autogen/actions/workflows/single-python-package.yml) workflow for each of the packages that need to be released and get an approval for the release for it to run + +## Triage process + +To help ensure the health of the project and community the AutoGen committers have a weekly triage process to ensure that all issues and pull requests are reviewed and addressed in a timely manner. The following documents the responsibilites while on triage duty: + +- Issues + - Review all new issues - these will be tagged with [`needs-triage`](https://github.com/microsoft/autogen/issues?q=is%3Aissue%20state%3Aopen%20label%3Aneeds-triage). + - Apply appropriate labels: + - One of `proj-*` labels based on the project the issue is related to + - `documentation`: related to documentation + - `x-lang`: related to cross language functionality + - `dotnet`: related to .NET + - Add the issue to a relevant milestone if necessary + - If you can resolve the issue or reply to the OP please do. + - If you cannot resolve the issue, assign it to the appropriate person. + - If awaiting a reply add the tag `awaiting-op-response` (this will be auto removed when the OP replies). + - Bonus: there is a backlog of old issues that need to be reviewed - if you have time, review these as well and close or refresh as many as you can. +- PRs + - The UX on GH flags all recently updated PRs. Draft PRs can be ignored, otherwise review all recently updated PRs. + - If a PR is ready for review and you can provide one please go ahead. If you cant, please assign someone. You can quickly spin up a codespace with the PR to test it out. + - If a PR is needing a reply from the op, please tag it `awaiting-op-response`. + - If a PR is approved and passes CI, its ready to merge, please do so. + - If it looks like there is a possibly transient CI failure, re-run failed jobs. +- Discussions + - Look for recently updated discussions and reply as needed or find someone on the team to reply. +- Security + - Look through any securty alerts and file issues or dismiss as needed. ## Becoming a Reviewer diff --git a/README.md b/README.md index c38d5f2805f..2cb9b266d83 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,9 @@
AutoGen Logo -[![Twitter](https://img.shields.io/twitter/url/https/twitter.com/cloudposse.svg?style=social&label=Follow%20%40pyautogen)](https://twitter.com/pyautogen) +[![Twitter](https://img.shields.io/twitter/url/https/twitter.com/cloudposse.svg?style=social&label=Follow%20%40pyautogen)](https://twitter.com/pyautogen) [![GitHub Discussions](https://img.shields.io/badge/Discussions-Q%26A-green?logo=github)](https://github.com/microsoft/autogen/discussions) [![0.2 Docs](https://img.shields.io/badge/Docs-0.2-blue)](https://microsoft.github.io/autogen/0.2/) [![0.4 Docs](https://img.shields.io/badge/Docs-0.4-blue)](https://microsoft.github.io/autogen/dev/) +[![PyPi autogen-core](https://img.shields.io/badge/PyPi-autogen--core-blue?logo=pypi)](https://pypi.org/project/autogen-core/0.4.0.dev7/) [![PyPi autogen-agentchat](https://img.shields.io/badge/PyPi-autogen--agentchat-blue?logo=pypi)](https://pypi.org/project/autogen-agentchat/0.4.0.dev7/) [![PyPi autogen-ext](https://img.shields.io/badge/PyPi-autogen--ext-blue?logo=pypi)](https://pypi.org/project/autogen-ext/0.4.0.dev7/) +
@@ -13,7 +15,7 @@ > - (11/14/24) ⚠️ In response to a number of asks to clarify and distinguish between official AutoGen and its forks that created confusion, we issued a [clarification statement](https://github.com/microsoft/autogen/discussions/4217). > - (10/13/24) Interested in the standard AutoGen as a prior user? Find it at the actively-maintained *AutoGen* [0.2 branch](https://github.com/microsoft/autogen/tree/0.2) and `autogen-agentchat~=0.2` PyPi package. > - (10/02/24) [AutoGen 0.4](https://microsoft.github.io/autogen/dev) is a from-the-ground-up rewrite of AutoGen. Learn more about the history, goals and future at [this blog post](https://microsoft.github.io/autogen/blog). We’re excited to work with the community to gather feedback, refine, and improve the project before we officially release 0.4. This is a big change, so AutoGen 0.2 is still available, maintained, and developed in the [0.2 branch](https://github.com/microsoft/autogen/tree/0.2). -> - *[Join us for Community Office Hours](https://github.com/microsoft/autogen/discussions/4059)* We will host a weekly open discussion to answer questions, talk about Roadmap, etc. +> - *[Join us for Community Office Hours](https://github.com/microsoft/autogen/discussions/4059)* We will host a weekly open discussion to answer questions, talk about Roadmap, etc. AutoGen is an open-source framework for building AI agent systems. It simplifies the creation of event-driven, distributed, scalable, and resilient agentic applications. diff --git a/docs/design/readme.md b/docs/design/readme.md new file mode 100644 index 00000000000..6a8221027f7 --- /dev/null +++ b/docs/design/readme.md @@ -0,0 +1,3 @@ +# Docs + +You can find the project documentation [here](https://microsoft.github.io/autogen/dev/). diff --git a/python/README.md b/python/README.md index 5b012ff60bf..69085b4c8dd 100644 --- a/python/README.md +++ b/python/README.md @@ -1,8 +1,11 @@ # AutoGen Python packages -See [`autogen-core`](./packages/autogen-core/) package for main functionality. +[![0.4 Docs](https://img.shields.io/badge/Docs-0.4-blue)](https://microsoft.github.io/autogen/dev/) +[![PyPi autogen-core](https://img.shields.io/badge/PyPi-autogen--core-blue?logo=pypi)](https://pypi.org/project/autogen-core/0.4.0.dev7/) [![PyPi autogen-agentchat](https://img.shields.io/badge/PyPi-autogen--agentchat-blue?logo=pypi)](https://pypi.org/project/autogen-agentchat/0.4.0.dev7/) [![PyPi autogen-ext](https://img.shields.io/badge/PyPi-autogen--ext-blue?logo=pypi)](https://pypi.org/project/autogen-ext/0.4.0.dev7/) +This directory works as a single `uv` workspace containing all project packages. See [`packages`](./packages/) to discover all project packages. + ## Development **TL;DR**, run all checks with: From 74bcd5e0f6f36508028e59d9ebad8e951d07f8ea Mon Sep 17 00:00:00 2001 From: peterychang <49209570+peterychang@users.noreply.github.com> Date: Tue, 26 Nov 2024 15:37:40 -0500 Subject: [PATCH 5/8] add protobuf serialization test (#4224) * add protobuf serialization test * proto file regeneration --------- Co-authored-by: Ryan Sweet Co-authored-by: Mohammad Mazraeh --- .github/workflows/checks.yml | 1 + python/packages/autogen-core/pyproject.toml | 6 +-- .../tests/protos/serialization_test.proto | 11 +++++ .../tests/protos/serialization_test_pb2.py | 28 +++++++++++ .../tests/protos/serialization_test_pb2.pyi | 46 +++++++++++++++++++ .../protos/serialization_test_pb2_grpc.py | 4 ++ .../protos/serialization_test_pb2_grpc.pyi | 17 +++++++ .../autogen-core/tests/test_serialization.py | 31 +++++++++++++ python/pyproject.toml | 2 + 9 files changed, 143 insertions(+), 3 deletions(-) create mode 100644 python/packages/autogen-core/tests/protos/serialization_test.proto create mode 100644 python/packages/autogen-core/tests/protos/serialization_test_pb2.py create mode 100644 python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi create mode 100644 python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py create mode 100644 python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 31869b86ccc..703c0749425 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -190,6 +190,7 @@ jobs: run: | source ${{ github.workspace }}/python/.venv/bin/activate poe gen-proto + poe gen-test-proto working-directory: ./python - name: Check if there are uncommited changes id: changes diff --git a/python/packages/autogen-core/pyproject.toml b/python/packages/autogen-core/pyproject.toml index 064928590b0..8727f5ee0c2 100644 --- a/python/packages/autogen-core/pyproject.toml +++ b/python/packages/autogen-core/pyproject.toml @@ -81,7 +81,7 @@ dev-dependencies = [ [tool.ruff] extend = "../../pyproject.toml" -exclude = ["build", "dist", "src/autogen_core/application/protos"] +exclude = ["build", "dist", "src/autogen_core/application/protos", "tests/protos"] include = ["src/**", "samples/*.py", "docs/**/*.ipynb", "tests/**"] [tool.ruff.lint.per-file-ignores] @@ -91,7 +91,7 @@ include = ["src/**", "samples/*.py", "docs/**/*.ipynb", "tests/**"] [tool.pyright] extends = "../../pyproject.toml" include = ["src", "tests", "samples"] -exclude = ["src/autogen_core/application/protos"] +exclude = ["src/autogen_core/application/protos", "tests/protos"] reportDeprecated = false [tool.pytest.ini_options] @@ -111,7 +111,7 @@ include = "../../shared_tasks.toml" test = "pytest -n auto" mypy.default_item_type = "cmd" mypy.sequence = [ - "mypy --config-file ../../pyproject.toml --exclude src/autogen_core/application/protos src tests", + "mypy --config-file ../../pyproject.toml --exclude src/autogen_core/application/protos --exclude tests/protos src tests", "nbqa mypy docs/src --config-file ../../pyproject.toml", ] diff --git a/python/packages/autogen-core/tests/protos/serialization_test.proto b/python/packages/autogen-core/tests/protos/serialization_test.proto new file mode 100644 index 00000000000..611100ccde1 --- /dev/null +++ b/python/packages/autogen-core/tests/protos/serialization_test.proto @@ -0,0 +1,11 @@ +syntax = "proto3"; + +package agents; + +message ProtoMessage { + string message = 1; +} +message NestingProtoMessage { + string message = 1; + ProtoMessage nested = 2; +} \ No newline at end of file diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2.py b/python/packages/autogen-core/tests/protos/serialization_test_pb2.py new file mode 100644 index 00000000000..ebc4bfee701 --- /dev/null +++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2.py @@ -0,0 +1,28 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: serialization_test.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x18serialization_test.proto\x12\x06\x61gents\"\x1f\n\x0cProtoMessage\x12\x0f\n\x07message\x18\x01 \x01(\t\"L\n\x13NestingProtoMessage\x12\x0f\n\x07message\x18\x01 \x01(\t\x12$\n\x06nested\x18\x02 \x01(\x0b\x32\x14.agents.ProtoMessageb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'serialization_test_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_PROTOMESSAGE']._serialized_start=36 + _globals['_PROTOMESSAGE']._serialized_end=67 + _globals['_NESTINGPROTOMESSAGE']._serialized_start=69 + _globals['_NESTINGPROTOMESSAGE']._serialized_end=145 +# @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi b/python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi new file mode 100644 index 00000000000..b8a284663f6 --- /dev/null +++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2.pyi @@ -0,0 +1,46 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import builtins +import google.protobuf.descriptor +import google.protobuf.message +import typing + +DESCRIPTOR: google.protobuf.descriptor.FileDescriptor + +@typing.final +class ProtoMessage(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + message: builtins.str + def __init__( + self, + *, + message: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["message", b"message"]) -> None: ... + +global___ProtoMessage = ProtoMessage + +@typing.final +class NestingProtoMessage(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + MESSAGE_FIELD_NUMBER: builtins.int + NESTED_FIELD_NUMBER: builtins.int + message: builtins.str + @property + def nested(self) -> global___ProtoMessage: ... + def __init__( + self, + *, + message: builtins.str = ..., + nested: global___ProtoMessage | None = ..., + ) -> None: ... + def HasField(self, field_name: typing.Literal["nested", b"nested"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["message", b"message", "nested", b"nested"]) -> None: ... + +global___NestingProtoMessage = NestingProtoMessage diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py new file mode 100644 index 00000000000..2daafffebfc --- /dev/null +++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.py @@ -0,0 +1,4 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + diff --git a/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi new file mode 100644 index 00000000000..a6a9cff9dfd --- /dev/null +++ b/python/packages/autogen-core/tests/protos/serialization_test_pb2_grpc.pyi @@ -0,0 +1,17 @@ +""" +@generated by mypy-protobuf. Do not edit manually! +isort:skip_file +""" + +import abc +import collections.abc +import grpc +import grpc.aio +import typing + +_T = typing.TypeVar("_T") + +class _MaybeAsyncIterator(collections.abc.AsyncIterator[_T], collections.abc.Iterator[_T], metaclass=abc.ABCMeta): ... + +class _ServicerContext(grpc.ServicerContext, grpc.aio.ServicerContext): # type: ignore[misc, type-arg] + ... diff --git a/python/packages/autogen-core/tests/test_serialization.py b/python/packages/autogen-core/tests/test_serialization.py index 3f3b0174c8b..6b5568411f6 100644 --- a/python/packages/autogen-core/tests/test_serialization.py +++ b/python/packages/autogen-core/tests/test_serialization.py @@ -11,6 +11,7 @@ from autogen_core.base._serialization import DataclassJsonMessageSerializer, PydanticJsonMessageSerializer from autogen_core.components import Image from PIL import Image as PILImage +from protos.serialization_test_pb2 import NestingProtoMessage, ProtoMessage from pydantic import BaseModel @@ -83,6 +84,36 @@ def test_nesting_dataclass_dataclass() -> None: serde.add_serializer(try_get_known_serializers_for_type(NestingDataclassMessage)) +def test_proto() -> None: + serde = SerializationRegistry() + serde.add_serializer(try_get_known_serializers_for_type(ProtoMessage)) + + message = ProtoMessage(message="hello") + name = serde.type_name(message) + # TODO: should be PROTO_DATA_CONTENT_TYPE + data = serde.serialize(message, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE) + assert name == "ProtoMessage" + # TODO: assert data == stuff + deserialized = serde.deserialize(data, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE) + assert deserialized == message + + +def test_nested_proto() -> None: + serde = SerializationRegistry() + serde.add_serializer(try_get_known_serializers_for_type(NestingProtoMessage)) + + message = NestingProtoMessage(message="hello", nested=ProtoMessage(message="world")) + name = serde.type_name(message) + + # TODO: should be PROTO_DATA_CONTENT_TYPE + data = serde.serialize(message, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE) + + # TODO: assert data == stuff + + deserialized = serde.deserialize(data, type_name=name, data_content_type=JSON_DATA_CONTENT_TYPE) + assert deserialized == message + + @dataclass class DataclassNestedUnionSyntaxOldMessage: message: Union[str, int] diff --git a/python/pyproject.toml b/python/pyproject.toml index 3b099db535e..e9b9753cfca 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -79,3 +79,5 @@ test = "python run_task_in_pkgs_if_exist.py test" check = ["fmt", "lint", "pyright", "mypy", "test"] gen-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/src/autogen_core/application/protos --grpc_python_out=./packages/autogen-core/src/autogen_core/application/protos --mypy_out=./packages/autogen-core/src/autogen_core/application/protos --mypy_grpc_out=./packages/autogen-core/src/autogen_core/application/protos --proto_path ../protos/ agent_worker.proto --proto_path ../protos/ cloudevent.proto" + +gen-test-proto = "python -m grpc_tools.protoc --python_out=./packages/autogen-core/tests/protos --grpc_python_out=./packages/autogen-core/tests/protos --mypy_out=./packages/autogen-core/tests/protos --mypy_grpc_out=./packages/autogen-core/tests/protos --proto_path ./packages/autogen-core/tests/protos serialization_test.proto" \ No newline at end of file From 1f07e5bea5e776f53a76fba9ba78258e5cb6aa18 Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Tue, 26 Nov 2024 15:58:26 -0500 Subject: [PATCH 6/8] Add type prefix subscription (#4383) * Add type prefix subscription * update example --------- Co-authored-by: Ryan Sweet --- docs/design/04 - Agent and Topic ID Specs.md | 2 +- protos/agent_worker.proto | 6 ++ .../application/_worker_runtime.py | 47 +++++++++----- .../_worker_runtime_host_servicer.py | 50 +++++++++------ .../application/protos/agent_worker_pb2.py | 36 ++++++----- .../application/protos/agent_worker_pb2.pyi | 28 ++++++++- .../src/autogen_core/components/__init__.py | 2 + .../components/_type_prefix_subscription.py | 63 +++++++++++++++++++ .../components/_type_subscription.py | 6 +- 9 files changed, 181 insertions(+), 59 deletions(-) create mode 100644 python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py diff --git a/docs/design/04 - Agent and Topic ID Specs.md b/docs/design/04 - Agent and Topic ID Specs.md index ee872ab2ac1..b0e0e0e94e6 100644 --- a/docs/design/04 - Agent and Topic ID Specs.md +++ b/docs/design/04 - Agent and Topic ID Specs.md @@ -34,7 +34,7 @@ This document describes the structure, constraints, and behavior of Agent IDs an - Type: `string` - Description: Topic type is usually defined by application code to mark the type of messages the topic is for. -- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), or underscores (\_). A valid identifier cannot start with a number, or contain any spaces. +- Constraints: UTF8 and only contain alphanumeric letters (a-z) and (0-9), ':', '=', or underscores (\_). A valid identifier cannot start with a number, or contain any spaces. - Examples: - `GitHub_Issues` diff --git a/protos/agent_worker.proto b/protos/agent_worker.proto index 7b0b5245dd3..61b00333cd2 100644 --- a/protos/agent_worker.proto +++ b/protos/agent_worker.proto @@ -63,9 +63,15 @@ message TypeSubscription { string agent_type = 2; } +message TypePrefixSubscription { + string topic_type_prefix = 1; + string agent_type = 2; +} + message Subscription { oneof subscription { TypeSubscription typeSubscription = 1; + TypePrefixSubscription typePrefixSubscription = 2; } } diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py index 2c405710876..f8ad4bbd14e 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py @@ -47,7 +47,7 @@ ) from ..base._serialization import MessageSerializer, SerializationRegistry from ..base._type_helpers import ChannelArgumentType -from ..components import TypeSubscription +from ..components import TypePrefixSubscription, TypeSubscription from ._helpers import SubscriptionManager, get_impl from ._utils import GRPC_IMPORT_ERROR_STR from .protos import agent_worker_pb2, agent_worker_pb2_grpc @@ -705,27 +705,44 @@ async def try_get_underlying_agent_instance(self, id: AgentId, type: Type[T] = A async def add_subscription(self, subscription: Subscription) -> None: if self._host_connection is None: raise RuntimeError("Host connection is not set.") - if not isinstance(subscription, TypeSubscription): - raise ValueError("Only TypeSubscription is supported.") - # Add to local subscription manager. - await self._subscription_manager.add_subscription(subscription) # Create a future for the subscription response. future = asyncio.get_event_loop().create_future() request_id = await self._get_new_request_id() + + match subscription: + case TypeSubscription(topic_type=topic_type, agent_type=agent_type): + message = agent_worker_pb2.Message( + addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( + request_id=request_id, + subscription=agent_worker_pb2.Subscription( + typeSubscription=agent_worker_pb2.TypeSubscription( + topic_type=topic_type, agent_type=agent_type + ) + ), + ) + ) + case TypePrefixSubscription(topic_type_prefix=topic_type_prefix, agent_type=agent_type): + message = agent_worker_pb2.Message( + addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( + request_id=request_id, + subscription=agent_worker_pb2.Subscription( + typePrefixSubscription=agent_worker_pb2.TypePrefixSubscription( + topic_type_prefix=topic_type_prefix, agent_type=agent_type + ) + ), + ) + ) + case _: + raise ValueError("Unsupported subscription type.") + + # Add the future to the pending requests. self._pending_requests[request_id] = future + # Add to local subscription manager. + await self._subscription_manager.add_subscription(subscription) + # Send the subscription to the host. - message = agent_worker_pb2.Message( - addSubscriptionRequest=agent_worker_pb2.AddSubscriptionRequest( - request_id=request_id, - subscription=agent_worker_pb2.Subscription( - typeSubscription=agent_worker_pb2.TypeSubscription( - topic_type=subscription.topic_type, agent_type=subscription.agent_type - ) - ), - ) - ) await self._host_connection.send(message) # Wait for the subscription response. diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py index 3da50c56f04..5cd2bf8ea9b 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py @@ -4,7 +4,9 @@ from asyncio import Future, Task from typing import Any, Dict, Set -from ..base import TopicId +from autogen_core.components._type_prefix_subscription import TypePrefixSubscription + +from ..base import Subscription, TopicId from ..components import TypeSubscription from ._helpers import SubscriptionManager from ._utils import GRPC_IMPORT_ERROR_STR @@ -221,34 +223,46 @@ async def _process_add_subscription_request( self, add_subscription_req: agent_worker_pb2.AddSubscriptionRequest, client_id: int ) -> None: oneofcase = add_subscription_req.subscription.WhichOneof("subscription") + subscription: Subscription | None = None match oneofcase: case "typeSubscription": type_subscription_msg: agent_worker_pb2.TypeSubscription = ( add_subscription_req.subscription.typeSubscription ) - type_subscription = TypeSubscription( + subscription = TypeSubscription( topic_type=type_subscription_msg.topic_type, agent_type=type_subscription_msg.agent_type ) - try: - await self._subscription_manager.add_subscription(type_subscription) - subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set()) - subscription_ids.add(type_subscription.id) - success = True - error = None - except ValueError as e: - success = False - error = str(e) - # Send a response back to the client. - await self._send_queues[client_id].put( - agent_worker_pb2.Message( - addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse( - request_id=add_subscription_req.request_id, success=success, error=error - ) - ) + + case "typePrefixSubscription": + type_prefix_subscription_msg: agent_worker_pb2.TypePrefixSubscription = ( + add_subscription_req.subscription.typePrefixSubscription + ) + subscription = TypePrefixSubscription( + topic_type_prefix=type_prefix_subscription_msg.topic_type_prefix, + agent_type=type_prefix_subscription_msg.agent_type, ) case None: logger.warning("Received empty subscription message") + if subscription is not None: + try: + await self._subscription_manager.add_subscription(subscription) + subscription_ids = self._client_id_to_subscription_id_mapping.setdefault(client_id, set()) + subscription_ids.add(subscription.id) + success = True + error = None + except ValueError as e: + success = False + error = str(e) + # Send a response back to the client. + await self._send_queues[client_id].put( + agent_worker_pb2.Message( + addSubscriptionResponse=agent_worker_pb2.AddSubscriptionResponse( + request_id=add_subscription_req.request_id, success=success, error=error + ) + ) + ) + async def GetState( # type: ignore self, request: agent_worker_pb2.AgentId, diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py index 0637e866c4d..8f143d770ae 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.py @@ -16,7 +16,7 @@ from google.protobuf import any_pb2 as google_dot_protobuf_dot_any__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"T\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x12\x61gent_worker.proto\x12\x06\x61gents\x1a\x10\x63loudevent.proto\x1a\x19google/protobuf/any.proto\"\'\n\x07TopicId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0e\n\x06source\x18\x02 \x01(\t\"$\n\x07\x41gentId\x12\x0c\n\x04type\x18\x01 \x01(\t\x12\x0b\n\x03key\x18\x02 \x01(\t\"E\n\x07Payload\x12\x11\n\tdata_type\x18\x01 \x01(\t\x12\x19\n\x11\x64\x61ta_content_type\x18\x02 \x01(\t\x12\x0c\n\x04\x64\x61ta\x18\x03 \x01(\x0c\"\x89\x02\n\nRpcRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12$\n\x06source\x18\x02 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12\x1f\n\x06target\x18\x03 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0e\n\x06method\x18\x04 \x01(\t\x12 \n\x07payload\x18\x05 \x01(\x0b\x32\x0f.agents.Payload\x12\x32\n\x08metadata\x18\x06 \x03(\x0b\x32 .agents.RpcRequest.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"\xb8\x01\n\x0bRpcResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12 \n\x07payload\x18\x02 \x01(\x0b\x32\x0f.agents.Payload\x12\r\n\x05\x65rror\x18\x03 \x01(\t\x12\x33\n\x08metadata\x18\x04 \x03(\x0b\x32!.agents.RpcResponse.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"\xe4\x01\n\x05\x45vent\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x14\n\x0ctopic_source\x18\x02 \x01(\t\x12$\n\x06source\x18\x03 \x01(\x0b\x32\x0f.agents.AgentIdH\x00\x88\x01\x01\x12 \n\x07payload\x18\x04 \x01(\x0b\x32\x0f.agents.Payload\x12-\n\x08metadata\x18\x05 \x03(\x0b\x32\x1b.agents.Event.MetadataEntry\x1a/\n\rMetadataEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\x42\t\n\x07_source\"<\n\x18RegisterAgentTypeRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0c\n\x04type\x18\x02 \x01(\t\"^\n\x19RegisterAgentTypeResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\":\n\x10TypeSubscription\x12\x12\n\ntopic_type\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"G\n\x16TypePrefixSubscription\x12\x19\n\x11topic_type_prefix\x18\x01 \x01(\t\x12\x12\n\nagent_type\x18\x02 \x01(\t\"\x96\x01\n\x0cSubscription\x12\x34\n\x10typeSubscription\x18\x01 \x01(\x0b\x32\x18.agents.TypeSubscriptionH\x00\x12@\n\x16typePrefixSubscription\x18\x02 \x01(\x0b\x32\x1e.agents.TypePrefixSubscriptionH\x00\x42\x0e\n\x0csubscription\"X\n\x16\x41\x64\x64SubscriptionRequest\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12*\n\x0csubscription\x18\x02 \x01(\x0b\x32\x14.agents.Subscription\"\\\n\x17\x41\x64\x64SubscriptionResponse\x12\x12\n\nrequest_id\x18\x01 \x01(\t\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\x9d\x01\n\nAgentState\x12!\n\x08\x61gent_id\x18\x01 \x01(\x0b\x32\x0f.agents.AgentId\x12\x0c\n\x04\x65Tag\x18\x02 \x01(\t\x12\x15\n\x0b\x62inary_data\x18\x03 \x01(\x0cH\x00\x12\x13\n\ttext_data\x18\x04 \x01(\tH\x00\x12*\n\nproto_data\x18\x05 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00\x42\x06\n\x04\x64\x61ta\"j\n\x10GetStateResponse\x12\'\n\x0b\x61gent_state\x18\x01 \x01(\x0b\x32\x12.agents.AgentState\x12\x0f\n\x07success\x18\x02 \x01(\x08\x12\x12\n\x05\x65rror\x18\x03 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"B\n\x11SaveStateResponse\x12\x0f\n\x07success\x18\x01 \x01(\x08\x12\x12\n\x05\x65rror\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x08\n\x06_error\"\xc6\x03\n\x07Message\x12%\n\x07request\x18\x01 \x01(\x0b\x32\x12.agents.RpcRequestH\x00\x12\'\n\x08response\x18\x02 \x01(\x0b\x32\x13.agents.RpcResponseH\x00\x12\x1e\n\x05\x65vent\x18\x03 \x01(\x0b\x32\r.agents.EventH\x00\x12\x44\n\x18registerAgentTypeRequest\x18\x04 \x01(\x0b\x32 .agents.RegisterAgentTypeRequestH\x00\x12\x46\n\x19registerAgentTypeResponse\x18\x05 \x01(\x0b\x32!.agents.RegisterAgentTypeResponseH\x00\x12@\n\x16\x61\x64\x64SubscriptionRequest\x18\x06 \x01(\x0b\x32\x1e.agents.AddSubscriptionRequestH\x00\x12\x42\n\x17\x61\x64\x64SubscriptionResponse\x18\x07 \x01(\x0b\x32\x1f.agents.AddSubscriptionResponseH\x00\x12,\n\ncloudEvent\x18\x08 \x01(\x0b\x32\x16.cloudevent.CloudEventH\x00\x42\t\n\x07message2\xb2\x01\n\x08\x41gentRpc\x12\x33\n\x0bOpenChannel\x12\x0f.agents.Message\x1a\x0f.agents.Message(\x01\x30\x01\x12\x35\n\x08GetState\x12\x0f.agents.AgentId\x1a\x18.agents.GetStateResponse\x12:\n\tSaveState\x12\x12.agents.AgentState\x1a\x19.agents.SaveStateResponseB!\xaa\x02\x1eMicrosoft.AutoGen.Abstractionsb\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -54,20 +54,22 @@ _globals['_REGISTERAGENTTYPERESPONSE']._serialized_end=1067 _globals['_TYPESUBSCRIPTION']._serialized_start=1069 _globals['_TYPESUBSCRIPTION']._serialized_end=1127 - _globals['_SUBSCRIPTION']._serialized_start=1129 - _globals['_SUBSCRIPTION']._serialized_end=1213 - _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1215 - _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1303 - _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1305 - _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1397 - _globals['_AGENTSTATE']._serialized_start=1400 - _globals['_AGENTSTATE']._serialized_end=1557 - _globals['_GETSTATERESPONSE']._serialized_start=1559 - _globals['_GETSTATERESPONSE']._serialized_end=1665 - _globals['_SAVESTATERESPONSE']._serialized_start=1667 - _globals['_SAVESTATERESPONSE']._serialized_end=1733 - _globals['_MESSAGE']._serialized_start=1736 - _globals['_MESSAGE']._serialized_end=2190 - _globals['_AGENTRPC']._serialized_start=2193 - _globals['_AGENTRPC']._serialized_end=2371 + _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_start=1129 + _globals['_TYPEPREFIXSUBSCRIPTION']._serialized_end=1200 + _globals['_SUBSCRIPTION']._serialized_start=1203 + _globals['_SUBSCRIPTION']._serialized_end=1353 + _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_start=1355 + _globals['_ADDSUBSCRIPTIONREQUEST']._serialized_end=1443 + _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_start=1445 + _globals['_ADDSUBSCRIPTIONRESPONSE']._serialized_end=1537 + _globals['_AGENTSTATE']._serialized_start=1540 + _globals['_AGENTSTATE']._serialized_end=1697 + _globals['_GETSTATERESPONSE']._serialized_start=1699 + _globals['_GETSTATERESPONSE']._serialized_end=1805 + _globals['_SAVESTATERESPONSE']._serialized_start=1807 + _globals['_SAVESTATERESPONSE']._serialized_end=1873 + _globals['_MESSAGE']._serialized_start=1876 + _globals['_MESSAGE']._serialized_end=2330 + _globals['_AGENTRPC']._serialized_start=2333 + _globals['_AGENTRPC']._serialized_end=2511 # @@protoc_insertion_point(module_scope) diff --git a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi index 522124ab889..728bfafcc81 100644 --- a/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi +++ b/python/packages/autogen-core/src/autogen_core/application/protos/agent_worker_pb2.pyi @@ -273,21 +273,43 @@ class TypeSubscription(google.protobuf.message.Message): global___TypeSubscription = TypeSubscription +@typing.final +class TypePrefixSubscription(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + TOPIC_TYPE_PREFIX_FIELD_NUMBER: builtins.int + AGENT_TYPE_FIELD_NUMBER: builtins.int + topic_type_prefix: builtins.str + agent_type: builtins.str + def __init__( + self, + *, + topic_type_prefix: builtins.str = ..., + agent_type: builtins.str = ..., + ) -> None: ... + def ClearField(self, field_name: typing.Literal["agent_type", b"agent_type", "topic_type_prefix", b"topic_type_prefix"]) -> None: ... + +global___TypePrefixSubscription = TypePrefixSubscription + @typing.final class Subscription(google.protobuf.message.Message): DESCRIPTOR: google.protobuf.descriptor.Descriptor TYPESUBSCRIPTION_FIELD_NUMBER: builtins.int + TYPEPREFIXSUBSCRIPTION_FIELD_NUMBER: builtins.int @property def typeSubscription(self) -> global___TypeSubscription: ... + @property + def typePrefixSubscription(self) -> global___TypePrefixSubscription: ... def __init__( self, *, typeSubscription: global___TypeSubscription | None = ..., + typePrefixSubscription: global___TypePrefixSubscription | None = ..., ) -> None: ... - def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ... - def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typeSubscription", b"typeSubscription"]) -> None: ... - def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription"] | None: ... + def HasField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> builtins.bool: ... + def ClearField(self, field_name: typing.Literal["subscription", b"subscription", "typePrefixSubscription", b"typePrefixSubscription", "typeSubscription", b"typeSubscription"]) -> None: ... + def WhichOneof(self, oneof_group: typing.Literal["subscription", b"subscription"]) -> typing.Literal["typeSubscription", "typePrefixSubscription"] | None: ... global___Subscription = Subscription diff --git a/python/packages/autogen-core/src/autogen_core/components/__init__.py b/python/packages/autogen-core/src/autogen_core/components/__init__.py index 9ad8bdb35a1..4c4d02f3be2 100644 --- a/python/packages/autogen-core/src/autogen_core/components/__init__.py +++ b/python/packages/autogen-core/src/autogen_core/components/__init__.py @@ -7,6 +7,7 @@ from ._default_topic import DefaultTopicId from ._image import Image from ._routed_agent import RoutedAgent, TypeRoutedAgent, event, message_handler, rpc +from ._type_prefix_subscription import TypePrefixSubscription from ._type_subscription import TypeSubscription from ._types import FunctionCall @@ -24,4 +25,5 @@ "DefaultTopicId", "default_subscription", "type_subscription", + "TypePrefixSubscription", ] diff --git a/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py b/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py new file mode 100644 index 00000000000..d71b587d141 --- /dev/null +++ b/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py @@ -0,0 +1,63 @@ +import uuid + +from ..base import AgentId, Subscription, TopicId +from ..base.exceptions import CantHandleException + + +class TypePrefixSubscription(Subscription): + """This subscription matches on topics based on a prefix of the type and maps to agents using the source of the topic as the agent key. + + This subscription causes each source to have its own agent instance. + + Example: + + .. code-block:: python + + from autogen_core.components import TypePrefixSubscription + + subscription = TypePrefixSubscription(topic_type_prefix="t1", agent_type="a1") + + In this case: + + - A topic_id with type `t1` and source `s1` will be handled by an agent of type `a1` with key `s1` + - A topic_id with type `t1` and source `s2` will be handled by an agent of type `a1` with key `s2`. + - A topic_id with type `t1SUFFIX` and source `s2` will be handled by an agent of type `a1` with key `s2`. + + Args: + topic_type_prefix (str): Topic type prefix to match against + agent_type (str): Agent type to handle this subscription + """ + + def __init__(self, topic_type_prefix: str, agent_type: str): + self._topic_type_prefix = topic_type_prefix + self._agent_type = agent_type + self._id = str(uuid.uuid4()) + + @property + def id(self) -> str: + return self._id + + @property + def topic_type_prefix(self) -> str: + return self._topic_type_prefix + + @property + def agent_type(self) -> str: + return self._agent_type + + def is_match(self, topic_id: TopicId) -> bool: + return topic_id.type.startswith(self._topic_type_prefix) + + def map_to_agent(self, topic_id: TopicId) -> AgentId: + if not self.is_match(topic_id): + raise CantHandleException("TopicId does not match the subscription") + + return AgentId(type=self._agent_type, key=topic_id.source) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, TypePrefixSubscription): + return False + + return self.id == other.id or ( + self.agent_type == other.agent_type and self.topic_type_prefix == other.topic_type_prefix + ) diff --git a/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py b/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py index 92709a457ae..94def76595d 100644 --- a/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py +++ b/python/packages/autogen-core/src/autogen_core/components/_type_subscription.py @@ -1,7 +1,6 @@ import uuid -from typing import TypeVar -from ..base import AgentId, BaseAgent, Subscription, TopicId +from ..base import AgentId, Subscription, TopicId from ..base.exceptions import CantHandleException @@ -59,6 +58,3 @@ def __eq__(self, other: object) -> bool: return False return self.id == other.id or (self.agent_type == other.agent_type and self.topic_type == other.topic_type) - - -BaseAgentType = TypeVar("BaseAgentType", bound="BaseAgent") From cf80b1bc14c2376f33ae07bf57d4802077284727 Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Tue, 26 Nov 2024 16:09:14 -0500 Subject: [PATCH 7/8] Add request_id parameter (#4384) --- .../application/_single_threaded_agent_runtime.py | 10 ++++++++++ .../src/autogen_core/application/_worker_runtime.py | 10 ++++++++++ .../src/autogen_core/base/_agent_runtime.py | 4 +++- .../src/autogen_core/base/_message_context.py | 1 + 4 files changed, 24 insertions(+), 1 deletion(-) diff --git a/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py index 52d24c64d0c..3d81f15eb33 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/application/_single_threaded_agent_runtime.py @@ -4,6 +4,7 @@ import inspect import logging import threading +import uuid import warnings from asyncio import CancelledError, Future, Task from collections.abc import Sequence @@ -53,6 +54,7 @@ class PublishMessageEnvelope: sender: AgentId | None topic_id: TopicId metadata: EnvelopeMetadata | None = None + message_id: str @dataclass(kw_only=True) @@ -256,6 +258,7 @@ async def publish_message( *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, + message_id: str | None = None, ) -> None: with self._tracer_helper.trace_block( "create", @@ -268,6 +271,9 @@ async def publish_message( content = message.__dict__ if hasattr(message, "__dict__") else message logger.info(f"Publishing message of type {type(message).__name__} to all subscribers: {content}") + if message_id is None: + message_id = str(uuid.uuid4()) + # event_logger.info( # MessageEvent( # payload=message, @@ -285,6 +291,7 @@ async def publish_message( sender=sender, topic_id=topic_id, metadata=get_telemetry_envelope_metadata(), + message_id=message_id, ) ) @@ -327,6 +334,8 @@ async def _process_send(self, message_envelope: SendMessageEnvelope) -> None: topic_id=None, is_rpc=True, cancellation_token=message_envelope.cancellation_token, + # Will be fixed when send API removed + message_id="NOT_DEFINED_TODO_FIX", ) with MessageHandlerContext.populate_context(recipient_agent.id): response = await recipient_agent.on_message( @@ -385,6 +394,7 @@ async def _process_publish(self, message_envelope: PublishMessageEnvelope) -> No topic_id=message_envelope.topic_id, is_rpc=False, cancellation_token=message_envelope.cancellation_token, + message_id=message_envelope.message_id, ) agent = await self._get_agent(agent_id) diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py index f8ad4bbd14e..0e5fb933a08 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime.py @@ -3,6 +3,7 @@ import json import logging import signal +import uuid import warnings from asyncio import Future, Task from collections import defaultdict @@ -371,11 +372,17 @@ async def publish_message( *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, + message_id: str | None = None, ) -> None: if not self._running: raise ValueError("Runtime must be running when publishing message.") if self._host_connection is None: raise RuntimeError("Host connection is not set.") + if message_id is None: + message_id = str(uuid.uuid4()) + + # TODO: consume message_id + message_type = self._serialization_registry.type_name(message) with self._trace_helper.trace_block( "create", topic_id, parent=None, extraAttributes={"message_type": message_type} @@ -447,6 +454,7 @@ async def _process_request(self, request: agent_worker_pb2.RpcRequest) -> None: topic_id=None, is_rpc=True, cancellation_token=CancellationToken(), + message_id=request.request_id, ) # Call the receiving agent. @@ -530,11 +538,13 @@ async def _process_event(self, event: agent_worker_pb2.Event) -> None: for agent_id in recipients: if agent_id == sender: continue + # TODO: consume message_id message_context = MessageContext( sender=sender, topic_id=topic_id, is_rpc=False, cancellation_token=CancellationToken(), + message_id="NOT_DEFINED_TODO_FIX", ) agent = await self._get_agent(agent_id) with MessageHandlerContext.populate_context(agent.id): diff --git a/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py b/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py index a8e7f009632..27c37ad9f34 100644 --- a/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py +++ b/python/packages/autogen-core/src/autogen_core/base/_agent_runtime.py @@ -55,6 +55,7 @@ async def publish_message( *, sender: AgentId | None = None, cancellation_token: CancellationToken | None = None, + message_id: str | None = None, ) -> None: """Publish a message to all agents in the given namespace, or if no namespace is provided, the namespace of the sender. @@ -64,7 +65,8 @@ async def publish_message( message (Any): The message to publish. topic (TopicId): The topic to publish the message to. sender (AgentId | None, optional): The agent which sent the message. Defaults to None. - cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress . Defaults to None. + cancellation_token (CancellationToken | None, optional): Token used to cancel an in progress. Defaults to None. + message_id (str | None, optional): The message id. If None, a new message id will be generated. Defaults to None. This message id must be unique. and is recommended to be a UUID. Raises: UndeliverableException: If the message cannot be delivered. diff --git a/python/packages/autogen-core/src/autogen_core/base/_message_context.py b/python/packages/autogen-core/src/autogen_core/base/_message_context.py index 0a2c2973bc0..c5c00559ed0 100644 --- a/python/packages/autogen-core/src/autogen_core/base/_message_context.py +++ b/python/packages/autogen-core/src/autogen_core/base/_message_context.py @@ -11,3 +11,4 @@ class MessageContext: topic_id: TopicId | None is_rpc: bool cancellation_token: CancellationToken + message_id: str From df183be35a5943607035fb3cb3af1d1687c90deb Mon Sep 17 00:00:00 2001 From: Jack Gerrits Date: Tue, 26 Nov 2024 17:01:25 -0500 Subject: [PATCH 8/8] Add special topic for agent direct messaging (#4385) * Add special topic for agent direct messaging * move to base * update sub counts * Fix tests --- .../application/_worker_runtime_host_servicer.py | 2 +- .../src/autogen_core/base/_base_agent.py | 12 ++++++++++++ .../_type_prefix_subscription.py | 6 ++++-- .../src/autogen_core/components/__init__.py | 2 +- .../autogen-core/tests/test_worker_runtime.py | 4 ++-- 5 files changed, 20 insertions(+), 6 deletions(-) rename python/packages/autogen-core/src/autogen_core/{components => base}/_type_prefix_subscription.py (93%) diff --git a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py index 5cd2bf8ea9b..7c597bd07a8 100644 --- a/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py +++ b/python/packages/autogen-core/src/autogen_core/application/_worker_runtime_host_servicer.py @@ -4,7 +4,7 @@ from asyncio import Future, Task from typing import Any, Dict, Set -from autogen_core.components._type_prefix_subscription import TypePrefixSubscription +from autogen_core.base._type_prefix_subscription import TypePrefixSubscription from ..base import Subscription, TopicId from ..components import TypeSubscription diff --git a/python/packages/autogen-core/src/autogen_core/base/_base_agent.py b/python/packages/autogen-core/src/autogen_core/base/_base_agent.py index 5d8e94225b3..70481705ca6 100644 --- a/python/packages/autogen-core/src/autogen_core/base/_base_agent.py +++ b/python/packages/autogen-core/src/autogen_core/base/_base_agent.py @@ -20,6 +20,7 @@ from ._subscription import Subscription, UnboundSubscription from ._subscription_context import SubscriptionInstantiationContext from ._topic import TopicId +from ._type_prefix_subscription import TypePrefixSubscription T = TypeVar("T", bound=Agent) @@ -149,6 +150,7 @@ async def register( factory: Callable[[], Self | Awaitable[Self]], *, skip_class_subscriptions: bool = False, + skip_direct_message_subscription: bool = False, ) -> AgentType: agent_type = AgentType(type) agent_type = await runtime.register_factory(type=agent_type, agent_factory=factory, expected_class=cls) @@ -166,6 +168,16 @@ async def register( for subscription in subscriptions: await runtime.add_subscription(subscription) + if not skip_direct_message_subscription: + # Additionally adds a special prefix subscription for this agent to receive direct messages + await runtime.add_subscription( + TypePrefixSubscription( + # The prefix MUST include ":" to avoid collisions with other agents + topic_type_prefix=agent_type.type + ":", + agent_type=agent_type.type, + ) + ) + # TODO: deduplication for _message_type, serializer in cls._handles_types(): runtime.add_message_serializer(serializer) diff --git a/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py b/python/packages/autogen-core/src/autogen_core/base/_type_prefix_subscription.py similarity index 93% rename from python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py rename to python/packages/autogen-core/src/autogen_core/base/_type_prefix_subscription.py index d71b587d141..f0011916509 100644 --- a/python/packages/autogen-core/src/autogen_core/components/_type_prefix_subscription.py +++ b/python/packages/autogen-core/src/autogen_core/base/_type_prefix_subscription.py @@ -1,7 +1,9 @@ import uuid -from ..base import AgentId, Subscription, TopicId -from ..base.exceptions import CantHandleException +from ._agent_id import AgentId +from ._subscription import Subscription +from ._topic import TopicId +from .exceptions import CantHandleException class TypePrefixSubscription(Subscription): diff --git a/python/packages/autogen-core/src/autogen_core/components/__init__.py b/python/packages/autogen-core/src/autogen_core/components/__init__.py index 4c4d02f3be2..75bb5eabcbe 100644 --- a/python/packages/autogen-core/src/autogen_core/components/__init__.py +++ b/python/packages/autogen-core/src/autogen_core/components/__init__.py @@ -2,12 +2,12 @@ The :mod:`autogen_core.components` module provides building blocks for creating single agents """ +from ..base._type_prefix_subscription import TypePrefixSubscription from ._closure_agent import ClosureAgent from ._default_subscription import DefaultSubscription, default_subscription, type_subscription from ._default_topic import DefaultTopicId from ._image import Image from ._routed_agent import RoutedAgent, TypeRoutedAgent, event, message_handler, rpc -from ._type_prefix_subscription import TypePrefixSubscription from ._type_subscription import TypeSubscription from ._types import FunctionCall diff --git a/python/packages/autogen-core/tests/test_worker_runtime.py b/python/packages/autogen-core/tests/test_worker_runtime.py index d58233c3ac8..26c95dc0186 100644 --- a/python/packages/autogen-core/tests/test_worker_runtime.py +++ b/python/packages/autogen-core/tests/test_worker_runtime.py @@ -360,7 +360,7 @@ async def get_subscribed_recipients() -> List[AgentId]: ) subscriptions1 = get_current_subscriptions() - assert len(subscriptions1) == 1 + assert len(subscriptions1) == 2 recipients1 = await get_subscribed_recipients() assert AgentId(type="worker1", key="default") in recipients1 @@ -388,7 +388,7 @@ async def get_subscribed_recipients() -> List[AgentId]: ) subscriptions3 = get_current_subscriptions() - assert len(subscriptions3) == 1 + assert len(subscriptions3) == 2 assert first_subscription_id not in [x.id for x in subscriptions3] recipients3 = await get_subscribed_recipients()