forked from wklenk/lmic-rpi-lora-gps-hat
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHelperClasses.hpp
94 lines (80 loc) · 2.85 KB
/
HelperClasses.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#ifndef MQTTDataStreamer_HELPERCLASSES_H
#define MQTTDataStreamer_HELPERCLASSES_H
#include <atomic>
#include <string>
#include <iostream>
#include "mqtt/async_client.h"
class TopicsToHandle {
public:
std::atomic<bool> message_received = false;
std::string name;
uint8_t QoS;
TopicsToHandle(const std::string& name_,
uint8_t QoS_) : name(name_), QoS(QoS_) {}
virtual void processMessage(mqtt::const_message_ptr msg_) = 0;
};
/**
* A base action listener.
*/
class ActionListener : public virtual mqtt::iaction_listener {
std::string name;
std::atomic<bool> done;
void on_failure(const mqtt::token& tok) override {
auto topics = tok.get_topics();
if(topics && !topics->empty())
std::cout << "\t" << name << " failure for " <<
(*topics)[0] << '\n';
done = true;
}
void on_success(const mqtt::token& tok) override {
auto topics = tok.get_topics();
if(topics && !topics->empty())
std::cout << "\t" << name << " success for " <<
(*topics)[0] << '\n';
done = true;
}
public:
ActionListener(const std::string& name_) :
name(name_), done(false) {}
bool isDone() const { return done; };
};
class MqttCallback : public virtual mqtt::callback {
std::shared_ptr<mqtt::async_client> mqtt_async_client;
std::vector<std::shared_ptr<TopicsToHandle>> topics_to_handle;
ActionListener listener{"subscribe"};
void connected(const std::string& cause) override {
std::cout << "\tConnected!\n";
for(const auto& topic : topics_to_handle) {
std::cout << "\t\tSubscribing to '" <<
topic->name << "' using QoS '" << topic->QoS << "'\n";
mqtt_async_client->subscribe(topic->name,
topic->QoS, nullptr, listener);
}
std::cout << "\tSubscription complete!\n";
}
void connection_lost(const std::string& cause) override {
std::cout << "\tConnection lost ... ";
if (!cause.empty())
std::cout << cause << "\n";
else
std::cout << "no cause found!\n";
}
void message_arrived(mqtt::const_message_ptr msg) override {
std::cout << "\tMessage arrived on " <<
msg->get_topic() << "\n";
for(const auto& topic : topics_to_handle) {
if(topic->name == msg->get_topic())
topic->processMessage(msg);
}
}
void delivery_complete(mqtt::delivery_token_ptr tok) override {
auto message = tok->get_message();
std::cout << "\tDelivery on " << message->get_topic() << " complete!\n";
}
public:
MqttCallback(std::shared_ptr<mqtt::async_client> mqtt_async_client_,
const std::vector<std::shared_ptr<TopicsToHandle>>& topics_to_handle_) :
mqtt_async_client(mqtt_async_client_),
topics_to_handle(topics_to_handle_) {}
};
#endif