Skip to content

Commit

Permalink
Release of version 1.4.7
Browse files Browse the repository at this point in the history
  • Loading branch information
liuszeng committed May 13, 2019
1 parent f0aa2ce commit fec1848
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 8 deletions.
66 changes: 62 additions & 4 deletions AWSIoTPythonSDK/MQTTLib.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,33 @@ def configureUsernamePassword(self, username, password=None):
"""
self._mqtt_core.configure_username_password(username, password)

def configureSocketFactory(self, socket_factory):
"""
**Description**
Configure a socket factory to custom configure a different socket type for
mqtt connection. Creating a custom socket allows for configuration of a proxy
**Syntax**
.. code:: python
# Configure socket factory
custom_args = {"arg1": "val1", "arg2": "val2"}
socket_factory = lambda: custom.create_connection((host, port), **custom_args)
myAWSIoTMQTTClient.configureSocketFactory(socket_factory)
**Parameters**
*socket_factory* - Anonymous function which creates a custom socket to spec.
**Returns**
None
"""
self._mqtt_core.configure_socket_factory(socket_factory)

def enableMetricsCollection(self):
"""
**Description**
Expand Down Expand Up @@ -1134,6 +1161,33 @@ def configureUsernamePassword(self, username, password=None):
"""
self._AWSIoTMQTTClient.configureUsernamePassword(username, password)

def configureSocketFactory(self, socket_factory):
"""
**Description**
Configure a socket factory to custom configure a different socket type for
mqtt connection. Creating a custom socket allows for configuration of a proxy
**Syntax**
.. code:: python
# Configure socket factory
custom_args = {"arg1": "val1", "arg2": "val2"}
socket_factory = lambda: custom.create_connection((host, port), **custom_args)
myAWSIoTMQTTClient.configureSocketFactory(socket_factory)
**Parameters**
*socket_factory* - Anonymous function which creates a custom socket to spec.
**Returns**
None
"""
self._AWSIoTMQTTClient.configureSocketFactory(socket_factory)

def enableMetricsCollection(self):
"""
**Description**
Expand Down Expand Up @@ -1613,7 +1667,7 @@ def sendJobsQuery(self, jobExecTopicType, jobId=None):
payload = self._thingJobManager.serializeClientTokenPayload()
return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS)

def sendJobsStartNext(self, statusDetails=None):
def sendJobsStartNext(self, statusDetails=None, stepTimeoutInMinutes=None):
"""
**Description**
Expand All @@ -1631,16 +1685,18 @@ def sendJobsStartNext(self, statusDetails=None):
*statusDetails* - Dictionary containing the key value pairs to use for the status details of the job execution
*stepTimeoutInMinutes - Specifies the amount of time this device has to finish execution of this job.
**Returns**
True if the publish request has been sent to paho. False if the request did not reach paho.
"""
topic = self._thingJobManager.getJobTopic(jobExecutionTopicType.JOB_START_NEXT_TOPIC, jobExecutionTopicReplyType.JOB_REQUEST_TYPE)
payload = self._thingJobManager.serializeStartNextPendingJobExecutionPayload(statusDetails)
payload = self._thingJobManager.serializeStartNextPendingJobExecutionPayload(statusDetails, stepTimeoutInMinutes)
return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS)

def sendJobsUpdate(self, jobId, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False):
def sendJobsUpdate(self, jobId, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False, stepTimeoutInMinutes=None):
"""
**Description**
Expand Down Expand Up @@ -1678,13 +1734,15 @@ def sendJobsUpdate(self, jobId, status, statusDetails=None, expectedVersion=0, e
*includeJobDocument* - When included and set to True, the response contains the JobDocument. The default is False.
*stepTimeoutInMinutes - Specifies the amount of time this device has to finish execution of this job.
**Returns**
True if the publish request has been sent to paho. False if the request did not reach paho.
"""
topic = self._thingJobManager.getJobTopic(jobExecutionTopicType.JOB_UPDATE_TOPIC, jobExecutionTopicReplyType.JOB_REQUEST_TYPE, jobId)
payload = self._thingJobManager.serializeJobExecutionUpdatePayload(status, statusDetails, expectedVersion, executionNumber, includeJobExecutionState, includeJobDocument)
payload = self._thingJobManager.serializeJobExecutionUpdatePayload(status, statusDetails, expectedVersion, executionNumber, includeJobExecutionState, includeJobDocument, stepTimeoutInMinutes)
return self._AWSIoTMQTTClient.publish(topic, payload, self._QoS)

def sendJobsDescribe(self, jobId, executionNumber=0, includeJobDocument=True):
Expand Down
2 changes: 1 addition & 1 deletion AWSIoTPythonSDK/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = "1.4.6"
__version__ = "1.4.7"


9 changes: 7 additions & 2 deletions AWSIoTPythonSDK/core/jobs/thingJobManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
_INCLUDE_JOB_EXECUTION_STATE_KEY = 'includeJobExecutionState'
_INCLUDE_JOB_DOCUMENT_KEY = 'includeJobDocument'
_CLIENT_TOKEN_KEY = 'clientToken'
_STEP_TIMEOUT_IN_MINUTES_KEY = 'stepTimeoutInMinutes'

