Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.1.0 #123

Merged
merged 31 commits into from
Oct 23, 2024
Merged

2.1.0 #123

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
24dec57
mark tests with mqtt
maurerle Oct 21, 2024
348ce11
Merge pull request #110 from maurerle/mark_mqtt
rcschrg Oct 21, 2024
9637581
remove overloaded property functions in RoleContext
maurerle Oct 21, 2024
d8d9735
Merge pull request #113 from maurerle/role_addr
rcschrg Oct 21, 2024
59a5e31
use AgentAddress in create_acl for receiver_addr and sender_addr
maurerle Oct 21, 2024
a599f28
Merge pull request #114 from maurerle/acl_msg
rcschrg Oct 21, 2024
bc11657
add test which sets suspendable False
maurerle Oct 21, 2024
153d13f
the context of the role context was not initialized. This should dele…
maurerle Oct 21, 2024
cdcd26a
Added topology feature. Changed to modern build tool.
rcschrg Oct 21, 2024
01887d3
Merge pull request #116 from maurerle/set_suspendable_false
rcschrg Oct 21, 2024
f7deccf
Merge branch 'development' into feature-topologies
rcschrg Oct 21, 2024
90bfb2a
Merge pull request #115 from maurerle/fix_role_context
rcschrg Oct 21, 2024
4f137ba
Merge remote-tracking branch 'origin/development' into feature-topolo…
rcschrg Oct 21, 2024
eade80a
Removing context properties in role_context.
rcschrg Oct 21, 2024
5163448
Update readme
rcschrg Oct 21, 2024
85aaa0d
Add codecoverage to codecov.
rcschrg Oct 21, 2024
9f3557f
Adding coveragerc.
rcschrg Oct 21, 2024
f0f7703
Adding coveragerc.
rcschrg Oct 21, 2024
eba1303
Adding coveragerc.
rcschrg Oct 21, 2024
72bac62
Merge pull request #118 from OFFIS-DAI/feature-coverage
rcschrg Oct 21, 2024
af31423
Add ruff and coverage to pyproject.toml
rcschrg Oct 22, 2024
c27cd30
Added docs and some additional API for topology feature.
rcschrg Oct 22, 2024
af6e2f9
Ruff liniting.
rcschrg Oct 22, 2024
500c4d7
Merge pull request #117 from OFFIS-DAI/feature-topologies
rcschrg Oct 22, 2024
f08a76f
if subscription for "handle_message" exists, do not execute generic h…
maurerle Oct 22, 2024
3a91044
add default param for datacontainer get function
maurerle Oct 22, 2024
4e3119f
Moving inbox to on_start.
rcschrg Oct 22, 2024
551426b
Merge pull request #121 from maurerle/add_default_get
rcschrg Oct 22, 2024
97c31cf
Merge pull request #120 from maurerle/double_subscription
rcschrg Oct 22, 2024
b41e76a
Fixing test.
rcschrg Oct 22, 2024
daa701b
Merge pull request #122 from OFFIS-DAI/sync-registering
rcschrg Oct 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/publish-mango.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ jobs:
pip install virtualenv
virtualenv venv
source venv/bin/activate
pip3 install -r requirements.txt
pip3 install -e .
pip3 install -e .[test]
sudo apt update
sudo apt install --assume-yes mosquitto
sudo service mosquitto start
Expand All @@ -33,7 +32,8 @@ jobs:
- name: Build package
run: |
source venv/bin/activate
python setup.py sdist bdist_wheel
python -m pip install build
python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
with:
Expand Down
13 changes: 7 additions & 6 deletions .github/workflows/test-mango.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ jobs:
source venv/bin/activate
pip3 install -U sphinx
pip3 install -r docs/requirements.txt
pip3 install -r requirements.txt
pip3 install -e .
pip3 install -e .[test]
brew install mosquitto
brew services start mosquitto
pip3 install pytest coverage ruff
Expand Down Expand Up @@ -76,8 +75,7 @@ jobs:
source venv/bin/activate
pip3 install -U sphinx
pip3 install -r docs/requirements.txt
pip3 install -r requirements.txt
pip3 install -e .
pip3 install -e .[test]
sudo apt update
sudo apt install --assume-yes mosquitto
sudo service mosquitto start
Expand All @@ -95,5 +93,8 @@ jobs:
- name: Test+Coverage
run: |
source venv/bin/activate
coverage run -m pytest
coverage report
pytest --cov --cov-report=xml
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
author = "mango team"

# The full version, including alpha/beta/rc tags
version = release = "2.0.0"
version = release = "2.1.0"


# -- General configuration ---------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Features
message exchange
role-api
scheduling
topology
codecs
api_ref/index
migration
Expand Down
81 changes: 81 additions & 0 deletions docs/source/topology.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
================
Topologies
================

