|
| 1 | +// Copyright (c) Microsoft Corporation. All rights reserved. |
| 2 | +// Licensed under the MIT License. |
| 3 | + |
| 4 | +#include "AdHocConnectionImpl.hpp" |
| 5 | +#include "Utils.hpp" |
| 6 | +#include "ThreadUtils.hpp" |
| 7 | +#include "../serial_com/Port.h" |
| 8 | +#include "../serial_com/SerialPort.hpp" |
| 9 | +#include "../serial_com/UdpClientPort.hpp" |
| 10 | +#include "../serial_com/TcpClientPort.hpp" |
| 11 | + |
| 12 | +using namespace mavlink_utils; |
| 13 | +using namespace mavlinkcom_impl; |
| 14 | + |
| 15 | +AdHocConnectionImpl::AdHocConnectionImpl() |
| 16 | +{ |
| 17 | + closed = true; |
| 18 | + ::memset(&mavlink_intermediate_status_, 0, sizeof(mavlink_status_t)); |
| 19 | + ::memset(&mavlink_status_, 0, sizeof(mavlink_status_t)); |
| 20 | +} |
| 21 | +std::string AdHocConnectionImpl::getName() { |
| 22 | + return name; |
| 23 | +} |
| 24 | + |
| 25 | +AdHocConnectionImpl::~AdHocConnectionImpl() |
| 26 | +{ |
| 27 | + con_.reset(); |
| 28 | + close(); |
| 29 | +} |
| 30 | + |
| 31 | +std::shared_ptr<AdHocConnection> AdHocConnectionImpl::createConnection(const std::string& nodeName, std::shared_ptr<Port> port) |
| 32 | +{ |
| 33 | + // std::shared_ptr<MavLinkCom> owner, const std::string& nodeName |
| 34 | + std::shared_ptr<AdHocConnection> con = std::make_shared<AdHocConnection>(); |
| 35 | + con->startListening(nodeName, port); |
| 36 | + return con; |
| 37 | +} |
| 38 | + |
| 39 | +std::shared_ptr<AdHocConnection> AdHocConnectionImpl::connectLocalUdp(const std::string& nodeName, std::string localAddr, int localPort) |
| 40 | +{ |
| 41 | + std::shared_ptr<UdpClientPort> socket = std::make_shared<UdpClientPort>(); |
| 42 | + |
| 43 | + socket->connect(localAddr, localPort, "", 0); |
| 44 | + |
| 45 | + return createConnection(nodeName, socket); |
| 46 | +} |
| 47 | + |
| 48 | +std::shared_ptr<AdHocConnection> AdHocConnectionImpl::connectRemoteUdp(const std::string& nodeName, std::string localAddr, std::string remoteAddr, int remotePort) |
| 49 | +{ |
| 50 | + std::string local = localAddr; |
| 51 | + // just a little sanity check on the local address, if remoteAddr is localhost then localAddr must be also. |
| 52 | + if (remoteAddr == "127.0.0.1") { |
| 53 | + local = "127.0.0.1"; |
| 54 | + } |
| 55 | + |
| 56 | + std::shared_ptr<UdpClientPort> socket = std::make_shared<UdpClientPort>(); |
| 57 | + |
| 58 | + socket->connect(local, 0, remoteAddr, remotePort); |
| 59 | + |
| 60 | + return createConnection(nodeName, socket); |
| 61 | +} |
| 62 | + |
| 63 | +std::shared_ptr<AdHocConnection> AdHocConnectionImpl::connectTcp(const std::string& nodeName, std::string localAddr, const std::string& remoteIpAddr, int remotePort) |
| 64 | +{ |
| 65 | + std::string local = localAddr; |
| 66 | + // just a little sanity check on the local address, if remoteAddr is localhost then localAddr must be also. |
| 67 | + if (remoteIpAddr == "127.0.0.1") { |
| 68 | + local = "127.0.0.1"; |
| 69 | + } |
| 70 | + |
| 71 | + std::shared_ptr<TcpClientPort> socket = std::make_shared<TcpClientPort>(); |
| 72 | + |
| 73 | + socket->connect(local, 0, remoteIpAddr, remotePort); |
| 74 | + |
| 75 | + return createConnection(nodeName, socket); |
| 76 | +} |
| 77 | + |
| 78 | +std::shared_ptr<AdHocConnection> AdHocConnectionImpl::connectSerial(const std::string& nodeName, std::string name, int baudRate, const std::string initString) |
| 79 | +{ |
| 80 | + std::shared_ptr<SerialPort> serial = std::make_shared<SerialPort>(); |
| 81 | + |
| 82 | + int hr = serial->connect(name.c_str(), baudRate); |
| 83 | + if (hr != 0) |
| 84 | + throw std::runtime_error(Utils::stringf("Could not open the serial port %s, error=%d", name.c_str(), hr)); |
| 85 | + |
| 86 | + // send this right away just in case serial link is not already configured |
| 87 | + if (initString.size() > 0) { |
| 88 | + serial->write(reinterpret_cast<const uint8_t*>(initString.c_str()), static_cast<int>(initString.size())); |
| 89 | + } |
| 90 | + |
| 91 | + return createConnection(nodeName, serial); |
| 92 | +} |
| 93 | + |
| 94 | +void AdHocConnectionImpl::startListening(std::shared_ptr<AdHocConnection> parent, const std::string& nodeName, std::shared_ptr<Port> connectedPort) |
| 95 | +{ |
| 96 | + name = nodeName; |
| 97 | + con_ = parent; |
| 98 | + close(); |
| 99 | + closed = false; |
| 100 | + port = connectedPort; |
| 101 | + |
| 102 | + Utils::cleanupThread(read_thread); |
| 103 | + read_thread = std::thread{ &AdHocConnectionImpl::readPackets, this }; |
| 104 | + Utils::cleanupThread(publish_thread_); |
| 105 | + publish_thread_ = std::thread{ &AdHocConnectionImpl::publishPackets, this }; |
| 106 | +} |
| 107 | + |
| 108 | +void AdHocConnectionImpl::close() |
| 109 | +{ |
| 110 | + closed = true; |
| 111 | + if (port != nullptr) { |
| 112 | + port->close(); |
| 113 | + port = nullptr; |
| 114 | + } |
| 115 | + |
| 116 | + if (read_thread.joinable()) { |
| 117 | + read_thread.join(); |
| 118 | + } |
| 119 | + if (publish_thread_.joinable()) { |
| 120 | + msg_available_.post(); |
| 121 | + publish_thread_.join(); |
| 122 | + } |
| 123 | +} |
| 124 | + |
| 125 | +bool AdHocConnectionImpl::isOpen() |
| 126 | +{ |
| 127 | + return !closed; |
| 128 | +} |
| 129 | + |
| 130 | +int AdHocConnectionImpl::getTargetComponentId() |
| 131 | +{ |
| 132 | + return this->other_component_id; |
| 133 | +} |
| 134 | +int AdHocConnectionImpl::getTargetSystemId() |
| 135 | +{ |
| 136 | + return this->other_system_id; |
| 137 | +} |
| 138 | + |
| 139 | +void AdHocConnectionImpl::sendMessage(const std::vector<uint8_t>& msg) |
| 140 | +{ |
| 141 | + if (closed) { |
| 142 | + return; |
| 143 | + } |
| 144 | + |
| 145 | + try { |
| 146 | + port->write(msg.data(), static_cast<int>(msg.size())); |
| 147 | + } |
| 148 | + catch (std::exception& e) { |
| 149 | + throw std::runtime_error(Utils::stringf("AdHocConnectionImpl: Error sending message on connection '%s', details: %s", name.c_str(), e.what())); |
| 150 | + } |
| 151 | +} |
| 152 | + |
| 153 | + |
| 154 | +int AdHocConnectionImpl::subscribe(AdHocMessageHandler handler) |
| 155 | +{ |
| 156 | + MessageHandlerEntry entry = { static_cast<int>(listeners.size() + 1), handler = handler }; |
| 157 | + std::lock_guard<std::mutex> guard(listener_mutex); |
| 158 | + listeners.push_back(entry); |
| 159 | + snapshot_stale = true; |
| 160 | + return entry.id; |
| 161 | +} |
| 162 | +void AdHocConnectionImpl::unsubscribe(int id) |
| 163 | +{ |
| 164 | + std::lock_guard<std::mutex> guard(listener_mutex); |
| 165 | + for (auto ptr = listeners.begin(), end = listeners.end(); ptr != end; ptr++) |
| 166 | + { |
| 167 | + if ((*ptr).id == id) |
| 168 | + { |
| 169 | + listeners.erase(ptr); |
| 170 | + snapshot_stale = true; |
| 171 | + break; |
| 172 | + } |
| 173 | + } |
| 174 | +} |
| 175 | + |
| 176 | +void AdHocConnectionImpl::readPackets() |
| 177 | +{ |
| 178 | + //CurrentThread::setMaximumPriority(); |
| 179 | + std::shared_ptr<Port> safePort = this->port; |
| 180 | + const int MAXBUFFER = 512; |
| 181 | + uint8_t* buffer = new uint8_t[MAXBUFFER]; |
| 182 | + int channel = 0; |
| 183 | + int hr = 0; |
| 184 | + while (hr == 0 && con_ != nullptr && !closed) |
| 185 | + { |
| 186 | + int read = 0; |
| 187 | + if (safePort->isClosed()) |
| 188 | + { |
| 189 | + // hmmm, wait till it is opened? |
| 190 | + std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| 191 | + continue; |
| 192 | + } |
| 193 | + |
| 194 | + int count = safePort->read(buffer, MAXBUFFER); |
| 195 | + if (count <= 0) { |
| 196 | + // error? well let's try again, but we should be careful not to spin too fast and kill the CPU |
| 197 | + std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| 198 | + continue; |
| 199 | + } |
| 200 | + |
| 201 | + if (count >= MAXBUFFER) { |
| 202 | + |
| 203 | + std::cerr << "GAH KM911 message size (" << std::to_string(count) << ") is bigger than max buffer size! Time to support frame breaks, Moffitt" << std::endl; |
| 204 | + |
| 205 | + // error? well let's try again, but we should be careful not to spin too fast and kill the CPU |
| 206 | + std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
| 207 | + continue; |
| 208 | + } |
| 209 | + |
| 210 | + // queue event for publishing. |
| 211 | + { |
| 212 | + std::lock_guard<std::mutex> guard(msg_queue_mutex_); |
| 213 | + std::vector<uint8_t> message(count); |
| 214 | + memcpy(message.data(), buffer, count); |
| 215 | + msg_queue_.push(message); |
| 216 | + } |
| 217 | + |
| 218 | + if (waiting_for_msg_) { |
| 219 | + msg_available_.post(); |
| 220 | + } |
| 221 | + |
| 222 | + } //while |
| 223 | + |
| 224 | + delete[] buffer; |
| 225 | + |
| 226 | +} //readPackets |
| 227 | + |
| 228 | +void AdHocConnectionImpl::drainQueue() |
| 229 | +{ |
| 230 | + std::vector<uint8_t> message; |
| 231 | + bool hasMsg = true; |
| 232 | + while (hasMsg) { |
| 233 | + hasMsg = false; |
| 234 | + { |
| 235 | + std::lock_guard<std::mutex> guard(msg_queue_mutex_); |
| 236 | + if (!msg_queue_.empty()) { |
| 237 | + message = msg_queue_.front(); |
| 238 | + msg_queue_.pop(); |
| 239 | + hasMsg = true; |
| 240 | + } |
| 241 | + } |
| 242 | + if (!hasMsg) |
| 243 | + { |
| 244 | + return; |
| 245 | + } |
| 246 | + // publish the message from this thread, this is safer than publishing from the readPackets thread |
| 247 | + // as it ensures we don't lose messages if the listener is slow. |
| 248 | + if (snapshot_stale) { |
| 249 | + // this is tricky, the clear has to be done outside the lock because it is destructing the handlers |
| 250 | + // and the handler might try and call unsubscribe, which needs to be able to grab the lock, otherwise |
| 251 | + // we would get a deadlock. |
| 252 | + snapshot.clear(); |
| 253 | + |
| 254 | + std::lock_guard<std::mutex> guard(listener_mutex); |
| 255 | + snapshot = listeners; |
| 256 | + snapshot_stale = false; |
| 257 | + } |
| 258 | + auto end = snapshot.end(); |
| 259 | + |
| 260 | + auto startTime = std::chrono::system_clock::now(); |
| 261 | + std::shared_ptr<AdHocConnection> sharedPtr = std::shared_ptr<AdHocConnection>(this->con_); |
| 262 | + for (auto ptr = snapshot.begin(); ptr != end; ptr++) |
| 263 | + { |
| 264 | + try { |
| 265 | + (*ptr).handler(sharedPtr, message); |
| 266 | + } |
| 267 | + catch (std::exception& e) { |
| 268 | + Utils::log(Utils::stringf("AdHocConnectionImpl: Error handling message on connection '%s', details: %s", |
| 269 | + name.c_str(), e.what()), Utils::kLogLevelError); |
| 270 | + } |
| 271 | + } |
| 272 | + } |
| 273 | +} |
| 274 | + |
| 275 | +void AdHocConnectionImpl::publishPackets() |
| 276 | +{ |
| 277 | + //CurrentThread::setMaximumPriority(); |
| 278 | + while (!closed) { |
| 279 | + |
| 280 | + drainQueue(); |
| 281 | + |
| 282 | + waiting_for_msg_ = true; |
| 283 | + msg_available_.wait(); |
| 284 | + waiting_for_msg_ = false; |
| 285 | + } |
| 286 | +} |
| 287 | + |
| 288 | + |
0 commit comments