forked from eclipse-paho/paho.mqtt.embedded-c
-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathPubSub.ino
181 lines (166 loc) · 5.43 KB
/
PubSub.ino
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
/*
*******************************************************************************
*
* Purpose: Example of using the Arduino MqttClient
* Project URL: https://github.com/monstrenyatko/ArduinoMqtt
*
*******************************************************************************
* Copyright Oleg Kovalenko 2016.
*
* Distributed under the MIT License.
* (See accompanying file LICENSE or copy at http://opensource.org/licenses/MIT)
*******************************************************************************
*/
#include <Arduino.h>
#include <SoftwareSerial.h>
// Enable MqttClient logs
#define MQTT_LOG_ENABLED 1
// Include library
#include <MqttClient.h>
#define LOG_PRINTFLN(fmt, ...) printfln_P(PSTR(fmt), ##__VA_ARGS__)
#define LOG_SIZE_MAX 128
void printfln_P(const char *fmt, ...) {
char buf[LOG_SIZE_MAX];
va_list ap;
va_start(ap, fmt);
vsnprintf_P(buf, LOG_SIZE_MAX, fmt, ap);
va_end(ap);
Serial.println(buf);
}
#define HW_UART_SPEED 57600L
#define MQTT_ID "TEST-ID"
const char* MQTT_TOPIC_SUB = "test/" MQTT_ID "/sub";
const char* MQTT_TOPIC_PUB = "test/" MQTT_ID "/pub";
MqttClient *mqtt = NULL;
// ============== Object to supply system functions ================================
class System: public MqttClient::System {
public:
unsigned long millis() const {
return ::millis();
}
};
// ============== Object to implement network connectivity =====================
// Current example assumes the network TCP stack is connected using serial
// interface to pins 10(RX) and 11(TX). The SoftwareSerial library is used
// for actual communication.
#define SW_UART_PIN_RX 10
#define SW_UART_PIN_TX 11
#define SW_UART_SPEED 9600L
class Network {
public:
Network() {
mNet = new SoftwareSerial(SW_UART_PIN_RX, SW_UART_PIN_TX);
mNet->begin(SW_UART_SPEED);
}
int connect(const char* hostname, int port) {
// TCP connection is already established otherwise do it here
return 0;
}
int read(unsigned char* buffer, int len, unsigned long timeoutMs) {
mNet->setTimeout(timeoutMs);
return mNet->readBytes((char*) buffer, len);
}
int write(unsigned char* buffer, int len, unsigned long timeoutMs) {
mNet->setTimeout(timeoutMs);
for (int i = 0; i < len; ++i) {
mNet->write(buffer[i]);
}
mNet->flush();
return len;
}
int disconnect() {
// Implement TCP network disconnect here
return 0;
}
private:
SoftwareSerial *mNet;
} *network = NULL;
// ============== Subscription callback ========================================
void processMessage(MqttClient::MessageData& md) {
const MqttClient::Message& msg = md.message;
char payload[msg.payloadLen + 1];
memcpy(payload, msg.payload, msg.payloadLen);
payload[msg.payloadLen] = '\0';
LOG_PRINTFLN(
"Message arrived: qos %d, retained %d, dup %d, packetid %d, payload:[%s]",
msg.qos, msg.retained, msg.dup, msg.id, payload
);
}
// ============== Setup all objects ============================================
void setup() {
// Setup hardware serial for logging
Serial.begin(HW_UART_SPEED);
while (!Serial);
// Setup network
network = new Network;
// Setup MqttClient
MqttClient::System *mqttSystem = new System;
MqttClient::Logger *mqttLogger = new MqttClient::LoggerImpl<HardwareSerial>(Serial);
MqttClient::Network * mqttNetwork = new MqttClient::NetworkImpl<Network>(*network, *mqttSystem);
//// Make 128 bytes send buffer
MqttClient::Buffer *mqttSendBuffer = new MqttClient::ArrayBuffer<128>();
//// Make 128 bytes receive buffer
MqttClient::Buffer *mqttRecvBuffer = new MqttClient::ArrayBuffer<128>();
//// Allow up to 2 subscriptions simultaneously
MqttClient::MessageHandlers *mqttMessageHandlers = new MqttClient::MessageHandlersImpl<2>();
//// Configure client options
MqttClient::Options mqttOptions;
////// Set command timeout to 10 seconds
mqttOptions.commandTimeoutMs = 10000;
//// Make client object
mqtt = new MqttClient (
mqttOptions, *mqttLogger, *mqttSystem, *mqttNetwork, *mqttSendBuffer,
*mqttRecvBuffer, *mqttMessageHandlers
);
// Setup message handlers
mqttMessageHandlers->set(MQTT_TOPIC_SUB, processMessage);
}
// ============== Main loop ====================================================
void loop() {
// Check connection status
if (!mqtt->isConnected()) {
// Re-establish TCP connection with MQTT broker
network->disconnect();
network->connect("mymqttserver.com", 1883);
// Start new MQTT connection
LOG_PRINTFLN("Connecting");
MqttClient::ConnectResult connectResult;
// Connect
{
MQTTPacket_connectData options = MQTTPacket_connectData_initializer;
options.MQTTVersion = 4;
options.clientID.cstring = (char*)MQTT_ID;
options.cleansession = true;
options.keepAliveInterval = 15; // 15 seconds
MqttClient::Error::type rc = mqtt->connect(options, connectResult);
if (rc != MqttClient::Error::SUCCESS) {
LOG_PRINTFLN("Connection error: %i", rc);
return;
}
}
// Subscribe
{
MqttClient::Error::type rc = mqtt->subscribe(MQTT_TOPIC_SUB, MqttClient::QOS0);
if (rc != MqttClient::Error::SUCCESS) {
LOG_PRINTFLN("Subscribe error: %i", rc);
LOG_PRINTFLN("Drop connection");
mqtt->disconnect();
return;
}
}
} else {
// Publish
{
const char* buf = "Hello";
MqttClient::Message message;
message.qos = MqttClient::QOS0;
message.retained = false;
message.dup = false;
message.payload = (void*) buf;
message.payloadLen = strlen(buf);
mqtt->publish(MQTT_TOPIC_PUB, message);
}
// Idle for 30 seconds
mqtt->yield(30000L);
}
}