The framework provides out of the box support for creating and distributing topologies to the agents.
This can be done using either :meth:`mango.create_topology` or :meth:`mango.per_node`. With the first method
a topology has to be created from scratch, and for every node you need to assign the agents.

.. testcode::

import asyncio
from typing import Any

from mango import Agent, run_with_tcp, create_topology

class TopAgent(Agent):
counter: int = 0

def handle_message(self, content, meta: dict[str, Any]):
self.counter += 1

async def start_example():
agents = [TopAgent(), TopAgent(), TopAgent()]
with create_topology() as topology:
id_1 = topology.add_node(agents[0])
id_2 = topology.add_node(agents[1])
id_3 = topology.add_node(agents[2])
topology.add_edge(id_1, id_2)
topology.add_edge(id_1, id_3)

async with run_with_tcp(1, *agents):
for neighbor in agents[0].neighbors():
await agents[0].send_message("hello neighbors", neighbor)
await asyncio.sleep(0.1)
print(agents[1].counter)
print(agents[2].counter)

asyncio.run(start_example())

.. testoutput::

1
1

The other method would be to use :meth:`mango.per_node`. Here you want to create a topology beforehand, this can be done using :meth:`mango.custom_topology` by providing
a networkx Graph.

.. testcode::

import asyncio
from typing import Any

from mango import Agent, run_with_tcp, per_node, complete_topology

class TopAgent(Agent):
counter: int = 0

def handle_message(self, content, meta: dict[str, Any]):
self.counter += 1

async def start_example():
topology = complete_topology(3)
for node in per_node(topology):
agent = TopAgent()
node.add(agent)

async with run_with_tcp(1, *topology.agents):
for neighbor in topology.agents[0].neighbors():
await topology.agents[0].send_message("hello neighbors", neighbor)
await asyncio.sleep(0.1)
print(topology.agents[1].counter)
print(topology.agents[2].counter)

asyncio.run(start_example())

.. testoutput::

1
1
7 changes: 7 additions & 0 deletions mango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@
PROTOBUF,
SerializationError,
)
from .express.topology import (
Topology,
create_topology,
complete_topology,
per_node,
custom_topology,
)
51 changes: 47 additions & 4 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
from abc import ABC
from dataclasses import dataclass
from enum import Enum
from typing import Any

from ..util.clock import Clock
Expand All @@ -23,6 +24,21 @@ class AgentAddress:
aid: str


class State(Enum):
NORMAL = 0 # normal neighbor
INACTIVE = (
1 # neighbor link exists but link is not active (could be activated/used)
)
BROKEN = 2 # neighbor link exists but link is not usable (can not be activated)


class TopologyService:
state_to_neighbors: dict[State, list] = dict()

def neighbors(self, state: State = State.NORMAL):
return [f() for f in self.state_to_neighbors.get(state, [])]


class AgentContext:
def __init__(self, container) -> None:
self._container = container
Expand Down Expand Up @@ -69,6 +85,7 @@ def __init__(self) -> None:
self.context: AgentContext = None
self.scheduler: Scheduler = None
self._aid = None
self._services = {}

def on_start(self):
"""Called when container started in which the agent is contained"""
Expand Down Expand Up @@ -344,6 +361,28 @@ async def tasks_complete(self, timeout=1):
"""
await self.scheduler.tasks_complete(timeout=timeout)

def service_of_type(self, type: type, default: Any = None) -> Any:
"""Return the service of the type ``type`` or set the default as service and return it.

:param type: the type of the service
:type type: type
:param default: the default if applicable
:type default: Any (optional)
:return: the service
:rtype: Any
"""
if type not in self._services:
self._services[type] = default
return self._services[type]

def neighbors(self, state: State = State.NORMAL) -> list[AgentAddress]:
"""Return the neighbors of the agent (controlled by the topology api).

:return: the list of agent addresses filtered by state
:rtype: list[AgentAddress]
"""
return self.service_of_type(TopologyService).neighbors(state)


class Agent(ABC, AgentDelegates):
"""Base class for all agents."""
Expand Down Expand Up @@ -373,24 +412,28 @@ def suspendable_tasks(self):

@suspendable_tasks.setter
def suspendable_tasks(self, value: bool):
self.self.scheduler.suspendable = value
self.scheduler.suspendable = value

def on_register(self):
"""
Hook-in to define behavior of the agent directly after it got registered by a container
"""

def _do_register(self, container, aid):
self._check_inbox_task = asyncio.create_task(self._check_inbox())
self._check_inbox_task.add_done_callback(self._raise_exceptions)
self._stopped = asyncio.Future()
self._aid = aid
self.context = AgentContext(container)
self.scheduler = Scheduler(
suspendable=True, observable=True, clock=container.clock
)
self.on_register()

def _do_start(self):
self._check_inbox_task = asyncio.create_task(self._check_inbox())
self._check_inbox_task.add_done_callback(self._raise_exceptions)
self._stopped = asyncio.Future()

self.on_start()

def _raise_exceptions(self, fut: asyncio.Future):
"""
Inline function used as a callback to raise exceptions
Expand Down
37 changes: 16 additions & 21 deletions mango/agent/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

