-
Notifications
You must be signed in to change notification settings - Fork 725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Roundtrips take at least 40ms or are not in the right order when publishing in subscribe #874
Comments
Interesting: When using the below code, which only sends the first message from the main loop and then lets the on_message and publish method do its ping pong, no deadlock happens, when QoS=0. # python 3.11-3.13
import logging
import random
from multiprocessing.pool import ThreadPool
import paho.mqtt.client as paho
logger = logging.getLogger(__name__)
myQOS = 1
broker = "localhost"
port = 1883
topic = "python/mqtt"
# Generate a Client ID with the publish prefix.
client_id = f"publish-{random.randint(0, 1000)}"
pool = ThreadPool(processes=4)
def connect_mqtt(topic):
def on_connect(client, userdata, flags, rc, pa):
if rc == 0:
logger.info("Connected to MQTT Broker!")
else:
logger.error("Failed to connect, return code %d\n", rc)
def thread_publish(client, topic, payload: int):
number = int(payload) + 1
if number < 100:
info = client.publish(topic, number, qos=myQOS)
logger.info("did send message %s", number)
# wait for publish does deadlock here
#info.wait_for_publish()
def on_message(client, userdata, message: paho.MQTTMessage):
logger.info("got message %s", message.payload)
payload = message.payload
# using a threadpool here did not help either
# pool.apply(thread_publish, args=(client, topic, message.payload))
number = int(payload) + 1
if number < 100:
info = client.publish(topic, number, qos=myQOS)
# wait for publish does deadlock here
#info.wait_for_publish()
logger.info("did send message %s", number)
client = paho.Client(paho.CallbackAPIVersion.VERSION2, client_id)
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker, port)
client.subscribe(topic)
return client
def run():
client = connect_mqtt(topic)
client.loop_start()
#client.subscribe("no topic")
result = client.publish(topic, 1, qos=myQOS)
result.wait_for_publish()
# this helps when QoS is 0 to wait until all is received
import time
time.sleep(1)
client.loop_stop()
if __name__ == "__main__":
logging.basicConfig(format="%(asctime)s;%(levelname)s;%(message)s", level="INFO")
run() |
So I don't know why the code in the issue description blocks and deadlocks when Adding paho.mqtt.python/src/paho/mqtt/client.py Line 1663 in d45de37
shows that the python client waits for the socket io to become available from the MQTT server. Taking a look at the MQTT server, we can see that this does not occur if I am not sure if this is one of the programming errors related around |
Bug Description
I need to respond to a received message by publishing another message.
I do not need QoS, but I do need to have the correct message ordering.
In when using an agent-based tool mango-agents, round trip time of MQTT connections is always at least 40ms - even when using QoS=0
An example of this is shown in OFFIS-DAI/mango#142
I tried reproducing this with plain paho-mqtt and got that far:
This does work with QoS = 1 set, in which case it takes 40ms for each roundtrip.
(I see that QoS=1 itself has a roundtrip, though this does not explain 40ms of waiting).
With QoS=0, the messages are not received in the correct order.
Adding
wait_for_publish
deadlocks the application.Usage of the Threadpool did not help either.
Reproduction
Please provide detailed steps showing how to replicate the issue (it's difficult to fix an issue we cannot replicate).
If errors are output then include the full error (including any stack trace).
Most issues should include a minimal example that
demonstrates the issue (ideally one that can be run without modification, i.e. runnable code using a public broker).
Environment
Logs
For many issues, especially when you cannot provide code to replicate the issue, it's helpful to include logs. Please
consider including:
The text was updated successfully, but these errors were encountered: