Skip to content

Commit

Permalink
Merge pull request #123 from OFFIS-DAI/development
Browse files Browse the repository at this point in the history
2.1.0
  • Loading branch information
rcschrg authored Oct 23, 2024
2 parents fa22d67 + daa701b commit 34e9497
Show file tree
Hide file tree
Showing 31 changed files with 609 additions and 305 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/publish-mango.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ jobs:
pip install virtualenv
virtualenv venv
source venv/bin/activate
pip3 install -r requirements.txt
pip3 install -e .
pip3 install -e .[test]
sudo apt update
sudo apt install --assume-yes mosquitto
sudo service mosquitto start
Expand All @@ -33,7 +32,8 @@ jobs:
- name: Build package
run: |
source venv/bin/activate
python setup.py sdist bdist_wheel
python -m pip install build
python -m build
- name: Publish package
uses: pypa/gh-action-pypi-publish@release/v1
with:
Expand Down
13 changes: 7 additions & 6 deletions .github/workflows/test-mango.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ jobs:
source venv/bin/activate
pip3 install -U sphinx
pip3 install -r docs/requirements.txt
pip3 install -r requirements.txt
pip3 install -e .
pip3 install -e .[test]
brew install mosquitto
brew services start mosquitto
pip3 install pytest coverage ruff
Expand Down Expand Up @@ -76,8 +75,7 @@ jobs:
source venv/bin/activate
pip3 install -U sphinx
pip3 install -r docs/requirements.txt
pip3 install -r requirements.txt
pip3 install -e .
pip3 install -e .[test]
sudo apt update
sudo apt install --assume-yes mosquitto
sudo service mosquitto start
Expand All @@ -95,5 +93,8 @@ jobs:
- name: Test+Coverage
run: |
source venv/bin/activate
coverage run -m pytest
coverage report
pytest --cov --cov-report=xml
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: false
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
author = "mango team"

# The full version, including alpha/beta/rc tags
version = release = "2.0.0"
version = release = "2.1.0"


# -- General configuration ---------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ Features
message exchange
role-api
scheduling
topology
codecs
api_ref/index
migration
Expand Down
81 changes: 81 additions & 0 deletions docs/source/topology.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
================
Topologies
================

The framework provides out of the box support for creating and distributing topologies to the agents.
This can be done using either :meth:`mango.create_topology` or :meth:`mango.per_node`. With the first method
a topology has to be created from scratch, and for every node you need to assign the agents.

.. testcode::

import asyncio
from typing import Any

from mango import Agent, run_with_tcp, create_topology

class TopAgent(Agent):
counter: int = 0

def handle_message(self, content, meta: dict[str, Any]):
self.counter += 1

async def start_example():
agents = [TopAgent(), TopAgent(), TopAgent()]
with create_topology() as topology:
id_1 = topology.add_node(agents[0])
id_2 = topology.add_node(agents[1])
id_3 = topology.add_node(agents[2])
topology.add_edge(id_1, id_2)
topology.add_edge(id_1, id_3)

async with run_with_tcp(1, *agents):
for neighbor in agents[0].neighbors():
await agents[0].send_message("hello neighbors", neighbor)
await asyncio.sleep(0.1)
print(agents[1].counter)
print(agents[2].counter)

asyncio.run(start_example())

.. testoutput::

1
1

The other method would be to use :meth:`mango.per_node`. Here you want to create a topology beforehand, this can be done using :meth:`mango.custom_topology` by providing
a networkx Graph.

.. testcode::

import asyncio
from typing import Any

from mango import Agent, run_with_tcp, per_node, complete_topology

class TopAgent(Agent):
counter: int = 0

def handle_message(self, content, meta: dict[str, Any]):
self.counter += 1

async def start_example():
topology = complete_topology(3)
for node in per_node(topology):
agent = TopAgent()
node.add(agent)

async with run_with_tcp(1, *topology.agents):
for neighbor in topology.agents[0].neighbors():
await topology.agents[0].send_message("hello neighbors", neighbor)
await asyncio.sleep(0.1)
print(topology.agents[1].counter)
print(topology.agents[2].counter)

asyncio.run(start_example())

.. testoutput::

1
1
7 changes: 7 additions & 0 deletions mango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@
PROTOBUF,
SerializationError,
)
from .express.topology import (
Topology,
create_topology,
complete_topology,
per_node,
custom_topology,
)
51 changes: 47 additions & 4 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import logging
from abc import ABC
from dataclasses import dataclass
from enum import Enum
from typing import Any

from ..util.clock import Clock
Expand All @@ -23,6 +24,21 @@ class AgentAddress:
aid: str


class State(Enum):
NORMAL = 0 # normal neighbor
INACTIVE = (
1 # neighbor link exists but link is not active (could be activated/used)
)
BROKEN = 2 # neighbor link exists but link is not usable (can not be activated)


class TopologyService:
state_to_neighbors: dict[State, list] = dict()

def neighbors(self, state: State = State.NORMAL):
return [f() for f in self.state_to_neighbors.get(state, [])]


class AgentContext:
def __init__(self, container) -> None:
self._container = container
Expand Down Expand Up @@ -69,6 +85,7 @@ def __init__(self) -> None:
self.context: AgentContext = None
self.scheduler: Scheduler = None
self._aid = None
self._services = {}