#The type of job topic.
class jobExecutionTopicType(object):
Expand Down Expand Up @@ -112,7 +113,7 @@ def getJobTopic(self, srcJobExecTopicType, srcJobExecTopicReplyType=jobExecution
else:
return '{0}{1}/jobs/{2}{3}'.format(_BASE_THINGS_TOPIC, self._thingName, srcJobExecTopicType[_JOB_OPERATION_INDEX], srcJobExecTopicReplyType[_JOB_SUFFIX_INDEX])

def serializeJobExecutionUpdatePayload(self, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False):
def serializeJobExecutionUpdatePayload(self, status, statusDetails=None, expectedVersion=0, executionNumber=0, includeJobExecutionState=False, includeJobDocument=False, stepTimeoutInMinutes=None):
executionStatus = _getExecutionStatus(status)
if executionStatus is None:
return None
Expand All @@ -129,6 +130,8 @@ def serializeJobExecutionUpdatePayload(self, status, statusDetails=None, expecte
payload[_INCLUDE_JOB_DOCUMENT_KEY] = True
if self._clientToken is not None:
payload[_CLIENT_TOKEN_KEY] = self._clientToken
if stepTimeoutInMinutes is not None:
payload[_STEP_TIMEOUT_IN_MINUTES_KEY] = stepTimeoutInMinutes
return json.dumps(payload)

def serializeDescribeJobExecutionPayload(self, executionNumber=0, includeJobDocument=True):
Expand All @@ -139,12 +142,14 @@ def serializeDescribeJobExecutionPayload(self, executionNumber=0, includeJobDocu
payload[_CLIENT_TOKEN_KEY] = self._clientToken
return json.dumps(payload)

def serializeStartNextPendingJobExecutionPayload(self, statusDetails=None):
def serializeStartNextPendingJobExecutionPayload(self, statusDetails=None, stepTimeoutInMinutes=None):
payload = {}
if self._clientToken is not None:
payload[_CLIENT_TOKEN_KEY] = self._clientToken
if statusDetails is not None:
payload[_STATUS_DETAILS_KEY] = statusDetails
if stepTimeoutInMinutes is not None:
payload[_STEP_TIMEOUT_IN_MINUTES_KEY] = stepTimeoutInMinutes
return json.dumps(payload)

def serializeClientTokenPayload(self):
Expand Down
3 changes: 3 additions & 0 deletions AWSIoTPythonSDK/core/protocol/internal/clients.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def clear_last_will(self):
def set_username_password(self, username, password=None):
self._paho_client.username_pw_set(username, password)

def set_socket_factory(self, socket_factory):
self._paho_client.socket_factory_set(socket_factory)

def configure_reconnect_back_off(self, base_reconnect_quiet_sec, max_reconnect_quiet_sec, stable_connection_sec):
self._paho_client.setBackoffTiming(base_reconnect_quiet_sec, max_reconnect_quiet_sec, stable_connection_sec)

Expand Down
4 changes: 4 additions & 0 deletions AWSIoTPythonSDK/core/protocol/mqtt_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ def configure_username_password(self, username, password=None):
self._username = username
self._password = password

def configure_socket_factory(self, socket_factory):
self._logger.info("Configuring socket factory...")
self._internal_async_client.set_socket_factory(socket_factory)

def enable_metrics_collection(self):
self._enable_metrics_collection = True

Expand Down
13 changes: 12 additions & 1 deletion AWSIoTPythonSDK/core/protocol/paho/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ def __init__(self, client_id="", clean_session=True, userdata=None, protocol=MQT
self._host = ""
self._port = 1883
self._bind_address = ""
self._socket_factory = None
self._in_callback = False
self._strict_protocol = False
self._callback_mutex = threading.Lock()
Expand Down Expand Up @@ -780,7 +781,9 @@ def reconnect(self):
self._messages_reconnect_reset()

try:
if (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
if self._socket_factory:
sock = self._socket_factory()
elif (sys.version_info[0] == 2 and sys.version_info[1] < 7) or (sys.version_info[0] == 3 and sys.version_info[1] < 2):
sock = socket.create_connection((self._host, self._port))
else:
sock = socket.create_connection((self._host, self._port), source_address=(self._bind_address, 0))
Expand Down Expand Up @@ -1014,6 +1017,14 @@ def username_pw_set(self, username, password=None):
self._username = username.encode('utf-8')
self._password = password

def socket_factory_set(self, socket_factory):
"""Set a socket factory to custom configure a different socket type for
mqtt connection.
Must be called before connect() to have any effect.
socket_factory: create_connection function which creates a socket to user's specification
"""
self._socket_factory = socket_factory

def disconnect(self):
"""Disconnect a connected client from the broker."""
self._state_mutex.acquire()
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
CHANGELOG
=========

1.4.7
=====
* improvement: Added connection establishment control through client socket factory option

1.4.6
=====
* bugfix: Use non-deprecated ssl API to specify ALPN when doing Greengrass discovery
Expand Down
136 changes: 136 additions & 0 deletions samples/basicPubSub/basicPubSubProxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
'''
/*
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
'''

from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
import logging
import time
import argparse
import json

AllowedActions = ['both', 'publish', 'subscribe']

# Custom MQTT message callback
def customCallback(client, userdata, message):
print("Received a new message: ")
print(message.payload)
print("from topic: ")
print(message.topic)
print("--------------\n\n")


# Read in command-line parameters
parser = argparse.ArgumentParser()
parser.add_argument("-e", "--endpoint", action="store", required=True, dest="host", help="Your AWS IoT custom endpoint")
parser.add_argument("-r", "--rootCA", action="store", required=True, dest="rootCAPath", help="Root CA file path")
parser.add_argument("-c", "--cert", action="store", dest="certificatePath", help="Certificate file path")
parser.add_argument("-k", "--key", action="store", dest="privateKeyPath", help="Private key file path")
parser.add_argument("-p", "--port", action="store", dest="port", type=int, help="Port number override")
parser.add_argument("-w", "--websocket", action="store_true", dest="useWebsocket", default=False,
help="Use MQTT over WebSocket")
parser.add_argument("-id", "--clientId", action="store", dest="clientId", default="basicPubSub",
help="Targeted client id")
parser.add_argument("-t", "--topic", action="store", dest="topic", default="sdk/test/Python", help="Targeted topic")
parser.add_argument("-m", "--mode", action="store", dest="mode", default="both",
help="Operation modes: %s"%str(AllowedActions))
parser.add_argument("-M", "--message", action="store", dest="message", default="Hello World!",
help="Message to publish")

args = parser.parse_args()
host = args.host
rootCAPath = args.rootCAPath
certificatePath = args.certificatePath
privateKeyPath = args.privateKeyPath
port = args.port
useWebsocket = args.useWebsocket
clientId = args.clientId
topic = args.topic

if args.mode not in AllowedActions:
parser.error("Unknown --mode option %s. Must be one of %s" % (args.mode, str(AllowedActions)))
exit(2)

if args.useWebsocket and args.certificatePath and args.privateKeyPath:
parser.error("X.509 cert authentication and WebSocket are mutual exclusive. Please pick one.")
exit(2)

if not args.useWebsocket and (not args.certificatePath or not args.privateKeyPath):
parser.error("Missing credentials for authentication.")
exit(2)

# Port defaults
if args.useWebsocket and not args.port: # When no port override for WebSocket, default to 443
port = 443
if not args.useWebsocket and not args.port: # When no port override for non-WebSocket, default to 8883
port = 8883

# Configure logging
logger = logging.getLogger("AWSIoTPythonSDK.core")
logger.setLevel(logging.DEBUG)
streamHandler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
streamHandler.setFormatter(formatter)
logger.addHandler(streamHandler)

# Init AWSIoTMQTTClient
myAWSIoTMQTTClient = None
if useWebsocket:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId, useWebsocket=True)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath)
else:
myAWSIoTMQTTClient = AWSIoTMQTTClient(clientId)
myAWSIoTMQTTClient.configureEndpoint(host, port)
myAWSIoTMQTTClient.configureCredentials(rootCAPath, privateKeyPath, certificatePath)

# AWSIoTMQTTClient connection configuration
myAWSIoTMQTTClient.configureAutoReconnectBackoffTime(1, 32, 20)
myAWSIoTMQTTClient.configureOfflinePublishQueueing(-1) # Infinite offline Publish queueing
myAWSIoTMQTTClient.configureDrainingFrequency(2) # Draining: 2 Hz
myAWSIoTMQTTClient.configureConnectDisconnectTimeout(10) # 10 sec
myAWSIoTMQTTClient.configureMQTTOperationTimeout(5) # 5 sec

# AWSIoTMQTTClient socket configuration
# import pysocks to help us build a socket that supports a proxy configuration
import socks

# set proxy arguments (for SOCKS5 proxy: proxy_type=2, for HTTP proxy: proxy_type=3)
proxy_config = {"proxy_addr":<proxy_addr>, "proxy_port":<proxy_port>, "proxy_type":<proxy_type>}

# create anonymous function to handle socket creation
socket_factory = lambda: socks.create_connection((host, port), **proxy_config)
myAWSIoTMQTTClient.configureSocketFactory(socket_factory)

# Connect and subscribe to AWS IoT
myAWSIoTMQTTClient.connect()
if args.mode == 'both' or args.mode == 'subscribe':
myAWSIoTMQTTClient.subscribe(topic, 1, customCallback)
time.sleep(2)

# Publish to the same topic in a loop forever
loopCount = 0
while True:
if args.mode == 'both' or args.mode == 'publish':
message = {}
message['message'] = args.message
message['sequence'] = loopCount
messageJson = json.dumps(message)
myAWSIoTMQTTClient.publish(topic, messageJson, 1)
if args.mode == 'publish':
print('Published topic %s: %s\n' % (topic, messageJson))
loopCount += 1
time.sleep(1)

0 comments on commit fec1848

Please sign in to comment.