import asyncio
from abc import ABC
from typing import Any, Callable
from collections.abc import Callable
from typing import Any

from mango.agent.core import Agent, AgentAddress, AgentDelegates

Expand All @@ -49,11 +50,11 @@ def __setitem__(self, key, newvalue):
def __contains__(self, key):
return hasattr(self, key)

def get(self, key):
def get(self, key, default=None):
if key in self:
return self[key]
else:
return None
return default

def update(self, data: dict):
for k, v in data.items():
Expand All @@ -67,15 +68,14 @@ class Role:
class RoleHandler:
"""Contains all roles and their models. Implements the communication between roles."""

def __init__(self, agent_context, scheduler):
def __init__(self, scheduler):
self._role_models = {}
self._roles = []
self._role_to_active = {}
self._role_model_type_to_subs = {}
self._message_subs = []
self._send_msg_subs = {}
self._role_event_type_to_handler = {}
self._agent_context = agent_context
self._scheduler = scheduler
self._data = DataContainer()

Expand Down Expand Up @@ -185,11 +185,16 @@ def handle_message(self, content, meta: dict[str, Any]):
:param content: content
:param meta: meta
"""
for role in self.roles:
role.handle_message(content, meta)
handle_message_found = False
for role, message_condition, method, _ in self._message_subs:
# do not execute handle_message twice if role has subscription as well
if method.__name__ == "handle_message":
handle_message_found = True
if self._is_role_active(role) and message_condition(content, meta):
method(content, meta)
if not handle_message_found:
for role in self.roles:
role.handle_message(content, meta)

def _notify_send_message_subs(self, content, receiver_addr: AgentAddress, **kwargs):
for role in self._send_msg_subs:
Expand Down Expand Up @@ -252,7 +257,6 @@ def __init__(
inbox,
):
super().__init__()
self._agent_context = None
self._role_handler = role_handler
self._aid = aid
self._inbox = inbox
Expand All @@ -268,7 +272,7 @@ def data(self):

@property
def current_timestamp(self) -> float:
return self._agent_context.current_timestamp
return self.context.current_timestamp

def _get_container(self):
return self._role_handler._data
Expand Down Expand Up @@ -334,7 +338,7 @@ async def send_message(
**kwargs,
):
self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs)
return await self._agent_context.send_message(
return await self.context.send_message(
content=content,
receiver_addr=receiver_addr,
sender_id=self.aid,
Expand Down Expand Up @@ -362,14 +366,6 @@ def subscribe_event(self, role: Role, event_type: Any, handler_method: Callable)
"""
self._role_handler.subscribe_event(role, event_type, handler_method)

@property
def addr(self):
return self._agent_context.addr

@property
def aid(self):
return self._aid

def deactivate(self, role) -> None:
self._role_handler.deactivate(role)

Expand All @@ -396,7 +392,7 @@ def __init__(self):
Using the generated aid-style ("agentX") is not allowed.
"""
super().__init__()
self._role_handler = RoleHandler(None, None)
self._role_handler = RoleHandler(None)
self._role_context = RoleContext(self._role_handler, self.aid, self.inbox)

def on_start(self):
Expand All @@ -406,8 +402,7 @@ def on_ready(self):
self._role_context.on_ready()

def on_register(self):
self._role_context._agent_context = self.context
self._role_handler._agent_context = self.context
self._role_context.context = self.context
self._role_context.scheduler = self.scheduler
self._role_handler._scheduler = self.scheduler
self._role_context._aid = self.aid
Expand Down
2 changes: 1 addition & 1 deletion mango/container/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def start(self):

"""Start the container. It totally depends on the implementation for what is actually happening."""
for agent in self._agents.values():
agent.on_start()
agent._do_start()

def on_ready(self):
for agent in self._agents.values():
Expand Down
3 changes: 3 additions & 0 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ async def start_agent_loop():
agent_creator(container)
process_initialized_event.set()

for agent in container._agents.values():
agent._do_start()
container.running = True
while not terminate_event.is_set():
await asyncio.sleep(WAIT_STEP)
await container.shutdown()
Expand Down
Loading