def on_start(self):
"""Called when container started in which the agent is contained"""
Expand Down Expand Up @@ -344,6 +361,28 @@ async def tasks_complete(self, timeout=1):
"""
await self.scheduler.tasks_complete(timeout=timeout)

def service_of_type(self, type: type, default: Any = None) -> Any:
"""Return the service of the type ``type`` or set the default as service and return it.
:param type: the type of the service
:type type: type
:param default: the default if applicable
:type default: Any (optional)
:return: the service
:rtype: Any
"""
if type not in self._services:
self._services[type] = default
return self._services[type]

def neighbors(self, state: State = State.NORMAL) -> list[AgentAddress]:
"""Return the neighbors of the agent (controlled by the topology api).
:return: the list of agent addresses filtered by state
:rtype: list[AgentAddress]
"""
return self.service_of_type(TopologyService).neighbors(state)


class Agent(ABC, AgentDelegates):
"""Base class for all agents."""
Expand Down Expand Up @@ -373,24 +412,28 @@ def suspendable_tasks(self):

@suspendable_tasks.setter
def suspendable_tasks(self, value: bool):
self.self.scheduler.suspendable = value
self.scheduler.suspendable = value

def on_register(self):
"""
Hook-in to define behavior of the agent directly after it got registered by a container
"""

def _do_register(self, container, aid):
self._check_inbox_task = asyncio.create_task(self._check_inbox())
self._check_inbox_task.add_done_callback(self._raise_exceptions)
self._stopped = asyncio.Future()
self._aid = aid
self.context = AgentContext(container)
self.scheduler = Scheduler(
suspendable=True, observable=True, clock=container.clock
)
self.on_register()

def _do_start(self):
self._check_inbox_task = asyncio.create_task(self._check_inbox())
self._check_inbox_task.add_done_callback(self._raise_exceptions)
self._stopped = asyncio.Future()

self.on_start()

def _raise_exceptions(self, fut: asyncio.Future):
"""
Inline function used as a callback to raise exceptions
Expand Down
37 changes: 16 additions & 21 deletions mango/agent/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

import asyncio
from abc import ABC
from typing import Any, Callable
from collections.abc import Callable
from typing import Any

from mango.agent.core import Agent, AgentAddress, AgentDelegates

Expand All @@ -49,11 +50,11 @@ def __setitem__(self, key, newvalue):
def __contains__(self, key):
return hasattr(self, key)

def get(self, key):
def get(self, key, default=None):
if key in self:
return self[key]
else:
return None
return default

def update(self, data: dict):
for k, v in data.items():
Expand All @@ -67,15 +68,14 @@ class Role:
class RoleHandler:
"""Contains all roles and their models. Implements the communication between roles."""

def __init__(self, agent_context, scheduler):
def __init__(self, scheduler):
self._role_models = {}
self._roles = []
self._role_to_active = {}
self._role_model_type_to_subs = {}
self._message_subs = []
self._send_msg_subs = {}
self._role_event_type_to_handler = {}
self._agent_context = agent_context
self._scheduler = scheduler
self._data = DataContainer()

Expand Down Expand Up @@ -185,11 +185,16 @@ def handle_message(self, content, meta: dict[str, Any]):
:param content: content
:param meta: meta
"""
for role in self.roles:
role.handle_message(content, meta)
handle_message_found = False
for role, message_condition, method, _ in self._message_subs:
# do not execute handle_message twice if role has subscription as well
if method.__name__ == "handle_message":
handle_message_found = True
if self._is_role_active(role) and message_condition(content, meta):
method(content, meta)
if not handle_message_found:
for role in self.roles:
role.handle_message(content, meta)

def _notify_send_message_subs(self, content, receiver_addr: AgentAddress, **kwargs):
for role in self._send_msg_subs:
Expand Down Expand Up @@ -252,7 +257,6 @@ def __init__(
inbox,
):
super().__init__()
self._agent_context = None
self._role_handler = role_handler
self._aid = aid
self._inbox = inbox
Expand All @@ -268,7 +272,7 @@ def data(self):

@property
def current_timestamp(self) -> float:
return self._agent_context.current_timestamp
return self.context.current_timestamp

def _get_container(self):
return self._role_handler._data
Expand Down Expand Up @@ -334,7 +338,7 @@ async def send_message(
**kwargs,
):
self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs)
return await self._agent_context.send_message(
return await self.context.send_message(
content=content,
receiver_addr=receiver_addr,
sender_id=self.aid,
Expand Down Expand Up @@ -362,14 +366,6 @@ def subscribe_event(self, role: Role, event_type: Any, handler_method: Callable)
"""
self._role_handler.subscribe_event(role, event_type, handler_method)

@property
def addr(self):
return self._agent_context.addr

@property
def aid(self):
return self._aid

def deactivate(self, role) -> None:
self._role_handler.deactivate(role)

Expand All @@ -396,7 +392,7 @@ def __init__(self):
Using the generated aid-style ("agentX") is not allowed.
"""
super().__init__()
self._role_handler = RoleHandler(None, None)
self._role_handler = RoleHandler(None)
self._role_context = RoleContext(self._role_handler, self.aid, self.inbox)

def on_start(self):
Expand All @@ -406,8 +402,7 @@ def on_ready(self):
self._role_context.on_ready()

def on_register(self):
self._role_context._agent_context = self.context
self._role_handler._agent_context = self.context
self._role_context.context = self.context
self._role_context.scheduler = self.scheduler
self._role_handler._scheduler = self.scheduler
self._role_context._aid = self.aid
Expand Down
2 changes: 1 addition & 1 deletion mango/container/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ async def start(self):

"""Start the container. It totally depends on the implementation for what is actually happening."""
for agent in self._agents.values():
agent.on_start()
agent._do_start()

def on_ready(self):
for agent in self._agents.values():
Expand Down
3 changes: 3 additions & 0 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ async def start_agent_loop():
agent_creator(container)
process_initialized_event.set()

for agent in container._agents.values():
agent._do_start()
container.running = True
while not terminate_event.is_set():
await asyncio.sleep(WAIT_STEP)
await container.shutdown()
Expand Down
Loading

0 comments on commit 34e9497

Please sign in to comment.