Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve mqtt QoS and return types #143

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/source/installation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ We recommend Mosquitto__. On Debian/Ubuntu it can be installed as follows:
$ sudo apt-get install mosquitto
Note that mosquitto requires to have a config since v2.x for it to be accessible from outside your machine.
It may as well be desired to set the option `set_tcp_nodelay=true` in the `mosquitto.conf` to improve round-trip time.
Using the default QoS setting of 0 is recommended.

__ https://mosquitto.org/


Expand Down
4 changes: 2 additions & 2 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ async def send_message(
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
):
) -> bool:
"""
See container.send_message(...)
"""
Expand Down Expand Up @@ -114,7 +114,7 @@ async def send_message(
content,
receiver_addr: AgentAddress,
**kwargs,
):
) -> bool:
"""
See container.send_message(...)
"""
Expand Down
2 changes: 1 addition & 1 deletion mango/agent/role.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async def send_message(
content,
receiver_addr: AgentAddress,
**kwargs,
):
) -> bool:
self._role_handler._notify_send_message_subs(content, receiver_addr, **kwargs)
return await self.context.send_message(
content=content,
Expand Down
9 changes: 4 additions & 5 deletions mango/container/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,10 @@ def _send_internal_message(

# first delegate to process manager to possibly reroute the message
if receiver_id not in self._agents:
(
result,
inbox_overwrite,
) = self._container_process_manager.pre_hook_send_internal_message(
message, receiver_id, priority, default_meta
result, inbox_overwrite = (
self._container_process_manager.pre_hook_send_internal_message(
message, receiver_id, priority, default_meta
)
)
if result is not None:
return result
Expand Down
5 changes: 2 additions & 3 deletions mango/container/external_coupling.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ async def send_message(
message=message, receiver_id=receiver_id, default_meta=meta
)
self._new_internal_message = True
return success
else:
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
success = await self._send_external_message(receiver_addr, message)

return success
return await self._send_external_message(receiver_addr, message)

async def _send_external_message(self, addr, message) -> bool:
"""
Expand Down
6 changes: 3 additions & 3 deletions mango/container/mp.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def create_agent_process(self, agent_creator, container, mirror_container_creato

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
):
) -> tuple[bool, str]:
"""Hook in before an internal message is sent. Capable of preventing the default
send_internal_message call.
Therefore this method is able to reroute messages without side effects.
Expand Down Expand Up @@ -340,7 +340,7 @@ async def _send_to_message_pipe(

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
):
) -> tuple[bool, str]:
self._out_queue.put_nowait((message, receiver_id, priority, default_meta))
return True, None

Expand Down Expand Up @@ -436,7 +436,7 @@ async def _handle_process_message(self, pipe: AioDuplex):

def pre_hook_send_internal_message(
self, message, receiver_id, priority, default_meta
):
) -> tuple[bool, str]:
target_inbox = None
if self._active:
target_inbox = self._find_sp_queue(receiver_id)
Expand Down
26 changes: 17 additions & 9 deletions mango/container/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ def on_sub(client, userdata, mid, reason_code_list, properties):
mqtt_messenger.on_subscribe = on_sub

# subscribe topic
result, _ = mqtt_messenger.subscribe(self.inbox_topic, 2)
# set maximum QoS to 2
result, _ = mqtt_messenger.subscribe(self.inbox_topic, qos=2)
if result != paho.MQTT_ERR_SUCCESS:
# subscription to inbox topic was not successful
mqtt_messenger.disconnect()
Expand Down Expand Up @@ -348,7 +349,7 @@ async def send_message(
receiver_addr: AgentAddress,
sender_id: None | str = None,
**kwargs,
):
) -> bool:
"""
The container sends the message of one of its own agents to a specific topic.

Expand Down Expand Up @@ -393,28 +394,35 @@ async def send_message(
message = content
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
self._send_external_message(
topic=receiver_addr.protocol_addr, message=message
return self._send_external_message(
topic=receiver_addr.protocol_addr,
message=message,
qos=actual_mqtt_kwargs.get("qos", 0),
retain=actual_mqtt_kwargs.get("retain", False),
)
return True

def _send_external_message(self, *, topic: str, message):
def _send_external_message(
self, *, topic: str, message, qos: int = 0, retain: bool = False
) -> bool:
"""

:param topic: MQTT topic
:param message: The ACL message
:param qos: The quality of service param passed to mqtt publish
:param retain: The retain param passed to mqtt publish
:return:
"""
encoded_message = self.codec.encode(message)
logger.debug("Sending message;%s;%s", message, topic)
self.mqtt_client.publish(topic, encoded_message)
msg_info = self.mqtt_client.publish(topic, encoded_message, qos, retain)
return msg_info.is_published()

async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 0) -> bool:
async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 2) -> bool:
"""

:param aid: aid of the corresponding agent
:param topic: topic to subscribe (wildcards are allowed)
:param qos: The quality of service for the subscription
:param qos: The maximum quality of service this subscription supports
:return: A boolean signaling if subscription was true or not
"""
if aid not in self._agents.keys():
Expand Down
6 changes: 2 additions & 4 deletions mango/container/tcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,20 +239,18 @@ async def send_message(
if protocol_addr == self.addr:
# internal message
meta["network_protocol"] = "tcp"
success = self._send_internal_message(
return self._send_internal_message(
content, receiver_addr.aid, default_meta=meta
)
else:
message = content
# if the user does not provide a splittable content, we create the default one
if not hasattr(content, "split_content_and_meta"):
message = MangoMessage(content, meta)
success = await self._send_external_message(
return await self._send_external_message(
receiver_addr.protocol_addr, message, meta
)

return success

async def _send_external_message(self, addr, message, meta) -> bool:
"""
Sends *message* to another container at *addr*
Expand Down