diff --git a/docs/requirements.txt b/docs/requirements.txt index b44f58f..f8deb3a 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -2,3 +2,4 @@ pika pyzmq furo>=2024.8.6 +sphinx-copybutton diff --git a/docs/source/conf.py b/docs/source/conf.py index 95f9205..f17448b 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -11,7 +11,7 @@ author = "mango team" # The full version, including alpha/beta/rc tags -version = release = "2.1.1" +version = release = "2.1.3" # -- General configuration --------------------------------------------------- @@ -25,6 +25,8 @@ "sphinx.ext.intersphinx", "sphinx.ext.graphviz", "sphinx.ext.imgmath", + "sphinx.ext.viewcode", + "sphinx_copybutton", ] diff --git a/docs/source/installation.rst b/docs/source/installation.rst index a311dc8..606336a 100644 --- a/docs/source/installation.rst +++ b/docs/source/installation.rst @@ -35,6 +35,11 @@ We recommend Mosquitto__. On Debian/Ubuntu it can be installed as follows: $ sudo apt-get install mosquitto + +Note that mosquitto requires to have a config since v2.x for it to be accessible from outside your machine. +It may as well be desired to set the option `set_tcp_nodelay=true` in the `mosquitto.conf` to improve round-trip time. +Using the default QoS setting of 0 is recommended. + __ https://mosquitto.org/ diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index a0bb451..da2e678 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -6,19 +6,17 @@ Tutorial Introduction *************** -This tutorial gives an overview of the basic functions of mango agents and containers. It consists of four -parts building a scenario of two PV plants, operated by their respective agents being directed by a remote +This tutorial gives an overview of the essential functions of mango agents and containers. It consists of four +parts in which we are building a scenario of two solar power plants, operated by their respective agents being directed by a remote controller. -Subsequent parts either extend the functionality or simplify some concept in the previous part. +Subsequent parts either extend the functionality or simplify some concepts in the previous part. -As a whole, this tutorial covers: - - container and agent creation - - message passing within a container - - message passing between containers - - codecs - - scheduling - - roles +As a whole, this tutorial covers + #. container and agent creation, message passing within a container + #. message passing between containers + #. codecs + #. scheduling and roles. ***************************** @@ -29,10 +27,10 @@ For your first mango tutorial, you will learn the fundamentals of creating mango as making them communicate with each other. This example covers: - - container - - agent creation - - basic message passing - - clean shutdown of containers + - container + - agent creation + - basic message passing + - clean shutdown of containers First, we want to create two simple agents and have the container send a message to one of them. An agent is created by defining a class that inherits from the base Agent class of mango. @@ -76,14 +74,17 @@ pattern: This will run in the asyncio loop. First, we create the container. A tcp container is created via the :meth:`mango.create_tcp_container` function which requires at least -the address of the container as a parameter. Other container types available by using :meth:`mango.create_mqtt_container` and :meth:`mango.create_ec_container`. -For this tutorial we will cover the tcp container. +the address of the container as a parameter. The address consists of the host (IP) and a port number (usually, you can choose an arbitrary number). +Other container types available by using :meth:`mango.create_mqtt_container` and :meth:`mango.create_ec_container`. +For this tutorial, we will cover the tcp container. .. testcode:: from mango import create_tcp_container - PV_CONTAINER_ADDRESS = ("127.0.0.1", 5555) + HOST = "127.0.0.1" + PORT = 5555 + PV_CONTAINER_ADDRESS = (HOST, PORT) pv_container = create_tcp_container(addr=PV_CONTAINER_ADDRESS) @@ -93,7 +94,8 @@ For this tutorial we will cover the tcp container. ('127.0.0.1', 5555) -Now we can create our agents. Agents always live inside a container and therefore need to be registered to the container. +Now, we can create our agents. Agents must always be registered to the container to enable messaging and task scheduling (will be introduced later). +Registration will assign the agent an ``aid``, which, together with the protocol address (HOST + PORT), is the so-called agent address. .. testcode:: @@ -113,11 +115,10 @@ Now we can create our agents. Agents always live inside a container and therefor AgentAddress(protocol_addr=('127.0.0.1', 5555), aid='agent1') For now, our agents and containers are purely passive entities. First, we need to activate the container to start -the tcp server and its internal asynchronous behavior. In mango this can be done with :meth:`mango.activate` and the `async with` syntax. +the tcp server and its internal asynchronous behavior. In mango, this can be done with :meth:`mango.activate` and the `async with` syntax. Second, we need to send a message from one agent to the other. Messages are passed by the container via the :meth:`mango.Agent.send_message` -function always at least expects some content and a target agent address. To send a message directly to an agent, we also need to provide -its agent id which is set by the container when the agent is created. The address of the container and the aid -is wrapped in the :class:`mango.AgentAddress` class and can be retrieved with :meth:`mango.Agent.addr`. +function. This function always at least expects some content and a target agent address. Note, that The address of the container and the aid +is wrapped as the agent address in the :class:`mango.AgentAddress` class and can be retrieved with :meth:`mango.Agent.addr`. .. testcode:: @@ -151,20 +152,19 @@ is wrapped in the :class:`mango.AgentAddress` class and can be retrieved with :m ********************************* In the previous example, you learned how to create mango agents and containers and how to send basic messages between them. -In this example, you expand upon this. We introduce a controller agent that asks the current feed_in of our PV agents and -subsequently limits the output of both to their minimum. +In this example, you expand upon this. We introduce a controller agent that asks the current feed_in (provided power) of our PV agents and +subsequently limits the output of both to their minimum. This will happen using different containers, which introduces inter-container +communication. This is useful if you need to run your agents in different processes or even on different computing units (i.e., for performance reasons). This example covers: - - message passing between different containers - - basic task scheduling - - setting custom agent ids - - use of metadata - -First, we define our controller Agent. To ensure it can message the pv agents we pass that information -directly to it in the constructor. The control agent will send out messages to each pv agent, await their -replies and act according to that information. To handle this, we also add some control structures to the + - message passing between different containers + - basic task scheduling + - setting custom agent ids + - use of metadata + +First, we define our controller Agent. We pass that information directly to the constructor to ensure it can message the PV agents. The control agent will send out messages to each PV agent, await their +replies, and act according to that information. To handle this, we also add some control structures to the constructor that we will later use to keep track of which agents have already answered our messages. -As an additional feature, we will make it possible to manually set the agent of our agents by. .. testcode:: @@ -188,12 +188,15 @@ As an additional feature, we will make it possible to manually set the agent of [AgentAddress(protocol_addr='protocol_addr', aid='aid')] Next, we set up its :meth:`mango.Agent.handle_message` function. The controller needs to distinguish between two message types: -The replies to feed_in requests and later the acknowledgements that a new maximum feed_in was set by a pv agent. +The replies to feed_in requests and later the acknowledgments that a new maximum feed_in was set by a pv agent. We assign the key `performative` of the metadata of the message to do this. We set the `performative` entry to `inform` -for feed_in replies and to `accept_proposal` for feed_in change acknowledgements. +for feed_in replies and to `accept_proposal` for feed_in change acknowledgements. The task of the performative is here to +mark the content we send, this enables receiving agents to handle it accordingly. .. testcode:: + from mango import Agent, Performatives + class ControllerAgent(Agent): def __init__(self, known_agents): super().__init__() @@ -231,7 +234,7 @@ We do the same for our PV agents. .. testcode:: - from mango import sender_addr + from mango import Agent, Performatives, sender_addr PV_FEED_IN = { "PV Agent 0": 2.0, @@ -280,11 +283,10 @@ We do the same for our PV agents. When a PV agent receives a request from the controller, it immediately answers. Note two important changes to the first example here: First, within our message handling methods we can not ``await send_message`` directly -because ``handle_message`` is not a coroutine. Instead, we pass ``send_message`` as a task to the scheduler to be -executed at once via the ``schedule_instant_task`` method. +because ``handle_message`` is not a coroutine. Instead, we call the :meth:`mango.Agent.schedule_instant_message``, which will schedule a send message coroutine. Second, we set ``meta`` to contain the typing information of our message. -Now both of our agents can handle their respective messages. The last thing to do is make the controller actually +Now, both of our agents can handle their respective messages. The last thing to do is make the controller actually perform its active actions. We do this by implementing a ``run`` function with the following control flow: - send a feed_in request to each known pv agent - wait for all pv agents to answer @@ -417,14 +419,13 @@ messages. If instances of custom classes are exchanged over the network (or generally between different containers), these instances need to be serialized. In mango, objects can be encoded by mango's codecs. To make a new object type -known to a codec it needs to provide a serialization and a deserialization method. The object type together -with these methods is then passed to the codec which in turn is passed to a container. The container will then +known to a codec, a serialization and deserialization method must be provided. The object type and these methods are then passed to the codec, which is then passed to a container. The container will then automatically use these methods when it encounters an object of this type as the content of a message. This example covers: - - message classes - - codec basics - - the json_serializable decorator + - message classes + - codec basics + - the json_serializable decorator We want to use the types of custom message objects as the new mechanism for message typing. We define these as simple data classes. For simple classes like this, we can use the :meth:`mango.json_serializable`` decorator to @@ -481,7 +482,7 @@ Next, we need to create a codec, make our message objects known to it, and pass Any time the content of a message matches one of these types now the corresponding serialize and deserialize functions are called. Of course, you can also create your own serialization and deserialization functions with -more sophisticated behaviours and pass them to the codec. For more details refer to the :doc:`codecs` section of +more sophisticated behaviours and pass them to the codec. For more details, refer to the :doc:`codecs` section of the documentation. With this, the message handling in our agent classes can be simplified: @@ -633,23 +634,23 @@ by a corresponding "pong". Periodic tasks can be handled for you by mango's sche With the introduction of this task, we know have different responsibilities for the agents (e. g. act as PVAgent and reply to ping requests). In order to facilitate structuring an agent with different -responsibilities we can use the role API. +responsibilities, we can use the role API. The idea of using roles is to divide the functionality of an agent by responsibility in a structured way. A role is a python object that can be assigned to a RoleAgent. There are several lifecycle functions each role may implement: - - ``__init__`` - where you do the initial object setup - - :meth:`mango.Role.setup` - which is called when the role is assigned to an agent - - :meth:`mango.Role.on_start` - which is called when the container is started - - :meth:`mango.Role.on_ready` - which is called when are activated + - ``__init__`` - where you do the initial object setup + - :meth:`mango.Role.setup` - which is called when the role is assigned to an agent + - :meth:`mango.Role.on_start` - which is called when the container is started + - :meth:`mango.Role.on_ready` - which is called when are activated This distinction is relevant because not all features exist after construction with ``__init__``. Most of the time you want to implement :meth:`mango.Role.on_ready` for actions like message sending, or scheduling, because only -since this point you can be sure that all relevant container are started and the agent the role belongs to has been registered. +Since this point, you can be sure that all relevant containers have been started and the agent the role belongs to has been registered. However, the setup of the role itself should be done in :meth:`mango.Role.setup`. This example covers: - - role API basics - - scheduling and periodic tasks + - role API basics + - scheduling and periodic tasks The key part of defining roles are their ``__init__``, :meth:`mango.Role.setup`, and :meth:`mango.Role.on_ready` methods. The first is called to create the role object. The second is called when the role is assigned to @@ -662,11 +663,11 @@ The idea of the condition function is to allow to define a condition filtering i Another idea is that sending messages from the role is now done via its context with the method: ``self.context.send_message```. -We first create the `Ping` role, which has to periodically send out its messages. +We first create the `Ping` role, which has to send out its messages periodically. We can use mango's scheduling API to handle this for us via the :meth:`mango.RoleContext.schedule_periodic_task` function. This takes a coroutine to execute and a time interval. Whenever the time interval runs out the coroutine is triggered. With the scheduling API you can -also run tasks at specific times. For a full overview we refer to the documentation. +also run tasks at specific times. For a full overview, we refer to the documentation. .. testcode:: diff --git a/mango/agent/core.py b/mango/agent/core.py index d8ca846..98775eb 100644 --- a/mango/agent/core.py +++ b/mango/agent/core.py @@ -65,7 +65,7 @@ async def send_message( receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs, - ): + ) -> bool: """ See container.send_message(...) """ @@ -114,7 +114,7 @@ async def send_message( content, receiver_addr: AgentAddress, **kwargs, - ): + ) -> bool: """ See container.send_message(...) """ diff --git a/mango/agent/role.py b/mango/agent/role.py index 7401c1b..eea3457 100644 --- a/mango/agent/role.py +++ b/mango/agent/role.py @@ -336,7 +336,7 @@ async def send_message( content, receiver_addr: AgentAddress, **kwargs, - ): + ) -> bool: self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs) return await self.context.send_message( content=content, diff --git a/mango/container/core.py b/mango/container/core.py index 0aeb616..9921675 100644 --- a/mango/container/core.py +++ b/mango/container/core.py @@ -32,6 +32,7 @@ def __init__( clock: Clock, copy_internal_messages=False, mirror_data=None, + mp_method="spawn", **kwargs, ): self.name: str = name @@ -64,7 +65,9 @@ def __init__( self, self._mirror_data ) else: - self._container_process_manager = MainContainerProcessManager(self) + self._container_process_manager = MainContainerProcessManager( + self, mp_method + ) def _all_aids(self): all_aids = list(self._agents.keys()) + self._container_process_manager.aids @@ -178,11 +181,10 @@ def _send_internal_message( # first delegate to process manager to possibly reroute the message if receiver_id not in self._agents: - ( - result, - inbox_overwrite, - ) = self._container_process_manager.pre_hook_send_internal_message( - message, receiver_id, priority, default_meta + result, inbox_overwrite = ( + self._container_process_manager.pre_hook_send_internal_message( + message, receiver_id, priority, default_meta + ) ) if result is not None: return result diff --git a/mango/container/external_coupling.py b/mango/container/external_coupling.py index 48d0abd..133991b 100644 --- a/mango/container/external_coupling.py +++ b/mango/container/external_coupling.py @@ -111,12 +111,11 @@ async def send_message( message=message, receiver_id=receiver_id, default_meta=meta ) self._new_internal_message = True + return success else: if not hasattr(content, "split_content_and_meta"): message = MangoMessage(content, meta) - success = await self._send_external_message(receiver_addr, message) - - return success + return await self._send_external_message(receiver_addr, message) async def _send_external_message(self, addr, message) -> bool: """ diff --git a/mango/container/factory.py b/mango/container/factory.py index d06e234..78fb1a6 100644 --- a/mango/container/factory.py +++ b/mango/container/factory.py @@ -26,6 +26,19 @@ def create_mqtt( copy_internal_messages: bool = False, **kwargs, ): + """ + This method is called to instantiate an MQTT container + + :param broker_addr: The address of the broker this container will connect to. it has to be a tuple of (host, port). + :param client_id: The id of the MQTT Client + :param codec: Defines the codec to use. Defaults to JSON + :param clock: The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock + :param inbox_topic: Default subscription to the a specific MQTT topic + :param copy_internal_messages: Explicitly copy internal messages. Defaults to False + + :return: The instance of a MQTTContainer + """ + if codec is None: codec = JSON() if clock is None: @@ -67,9 +80,11 @@ def create_tcp( """ This method is called to instantiate a tcp container + :param addr: The address to use. it has to be a tuple of (host, port). :param codec: Defines the codec to use. Defaults to JSON :param clock: The clock that the scheduler of the agent should be based on. Defaults to the AsyncioClock - :param addr: the address to use. it has to be a tuple of (host, port). + :param copy_internal_messages: Explicitly copy internal messages. Defaults to False + :param auto_port: Whether you want to let the operating system pick the port. Defaults to False :return: The instance of a TCPContainer """ diff --git a/mango/container/mp.py b/mango/container/mp.py index 0d3c63e..84247c1 100644 --- a/mango/container/mp.py +++ b/mango/container/mp.py @@ -1,6 +1,7 @@ import asyncio import logging import os +import warnings from collections.abc import Callable from dataclasses import dataclass from multiprocessing import get_context @@ -204,7 +205,7 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato def pre_hook_send_internal_message( self, message, receiver_id, priority, default_meta - ): + ) -> tuple[bool, str]: """Hook in before an internal message is sent. Capable of preventing the default send_internal_message call. Therefore this method is able to reroute messages without side effects. @@ -339,7 +340,7 @@ async def _send_to_message_pipe( def pre_hook_send_internal_message( self, message, receiver_id, priority, default_meta - ): + ) -> tuple[bool, str]: self._out_queue.put_nowait((message, receiver_id, priority, default_meta)) return True, None @@ -362,11 +363,12 @@ class MainContainerProcessManager(BaseContainerProcessManager): def __init__( self, container, + mp_method: str = "spawn", ) -> None: self._active = False self._container = container self._mp_enabled = False - self._ctx = get_context("spawn") + self._ctx = get_context(mp_method) self._agent_process_init_list = [] self._started = False @@ -434,7 +436,7 @@ async def _handle_process_message(self, pipe: AioDuplex): def pre_hook_send_internal_message( self, message, receiver_id, priority, default_meta - ): + ) -> tuple[bool, str]: target_inbox = None if self._active: target_inbox = self._find_sp_queue(receiver_id) @@ -510,7 +512,16 @@ def _create_agent_process_bytes( from_pipe_message, to_pipe_message = aioduplex(self._ctx) from_pipe, to_pipe = aioduplex(self._ctx) process_initialized = self._ctx.Event() - with to_pipe.detach() as to_pipe, to_pipe_message.detach() as to_pipe_message: + with ( + warnings.catch_warnings(), + to_pipe.detach() as to_pipe, + to_pipe_message.detach() as to_pipe_message, + ): + warnings.filterwarnings( + "ignore", + message=r".*This process .* is multi-threaded, use of fork\(\) may lead to deadlocks.*", + category=DeprecationWarning, + ) agent_process = self._ctx.Process( target=create_agent_process_environment, args=( diff --git a/mango/container/mqtt.py b/mango/container/mqtt.py index 12bf048..76f6a78 100644 --- a/mango/container/mqtt.py +++ b/mango/container/mqtt.py @@ -208,7 +208,8 @@ def on_sub(client, userdata, mid, reason_code_list, properties): mqtt_messenger.on_subscribe = on_sub # subscribe topic - result, _ = mqtt_messenger.subscribe(self.inbox_topic, 2) + # set maximum QoS to 2 + result, _ = mqtt_messenger.subscribe(self.inbox_topic, qos=2) if result != paho.MQTT_ERR_SUCCESS: # subscription to inbox topic was not successful mqtt_messenger.disconnect() @@ -348,7 +349,7 @@ async def send_message( receiver_addr: AgentAddress, sender_id: None | str = None, **kwargs, - ): + ) -> bool: """ The container sends the message of one of its own agents to a specific topic. @@ -393,28 +394,35 @@ async def send_message( message = content if not hasattr(content, "split_content_and_meta"): message = MangoMessage(content, meta) - self._send_external_message( - topic=receiver_addr.protocol_addr, message=message + return self._send_external_message( + topic=receiver_addr.protocol_addr, + message=message, + qos=actual_mqtt_kwargs.get("qos", 0), + retain=actual_mqtt_kwargs.get("retain", False), ) - return True - def _send_external_message(self, *, topic: str, message): + def _send_external_message( + self, *, topic: str, message, qos: int = 0, retain: bool = False + ) -> bool: """ :param topic: MQTT topic :param message: The ACL message + :param qos: The quality of service param passed to mqtt publish + :param retain: The retain param passed to mqtt publish :return: """ encoded_message = self.codec.encode(message) logger.debug("Sending message;%s;%s", message, topic) - self.mqtt_client.publish(topic, encoded_message) + msg_info = self.mqtt_client.publish(topic, encoded_message, qos, retain) + return msg_info.is_published() - async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 0) -> bool: + async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 2) -> bool: """ :param aid: aid of the corresponding agent :param topic: topic to subscribe (wildcards are allowed) - :param qos: The quality of service for the subscription + :param qos: The maximum quality of service this subscription supports :return: A boolean signaling if subscription was true or not """ if aid not in self._agents.keys(): diff --git a/mango/container/tcp.py b/mango/container/tcp.py index fde0692..2bfdf53 100644 --- a/mango/container/tcp.py +++ b/mango/container/tcp.py @@ -239,7 +239,7 @@ async def send_message( if protocol_addr == self.addr: # internal message meta["network_protocol"] = "tcp" - success = self._send_internal_message( + return self._send_internal_message( content, receiver_addr.aid, default_meta=meta ) else: @@ -247,12 +247,10 @@ async def send_message( # if the user does not provide a splittable content, we create the default one if not hasattr(content, "split_content_and_meta"): message = MangoMessage(content, meta) - success = await self._send_external_message( + return await self._send_external_message( receiver_addr.protocol_addr, message, meta ) - return success - async def _send_external_message(self, addr, message, meta) -> bool: """ Sends *message* to another container at *addr* diff --git a/mango/util/distributed_clock.py b/mango/util/distributed_clock.py index b088265..b47a3be 100644 --- a/mango/util/distributed_clock.py +++ b/mango/util/distributed_clock.py @@ -121,11 +121,11 @@ async def get_next_event(self): if self.schedules: next_event = min(self.schedules) else: - logger.warning("%s: no new events, time stands still", self.aid) + logger.info("%s: no new events, time stands still", self.aid) next_event = self.scheduler.clock.time if next_event < self.scheduler.clock.time: - logger.warning("%s: got old event, time stands still", self.aid) + logger.info("%s: got old event, time stands still", self.aid) next_event = self.scheduler.clock.time logger.debug("next event at %s", next_event) return next_event diff --git a/mango/util/scheduling.py b/mango/util/scheduling.py index 8ec62fd..4810643 100644 --- a/mango/util/scheduling.py +++ b/mango/util/scheduling.py @@ -5,6 +5,7 @@ import asyncio import concurrent.futures import datetime +import logging from abc import abstractmethod from asyncio import Future from dataclasses import dataclass @@ -16,6 +17,20 @@ from mango.util.clock import AsyncioClock, Clock, ExternalClock +logger = logging.getLogger(__name__) + + +def _raise_exceptions(fut: asyncio.Future): + """ + Inline function used as a callback to raise exceptions + :param fut: The Future object of the task + """ + if fut.exception() is not None: + try: + raise fut.exception() + except Exception: + logger.exception("got exception in scheduled event") + @dataclass class ScheduledProcessControl: @@ -507,6 +522,7 @@ def schedule_task(self, task: ScheduledTask, src=None) -> asyncio.Task: coro = task.run() l_task = asyncio.create_task(coro) l_task.add_done_callback(task.on_stop) + l_task.add_done_callback(_raise_exceptions) l_task.add_done_callback(self._remove_task) self._scheduled_tasks.append((task, l_task, coro, src)) return l_task @@ -703,6 +719,7 @@ def schedule_process_task(self, task: ScheduledProcessTask, src=None): ) l_task.add_done_callback(self._remove_process_task) l_task.add_done_callback(task.on_stop) + l_task.add_done_callback(_raise_exceptions) self._scheduled_process_tasks.append( (task, l_task, scheduled_process_control, src) ) diff --git a/pyproject.toml b/pyproject.toml index 507f6b2..f2687e6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "mango-agents" -version = "2.1.2" +version = "2.1.3" authors = [ { name="mango Team", email="mango@offis.de" }, ]