-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
213 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from packaging.version import Version | ||
from packaging.version import parse as parse_version | ||
|
||
from pytest_mqtt import __version__ | ||
|
||
|
||
def test_app_version(): | ||
assert isinstance(parse_version(__version__), Version) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
from pytest_mqtt.capmqtt import MqttClientAdapter | ||
from pytest_mqtt.util import delay | ||
|
||
|
||
def test_mqtt_client_adapter(mosquitto): | ||
mqtt_client = MqttClientAdapter() | ||
mqtt_client.start() | ||
|
||
# Submit MQTT message. | ||
message_info = mqtt_client.publish("foo", "bar") | ||
message_info.wait_for_publish(timeout=0.5) | ||
assert message_info.is_published() is True | ||
|
||
delay() # Only needed to let the coverage tracker fulfil its job. | ||
mqtt_client.stop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import pytest | ||
|
||
from pytest_mqtt.model import MqttMessage | ||
|
||
|
||
def test_basic_submit_text_receive_binary(mosquitto, capmqtt): | ||
""" | ||
Basic submit/receive roundtrip, with ASCII text payload (`str`). | ||
Without further ado, the payloads will be received as `bytes`. | ||
""" | ||
# Submit two MQTT messages. | ||
capmqtt.publish(topic="foo", payload="bar") | ||
capmqtt.publish(topic="baz", payload="qux") | ||
|
||
# Demonstrate the `messages` property. | ||
assert capmqtt.messages == [ | ||
MqttMessage(topic="foo", payload=b"bar", userdata=None), | ||
MqttMessage(topic="baz", payload=b"qux", userdata=None), | ||
] | ||
|
||
# Demonstrate the `records` property. | ||
assert capmqtt.records == [ | ||
("foo", b"bar", None), | ||
("baz", b"qux", None), | ||
] | ||
|
||
|
||
def test_basic_submit_and_receive_binary(mosquitto, capmqtt): | ||
""" | ||
Basic submit/receive roundtrip, with binary payload (`bytes`). | ||
""" | ||
|
||
# Submit two MQTT messages. | ||
capmqtt.publish(topic="foo", payload=b"bar") | ||
capmqtt.publish(topic="baz", payload=b"qux") | ||
|
||
# Demonstrate the `messages` property. | ||
assert capmqtt.messages == [ | ||
MqttMessage(topic="foo", payload=b"bar", userdata=None), | ||
MqttMessage(topic="baz", payload=b"qux", userdata=None), | ||
] | ||
|
||
# Demonstrate the `records` property. | ||
assert capmqtt.records == [ | ||
("foo", b"bar", None), | ||
("baz", b"qux", None), | ||
] | ||
|
||
|
||
@pytest.mark.capmqtt_decode_utf8 | ||
def test_basic_submit_text_receive_text_marker(mosquitto, capmqtt): | ||
""" | ||
Basic submit/receive roundtrip, with ASCII text payload (`str`). | ||
By using the `capmqtt_decode_utf8` marker, the payloads will also be received | ||
as `str`, after decoding them from `utf-8`. | ||
""" | ||
|
||
# Submit two MQTT messages. | ||
capmqtt.publish(topic="foo", payload="bar") | ||
capmqtt.publish(topic="baz", payload="qux") | ||
|
||
# Demonstrate the `messages` property. | ||
assert capmqtt.messages == [ | ||
MqttMessage(topic="foo", payload="bar", userdata=None), | ||
MqttMessage(topic="baz", payload="qux", userdata=None), | ||
] | ||
|
||
# Demonstrate the `records` property. | ||
assert capmqtt.records == [ | ||
("foo", "bar", None), | ||
("baz", "qux", None), | ||
] | ||
|
||
|
||
@pytest.fixture | ||
def configure_capmqtt_decode_utf8(pytestconfig): | ||
pytestconfig.option.capmqtt_decode_utf8 = True | ||
|
||
|
||
def test_basic_submit_text_receive_text_config(configure_capmqtt_decode_utf8, mosquitto, capmqtt): | ||
""" | ||
Basic submit/receive roundtrip, with ASCII text payload (`str`). | ||
By using the global `capmqtt_decode_utf8` config option, the payloads | ||
will also be received as `str`, after decoding them from `utf-8`. | ||
""" | ||
|
||
# Submit two MQTT messages. | ||
capmqtt.publish(topic="foo", payload="bar") | ||
capmqtt.publish(topic="baz", payload="qux") | ||
|
||
# Demonstrate the `messages` property. | ||
assert capmqtt.messages == [ | ||
MqttMessage(topic="foo", payload="bar", userdata=None), | ||
MqttMessage(topic="baz", payload="qux", userdata=None), | ||
] | ||
|
||
# Demonstrate the `records` property. | ||
assert capmqtt.records == [ | ||
("foo", "bar", None), | ||
("baz", "qux", None), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
from pytest_mqtt.model import MqttMessage | ||
|
||
# Configure `capmqtt` to return `MqttMessage.payload` as `str`, decoded from `utf-8`. | ||
capmqtt_decode_utf8 = True | ||
|
||
|
||
def test_basic_submit_text_receive_text(mosquitto, capmqtt): | ||
""" | ||
Basic submit/receive roundtrip, with ASCII text payload (`str`). | ||
By using the module-wide `capmqtt_decode_utf8` setting, the payloads | ||
will also be received as `str`, after decoding them from `utf-8`. | ||
""" | ||
|
||
# Submit MQTT message. | ||
capmqtt.publish(topic="foo", payload="bar") | ||
|
||
# Demonstrate `messages` property. | ||
assert capmqtt.messages == [ | ||
MqttMessage(topic="foo", payload="bar", userdata=None), | ||
] | ||
|
||
# Demonstrate `records` property. | ||
assert capmqtt.records == [ | ||
("foo", "bar", None), | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
import pytest | ||
|
||
from pytest_mqtt.util import delay | ||
from testing.util import DummyServer | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
@pytest.mark.early | ||
def no_mqtt_broker(request): | ||
server = DummyServer("localhost", 1883) | ||
server.start() | ||
delay() | ||
request.addfinalizer(server.shutdown) | ||
return server | ||
|
||
|
||
@pytest.fixture(scope="session") | ||
@pytest.mark.late | ||
def mosquitto_mqtt_broker(mosquitto): | ||
return mosquitto | ||
|
||
|
||
@pytest.mark.skip(reason="Unable to run together with other test cases") | ||
@pytest.mark.run(order=1) | ||
def test_mosquitto_running(no_mqtt_broker, mosquitto_mqtt_broker): | ||
assert mosquitto_mqtt_broker == ("localhost", 1883) | ||
# no_mqtt_broker.shutdown() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from pytest_mqtt.util import probe_tcp_connect | ||
|
||
|
||
def test_probe_tcp_connect_available(): | ||
assert probe_tcp_connect("9.9.9.9", 443) is True | ||
|
||
|
||
def test_probe_tcp_connect_unavailable(): | ||
assert probe_tcp_connect("9.9.9.9", 444) is False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
import socketserver | ||
import threading | ||
|
||
|
||
class DummyServer(threading.Thread): | ||
class TcpServer(socketserver.TCPServer): | ||
allow_reuse_address = True | ||
|
||
class TCPHandler(socketserver.BaseRequestHandler): | ||
def handle(self): | ||
pass | ||
|
||
def __init__(self, host, port): | ||
super().__init__() | ||
self.host = host | ||
self.port = port | ||
self.server = None | ||
|
||
def run(self): | ||
self.server = self.TcpServer((self.host, self.port), self.TCPHandler) | ||
self.server.serve_forever(poll_interval=0.01) | ||
|
||
def shutdown(self): | ||
if self.server is not None: | ||
# scdsc | ||
# threading.Thread(target=self.server.shutdown).start() | ||
self.server.shutdown() | ||
self.join() |