Skip to content

Commit

Permalink
Merge pull request #159 from ahilloffis/bugfix_race_con_agent_subscribe
Browse files Browse the repository at this point in the history
Fix: Fixed race condition in mqtt container for extra subscriptions
  • Loading branch information
rcschrg authored Feb 10, 2025
2 parents 0f38af0 + 4b085a8 commit c2c80b1
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions mango/container/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def __init__(
# dict mapping additionally subscribed topics to a set of aids
self.additional_subscriptions: dict[str, set[str]] = {}
# Future for pending sub requests
self.pending_sub_request: None | asyncio.Future = None
self.pending_sub_request: dict[int, asyncio.Future] = {}

async def start(self):
self._loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -264,8 +264,11 @@ def on_discon(client, userdata, disconnect_flags, reason_code, properties):

self.mqtt_client.on_disconnect = on_discon

def process_sub_request(mid):
self.pending_sub_request[mid].set_result(0)

def on_sub(client, userdata, mid, reason_code_list, properties):
self._loop.call_soon_threadsafe(self.pending_sub_request.set_result, 0)
self._loop.call_soon_threadsafe(process_sub_request, mid)

self.mqtt_client.on_subscribe = on_sub

Expand Down Expand Up @@ -433,14 +436,17 @@ async def subscribe_for_agent(self, *, aid: str, topic: str, qos: int = 2) -> bo
return True

self.additional_subscriptions[topic] = {aid}
self.pending_sub_request = asyncio.Future()
result, _ = self.mqtt_client.subscribe(topic, qos=qos)
future = asyncio.Future()
result, mid = self.mqtt_client.subscribe(topic, qos=qos)

if result != paho.MQTT_ERR_SUCCESS:
self.pending_sub_request.set_result(False)
future.set_result(False)
return False

await self.pending_sub_request
self.pending_sub_request[mid] = future

await self.pending_sub_request[mid]
del self.pending_sub_request[mid]
return True

def deregister(self, aid):
Expand Down

0 comments on commit c2c80b1

Please sign in to comment.