From 24dec575847e8dbf063354741023b827c07fc611 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Mon, 21 Oct 2024 14:03:22 +0200 Subject: [PATCH 01/19] mark tests with mqtt * makes it possible to run all other tests using `pytest -m "not mqtt"` without having a mqtt broker running --- tests/integration_tests/test_single_container_termination.py | 4 ++++ tests/unit_tests/express/test_api.py | 1 + 2 files changed, 5 insertions(+) diff --git a/tests/integration_tests/test_single_container_termination.py b/tests/integration_tests/test_single_container_termination.py index a3c6bf4..9293e30 100644 --- a/tests/integration_tests/test_single_container_termination.py +++ b/tests/integration_tests/test_single_container_termination.py @@ -176,6 +176,7 @@ async def test_distribute_ping_pong_tcp(): @pytest.mark.asyncio +@pytest.mark.mqtt async def test_distribute_ping_pong_mqtt(): await distribute_ping_pong_test("mqtt") @@ -186,6 +187,7 @@ async def test_distribute_ping_pong_ts_tcp(): @pytest.mark.asyncio +@pytest.mark.mqtt async def test_distribute_ping_pong_ts_mqtt(): await distribute_ping_pong_test_timestamp("mqtt") @@ -296,6 +298,7 @@ async def test_distribute_time_tcp(): @pytest.mark.asyncio +@pytest.mark.mqtt async def test_distribute_time_mqtt(): await distribute_time_test_case("mqtt") @@ -306,5 +309,6 @@ async def test_send_current_time_tcp(): @pytest.mark.asyncio +@pytest.mark.mqtt async def test_send_current_time_mqtt(): await send_current_time_test_case("mqtt") diff --git a/tests/unit_tests/express/test_api.py b/tests/unit_tests/express/test_api.py index 144357f..019a772 100644 --- a/tests/unit_tests/express/test_api.py +++ b/tests/unit_tests/express/test_api.py @@ -119,6 +119,7 @@ async def test_run_api_style_agent_with_aid(): @pytest.mark.asyncio +@pytest.mark.mqtt async def test_run_api_style_agent_with_aid_mqtt(): # GIVEN run_agent = MyAgent() From 9637581fe37af748081ecd7a516dde37432af97d Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Mon, 21 Oct 2024 15:56:15 +0200 Subject: [PATCH 02/19] remove overloaded property functions in RoleContext closes #111 --- mango/agent/role.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/mango/agent/role.py b/mango/agent/role.py index 6c2698a..1380845 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -362,14 +362,6 @@ def subscribe_event(self, role: Role, event_type: Any, handler_method: Callable) """ self._role_handler.subscribe_event(role, event_type, handler_method) - @property - def addr(self): - return self._agent_context.addr - - @property - def aid(self): - return self._aid - def deactivate(self, role) -> None: self._role_handler.deactivate(role) From 59a5e310f3200ed76fe16aebd95a14191b3b2e39 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Mon, 21 Oct 2024 16:05:01 +0200 Subject: [PATCH 03/19] use AgentAddress in create_acl for receiver_addr and sender_addr --- mango/messages/message.py | 50 ++++++++++--------- tests/unit_tests/core/test_agent.py | 4 +- tests/unit_tests/core/test_container.py | 22 ++++---- .../test_external_scheduling_container.py | 10 ++-- 4 files changed, 45 insertions(+), 41 deletions(-) diff --git a/mango/messages/message.py b/mango/messages/message.py index 6dbfec6..3400cf3 100644 --- a/mango/messages/message.py +++ b/mango/messages/message.py @@ -14,8 +14,9 @@ from enum import Enum from typing import Any -from ..messages.acl_message_pb2 import ACLMessage as ACLProto -from ..messages.mango_message_pb2 import MangoMessage as MangoMsg +from ..agent.core import AgentAddress +from .acl_message_pb2 import ACLMessage as ACLProto +from .mango_message_pb2 import MangoMessage as MangoMsg class Message(ABC): @@ -245,41 +246,42 @@ class Performatives(Enum): def create_acl( content, - receiver_addr: str | tuple[str, int], - sender_addr: str | tuple[str, int], - receiver_id: None | str = None, + receiver_addr: AgentAddress, + sender_addr: AgentAddress, acl_metadata: None | dict[str, Any] = None, is_anonymous_acl=False, ): acl_metadata = {} if acl_metadata is None else acl_metadata.copy() # analyse and complete acl_metadata - if "receiver_addr" not in acl_metadata.keys(): - acl_metadata["receiver_addr"] = receiver_addr - elif acl_metadata["receiver_addr"] != receiver_addr: + if ( + "receiver_addr" in acl_metadata.keys() + and acl_metadata["receiver_addr"] != receiver_addr.protocol_addr + ): warnings.warn( - f"The argument receiver_addr ({receiver_addr}) is not equal to " + f"The argument receiver_addr ({receiver_addr.protocol_addr}) is not equal to " f"acl_metadata['receiver_addr'] ({acl_metadata['receiver_addr']}). \ For consistency, the value in acl_metadata['receiver_addr'] " f"was overwritten with receiver_addr.", UserWarning, ) - acl_metadata["receiver_addr"] = receiver_addr - if receiver_id: - if "receiver_id" not in acl_metadata.keys(): - acl_metadata["receiver_id"] = receiver_id - elif acl_metadata["receiver_id"] != receiver_id: - warnings.warn( - f"The argument receiver_id ({receiver_id}) is not equal to " - f"acl_metadata['receiver_id'] ({acl_metadata['receiver_id']}). \ - For consistency, the value in acl_metadata['receiver_id'] " - f"was overwritten with receiver_id.", - UserWarning, - ) - acl_metadata["receiver_id"] = receiver_id + if ( + "receiver_id" in acl_metadata.keys() + and acl_metadata["receiver_id"] != receiver_addr.aid + ): + warnings.warn( + f"The argument receiver_id ({receiver_addr.aid}) is not equal to " + f"acl_metadata['receiver_id'] ({acl_metadata['receiver_id']}). \ + For consistency, the value in acl_metadata['receiver_id'] " + f"was overwritten with receiver_id.", + UserWarning, + ) + acl_metadata["receiver_addr"] = receiver_addr.protocol_addr + acl_metadata["receiver_id"] = receiver_addr.aid + # add sender_addr if not defined and not anonymous if not is_anonymous_acl: - if "sender_addr" not in acl_metadata.keys() and sender_addr is not None: - acl_metadata["sender_addr"] = sender_addr + acl_metadata["sender_addr"] = sender_addr.protocol_addr + acl_metadata["sender_id"] = sender_addr.aid message = ACLMessage() message.content = content diff --git a/tests/unit_tests/core/test_agent.py b/tests/unit_tests/core/test_agent.py index 42edfbc..8acfb65 100644 --- a/tests/unit_tests/core/test_agent.py +++ b/tests/unit_tests/core/test_agent.py @@ -62,7 +62,7 @@ async def test_send_acl_message(): async with activate(c) as c: await agent.send_message( - create_acl("", receiver_addr=agent2.addr, sender_addr=c.addr), + create_acl("", receiver_addr=agent2.addr, sender_addr=agent.addr), receiver_addr=agent2.addr, ) msg = await agent2.inbox.get() @@ -96,7 +96,7 @@ async def test_schedule_acl_message(): async with activate(c) as c: await agent.schedule_instant_message( - create_acl("", receiver_addr=agent2.addr, sender_addr=c.addr), + create_acl("", receiver_addr=agent2.addr, sender_addr=agent.addr), receiver_addr=agent2.addr, ) diff --git a/tests/unit_tests/core/test_container.py b/tests/unit_tests/core/test_container.py index 3464718..45c8df7 100644 --- a/tests/unit_tests/core/test_container.py +++ b/tests/unit_tests/core/test_container.py @@ -1,7 +1,7 @@ import pytest from mango import activate, create_acl, create_tcp_container -from mango.agent.core import Agent +from mango.agent.core import Agent, AgentAddress class LooksLikeAgent: @@ -153,10 +153,9 @@ async def test_create_acl_no_modify(): common_acl_q = {} actual_acl_message = create_acl( "", - receiver_addr="", - receiver_id="", + receiver_addr=AgentAddress("", ""), acl_metadata=common_acl_q, - sender_addr=c.addr, + sender_addr=AgentAddress(c.addr, ""), ) assert "reeiver_addr" not in common_acl_q @@ -170,7 +169,10 @@ async def test_create_acl_no_modify(): async def test_create_acl_anon(): c = create_tcp_container(addr=("127.0.0.1", 5555)) actual_acl_message = create_acl( - "", receiver_addr="", receiver_id="", is_anonymous_acl=True, sender_addr=c.addr + "", + receiver_addr=AgentAddress("", ""), + is_anonymous_acl=True, + sender_addr=AgentAddress(c.addr, ""), ) assert actual_acl_message.sender_addr is None @@ -181,7 +183,10 @@ async def test_create_acl_anon(): async def test_create_acl_not_anon(): c = create_tcp_container(addr=("127.0.0.1", 5555)) actual_acl_message = create_acl( - "", receiver_addr="", receiver_id="", is_anonymous_acl=False, sender_addr=c.addr + "", + receiver_addr=AgentAddress("", ""), + is_anonymous_acl=False, + sender_addr=AgentAddress(c.addr, ""), ) assert actual_acl_message.sender_addr is not None @@ -227,10 +232,9 @@ async def test_create_acl_diff_receiver(): with pytest.warns(UserWarning) as record: actual_acl_message = create_acl( "", - receiver_addr="A", - receiver_id="A", + receiver_addr=AgentAddress("A", "A"), acl_metadata={"receiver_id": "B", "receiver_addr": "B"}, - sender_addr=c.addr, + sender_addr=AgentAddress(c.addr, ""), is_anonymous_acl=False, ) diff --git a/tests/unit_tests/core/test_external_scheduling_container.py b/tests/unit_tests/core/test_external_scheduling_container.py index bdb2e5b..2efe276 100644 --- a/tests/unit_tests/core/test_external_scheduling_container.py +++ b/tests/unit_tests/core/test_external_scheduling_container.py @@ -146,9 +146,8 @@ async def test_step_with_cond_task(): # create and send message in next step message = create_acl( content="", - receiver_addr=external_scheduling_container.addr, - receiver_id=agent_1.aid, - sender_addr=external_scheduling_container.addr, + receiver_addr=AgentAddress(external_scheduling_container.addr, agent_1.aid), + sender_addr=AgentAddress(external_scheduling_container.addr, agent_1.aid), ) encoded_msg = external_scheduling_container.codec.encode(message) print("created message") @@ -204,9 +203,8 @@ async def test_send_internal_messages(): async with activate(external_scheduling_container) as c: message = create_acl( content="", - receiver_addr=external_scheduling_container.addr, - receiver_id=agent_1.aid, - sender_addr=external_scheduling_container.addr, + receiver_addr=AgentAddress(external_scheduling_container.addr, agent_1.aid), + sender_addr=AgentAddress(external_scheduling_container.addr, agent_1.aid), ) encoded_msg = external_scheduling_container.codec.encode(message) return_values = await external_scheduling_container.step( From bc11657200775630a4a8e26b14de1600b8d3ebe5 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Mon, 21 Oct 2024 18:09:26 +0200 Subject: [PATCH 04/19] add test which sets suspendable False --- mango/agent/core.py | 2 +- tests/unit_tests/express/test_api.py | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/mango/agent/core.py b/mango/agent/core.py index fe55db8..4cfc2eb 100644 --- a/mango/agent/core.py +++ b/mango/agent/core.py @@ -373,7 +373,7 @@ def suspendable_tasks(self): @suspendable_tasks.setter def suspendable_tasks(self, value: bool): - self.self.scheduler.suspendable = value + self.scheduler.suspendable = value def on_register(self): """ diff --git a/tests/unit_tests/express/test_api.py b/tests/unit_tests/express/test_api.py index 019a772..ae18894 100644 --- a/tests/unit_tests/express/test_api.py +++ b/tests/unit_tests/express/test_api.py @@ -139,3 +139,13 @@ async def test_run_api_style_agent_with_aid_mqtt(): assert run_agent2.test_counter == 1 assert run_agent2.aid == "my_custom_aid" assert run_agent.aid == "agent0" + + +@pytest.mark.asyncio +async def test_deactivate_suspendable(): + container = create_tcp_container("127.0.0.1:5555") + ping_pong_agent = agent_composed_of(PingPongRole(), register_in=container) + ping_pong_agent.suspendable_tasks = False + + async with activate(container) as c: + assert ping_pong_agent.scheduler.suspendable is False From 153d13fbc8f44f83a669adc341a9202e64113410 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Mon, 21 Oct 2024 18:05:29 +0200 Subject: [PATCH 05/19] the context of the role context was not initialized. This should delegate to the _agent_context A setter is generally not needed for this, except for the initialization where it is set to None --- mango/agent/role.py | 8 ++++++++ tests/unit_tests/express/test_api.py | 11 +++++++++++ 2 files changed, 19 insertions(+) diff --git a/mango/agent/role.py b/mango/agent/role.py index 1380845..16279ab 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -266,6 +266,14 @@ def data(self): """ return self._get_container() + @property + def context(self) -> float: + return self._agent_context + + @context.setter + def context(self, value: bool): + pass + @property def current_timestamp(self) -> float: return self._agent_context.current_timestamp diff --git a/tests/unit_tests/express/test_api.py b/tests/unit_tests/express/test_api.py index 019a772..ea8dda4 100644 --- a/tests/unit_tests/express/test_api.py +++ b/tests/unit_tests/express/test_api.py @@ -41,12 +41,23 @@ async def test_activate_pingpong(): await c.send_message( "Ping", ping_pong_agent.addr, sender_id=ping_pong_agent_two.aid ) + assert ping_pong_agent.roles[0].context.addr is not None while ping_pong_agent.roles[0].counter < 5: await asyncio.sleep(0.01) assert ping_pong_agent.roles[0].counter == 5 +@pytest.mark.asyncio +async def test_activate_rolecontext(): + container = create_tcp_container("127.0.0.1:5555") + ping_pong_agent = agent_composed_of(PingPongRole(), register_in=container) + + async with activate(container) as c: + assert ping_pong_agent.roles[0].context.addr is not None + assert ping_pong_agent.roles[0].context.context is not None + + class MyAgent(Agent): test_counter: int = 0 From cdcd26ae3bfd423e04635df9c230cde0c97e733e Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 18:27:54 +0200 Subject: [PATCH 06/19] Added topology feature. Changed to modern build tool. --- .github/workflows/publish-mango.yml | 6 +- .github/workflows/test-mango.yml | 6 +- mango/__init__.py | 1 + mango/agent/core.py | 39 +++++++ mango/agent/role.py | 17 +-- mango/express/topology.py | 74 ++++++++++++ pyproject.toml | 55 +++++++++ requirements.txt | 27 ----- setup.cfg | 6 - setup.py | 133 ---------------------- tests/unit_tests/express/__init__.py | 0 tests/unit_tests/express/test_topology.py | 34 ++++++ tests/unit_tests/role_agent_test.py | 26 +++++ 13 files changed, 239 insertions(+), 185 deletions(-) create mode 100644 mango/express/topology.py create mode 100644 pyproject.toml delete mode 100644 requirements.txt delete mode 100644 setup.cfg delete mode 100644 setup.py create mode 100644 tests/unit_tests/express/__init__.py create mode 100644 tests/unit_tests/express/test_topology.py diff --git a/.github/workflows/publish-mango.yml b/.github/workflows/publish-mango.yml index c4b5b38..ffc332a 100644 --- a/.github/workflows/publish-mango.yml +++ b/.github/workflows/publish-mango.yml @@ -18,8 +18,7 @@ jobs: pip install virtualenv virtualenv venv source venv/bin/activate - pip3 install -r requirements.txt - pip3 install -e . + pip3 install -e .[test] sudo apt update sudo apt install --assume-yes mosquitto sudo service mosquitto start @@ -33,7 +32,8 @@ jobs: - name: Build package run: | source venv/bin/activate - python setup.py sdist bdist_wheel + python -m pip install build + python -m build - name: Publish package uses: pypa/gh-action-pypi-publish@release/v1 with: diff --git a/.github/workflows/test-mango.yml b/.github/workflows/test-mango.yml index 463c27c..8bcb687 100644 --- a/.github/workflows/test-mango.yml +++ b/.github/workflows/test-mango.yml @@ -34,8 +34,7 @@ jobs: source venv/bin/activate pip3 install -U sphinx pip3 install -r docs/requirements.txt - pip3 install -r requirements.txt - pip3 install -e . + pip3 install -e .[test] brew install mosquitto brew services start mosquitto pip3 install pytest coverage ruff @@ -76,8 +75,7 @@ jobs: source venv/bin/activate pip3 install -U sphinx pip3 install -r docs/requirements.txt - pip3 install -r requirements.txt - pip3 install -e . + pip3 install -e .[test] sudo apt update sudo apt install --assume-yes mosquitto sudo service mosquitto start diff --git a/mango/__init__.py b/mango/__init__.py index 1880879..730c873 100644 --- a/mango/__init__.py +++ b/mango/__init__.py @@ -23,3 +23,4 @@ PROTOBUF, SerializationError, ) +from .express.topology import Topology, create_topology, complete_topology, per_node diff --git a/mango/agent/core.py b/mango/agent/core.py index fe55db8..dda7d20 100644 --- a/mango/agent/core.py +++ b/mango/agent/core.py @@ -9,6 +9,7 @@ import logging from abc import ABC from dataclasses import dataclass +from enum import Enum from typing import Any from ..util.clock import Clock @@ -23,6 +24,21 @@ class AgentAddress: aid: str +class State(Enum): + NORMAL = 0 # normal neighbor + INACTIVE = ( + 1 # neighbor link exists but link is not active (could be activated/used) + ) + BROKEN = 2 # neighbor link exists but link is not usable (can not be activated) + + +class TopologyService: + state_to_neighbors: dict[State, list] = dict() + + def neighbors(self, state: State = State.NORMAL): + return [f() for f in self.state_to_neighbors.get(state, [])] + + class AgentContext: def __init__(self, container) -> None: self._container = container @@ -69,6 +85,7 @@ def __init__(self) -> None: self.context: AgentContext = None self.scheduler: Scheduler = None self._aid = None + self._services = {} def on_start(self): """Called when container started in which the agent is contained""" @@ -344,6 +361,28 @@ async def tasks_complete(self, timeout=1): """ await self.scheduler.tasks_complete(timeout=timeout) + def service_of_type(self, type: type, default: Any = None) -> Any: + """Return the service of the type ``type`` or set the default as service and return it. + + :param type: the type of the service + :type type: type + :param default: the default if applicable + :type default: Any (optional) + :return: the service + :rtype: Any + """ + if type not in self._services: + self._services[type] = default + return self._services[type] + + def neighbors(self, state: State = State.NORMAL) -> list[AgentAddress]: + """Return the neighbors of the agent (controlled by the topology api). + + :return: the list of agent addresses filtered by state + :rtype: list[AgentAddress] + """ + return self.service_of_type(TopologyService).neighbors(state) + class Agent(ABC, AgentDelegates): """Base class for all agents.""" diff --git a/mango/agent/role.py b/mango/agent/role.py index 6c2698a..74f92fe 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -67,7 +67,7 @@ class Role: class RoleHandler: """Contains all roles and their models. Implements the communication between roles.""" - def __init__(self, agent_context, scheduler): + def __init__(self, scheduler): self._role_models = {} self._roles = [] self._role_to_active = {} @@ -75,7 +75,6 @@ def __init__(self, agent_context, scheduler): self._message_subs = [] self._send_msg_subs = {} self._role_event_type_to_handler = {} - self._agent_context = agent_context self._scheduler = scheduler self._data = DataContainer() @@ -252,7 +251,6 @@ def __init__( inbox, ): super().__init__() - self._agent_context = None self._role_handler = role_handler self._aid = aid self._inbox = inbox @@ -268,7 +266,7 @@ def data(self): @property def current_timestamp(self) -> float: - return self._agent_context.current_timestamp + return self.context.current_timestamp def _get_container(self): return self._role_handler._data @@ -334,7 +332,7 @@ async def send_message( **kwargs, ): self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs) - return await self._agent_context.send_message( + return await self.context.send_message( content=content, receiver_addr=receiver_addr, sender_id=self.aid, @@ -362,10 +360,6 @@ def subscribe_event(self, role: Role, event_type: Any, handler_method: Callable) """ self._role_handler.subscribe_event(role, event_type, handler_method) - @property - def addr(self): - return self._agent_context.addr - @property def aid(self): return self._aid @@ -396,7 +390,7 @@ def __init__(self): Using the generated aid-style ("agentX") is not allowed. """ super().__init__() - self._role_handler = RoleHandler(None, None) + self._role_handler = RoleHandler(None) self._role_context = RoleContext(self._role_handler, self.aid, self.inbox) def on_start(self): @@ -406,8 +400,7 @@ def on_ready(self): self._role_context.on_ready() def on_register(self): - self._role_context._agent_context = self.context - self._role_handler._agent_context = self.context + self._role_context.context = self.context self._role_context.scheduler = self.scheduler self._role_handler._scheduler = self.scheduler self._role_context._aid = self.aid diff --git a/mango/express/topology.py b/mango/express/topology.py new file mode 100644 index 0000000..ff5c73a --- /dev/null +++ b/mango/express/topology.py @@ -0,0 +1,74 @@ +from contextlib import contextmanager + +import networkx as nx + +from mango.agent.core import Agent, AgentAddress, State, TopologyService + +AGENT_NODE_KEY = "node" +STATE_EDGE_KEY = "state" + + +class AgentNode: + def __init__(self) -> None: + self.agents: list[Agent] = [] + + def add(self, agent: Agent): + self.agents.append(agent) + + +class Topology: + def __init__(self, graph) -> None: + self.graph = graph + + for node in self.graph.nodes: + self.graph.nodes[node][AGENT_NODE_KEY] = AgentNode() + for edge in self.graph.edges: + self.graph.edges[edge][STATE_EDGE_KEY] = State.NORMAL + + def set_edge_state(self, node_id_from: int, node_id_to: int, state: State): + self.graph[node_id_from, node_id_to] = state + + def inject(self): + # 2nd pass, build the neighborhoods and add it to agents + for node in self.graph.nodes: + agent_node = self.graph.nodes[node][AGENT_NODE_KEY] + state_to_neighbors: dict[State, list[AgentAddress]] = dict() + + for neighbor in self.graph.neighbors(node): + state = self.graph.edges[node, neighbor][STATE_EDGE_KEY] + neighbor_addresses = state_to_neighbors.setdefault(state, []) + neighbor_addresses.extend( + [ + lambda: agent.addr + for agent in self.graph.nodes[neighbor][AGENT_NODE_KEY].agents + ] + ) + for agent in agent_node.agents: + topology_service = agent.service_of_type( + TopologyService, TopologyService() + ) + topology_service.state_to_neighbors = state_to_neighbors + + +def complete_topology(number_of_nodes: int) -> Topology: + """ + Create a fully-connected topology. + """ + graph = nx.complete_graph(number_of_nodes) + return Topology(graph) + + +@contextmanager +def create_topology(directed: bool = False): + topology = Topology(nx.DiGraph() if directed else nx.Graph()) + try: + yield topology + finally: + topology.inject() + + +def per_node(topology: Topology): + for node in topology.graph.nodes: + agent_node = topology.graph.nodes[node][AGENT_NODE_KEY] + yield agent_node + topology.inject() diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..4313722 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,55 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "mango-agents" +version = "2.1.0" +authors = [ + { name="mango Team", email="mango@offis.de" }, +] +description = "Modular Python Agent Framework" +readme = "readme.md" +requires-python = ">=3.10" +classifiers = [ + "License :: OSI Approved :: MIT License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Programming Language :: Python :: Implementation :: CPython", + "Programming Language :: Python :: Implementation :: PyPy", +] +license = {file="LICENSE"} +dependencies = [ + "paho-mqtt>=2.1.0", + "python-dateutil>=2.9.0", + "dill>=0.3.8", + "protobuf>=5.27.2", + "networkx>=3.4.1" +] + +[project.optional-dependencies] +fastjson = [ + "msgspec>=0.18.6" +] +test = [ + "pytest", + "pytest-cov", + "pytest-asyncio", + "pre-commit" +] + +[project.urls] +Homepage = "https://mango-agents.readthedocs.io" +Repository = "https://github.com/OFFIS-DAI/mango" +Issues = "https://github.com/OFFIS-DAI/mango/issues" + + +[tool.pytest.ini_options] +asyncio_default_fixture_loop_scope = "function" +markers = [ + "mqtt", +] diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 48881a9..0000000 --- a/requirements.txt +++ /dev/null @@ -1,27 +0,0 @@ -appdirs==1.4.4 -argcomplete==1.12.0 -atomicwrites==1.4.0 -attrs==20.2.0 -colorama==0.4.3 -colorlog==4.2.1 -decorator==4.4.2 -distlib==0.3.1 -filelock==3.0.12 -iniconfig==1.0.1 -more-itertools==8.5.0 -nox==2020.8.22 -packaging==24.1 -paho-mqtt==2.1.0 -pika==1.1.0 -pluggy>=0.13.1 -protobuf==5.27.2 -pyparsing==2.4.7 -pytest==8.2.2 -pytest-asyncio==0.14.0 -pyzmq==26.0.3 -toml==0.10.1 -virtualenv==20.0.31 -python-dateutil==2.9.0 -dill==0.3.6 -apipkg>=3.0.2 -six>=1.16.0 diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 6279a4d..0000000 --- a/setup.cfg +++ /dev/null @@ -1,6 +0,0 @@ -[tool:pytest] - -markers = - mqtt - -asyncio_default_fixture_loop_scope = function diff --git a/setup.py b/setup.py deleted file mode 100644 index c7c7616..0000000 --- a/setup.py +++ /dev/null @@ -1,133 +0,0 @@ -#!/usr/bin/env python -# from https://github.com/navdeep-G/setup.py -# Note: To use the 'upload' functionality of this file, you must: -# $ pipenv install twine --dev - -import os -import sys -from shutil import rmtree - -from setuptools import Command, find_packages, setup - -# Package meta-data. -NAME = "mango-agents" -DESCRIPTION = "Modular Python Agent Framework" -URL = "https://github.com/OFFIS-DAI/mango" -EMAIL = "mango@offis.de" -AUTHOR = "mango Team" -REQUIRES_PYTHON = ">=3.10.0" -VERSION = "2.0.3" - -# What packages are required for this module to be executed? -REQUIRED = [ - "paho-mqtt>=2.1.0", - "python-dateutil>=2.9.0", - "dill>=0.3.8", - "protobuf>=5.27.2", -] - -# What packages are optional? -EXTRAS = {"fastjson": ["msgspec>=0.18.6"]} - - -# The rest you shouldn't have to touch too much :) -# ------------------------------------------------ -# Except, perhaps the License and Trove Classifiers! -# If you do change the License, remember to change the Trove Classifier for that! - -here = os.path.abspath(os.path.dirname(__file__)) - -# Import the README and use it as the long-description. -# Note: this will only work if 'README.md' is present in your MANIFEST.in file! -try: - with open(os.path.join(here, "readme.md"), encoding="utf-8") as f: - long_description = "\n" + f.read() -except FileNotFoundError: - long_description = DESCRIPTION - -# Load the package's __version__.py module as a dictionary. -about = {} -if not VERSION: - project_slug = NAME.lower().replace("-", "_").replace(" ", "_") - with open(os.path.join(here, project_slug, "__version__.py")) as f: - exec(f.read(), about) -else: - about["__version__"] = VERSION - - -class UploadCommand(Command): - """Support setup.py upload.""" - - description = "Build and publish the package." - user_options = [] - - @staticmethod - def status(s): - """Prints things in bold.""" - print(f"\033[1m{s}\033[0m") - - def initialize_options(self): - pass - - def finalize_options(self): - pass - - def run(self): - try: - self.status("Removing previous builds…") - rmtree(os.path.join(here, "dist")) - except OSError: - pass - - self.status("Building Source and Wheel (universal) distribution…") - os.system(f"{sys.executable} setup.py sdist bdist_wheel --universal") - - self.status("Uploading the package to PyPI via Twine…") - os.system("twine upload dist/*") - - self.status("Pushing git tags…") - os.system("git tag v{}".format(about["__version__"])) - os.system("git push --tags") - - sys.exit() - - -# Where the magic happens: -setup( - name=NAME, - version=about["__version__"], - description=DESCRIPTION, - long_description=long_description, - long_description_content_type="text/markdown", - author=AUTHOR, - author_email=EMAIL, - python_requires=REQUIRES_PYTHON, - # url=URL, - packages=find_packages(exclude=["tests", "*.tests", "*.tests.*", "tests.*"]), - # If your package is a single module, use this instead of 'packages': - # py_modules=['mypackage'], - # entry_points={ - # 'console_scripts': ['mycli=mymodule:cli'], - # }, - install_requires=REQUIRED, - extras_require=EXTRAS, - include_package_data=True, - license="MIT", - classifiers=[ - # Trove classifiers - # Full list: https://pypi.python.org/pypi?%3Aaction=list_classifiers - "License :: OSI Approved :: MIT License", - "Programming Language :: Python", - "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.10", - "Programming Language :: Python :: 3.11", - "Programming Language :: Python :: 3.12", - "Programming Language :: Python :: 3.13", - "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", - ], - # $ setup.py publish support. - cmdclass={ - "upload": UploadCommand, - }, -) diff --git a/tests/unit_tests/express/__init__.py b/tests/unit_tests/express/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit_tests/express/test_topology.py b/tests/unit_tests/express/test_topology.py new file mode 100644 index 0000000..8514f50 --- /dev/null +++ b/tests/unit_tests/express/test_topology.py @@ -0,0 +1,34 @@ +import asyncio +from typing import Any + +import pytest + +from mango import Agent, complete_topology, per_node, run_with_tcp + + +class TopAgent(Agent): + counter: int = 0 + + def handle_message(self, content, meta: dict[str, Any]): + self.counter += 1 + + +@pytest.mark.asyncio +async def test_run_api_style_agent(): + # GIVEN + agents = [] + topology = complete_topology(3) + for node in per_node(topology): + agent = TopAgent() + agents.append(agent) + node.add(agent) + + # WHEN + async with run_with_tcp(1, *agents): + for neighbor in agents[0].neighbors(): + await agents[0].send_message("hello neighbors", neighbor) + await asyncio.sleep(0.1) + + # THEN + assert agents[1].counter == 1 + assert agents[2].counter == 1 diff --git a/tests/unit_tests/role_agent_test.py b/tests/unit_tests/role_agent_test.py index 3fde9bc..99d5ee2 100644 --- a/tests/unit_tests/role_agent_test.py +++ b/tests/unit_tests/role_agent_test.py @@ -114,6 +114,9 @@ def setup(self): assert self.context is not None self.setup_called = True + def handle_message(self, content: Any, meta: Dict): + self.messages = 1 + @pytest.mark.asyncio @pytest.mark.parametrize( @@ -220,3 +223,26 @@ async def test_role_add_remove_context(): agent.remove_role(role) assert len(agent.roles) == 0 + + +@pytest.mark.asyncio +async def test_role_addr(): + c = create_tcp_container(addr=("127.0.0.1", 5555)) + agent = c.register(RoleAgent()) + role = SampleRole() + agent.add_role(role) + + assert role.context.addr == agent.addr + + +@pytest.mark.asyncio +async def test_role_register_after_agent_register(): + c = create_tcp_container(addr=("127.0.0.1", 5555)) + agent = c.register(RoleAgent()) + role = SampleRole() + agent.add_role(role) + + async with activate(c): + await role.context.send_message("", role.context.addr) + + assert role.messages == 1 From eade80ac24a30c4b5969d941be605151f53ef5d7 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 18:56:47 +0200 Subject: [PATCH 07/19] Removing context properties in role_context. --- mango/agent/role.py | 8 -------- tests/unit_tests/role/role_test.py | 10 +++++----- 2 files changed, 5 insertions(+), 13 deletions(-) diff --git a/mango/agent/role.py b/mango/agent/role.py index 0a481b6..e13e88a 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -264,14 +264,6 @@ def data(self): """ return self._get_container() - @property - def context(self) -> float: - return self._agent_context - - @context.setter - def context(self, value: bool): - pass - @property def current_timestamp(self) -> float: return self.context.current_timestamp diff --git a/tests/unit_tests/role/role_test.py b/tests/unit_tests/role/role_test.py index 5c41977..ea8b3f2 100644 --- a/tests/unit_tests/role/role_test.py +++ b/tests/unit_tests/role/role_test.py @@ -21,7 +21,7 @@ def on_change_model(self, model): def test_subscription(): # GIVEN - role_handler = RoleHandler(None, None) + role_handler = RoleHandler(None) ex_role = SubRole() ex_role2 = SubRole() role_model = role_handler.get_or_create_model(RoleModel) @@ -40,7 +40,7 @@ def test_subscription(): def test_subscription_deactivated(): # GIVEN - role_handler = RoleHandler(None, Scheduler()) + role_handler = RoleHandler(Scheduler()) ex_role = SubRole() ex_role2 = SubRole() role_model = role_handler.get_or_create_model(RoleModel) @@ -60,7 +60,7 @@ def test_subscription_deactivated(): def test_no_subscription_update(): # GIVEN - role_handler = RoleHandler(None, None) + role_handler = RoleHandler(None) ex_role = SubRole() role_model = role_handler.get_or_create_model(RoleModel) @@ -74,7 +74,7 @@ def test_no_subscription_update(): def test_append_message_subs(): # GIVEN - role_handler = RoleHandler(None, None) + role_handler = RoleHandler(None) test_role = SubRole() # WHEN @@ -124,7 +124,7 @@ def setup(self) -> None: def test_emit_event(): # GIVEN - role_handler = RoleHandler(None, None) + role_handler = RoleHandler(None) context = RoleContext(role_handler, None, None) ex_role = SubRole() ex_role2 = RoleHandlingEvents() From 5163448704aa9cf653933539a95eccc7e3374fcf Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 19:53:03 +0200 Subject: [PATCH 08/19] Update readme --- readme.md | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/readme.md b/readme.md index 781231d..6d6bb24 100644 --- a/readme.md +++ b/readme.md @@ -30,15 +30,9 @@ A detailed documentation for this project can be found at [mango-agents.readthed ## Installation -*mango* requires Python >= 3.8 and runs on Linux, OSX and Windows. - -For installation of mango you should use +For installation of mango you may use [virtualenv](https://virtualenv.pypa.io/en/latest/#) which can create isolated Python environments for different projects. -It is also recommended to install -[virtualenvwrapper](https://virtualenvwrapper.readthedocs.io/en/latest/index.html) -which makes it easier to manage different virtual environments. - Once you have created a virtual environment you can just run [pip](https://pip.pypa.io/en/stable/) to install it: $ pip install mango-agents From 85aaa0d5da5c4db2cea52a10f5c14333aa0a3282 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 20:13:00 +0200 Subject: [PATCH 09/19] Add codecoverage to codecov. --- .github/workflows/test-mango.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test-mango.yml b/.github/workflows/test-mango.yml index 8bcb687..a7d1eea 100644 --- a/.github/workflows/test-mango.yml +++ b/.github/workflows/test-mango.yml @@ -93,5 +93,8 @@ jobs: - name: Test+Coverage run: | source venv/bin/activate - coverage run -m pytest - coverage report + pytest --cov --cov-report=xml + - uses: codecov/codecov-action@v4 + with: + token: ${{ secrets.CODECOV_TOKEN }} + fail_ci_if_error: false From 9f3557f897eaa240be9d9fc5ed5cc5a1106f6ac2 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 20:21:21 +0200 Subject: [PATCH 10/19] Adding coveragerc. --- .coveragerc | 2 ++ readme.md | 2 ++ 2 files changed, 4 insertions(+) create mode 100644 .coveragerc diff --git a/.coveragerc b/.coveragerc new file mode 100644 index 0000000..e41cdd7 --- /dev/null +++ b/.coveragerc @@ -0,0 +1,2 @@ +[run] +omit = *test* diff --git a/readme.md b/readme.md index 6d6bb24..5d91e1c 100644 --- a/readme.md +++ b/readme.md @@ -11,6 +11,8 @@ ![lifecycle](https://img.shields.io/badge/lifecycle-maturing-blue.svg) [![MIT License](https://img.shields.io/badge/license-MIT-green.svg)](https://github.com/OFFIS-DAI/mango/blob/development/LICENSE) [![Test mango-python](https://github.com/OFFIS-DAI/mango/actions/workflows/test-mango.yml/badge.svg)](https://github.com/OFFIS-DAI/mango/actions/workflows/test-mango.yml) +[![codecov](https://codecov.io/gh/OFFIS-DAI/mango/graph/badge.svg?token=6KVKBICGYG)](https://codecov.io/gh/OFFIS-DAI/mango) + mango (**m**odul**a**r pytho**n** a**g**ent framew**o**rk) is a python library for multi-agent systems (MAS). From f0f7703c38101b4112bc70f38c029d058bc16993 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 20:24:51 +0200 Subject: [PATCH 11/19] Adding coveragerc. --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index e41cdd7..d978679 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,2 @@ [run] -omit = *test* +omit = *tests* From eba1303413c1b3be408bdb47c4ab77c4d5827638 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Mon, 21 Oct 2024 20:31:11 +0200 Subject: [PATCH 12/19] Adding coveragerc. --- .coveragerc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.coveragerc b/.coveragerc index d978679..c712d25 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,2 @@ [run] -omit = *tests* +omit = tests/* From af314231e29fca2d656de1a8000ab1aed95f3ade Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Tue, 22 Oct 2024 10:53:59 +0200 Subject: [PATCH 13/19] Add ruff and coverage to pyproject.toml --- .coveragerc | 2 -- pyproject.toml | 28 ++++++++++++++++++++++++++++ ruff.toml | 23 ----------------------- 3 files changed, 28 insertions(+), 25 deletions(-) delete mode 100644 .coveragerc delete mode 100644 ruff.toml diff --git a/.coveragerc b/.coveragerc deleted file mode 100644 index c712d25..0000000 --- a/.coveragerc +++ /dev/null @@ -1,2 +0,0 @@ -[run] -omit = tests/* diff --git a/pyproject.toml b/pyproject.toml index 4313722..9817121 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,3 +53,31 @@ asyncio_default_fixture_loop_scope = "function" markers = [ "mqtt", ] + +[tool.coverage.run] +omit = ["tests/*"] + +[tool.ruff] +target-version = "py38" + +[tool.ruff.lint] +# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. +# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or +# McCabe complexity (`C901`) by default. +select = ["E", "F", "G", "I", "UP", "AIR", "PIE", "PLR1714", "PLW2901", "TRY201"] +ignore = ["E501"] + +[tool.ruff.lint.per-file-ignores] +"__init__.py" = [ + "I001", # allow unsorted imports in __init__.py + "F401", # allow unused imports in __init__.py +] +"examples/*" = [ + "ARG", # allow unused arguments + "F841", # allow unused local variables +] +"tests/*" = [ + "ARG", # allow unused arguments for pytest fixtures + "E741", # allow reused variables + "F841", # allow unused local variables +] diff --git a/ruff.toml b/ruff.toml deleted file mode 100644 index a1a8efc..0000000 --- a/ruff.toml +++ /dev/null @@ -1,23 +0,0 @@ -target-version = "py38" - -[lint] -# Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. -# Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or -# McCabe complexity (`C901`) by default. -select = ["E", "F", "G", "I", "UP", "AIR", "PIE", "PLR1714", "PLW2901", "TRY201"] -ignore = ["E501"] - -[lint.per-file-ignores] -"__init__.py" = [ - "I001", # allow unsorted imports in __init__.py - "F401", # allow unused imports in __init__.py -] -"examples/*" = [ - "ARG", # allow unused arguments - "F841", # allow unused local variables -] -"tests/*" = [ - "ARG", # allow unused arguments for pytest fixtures - "E741", # allow reused variables - "F841", # allow unused local variables -] From c27cd301abb8f08b503caabbfebe8376f9d55c53 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Tue, 22 Oct 2024 14:13:26 +0200 Subject: [PATCH 14/19] Added docs and some additional API for topology feature. --- docs/source/conf.py | 2 +- docs/source/index.rst | 1 + docs/source/topology.rst | 81 +++++++++++++++++++++++ mango/__init__.py | 8 ++- mango/express/topology.py | 73 ++++++++++++++++++-- pyproject.toml | 3 - tests/unit_tests/express/test_topology.py | 26 +++++++- 7 files changed, 182 insertions(+), 12 deletions(-) create mode 100644 docs/source/topology.rst diff --git a/docs/source/conf.py b/docs/source/conf.py index 1a04453..ac2440a 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -11,7 +11,7 @@ author = "mango team" # The full version, including alpha/beta/rc tags -version = release = "2.0.0" +version = release = "2.1.0" # -- General configuration --------------------------------------------------- diff --git a/docs/source/index.rst b/docs/source/index.rst index 185b386..d77c094 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -30,6 +30,7 @@ Features message exchange role-api scheduling + topology codecs api_ref/index migration diff --git a/docs/source/topology.rst b/docs/source/topology.rst new file mode 100644 index 0000000..f7152c8 --- /dev/null +++ b/docs/source/topology.rst @@ -0,0 +1,81 @@ +================ +Topologies +================ + +The framework provides out of the box support for creating and distributing topologies to the agents. +This can be done using either :meth:`mango.create_topology` or :meth:`mango.per_node`. With the first method +a topology has to be created from scratch, and for every node you need to assign the agents. + +.. testcode:: + + import asyncio + from typing import Any + + from mango import Agent, run_with_tcp, create_topology + + class TopAgent(Agent): + counter: int = 0 + + def handle_message(self, content, meta: dict[str, Any]): + self.counter += 1 + + async def start_example(): + agents = [TopAgent(), TopAgent(), TopAgent()] + with create_topology() as topology: + id_1 = topology.add_node(agents[0]) + id_2 = topology.add_node(agents[1]) + id_3 = topology.add_node(agents[2]) + topology.add_edge(id_1, id_2) + topology.add_edge(id_1, id_3) + + async with run_with_tcp(1, *agents): + for neighbor in agents[0].neighbors(): + await agents[0].send_message("hello neighbors", neighbor) + await asyncio.sleep(0.1) + + print(agents[1].counter) + print(agents[2].counter) + + asyncio.run(start_example()) + +.. testoutput:: + + 1 + 1 + +The other method would be to use :meth:`mango.per_node`. Here you want to create a topology beforehand, this can be done using :meth:`mango.custom_topology` by providing +a networkx Graph. + +.. testcode:: + + import asyncio + from typing import Any + + from mango import Agent, run_with_tcp, per_node, complete_topology + + class TopAgent(Agent): + counter: int = 0 + + def handle_message(self, content, meta: dict[str, Any]): + self.counter += 1 + + async def start_example(): + topology = complete_topology(3) + for node in per_node(topology): + agent = TopAgent() + node.add(agent) + + async with run_with_tcp(1, *topology.agents): + for neighbor in topology.agents[0].neighbors(): + await topology.agents[0].send_message("hello neighbors", neighbor) + await asyncio.sleep(0.1) + + print(topology.agents[1].counter) + print(topology.agents[2].counter) + + asyncio.run(start_example()) + +.. testoutput:: + + 1 + 1 diff --git a/mango/__init__.py b/mango/__init__.py index 730c873..77d41a6 100644 --- a/mango/__init__.py +++ b/mango/__init__.py @@ -23,4 +23,10 @@ PROTOBUF, SerializationError, ) -from .express.topology import Topology, create_topology, complete_topology, per_node +from .express.topology import ( + Topology, + create_topology, + complete_topology, + per_node, + custom_topology, +) diff --git a/mango/express/topology.py b/mango/express/topology.py index ff5c73a..a935604 100644 --- a/mango/express/topology.py +++ b/mango/express/topology.py @@ -8,9 +8,13 @@ STATE_EDGE_KEY = "state" +def _flatten_list(list): + return [x for xs in list for x in xs] + + class AgentNode: - def __init__(self) -> None: - self.agents: list[Agent] = [] + def __init__(self, agents: list = None) -> None: + self.agents: list[Agent] = [] if agents is None else agents def add(self, agent: Agent): self.agents.append(agent) @@ -18,7 +22,7 @@ def add(self, agent: Agent): class Topology: def __init__(self, graph) -> None: - self.graph = graph + self.graph: nx.Graph = graph for node in self.graph.nodes: self.graph.nodes[node][AGENT_NODE_KEY] = AgentNode() @@ -26,7 +30,27 @@ def __init__(self, graph) -> None: self.graph.edges[edge][STATE_EDGE_KEY] = State.NORMAL def set_edge_state(self, node_id_from: int, node_id_to: int, state: State): - self.graph[node_id_from, node_id_to] = state + self.graph.edges[node_id_from, node_id_to] = state + + def add_node(self, *agents: Agent): + id = len(self.graph) + self.graph.add_node(id, node=AgentNode(agents)) + return id + + def add_edge(self, node_from, node_to, state: State = State.NORMAL): + self.graph.add_edge(node_from, node_to, state=state) + + @property + def agents(self) -> list[Agent]: + """Return all agents controlled by the topology after + the neighborhood were injected. + + :return: the list of agents + :rtype: list[Agent] + """ + return _flatten_list( + [self.graph.nodes[node][AGENT_NODE_KEY].agents for node in self.graph.nodes] + ) def inject(self): # 2nd pass, build the neighborhoods and add it to agents @@ -58,8 +82,33 @@ def complete_topology(number_of_nodes: int) -> Topology: return Topology(graph) +def custom_topology(graph: nx.Graph) -> Topology: + """ + Create an already created custom topology base on a nx Graph. + """ + return Topology(graph) + + @contextmanager def create_topology(directed: bool = False): + """Create a topology, which will automatically inject the neighbors to + the participating agents. Example: + + .. code-block:: python + + agents = [TopAgent(), TopAgent(), TopAgent()] + with create_topology() as topology: + id_1 = topology.add_node(agents[0]) + id_2 = topology.add_node(agents[1]) + id_3 = topology.add_node(agents[2]) + topology.add_edge(id_1, id_2) + topology.add_edge(id_1, id_3) + + :param directed: _description_, defaults to False + :type directed: bool, optional + :yield: _description_ + :rtype: _type_ + """ topology = Topology(nx.DiGraph() if directed else nx.Graph()) try: yield topology @@ -68,6 +117,22 @@ def create_topology(directed: bool = False): def per_node(topology: Topology): + """Assign agents per node of the already created topology. This method + shall be used as iterator in a for in construct. The iterator will return + nodes, which can be used to add (with node.add()) agents to the node. + + .. code-block:: python + + topology = complete_topology(3) + for node in per_node(topology): + node.add(TopAgent()) + + + :param topology: the topology + :type topology: Topology + :yield: AgentNode + :rtype: _type_ + """ for node in topology.graph.nodes: agent_node = topology.graph.nodes[node][AGENT_NODE_KEY] yield agent_node diff --git a/pyproject.toml b/pyproject.toml index 9817121..5e8f75a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,9 +57,6 @@ markers = [ [tool.coverage.run] omit = ["tests/*"] -[tool.ruff] -target-version = "py38" - [tool.ruff.lint] # Enable Pyflakes (`F`) and a subset of the pycodestyle (`E`) codes by default. # Unlike Flake8, Ruff doesn't enable pycodestyle warnings (`W`) or diff --git a/tests/unit_tests/express/test_topology.py b/tests/unit_tests/express/test_topology.py index 8514f50..2206e68 100644 --- a/tests/unit_tests/express/test_topology.py +++ b/tests/unit_tests/express/test_topology.py @@ -3,7 +3,7 @@ import pytest -from mango import Agent, complete_topology, per_node, run_with_tcp +from mango import Agent, complete_topology, create_topology, per_node, run_with_tcp class TopAgent(Agent): @@ -16,13 +16,33 @@ def handle_message(self, content, meta: dict[str, Any]): @pytest.mark.asyncio async def test_run_api_style_agent(): # GIVEN - agents = [] topology = complete_topology(3) for node in per_node(topology): agent = TopAgent() - agents.append(agent) node.add(agent) + # WHEN + async with run_with_tcp(1, *topology.agents): + for neighbor in topology.agents[0].neighbors(): + await topology.agents[0].send_message("hello neighbors", neighbor) + await asyncio.sleep(0.1) + + # THEN + assert topology.agents[1].counter == 1 + assert topology.agents[2].counter == 1 + + +@pytest.mark.asyncio +async def test_run_api_style_custom_topology(): + # GIVEN + agents = [TopAgent(), TopAgent(), TopAgent()] + with create_topology() as topology: + id_1 = topology.add_node(agents[0]) + id_2 = topology.add_node(agents[1]) + id_3 = topology.add_node(agents[2]) + topology.add_edge(id_1, id_2) + topology.add_edge(id_1, id_3) + # WHEN async with run_with_tcp(1, *agents): for neighbor in agents[0].neighbors(): From af6e2f944023011c980bcd461b898b7aee51097c Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Tue, 22 Oct 2024 14:20:43 +0200 Subject: [PATCH 15/19] Ruff liniting. --- mango/agent/role.py | 3 ++- mango/container/tcp.py | 4 ++-- mango/messages/codecs.py | 4 ++-- mango/util/distributed_clock.py | 4 ++-- mango/util/scheduling.py | 10 +++++----- .../core/test_external_scheduling_container.py | 8 ++++---- tests/unit_tests/role_agent_test.py | 12 ++++++------ 7 files changed, 23 insertions(+), 22 deletions(-) diff --git a/mango/agent/role.py b/mango/agent/role.py index e13e88a..9fbe153 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -34,7 +34,8 @@ import asyncio from abc import ABC -from typing import Any, Callable +from collections.abc import Callable +from typing import Any from mango.agent.core import Agent, AgentAddress, AgentDelegates diff --git a/mango/container/tcp.py b/mango/container/tcp.py index 3e616b5..0e36071 100644 --- a/mango/container/tcp.py +++ b/mango/container/tcp.py @@ -220,7 +220,7 @@ async def send_message( protocol_addr = receiver_addr.protocol_addr if isinstance(protocol_addr, str) and ":" in protocol_addr: protocol_addr = protocol_addr.split(":") - elif isinstance(protocol_addr, (tuple, list)) and len(protocol_addr) == 2: + elif isinstance(protocol_addr, tuple | list) and len(protocol_addr) == 2: protocol_addr = tuple(protocol_addr) else: logger.warning("Address for sending message is not valid;%s", protocol_addr) @@ -257,7 +257,7 @@ async def _send_external_message(self, addr, message, meta) -> bool: :param message: The message :return: """ - if addr is None or not isinstance(addr, (tuple, list)) or len(addr) != 2: + if addr is None or not isinstance(addr, tuple | list) or len(addr) != 2: logger.warning( "Sending external message not successful, invalid address; %s", addr, diff --git a/mango/messages/codecs.py b/mango/messages/codecs.py index c5ae228..f1af688 100644 --- a/mango/messages/codecs.py +++ b/mango/messages/codecs.py @@ -244,14 +244,14 @@ def _acl_to_proto(self, acl_message): msg.reply_by = acl_message.reply_by if acl_message.reply_by else "" msg.in_reply_to = acl_message.in_reply_to if acl_message.in_reply_to else "" - if isinstance(acl_message.sender_addr, (tuple, list)): + if isinstance(acl_message.sender_addr, tuple | list): msg.sender_addr = ( f"{acl_message.sender_addr[0]}:{acl_message.sender_addr[1]}" ) elif acl_message.sender_addr: msg.sender_addr = acl_message.sender_addr - if isinstance(acl_message.receiver_addr, (tuple, list)): + if isinstance(acl_message.receiver_addr, tuple | list): msg.receiver_addr = ( f"{acl_message.receiver_addr[0]}:{acl_message.receiver_addr[1]}" ) diff --git a/mango/util/distributed_clock.py b/mango/util/distributed_clock.py index b5dd345..b088265 100644 --- a/mango/util/distributed_clock.py +++ b/mango/util/distributed_clock.py @@ -28,7 +28,7 @@ def handle_message(self, content: float, meta): sender = sender_addr(meta) logger.debug("clockmanager: %s from %s", content, sender) if content: - assert isinstance(content, (int, float)), f"{content} was {type(content)}" + assert isinstance(content, int | float), f"{content} was {type(content)}" self.schedules.append(content) if not self.futures[sender].done(): @@ -177,5 +177,5 @@ def respond(fut: asyncio.Future = None): t.add_done_callback(respond) else: - assert isinstance(content, (int, float)), f"{content} was {type(content)}" + assert isinstance(content, int | float), f"{content} was {type(content)}" self.scheduler.clock.set_time(content) diff --git a/mango/util/scheduling.py b/mango/util/scheduling.py index b207d20..8ec62fd 100644 --- a/mango/util/scheduling.py +++ b/mango/util/scheduling.py @@ -10,7 +10,7 @@ from dataclasses import dataclass from multiprocessing import Manager from multiprocessing.synchronize import Event as MultiprocessingEvent -from typing import Any, List, Tuple +from typing import Any from dateutil.rrule import rrule @@ -453,12 +453,12 @@ def __init__( observable=True, ): # List of Tuples with asyncio.Future, ScheduledTask, Suspendable coro, Source - self._scheduled_tasks: List[ - Tuple[ScheduledTask, asyncio.Future, Suspendable, Any] + self._scheduled_tasks: list[ + tuple[ScheduledTask, asyncio.Future, Suspendable, Any] ] = [] self.clock = clock if clock is not None else AsyncioClock() - self._scheduled_process_tasks: List[ - Tuple[ScheduledProcessTask, Future, ScheduledProcessControl, Any] + self._scheduled_process_tasks: list[ + tuple[ScheduledProcessTask, Future, ScheduledProcessControl, Any] ] = [] self._manager = None self._num_process_parallel = num_process_parallel diff --git a/tests/unit_tests/core/test_external_scheduling_container.py b/tests/unit_tests/core/test_external_scheduling_container.py index 2efe276..9301d8c 100644 --- a/tests/unit_tests/core/test_external_scheduling_container.py +++ b/tests/unit_tests/core/test_external_scheduling_container.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any, Dict +from typing import Any import pytest @@ -76,7 +76,7 @@ async def send_ping(self): ) self.current_ping += 1 - def handle_message(self, content, meta: Dict[str, Any]): + def handle_message(self, content, meta: dict[str, Any]): self.schedule_instant_task(self.sleep_and_answer(content, meta)) async def sleep_and_answer(self, content, meta): @@ -113,7 +113,7 @@ def on_register(self): async def print_cond_task_finished(self): pass - def handle_message(self, content, meta: Dict[str, Any]): + def handle_message(self, content, meta: dict[str, Any]): self.received_msg = True @@ -180,7 +180,7 @@ def __init__(self, final_number=3): self.no_received_msg = 0 self.final_no = final_number - def handle_message(self, content, meta: Dict[str, Any]): + def handle_message(self, content, meta: dict[str, Any]): self.no_received_msg += 1 # pretend to be really busy i = 0 diff --git a/tests/unit_tests/role_agent_test.py b/tests/unit_tests/role_agent_test.py index 99d5ee2..f740810 100644 --- a/tests/unit_tests/role_agent_test.py +++ b/tests/unit_tests/role_agent_test.py @@ -1,6 +1,6 @@ import asyncio import datetime -from typing import Any, Dict +from typing import Any import pytest @@ -15,10 +15,10 @@ def setup(self): self, self.react_handle_message, self.is_applicable ) - def react_handle_message(self, content, meta: Dict[str, Any]) -> None: + def react_handle_message(self, content, meta: dict[str, Any]) -> None: pass - def is_applicable(self, content, meta: Dict[str, Any]) -> bool: + def is_applicable(self, content, meta: dict[str, Any]) -> bool: return True @@ -27,7 +27,7 @@ def __init__(self): super().__init__() self.sending_tasks = [] - def react_handle_message(self, content, meta: Dict[str, Any]): + def react_handle_message(self, content, meta: dict[str, Any]): assert "sender_addr" in meta.keys() and "sender_id" in meta.keys() # send back pong, providing your own details @@ -47,7 +47,7 @@ def __init__(self, target, expect_no_answer=False): self.target = target self._expect_no_answer = expect_no_answer - def react_handle_message(self, content, meta: Dict[str, Any]): + def react_handle_message(self, content, meta: dict[str, Any]): assert "sender_addr" in meta.keys() and "sender_id" in meta.keys() sender = sender_addr(meta) assert sender in self.open_ping_requests.keys() @@ -114,7 +114,7 @@ def setup(self): assert self.context is not None self.setup_called = True - def handle_message(self, content: Any, meta: Dict): + def handle_message(self, content: Any, meta: dict): self.messages = 1 From f08a76f6eb854a0780557b496c2d79cc8554c20f Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Tue, 22 Oct 2024 10:19:08 +0200 Subject: [PATCH 16/19] if subscription for "handle_message" exists, do not execute generic handle_message function for RoleHandlers --- mango/agent/role.py | 9 +++++++-- tests/unit_tests/role_agent_test.py | 22 +++++++++++++++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/mango/agent/role.py b/mango/agent/role.py index 9fbe153..984a519 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -185,11 +185,16 @@ def handle_message(self, content, meta: dict[str, Any]): :param content: content :param meta: meta """ - for role in self.roles: - role.handle_message(content, meta) + handle_message_found = False for role, message_condition, method, _ in self._message_subs: + # do not execute handle_message twice if role has subscription as well + if method.__name__ == "handle_message": + handle_message_found = True if self._is_role_active(role) and message_condition(content, meta): method(content, meta) + if not handle_message_found: + for role in self.roles: + role.handle_message(content, meta) def _notify_send_message_subs(self, content, receiver_addr: AgentAddress, **kwargs): for role in self._send_msg_subs: diff --git a/tests/unit_tests/role_agent_test.py b/tests/unit_tests/role_agent_test.py index f740810..47bbee1 100644 --- a/tests/unit_tests/role_agent_test.py +++ b/tests/unit_tests/role_agent_test.py @@ -109,13 +109,20 @@ def setup(self): class SampleRole(Role): def __init__(self): self.setup_called = False + self.messages = 0 def setup(self): assert self.context is not None self.setup_called = True def handle_message(self, content: Any, meta: dict): - self.messages = 1 + self.messages += 1 + + +class SampleSubRole(SampleRole): + def setup(self): + super().setup() + self.context.subscribe_message(self, self.handle_message, lambda c, m: True) @pytest.mark.asyncio @@ -246,3 +253,16 @@ async def test_role_register_after_agent_register(): await role.context.send_message("", role.context.addr) assert role.messages == 1 + + +@pytest.mark.asyncio +async def test_role_with_message_handler(): + c = create_tcp_container(addr=("127.0.0.1", 5555)) + agent = c.register(RoleAgent()) + role = SampleSubRole() + agent.add_role(role) + + async with activate(c): + await role.context.send_message("", role.context.addr) + + assert role.messages == 1 From 3a9104402e23142cd3fde2a538143475e844a3f8 Mon Sep 17 00:00:00 2001 From: Florian Maurer Date: Tue, 22 Oct 2024 14:50:52 +0200 Subject: [PATCH 17/19] add default param for datacontainer get function the usage pattern looks like one should be able to do this which would come in handy --- mango/agent/role.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mango/agent/role.py b/mango/agent/role.py index 9fbe153..d572e60 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -50,11 +50,11 @@ def __setitem__(self, key, newvalue): def __contains__(self, key): return hasattr(self, key) - def get(self, key): + def get(self, key, default=None): if key in self: return self[key] else: - return None + return default def update(self, data: dict): for k, v in data.items(): From 4e3119fde1ca6e1767c9ad0a00fcc5e80f660671 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Tue, 22 Oct 2024 15:54:34 +0200 Subject: [PATCH 18/19] Moving inbox to on_start. --- mango/agent/core.py | 10 +++++++--- mango/container/core.py | 2 +- mango/container/mp.py | 3 +++ pyproject.toml | 2 +- tests/unit_tests/core/test_agent.py | 19 ++++++++++++++++++- tests/unit_tests/core/test_container.py | 8 ++++---- 6 files changed, 34 insertions(+), 10 deletions(-) diff --git a/mango/agent/core.py b/mango/agent/core.py index 396eae7..4768f16 100644 --- a/mango/agent/core.py +++ b/mango/agent/core.py @@ -420,9 +420,6 @@ def on_register(self): """ def _do_register(self, container, aid): - self._check_inbox_task = asyncio.create_task(self._check_inbox()) - self._check_inbox_task.add_done_callback(self._raise_exceptions) - self._stopped = asyncio.Future() self._aid = aid self.context = AgentContext(container) self.scheduler = Scheduler( @@ -430,6 +427,13 @@ def _do_register(self, container, aid): ) self.on_register() + def _do_start(self): + self._check_inbox_task = asyncio.create_task(self._check_inbox()) + self._check_inbox_task.add_done_callback(self._raise_exceptions) + self._stopped = asyncio.Future() + + self.on_start() + def _raise_exceptions(self, fut: asyncio.Future): """ Inline function used as a callback to raise exceptions diff --git a/mango/container/core.py b/mango/container/core.py index 6eeffef..aea994e 100644 --- a/mango/container/core.py +++ b/mango/container/core.py @@ -298,7 +298,7 @@ async def start(self): """Start the container. It totally depends on the implementation for what is actually happening.""" for agent in self._agents.values(): - agent.on_start() + agent._do_start() def on_ready(self): for agent in self._agents.values(): diff --git a/mango/container/mp.py b/mango/container/mp.py index 35ea462..d460216 100644 --- a/mango/container/mp.py +++ b/mango/container/mp.py @@ -123,6 +123,9 @@ async def start_agent_loop(): agent_creator(container) process_initialized_event.set() + for agent in container._agents.values(): + agent._do_start() + container.running = True while not terminate_event.is_set(): await asyncio.sleep(WAIT_STEP) await container.shutdown() diff --git a/pyproject.toml b/pyproject.toml index 5e8f75a..074914e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -27,7 +27,7 @@ dependencies = [ "paho-mqtt>=2.1.0", "python-dateutil>=2.9.0", "dill>=0.3.8", - "protobuf>=5.27.2", + "protobuf==5.27.2", "networkx>=3.4.1" ] diff --git a/tests/unit_tests/core/test_agent.py b/tests/unit_tests/core/test_agent.py index 8acfb65..fc80f26 100644 --- a/tests/unit_tests/core/test_agent.py +++ b/tests/unit_tests/core/test_agent.py @@ -33,7 +33,6 @@ async def increase_counter(): # THEN assert len(l) == 2 - await c.shutdown() @pytest.mark.asyncio @@ -102,3 +101,21 @@ async def test_schedule_acl_message(): # THEN assert agent2.test_counter == 1 + + +def test_sync_setup_agent(): + # this test is not async and therefore does not provide a running event loop + c = create_tcp_container(addr=("127.0.0.1", 5555)) + # registration without async context should not raise "no running event loop" error + agent = c.register(MyAgent()) + agent2 = c.register(MyAgent()) + + async def run_this(c): + async with activate(c) as c: + await agent.schedule_instant_message( + create_acl("", receiver_addr=agent2.addr, sender_addr=agent.addr), + receiver_addr=agent2.addr, + ) # THEN + assert agent2.test_counter == 1 + + asyncio.run(run_this(c)) diff --git a/tests/unit_tests/core/test_container.py b/tests/unit_tests/core/test_container.py index 45c8df7..fe01139 100644 --- a/tests/unit_tests/core/test_container.py +++ b/tests/unit_tests/core/test_container.py @@ -208,8 +208,8 @@ async def test_send_message_no_copy(): agent1 = c.register(ExampleAgent()) message_to_send = Data() - await c.send_message(message_to_send, receiver_addr=agent1.addr) - await c.shutdown() + async with activate(c): + await c.send_message(message_to_send, receiver_addr=agent1.addr) assert agent1.content is message_to_send @@ -220,8 +220,8 @@ async def test_send_message_copy(): agent1 = c.register(ExampleAgent()) message_to_send = Data() - await c.send_message(message_to_send, receiver_addr=agent1.addr) - await c.shutdown() + async with activate(c): + await c.send_message(message_to_send, receiver_addr=agent1.addr) assert agent1.content is not message_to_send From b41e76a549a90e7f12789e168e6f5649cef9df87 Mon Sep 17 00:00:00 2001 From: Rico Schrage Date: Tue, 22 Oct 2024 16:03:20 +0200 Subject: [PATCH 19/19] Fixing test. --- tests/unit_tests/core/test_external_scheduling_container.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/core/test_external_scheduling_container.py b/tests/unit_tests/core/test_external_scheduling_container.py index 9301d8c..9e74fbe 100644 --- a/tests/unit_tests/core/test_external_scheduling_container.py +++ b/tests/unit_tests/core/test_external_scheduling_container.py @@ -66,7 +66,7 @@ def __init__(self): self.current_ping = 0 self.tasks = [] - def on_register(self): + def on_ready(self): self.tasks.append(self.schedule_periodic_task(self.send_ping, delay=10)) async def send_ping(self): @@ -216,9 +216,9 @@ async def test_send_internal_messages(): @pytest.mark.asyncio async def test_step_with_replying_agent(): external_scheduling_container = create_ec_container(addr="external_eid_1") + reply_agent = external_scheduling_container.register(ReplyAgent()) async with activate(external_scheduling_container) as c: - reply_agent = external_scheduling_container.register(ReplyAgent()) new_acl_msg = ACLMessage() new_acl_msg.content = "hello you" new_acl_msg.receiver_addr = "external_eid_1"