forked from aws/aws-iot-device-sdk-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_mqtt_lib.py
304 lines (239 loc) · 15.7 KB
/
test_mqtt_lib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
from AWSIoTPythonSDK.core.protocol.mqtt_core import MqttCore
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTShadowClient
from AWSIoTPythonSDK.MQTTLib import DROP_NEWEST
try:
from mock import patch
from mock import MagicMock
except:
from unittest.mock import patch
from unittest.mock import MagicMock
PATCH_MODULE_LOCATION = "AWSIoTPythonSDK.MQTTLib."
CLIENT_ID = "DefaultClientId"
SHADOW_CLIENT_ID = "DefaultShadowClientId"
DUMMY_HOST = "dummy.host"
PORT_443 = 443
PORT_8883 = 8883
DEFAULT_KEEPALIVE_SEC = 600
DUMMY_TOPIC = "dummy/topic"
DUMMY_PAYLOAD = "dummy/payload"
DUMMY_QOS = 1
DUMMY_AWS_ACCESS_KEY_ID = "DummyKeyId"
DUMMY_AWS_SECRET_KEY = "SecretKey"
DUMMY_AWS_TOKEN = "Token"
DUMMY_CA_PATH = "path/to/ca"
DUMMY_CERT_PATH = "path/to/cert"
DUMMY_KEY_PATH = "path/to/key"
DUMMY_BASE_RECONNECT_BACKOFF_SEC = 1
DUMMY_MAX_RECONNECT_BACKOFF_SEC = 32
DUMMY_STABLE_CONNECTION_SEC = 16
DUMMY_QUEUE_SIZE = 100
DUMMY_DRAINING_FREQUENCY = 2
DUMMY_TIMEOUT_SEC = 10
DUMMY_USER_NAME = "UserName"
DUMMY_PASSWORD = "Password"
class TestMqttLibShadowClient:
def setup_method(self, test_method):
self._use_mock_mqtt_core()
def _use_mock_mqtt_core(self):
self.mqtt_core_patcher = patch(PATCH_MODULE_LOCATION + "MqttCore", spec=MqttCore)
self.mock_mqtt_core_constructor = self.mqtt_core_patcher.start()
self.mqtt_core_mock = MagicMock()
self.mock_mqtt_core_constructor.return_value = self.mqtt_core_mock
self.iot_mqtt_shadow_client = AWSIoTMQTTShadowClient(SHADOW_CLIENT_ID)
def teardown_method(self, test_method):
self.mqtt_core_patcher.stop()
def test_iot_mqtt_shadow_client_with_provided_mqtt_client(self):
mock_iot_mqtt_client = MagicMock()
iot_mqtt_shadow_client_with_provided_mqtt_client = AWSIoTMQTTShadowClient(SHADOW_CLIENT_ID, awsIoTMQTTClient=mock_iot_mqtt_client)
assert mock_iot_mqtt_client.configureOfflinePublishQueueing.called is False
def test_iot_mqtt_shadow_client_connect_default_keepalive(self):
self.iot_mqtt_shadow_client.connect()
self.mqtt_core_mock.connect.assert_called_once_with(DEFAULT_KEEPALIVE_SEC)
def test_iot_mqtt_shadow_client_auto_enable_when_use_cert_over_443(self):
self.mqtt_core_mock.use_wss.return_value = False
self.iot_mqtt_shadow_client.configureEndpoint(hostName=DUMMY_HOST, portNumber=PORT_443)
self.mqtt_core_mock.configure_alpn_protocols.assert_called_once()
def test_iot_mqtt_shadow_client_alpn_auto_disable_when_use_wss(self):
self.mqtt_core_mock.use_wss.return_value = True
self.iot_mqtt_shadow_client.configureEndpoint(hostName=DUMMY_HOST, portNumber=PORT_443)
assert self.mqtt_core_mock.configure_alpn_protocols.called is False
def test_iot_mqtt_shadow_client_alpn_auto_disable_when_use_cert_over_8883(self):
self.mqtt_core_mock.use_wss.return_value = False
self.iot_mqtt_shadow_client.configureEndpoint(hostName=DUMMY_HOST, portNumber=PORT_8883)
assert self.mqtt_core_mock.configure_alpn_protocols.called is False
def test_iot_mqtt_shadow_client_clear_last_will(self):
self.iot_mqtt_shadow_client.clearLastWill()
self.mqtt_core_mock.clear_last_will.assert_called_once()
def test_iot_mqtt_shadow_client_configure_endpoint(self):
self.iot_mqtt_shadow_client.configureEndpoint(DUMMY_HOST, PORT_8883)
self.mqtt_core_mock.configure_endpoint.assert_called_once()
def test_iot_mqtt_shadow_client_configure_iam_credentials(self):
self.iot_mqtt_shadow_client.configureIAMCredentials(DUMMY_AWS_ACCESS_KEY_ID, DUMMY_AWS_SECRET_KEY, DUMMY_AWS_TOKEN)
self.mqtt_core_mock.configure_iam_credentials.assert_called_once()
def test_iot_mqtt_shadowclient_configure_credentials(self):
self.iot_mqtt_shadow_client.configureCredentials(DUMMY_CA_PATH, DUMMY_KEY_PATH, DUMMY_CERT_PATH)
self.mqtt_core_mock.configure_cert_credentials.assert_called_once()
def test_iot_mqtt_shadow_client_configure_auto_reconnect_backoff(self):
self.iot_mqtt_shadow_client.configureAutoReconnectBackoffTime(DUMMY_BASE_RECONNECT_BACKOFF_SEC,
DUMMY_MAX_RECONNECT_BACKOFF_SEC,
DUMMY_STABLE_CONNECTION_SEC)
self.mqtt_core_mock.configure_reconnect_back_off.assert_called_once_with(DUMMY_BASE_RECONNECT_BACKOFF_SEC,
DUMMY_MAX_RECONNECT_BACKOFF_SEC,
DUMMY_STABLE_CONNECTION_SEC)
def test_iot_mqtt_shadow_client_configure_offline_publish_queueing(self):
# This configurable is done at object initialization. We do not allow customers to configure this.
self.mqtt_core_mock.configure_offline_requests_queue.assert_called_once_with(0, DROP_NEWEST) # Disabled
def test_iot_mqtt_client_configure_draining_frequency(self):
# This configurable is done at object initialization. We do not allow customers to configure this.
# Sine queuing is disabled, draining interval configuration is meaningless.
# "10" is just a placeholder value in the internal implementation.
self.mqtt_core_mock.configure_draining_interval_sec.assert_called_once_with(1/float(10))
def test_iot_mqtt_client_configure_connect_disconnect_timeout(self):
self.iot_mqtt_shadow_client.configureConnectDisconnectTimeout(DUMMY_TIMEOUT_SEC)
self.mqtt_core_mock.configure_connect_disconnect_timeout_sec.assert_called_once_with(DUMMY_TIMEOUT_SEC)
def test_iot_mqtt_client_configure_mqtt_operation_timeout(self):
self.iot_mqtt_shadow_client.configureMQTTOperationTimeout(DUMMY_TIMEOUT_SEC)
self.mqtt_core_mock.configure_operation_timeout_sec.assert_called_once_with(DUMMY_TIMEOUT_SEC)
def test_iot_mqtt_client_configure_user_name_password(self):
self.iot_mqtt_shadow_client.configureUsernamePassword(DUMMY_USER_NAME, DUMMY_PASSWORD)
self.mqtt_core_mock.configure_username_password.assert_called_once_with(DUMMY_USER_NAME, DUMMY_PASSWORD)
def test_iot_mqtt_client_enable_metrics_collection(self):
self.iot_mqtt_shadow_client.enableMetricsCollection()
self.mqtt_core_mock.enable_metrics_collection.assert_called_once()
def test_iot_mqtt_client_disable_metrics_collection(self):
self.iot_mqtt_shadow_client.disableMetricsCollection()
self.mqtt_core_mock.disable_metrics_collection.assert_called_once()
def test_iot_mqtt_client_callback_registration_upon_connect(self):
fake_on_online_callback = MagicMock()
fake_on_offline_callback = MagicMock()
self.iot_mqtt_shadow_client.onOnline = fake_on_online_callback
self.iot_mqtt_shadow_client.onOffline = fake_on_offline_callback
# `onMessage` is used internally by the SDK. We do not expose this callback configurable to the customer
self.iot_mqtt_shadow_client.connect()
assert self.mqtt_core_mock.on_online == fake_on_online_callback
assert self.mqtt_core_mock.on_offline == fake_on_offline_callback
self.mqtt_core_mock.connect.assert_called_once()
def test_iot_mqtt_client_disconnect(self):
self.iot_mqtt_shadow_client.disconnect()
self.mqtt_core_mock.disconnect.assert_called_once()
class TestMqttLibMqttClient:
def setup_method(self, test_method):
self._use_mock_mqtt_core()
def _use_mock_mqtt_core(self):
self.mqtt_core_patcher = patch(PATCH_MODULE_LOCATION + "MqttCore", spec=MqttCore)
self.mock_mqtt_core_constructor = self.mqtt_core_patcher.start()
self.mqtt_core_mock = MagicMock()
self.mock_mqtt_core_constructor.return_value = self.mqtt_core_mock
self.iot_mqtt_client = AWSIoTMQTTClient(CLIENT_ID)
def teardown_method(self, test_method):
self.mqtt_core_patcher.stop()
def test_iot_mqtt_client_connect_default_keepalive(self):
self.iot_mqtt_client.connect()
self.mqtt_core_mock.connect.assert_called_once_with(DEFAULT_KEEPALIVE_SEC)
def test_iot_mqtt_client_connect_async_default_keepalive(self):
self.iot_mqtt_client.connectAsync()
self.mqtt_core_mock.connect_async.assert_called_once_with(DEFAULT_KEEPALIVE_SEC, None)
def test_iot_mqtt_client_alpn_auto_enable_when_use_cert_over_443(self):
self.mqtt_core_mock.use_wss.return_value = False
self.iot_mqtt_client.configureEndpoint(hostName=DUMMY_HOST, portNumber=PORT_443)
self.mqtt_core_mock.configure_alpn_protocols.assert_called_once()
def test_iot_mqtt_client_alpn_auto_disable_when_use_wss(self):
self.mqtt_core_mock.use_wss.return_value = True
self.iot_mqtt_client.configureEndpoint(hostName=DUMMY_HOST, portNumber=PORT_443)
assert self.mqtt_core_mock.configure_alpn_protocols.called is False
def test_iot_mqtt_client_alpn_auto_disable_when_use_cert_over_8883(self):
self.mqtt_core_mock.use_wss.return_value = False
self.iot_mqtt_client.configureEndpoint(hostName=DUMMY_HOST, portNumber=PORT_8883)
assert self.mqtt_core_mock.configure_alpn_protocols.called is False
def test_iot_mqtt_client_configure_last_will(self):
self.iot_mqtt_client.configureLastWill(topic=DUMMY_TOPIC, payload=DUMMY_PAYLOAD, QoS=DUMMY_QOS)
self.mqtt_core_mock.configure_last_will.assert_called_once_with(DUMMY_TOPIC, DUMMY_PAYLOAD, DUMMY_QOS, False)
def test_iot_mqtt_client_clear_last_will(self):
self.iot_mqtt_client.clearLastWill()
self.mqtt_core_mock.clear_last_will.assert_called_once()
def test_iot_mqtt_client_configure_endpoint(self):
self.iot_mqtt_client.configureEndpoint(DUMMY_HOST, PORT_8883)
self.mqtt_core_mock.configure_endpoint.assert_called_once()
def test_iot_mqtt_client_configure_iam_credentials(self):
self.iot_mqtt_client.configureIAMCredentials(DUMMY_AWS_ACCESS_KEY_ID, DUMMY_AWS_SECRET_KEY, DUMMY_AWS_TOKEN)
self.mqtt_core_mock.configure_iam_credentials.assert_called_once()
def test_iot_mqtt_client_configure_credentials(self):
self.iot_mqtt_client.configureCredentials(DUMMY_CA_PATH, DUMMY_KEY_PATH, DUMMY_CERT_PATH)
self.mqtt_core_mock.configure_cert_credentials.assert_called_once()
def test_iot_mqtt_client_configure_auto_reconnect_backoff(self):
self.iot_mqtt_client.configureAutoReconnectBackoffTime(DUMMY_BASE_RECONNECT_BACKOFF_SEC,
DUMMY_MAX_RECONNECT_BACKOFF_SEC,
DUMMY_STABLE_CONNECTION_SEC)
self.mqtt_core_mock.configure_reconnect_back_off.assert_called_once_with(DUMMY_BASE_RECONNECT_BACKOFF_SEC,
DUMMY_MAX_RECONNECT_BACKOFF_SEC,
DUMMY_STABLE_CONNECTION_SEC)
def test_iot_mqtt_client_configure_offline_publish_queueing(self):
self.iot_mqtt_client.configureOfflinePublishQueueing(DUMMY_QUEUE_SIZE)
self.mqtt_core_mock.configure_offline_requests_queue.assert_called_once_with(DUMMY_QUEUE_SIZE, DROP_NEWEST)
def test_iot_mqtt_client_configure_draining_frequency(self):
self.iot_mqtt_client.configureDrainingFrequency(DUMMY_DRAINING_FREQUENCY)
self.mqtt_core_mock.configure_draining_interval_sec.assert_called_once_with(1/float(DUMMY_DRAINING_FREQUENCY))
def test_iot_mqtt_client_configure_connect_disconnect_timeout(self):
self.iot_mqtt_client.configureConnectDisconnectTimeout(DUMMY_TIMEOUT_SEC)
self.mqtt_core_mock.configure_connect_disconnect_timeout_sec.assert_called_once_with(DUMMY_TIMEOUT_SEC)
def test_iot_mqtt_client_configure_mqtt_operation_timeout(self):
self.iot_mqtt_client.configureMQTTOperationTimeout(DUMMY_TIMEOUT_SEC)
self.mqtt_core_mock.configure_operation_timeout_sec.assert_called_once_with(DUMMY_TIMEOUT_SEC)
def test_iot_mqtt_client_configure_user_name_password(self):
self.iot_mqtt_client.configureUsernamePassword(DUMMY_USER_NAME, DUMMY_PASSWORD)
self.mqtt_core_mock.configure_username_password.assert_called_once_with(DUMMY_USER_NAME, DUMMY_PASSWORD)
def test_iot_mqtt_client_enable_metrics_collection(self):
self.iot_mqtt_client.enableMetricsCollection()
self.mqtt_core_mock.enable_metrics_collection.assert_called_once()
def test_iot_mqtt_client_disable_metrics_collection(self):
self.iot_mqtt_client.disableMetricsCollection()
self.mqtt_core_mock.disable_metrics_collection.assert_called_once()
def test_iot_mqtt_client_callback_registration_upon_connect(self):
fake_on_online_callback = MagicMock()
fake_on_offline_callback = MagicMock()
fake_on_message_callback = MagicMock()
self.iot_mqtt_client.onOnline = fake_on_online_callback
self.iot_mqtt_client.onOffline = fake_on_offline_callback
self.iot_mqtt_client.onMessage = fake_on_message_callback
self.iot_mqtt_client.connect()
assert self.mqtt_core_mock.on_online == fake_on_online_callback
assert self.mqtt_core_mock.on_offline == fake_on_offline_callback
assert self.mqtt_core_mock.on_message == fake_on_message_callback
self.mqtt_core_mock.connect.assert_called_once()
def test_iot_mqtt_client_disconnect(self):
self.iot_mqtt_client.disconnect()
self.mqtt_core_mock.disconnect.assert_called_once()
def test_iot_mqtt_client_publish(self):
self.iot_mqtt_client.publish(DUMMY_TOPIC, DUMMY_PAYLOAD, DUMMY_QOS)
self.mqtt_core_mock.publish.assert_called_once_with(DUMMY_TOPIC, DUMMY_PAYLOAD, DUMMY_QOS, False)
def test_iot_mqtt_client_subscribe(self):
message_callback = MagicMock()
self.iot_mqtt_client.subscribe(DUMMY_TOPIC, DUMMY_QOS, message_callback)
self.mqtt_core_mock.subscribe.assert_called_once_with(DUMMY_TOPIC, DUMMY_QOS, message_callback)
def test_iot_mqtt_client_unsubscribe(self):
self.iot_mqtt_client.unsubscribe(DUMMY_TOPIC)
self.mqtt_core_mock.unsubscribe.assert_called_once_with(DUMMY_TOPIC)
def test_iot_mqtt_client_connect_async(self):
connack_callback = MagicMock()
self.iot_mqtt_client.connectAsync(ackCallback=connack_callback)
self.mqtt_core_mock.connect_async.assert_called_once_with(DEFAULT_KEEPALIVE_SEC, connack_callback)
def test_iot_mqtt_client_disconnect_async(self):
disconnect_callback = MagicMock()
self.iot_mqtt_client.disconnectAsync(ackCallback=disconnect_callback)
self.mqtt_core_mock.disconnect_async.assert_called_once_with(disconnect_callback)
def test_iot_mqtt_client_publish_async(self):
puback_callback = MagicMock()
self.iot_mqtt_client.publishAsync(DUMMY_TOPIC, DUMMY_PAYLOAD, DUMMY_QOS, puback_callback)
self.mqtt_core_mock.publish_async.assert_called_once_with(DUMMY_TOPIC, DUMMY_PAYLOAD, DUMMY_QOS,
False, puback_callback)
def test_iot_mqtt_client_subscribe_async(self):
suback_callback = MagicMock()
message_callback = MagicMock()
self.iot_mqtt_client.subscribeAsync(DUMMY_TOPIC, DUMMY_QOS, suback_callback, message_callback)
self.mqtt_core_mock.subscribe_async.assert_called_once_with(DUMMY_TOPIC, DUMMY_QOS,
suback_callback, message_callback)
def test_iot_mqtt_client_unsubscribe_async(self):
unsuback_callback = MagicMock()
self.iot_mqtt_client.unsubscribeAsync(DUMMY_TOPIC, unsuback_callback)
self.mqtt_core_mock.unsubscribe_async.assert_called_once_with(DUMMY_TOPIC, unsuback_callback)