diff --git a/AWSIoTPythonSDK/MQTTLib.py b/AWSIoTPythonSDK/MQTTLib.py index a450a07..c4e9269 100755 --- a/AWSIoTPythonSDK/MQTTLib.py +++ b/AWSIoTPythonSDK/MQTTLib.py @@ -652,7 +652,9 @@ def subscribe(self, topic, QoS, callback): *callback* - Function to be called when a new message for the subscribed topic comes in. Should be in form :code:`customCallback(client, userdata, message)`, where - :code:`message` contains :code:`topic` and :code:`payload`. + :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are + here just to be aligned with the underneath Paho callback function signature. These fields are pending to be + deprecated and should not be depended on. **Returns** @@ -688,7 +690,9 @@ def subscribeAsync(self, topic, QoS, ackCallback=None, messageCallback=None): *messageCallback* - Function to be called when a new message for the subscribed topic comes in. Should be in form :code:`customCallback(client, userdata, message)`, where - :code:`message` contains :code:`topic` and :code:`payload`. + :code:`message` contains :code:`topic` and :code:`payload`. Note that :code:`client` and :code:`userdata` are + here just to be aligned with the underneath Paho callback function signature. These fields are pending to be + deprecated and should not be depended on. **Returns** diff --git a/AWSIoTPythonSDK/__init__.py b/AWSIoTPythonSDK/__init__.py index f3b924e..377fda1 100755 --- a/AWSIoTPythonSDK/__init__.py +++ b/AWSIoTPythonSDK/__init__.py @@ -1,3 +1,3 @@ -__version__ = "1.3.0" +__version__ = "1.3.1" diff --git a/AWSIoTPythonSDK/core/protocol/internal/clients.py b/AWSIoTPythonSDK/core/protocol/internal/clients.py index 0ba9d91..ae6d764 100644 --- a/AWSIoTPythonSDK/core/protocol/internal/clients.py +++ b/AWSIoTPythonSDK/core/protocol/internal/clients.py @@ -151,8 +151,8 @@ def combined_on_disconnect_callback(mid, data): return combined_on_disconnect_callback def _create_converted_on_message_callback(self): - def converted_on_message_callback(mid, message): - self.on_message(message) + def converted_on_message_callback(mid, data): + self.on_message(data) return converted_on_message_callback # For client online notification @@ -212,14 +212,16 @@ def unregister_internal_event_callbacks(self): def invoke_event_callback(self, mid, data=None): with self._event_callback_map_lock: event_callback = self._event_callback_map.get(mid) - if event_callback: - self._logger.debug("Invoking custom event callback...") - if data is not None: - event_callback(mid, data) - else: - event_callback(mid) - if isinstance(mid, Number): # Do NOT remove callbacks for CONNACK/DISCONNECT/MESSAGE - self._logger.debug("This custom event callback is for pub/sub/unsub, removing it after invocation...") + # For invoking the event callback, we do not need to acquire the lock + if event_callback: + self._logger.debug("Invoking custom event callback...") + if data is not None: + event_callback(mid=mid, data=data) + else: + event_callback(mid=mid) + if isinstance(mid, Number): # Do NOT remove callbacks for CONNACK/DISCONNECT/MESSAGE + self._logger.debug("This custom event callback is for pub/sub/unsub, removing it after invocation...") + with self._event_callback_map_lock: del self._event_callback_map[mid] def remove_event_callback(self, mid): diff --git a/AWSIoTPythonSDK/core/protocol/internal/workers.py b/AWSIoTPythonSDK/core/protocol/internal/workers.py index 656578a..604b201 100644 --- a/AWSIoTPythonSDK/core/protocol/internal/workers.py +++ b/AWSIoTPythonSDK/core/protocol/internal/workers.py @@ -16,6 +16,7 @@ import time import logging from threading import Thread +from threading import Event from AWSIoTPythonSDK.core.protocol.internal.events import EventTypes from AWSIoTPythonSDK.core.protocol.internal.events import FixedEventMids from AWSIoTPythonSDK.core.protocol.internal.clients import ClientStatus @@ -91,6 +92,7 @@ def __init__(self, cv, event_queue, internal_async_client, RequestTypes.SUBSCRIBE : self._handle_offline_subscribe, RequestTypes.UNSUBSCRIBE : self._handle_offline_unsubscribe } + self._stopper = Event() def update_offline_requests_manager(self, offline_requests_manager): self._offline_requests_manager = offline_requests_manager @@ -105,6 +107,7 @@ def is_running(self): return self._is_running def start(self): + self._stopper.clear() self._is_running = True dispatch_events = Thread(target=self._dispatch) dispatch_events.daemon = True @@ -127,6 +130,13 @@ def _clean_up(self): self._internal_async_client.clean_up_event_callbacks() self._logger.debug("Event callbacks cleared") + def wait_until_it_stops(self, timeout_sec): + self._logger.debug("Waiting for event consumer to completely stop") + return self._stopper.wait(timeout=timeout_sec) + + def is_fully_stopped(self): + return self._stopper.is_set() + def _dispatch(self): while self._is_running: with self._cv: @@ -135,6 +145,8 @@ def _dispatch(self): else: while not self._event_queue.empty(): self._dispatch_one() + self._stopper.set() + self._logger.debug("Exiting dispatching loop...") def _dispatch_one(self): mid, event_type, data = self._event_queue.get() diff --git a/AWSIoTPythonSDK/core/protocol/mqtt_core.py b/AWSIoTPythonSDK/core/protocol/mqtt_core.py index a1797ca..05f1e19 100644 --- a/AWSIoTPythonSDK/core/protocol/mqtt_core.py +++ b/AWSIoTPythonSDK/core/protocol/mqtt_core.py @@ -219,6 +219,9 @@ def disconnect(self): if not event.wait(self._connect_disconnect_timeout_sec): self._logger.error("Disconnect timed out") raise disconnectTimeoutException() + if not self._event_consumer.wait_until_it_stops(self._connect_disconnect_timeout_sec): + self._logger.error("Disconnect timed out in waiting for event consumer") + raise disconnectTimeoutException() return True def disconnect_async(self, ack_callback=None): diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 1ddcdf0..a1ddd5a 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -2,6 +2,13 @@ CHANGELOG ========= +1.3.1 +===== +* bugfix:Issue:`#67 `__ +* bugfix:Fixed a dead lock issue when client async API is called within the event callback +* bugfix:Updated README and API documentation to provide clear usage information on sync/async API and callbacks +* improvement:Added a new sample to show API usage within callbacks + 1.3.0 ===== * bugfix:WebSocket handshake response timeout and error escalation diff --git a/README.rst b/README.rst index f6a1faa..0f6d258 100755 --- a/README.rst +++ b/README.rst @@ -348,6 +348,36 @@ API documentation for ``AWSIoTPythonSDK.core.greengrass.discovery.models``, `Greengrass Discovery documentation `__ or `Greengrass overall documentation `__. + +Synchronous APIs and Asynchronous APIs +______________________________________ + +Beginning with Release v1.2.0, SDK provides asynchronous APIs and enforces synchronous API behaviors for MQTT operations, +which includes: +- connect/connectAsync +- disconnect/disconnectAsync +- publish/publishAsync +- subscribe/subscribeAsync +- unsubscribe/unsubscribeAsync + +- Asynchronous APIs +Asynchronous APIs translate the invocation into MQTT packet and forward it to the underneath connection to be sent out. +They return immediately once packets are out for delivery, regardless of whether the corresponding ACKs, if any, have +been received. Users can specify their own callbacks for ACK/message (server side PUBLISH) processing for each +individual request. These callbacks will be sequentially dispatched and invoked upon the arrival of ACK/message (server +side PUBLISH) packets. + +- Synchronous APIs +Synchronous API behaviors are enforced by registering blocking ACK callbacks on top of the asynchronous APIs. +Synchronous APIs wait on their corresponding ACK packets, if there is any, before the invocation returns. For example, +a synchronous QoS1 publish call will wait until it gets its PUBACK back. A synchronous subscribe call will wait until +it gets its SUBACK back. Users can configure operation time out for synchronous APIs to stop the waiting. + +Since callbacks are sequentially dispatched and invoked, calling synchronous APIs within callbacks will deadlock the +user application. If users are inclined to utilize the asynchronous mode and perform MQTT operations +within callbacks, asynchronous APIs should be used. For more details, please check out the provided samples at +``samples/basicPubSub/basicPubSub_APICallInCallback.py`` + .. _Key_Features: Key Features @@ -642,6 +672,37 @@ Source The example is available in ``samples/basicPubSub/``. +BasicPubSub with API invocation in callback +___________ + +This example demonstrates the usage of asynchronous APIs within callbacks. It first connects to AWS IoT and subscribes +to 2 topics with the corresponding message callbacks registered. One message callback contains client asynchronous API +invocation that republishes the received message from to /republish. The other message callback simply +prints out the received message. It then publishes messages to in an infinite loop. For every message received +from , it will be republished to /republish and be printed out as configured in the simple print-out +message callback. +New ack packet ids are printed upon reception of PUBACK and SUBACK through ACK callbacks registered with asynchronous +API calls, indicating that the the client received ACKs for the corresponding asynchronous API calls. + +Instructions +************ + +Run the example like this: + +.. code-block:: python + + # Certificate based mutual authentication + python basicPubSub_APICallInCallback.py -e -r -c -k + # MQTT over WebSocket + python basicPubSub_APICallInCallback.py -e -r -w + # Customize client id and topic + python basicPubSub_APICallInCallback.py -e -r -c -k -id -t + +Source +****** + +The example is available in ``samples/basicPubSub/``. + BasicShadow ___________ diff --git a/samples/basicPubSub/basicPubSub_APICallInCallback.py b/samples/basicPubSub/basicPubSub_APICallInCallback.py new file mode 100644 index 0000000..75581e1 --- /dev/null +++ b/samples/basicPubSub/basicPubSub_APICallInCallback.py @@ -0,0 +1,125 @@ +''' +/* + * Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + ''' + +from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient +import logging +import time +import argparse + + +class CallbackContainer(object): + + def __init__(self, client): + self._client = client + + def messagePrint(self, client, userdata, message): + print("Received a new message: ") + print(message.payload) + print("from topic: ") + print(message.topic) + print("--------------\n\n") + + def messageForward(self, client, userdata, message): + topicRepublish = message.topic + "/republish" + print("Forwarding message from: %s to %s" % (message.topic, topicRepublish)) + print("--------------\n\n") + self._client.publishAsync(topicRepublish, str(message.payload), 1, self.pubackCallback) + + def pubackCallback(self, mid): + print("Received PUBACK packet id: ") + print(mid) + print("++++++++++++++\n\n") + + def subackCallback(self, mid, data): + print("Received SUBACK packet id: ") + print(mid) + print("Granted QoS: ") + print(data) + print("++++++++++++++\n\n") + + +# Read in command-line parameters +parser = argparse.ArgumentParser() +parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint") +parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path") +parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path") +parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path") +parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False, + help="Use MQTT over WebSocket") +parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub", + help="Targeted client id") +parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic") + +args = parser.parse_args() +host = args.host +rootCAPath = args.rootCAPath +certificatePath = args.certificatePath +privateKeyPath = args.privateKeyPath +useWebsocket = args.useWebsocket +clientId = args.clientId +topic = args.topic + +if args.useWebsocket and args.certificatePath and args.privateKeyPath: + parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.") + exit(2) + +if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath): + parser.error("Missing credentials for authentication.") + exit(2) + +# Configure logging +logger = logging.getLogger("AWSIoTPythonSDK.core") +logger.setLevel(logging.DEBUG) +streamHandler = logging.StreamHandler() +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +streamHandler.setFormatter(formatter) +logger.addHandler(streamHandler) + +# Init AWSIoTMQTTClient +myAWSIoTMQTTClient = None +if useWebsocket: + myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True) + myAWSIoTMQTTClient.configureEndpoint(host, 443) + myAWSIoTMQTTClient.configureCredentials(rootCAPath) +else: + myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId) + myAWSIoTMQTTClient.configureEndpoint(host, 8883) + myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath) + +# AWSIoTMQTTClient connection configuration +myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20) +myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing +myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz +myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec +myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec + +myCallbackContainer = CallbackContainer(myAWSIoTMQTTClient) + +# Connect and subscribe to AWS IoT +myAWSIoTMQTTClient.connect() + +# Perform synchronous subscribes +myAWSIoTMQTTClient.subscribe(topic, 1, myCallbackContainer.messageForward) +myAWSIoTMQTTClient.subscribe(topic + "/republish", 1, myCallbackContainer.messagePrint) +time.sleep(2) + +# Publish to the same topic in a loop forever +loopCount = 0 +while True: + myAWSIoTMQTTClient.publishAsync(topic, "New Message " + str(loopCount), 1, ackCallback=myCallbackContainer.pubackCallback) + loopCount += 1 + time.sleep(1)