Skip to content

Commit

Permalink
Merge pull request #143 from OFFIS-DAI/improve_mqtt
Browse files Browse the repository at this point in the history
improve mqtt QoS and return types
  • Loading branch information
rcschrg authored Dec 2, 2024
2 parents 9337527 + 373ad84 commit f89fd83
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 27 deletions.
5 changes: 5 additions & 0 deletions docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ We recommend Mosquitto__. On Debian/Ubuntu it can be installed as follows:
$ sudo apt-get install mosquitto
Note that mosquitto requires to have a config since v2.x for it to be accessible from outside your machine.
It may as well be desired to set the option `set_tcp_nodelay=true` in the `mosquitto.conf` to improve round-trip time.
Using the default QoS setting of 0 is recommended.

__ https://mosquitto.org/


Expand Down
4 changes: 2 additions & 2 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def send_message(
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
):
) -> bool:
"""
See container.send_message(...)
"""
Expand Down Expand Up @@ -114,7 +114,7 @@ async def send_message(
content,
receiver_addr: AgentAddress,
**kwargs,
):
) -> bool:
"""
See container.send_message(...)
"""
Expand Down
2 changes: 1 addition & 1 deletion mango/agent/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async def send_message(
content,
receiver_addr: AgentAddress,
**kwargs,
):
) -> bool:
self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs)
return await self.context.send_message(
content=content,
Expand Down
9 changes: 4 additions & 5 deletions mango/container/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,10 @@ def _send_internal_message(

# first delegate to process manager to possibly reroute the message
if receiver_id not in self._agents:
(
result,
inbox_overwrite,
) = self._container_process_manager.pre_hook_send_internal_message(
message, receiver_id, priority, default_meta
result, inbox_overwrite = (
self._container_process_manager.pre_hook_send_internal_message(
message, receiver_id, priority, default_meta
)
)
if result is not None:
return result
Expand Down
5 changes: 2 additions & 3 deletions mango/container/external_coupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ async def send_message(
message=message, receiver_id=receiver_id, default_meta=meta
)
self._new_internal_message = True
return success
else:
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
success = await self._send_external_message(receiver_addr, message)

return success
return await self._send_external_message(receiver_addr, message)

async def _send_external_message(self, addr, message) -> bool:
"""
Expand Down
6 changes: 3 additions & 3 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
):
) -> tuple[bool, str]:
"""Hook in before an internal message is sent. Capable of preventing the default
send_internal_message call.
Therefore this method is able to reroute messages without side effects.
Expand Down Expand Up @@ -340,7 +340,7 @@ async def _send_to_message_pipe(

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
):
) -> tuple[bool, str]:
self._out_queue.put_nowait((message, receiver_id, priority, default_meta))
return True, None

Expand Down Expand Up @@ -436,7 +436,7 @@ async def _handle_process_message(self, pipe: AioDuplex):

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
):
) -> tuple[bool, str]:
target_inbox = None
if self._active:
target_inbox = self._find_sp_queue(receiver_id)
Expand Down
26 changes: 17 additions & 9 deletions mango/container/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ def on_sub(client, userdata, mid, reason_code_list, properties):
mqtt_messenger.on_subscribe = on_sub

# subscribe topic
result, _ = mqtt_messenger.subscribe(self.inbox_topic, 2)
# set maximum QoS to 2
result, _ = mqtt_messenger.subscribe(self.inbox_topic, qos=2)
if result != paho.MQTT_ERR_SUCCESS:
# subscription to inbox topic was not successful
mqtt_messenger.disconnect()
Expand Down Expand Up @@ -348,7 +349,7 @@ async def send_message(
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
):
) -> bool:
"""
The container sends the message of one of its own agents to a specific topic.
Expand Down Expand Up @@ -393,28 +394,35 @@ async def send_message(
message = content
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
self._send_external_message(
topic=receiver_addr.protocol_addr, message=message
return self._send_external_message(
topic=receiver_addr.protocol_addr,
message=message,
qos=actual_mqtt_kwargs.get("qos", 0),
retain=actual_mqtt_kwargs.get("retain", False),
)
return True

def _send_external_message(self, *, topic: str, message):
def _send_external_message(
self, *, topic: str, message, qos: int = 0, retain: bool = False
) -> bool:
"""
:param topic: MQTT topic
:param message: The ACL message
:param qos: The quality of service param passed to mqtt publish
:param retain: The retain param passed to mqtt publish
:return:
"""
encoded_message = self.codec.encode(message)
logger.debug("Sending message;%s;%s", message, topic)
self.mqtt_client.publish(topic, encoded_message)
msg_info = self.mqtt_client.publish(topic, encoded_message, qos, retain)
return msg_info.is_published()

async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 0) -> bool:
async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 2) -> bool:
"""
:param aid: aid of the corresponding agent
:param topic: topic to subscribe (wildcards are allowed)
:param qos: The quality of service for the subscription
:param qos: The maximum quality of service this subscription supports
:return: A boolean signaling if subscription was true or not
"""
if aid not in self._agents.keys():
Expand Down
6 changes: 2 additions & 4 deletions mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,18 @@ async def send_message(
if protocol_addr == self.addr:
# internal message
meta["network_protocol"] = "tcp"
success = self._send_internal_message(
return self._send_internal_message(
content, receiver_addr.aid, default_meta=meta
)
else:
message = content
# if the user does not provide a splittable content, we create the default one
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
success = await self._send_external_message(
return await self._send_external_message(
receiver_addr.protocol_addr, message, meta
)

return success

async def _send_external_message(self, addr, message, meta) -> bool:
"""
Sends *message* to another container at *addr*
Expand Down

0 comments on commit f89fd83

Please sign in to comment.