Skip to content

Commit

Permalink
Merge branch 'lepton' into next
Browse files Browse the repository at this point in the history
  • Loading branch information
Blackcatn13 committed Jun 13, 2018
2 parents 4e1a439 + 4fdf06d commit 9f3ce70
Show file tree
Hide file tree
Showing 28 changed files with 1,831 additions and 52 deletions.
4 changes: 4 additions & 0 deletions BundleAgent/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
include(../aDTNPlusConfig.cmake.in)

if(LEPTON)
add_definitions(-DLEPTON)
endif(LEPTON)

add_subdirectory(Bundle)
add_subdirectory(Utils)
add_subdirectory(ExternTools)
Expand Down
25 changes: 20 additions & 5 deletions BundleAgent/Node/BundleProcessor/BundleProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "Bundle/PayloadBlock.h"
#include "Utils/globals.h"
#include "Utils/Logger.h"
#include "Utils/PerfLogger.h"
#include "Utils/TimestampManager.h"
#include "Utils/Functions.h"
#include "Utils/Socket.h"
Expand Down Expand Up @@ -206,7 +207,8 @@ void BundleProcessor::receiveMessage(Socket sock) {
std::unique_ptr<BundleContainer> bc = createBundleContainer(
std::move(b));
// Save the bundleContainer to disk
LOG(42) << "Saving bundle " << bc->getBundle().getId() << " to disk";
std::string bundleId = bc->getBundle().getId();
LOG(42) << "Saving bundle " << bundleId << " to disk";
m_bundleQueue->saveBundleToDisk(m_config.getDataPath(), *bc);
// Execute process control
processControl(*bc);
Expand All @@ -221,15 +223,15 @@ void BundleProcessor::receiveMessage(Socket sock) {

} catch (const DroppedBundleQueueException &e) {
std::stringstream ss;
ss << m_config.getDataPath() << bc->getBundle().getId()
<< ".bundle";
ss << m_config.getDataPath() << bundleId << ".bundle";
int success = std::remove(ss.str().c_str());
if (success != 0) {
LOG(3) << "Cannot delete bundle " << ss.str();
}
LOG(40) << e.what();
drop();
ack = static_cast<uint8_t>(BundleACK::QUEUE_FULL);
PERF(PerfMessages::MESSAGE_DROPPED) << bundleId;
} catch (const InBundleQueueException &e) {
LOG(40) << e.what();
ack = static_cast<uint8_t>(BundleACK::ALREADY_IN_QUEUE);
Expand All @@ -239,6 +241,13 @@ void BundleProcessor::receiveMessage(Socket sock) {
if (!(sock << ack)) {
LOG(3) << "Cannot write to socket, reason: " << sock.getLastError();
}
if (srcNodeId == "_ADTN_LIB_") {
PERF(PerfMessages::MESSAGE_CREATED) << bundleId;
} else if (ack == static_cast<uint8_t>(BundleACK::CORRECT_RECEIVED)) {
PERF(PerfMessages::MESSAGE_RELAYED_FROM) << bundleId << " "
<< sock.getPeerName()
<< " " << bundleLength;
}
sock.close();
} catch (const BundleCreationException &e) {
LOG(3) << "Error constructing received bundle, reason: " << e.what();
Expand Down Expand Up @@ -275,6 +284,9 @@ void BundleProcessor::delivery(BundleContainer &bundleContainer,
bundleContainer, true);
}
}
PERF(MESSAGE_RECEIVED)
<< bundleContainer.getBundle().getId() << " " << payloadSize << " "
<< BundleInfo(bundleContainer.getBundle()).getCreationTimestamp();
} catch (const TableException &e) {
LOG(3) << "Error getting appId, reason: " << e.what();
LOG(11) << "Saving not delivered bundle to disk.";
Expand All @@ -289,11 +301,12 @@ void BundleProcessor::forward(Bundle bundle, std::vector<std::string> nextHop) {
std::string bundleRaw = bundle.toRaw();
// Bundle length, this will limit the max length of a bundle to 2^32 ~ 4GB
uint32_t bundleLength = bundleRaw.length();
std::string bundleId = bundle.getId();
if (bundleLength <= 0) {
LOG(3) << "The bundle to forward has a length of 0, aborting forward.";
} else {
auto forwardFunction =
[this, bundleRaw, bundleLength](std::string &nh) {
[this, bundleRaw, bundleLength, bundleId](std::string &nh) {
LOG(45) << "Forwarding bundle to " << nh;
LOG(50) << "Bundle to forward " << bundleRaw;
std::shared_ptr<Neighbour> nb = m_neighbourTable->getValue(nh);
Expand Down Expand Up @@ -367,11 +380,12 @@ void BundleProcessor::forward(Bundle bundle, std::vector<std::string> nextHop) {
static_cast<uint8_t>(NetworkError::SOCKET_RECEIVE_ERROR));
} else {
LOG(46) << "Received bundle ACK: " << static_cast<unsigned int>(ack);
if (ack == static_cast<uint8_t>(BundleACK::CORRECT_RECEIVED)) {
if (ack == static_cast<uint8_t>(BundleACK::CORRECT_RECEIVED) || ack == static_cast<uint8_t>(BundleACK::QUEUE_FULL)) {
LOG(11) << "A bundle of length " << bundleLength
<< " has been sent to " << nb->getNodeAddress()
<< ":" << nb->getNodePort() << " from "
<< s.getPeerName();
PERF(MESSAGE_RELAYED) << bundleId << " " << s.getPeerName() << " " << bundleLength;
} else {
std::stringstream ss;
uint8_t error;
Expand Down Expand Up @@ -424,6 +438,7 @@ void BundleProcessor::discard(
LOG(3) << "Cannot delete bundle " << ss.str();
}
LOG(51) << "Deleting bundleContainer.";
PERF(PerfMessages::MESSAGE_REMOVED) << bundleContainer->getBundle().getId();
bundleContainer.reset();
m_bundleQueue->resetLast();
}
Expand Down
101 changes: 69 additions & 32 deletions BundleAgent/Node/Neighbour/NeighbourDiscovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,27 @@ NeighbourDiscovery::NeighbourDiscovery(
: m_config(config),
m_neighbourTable(neighbourTable),
m_listeningEndpointsTable(listeningEndpointsTable) {
#ifdef LEPTON
// If using the platform with LEPTON init the socket for the threads before.
s = Socket(false);
if (!s) {
LOG(1) << "Cannot create socket into sendBeacons thread, reason: "
<< s.getLastError();
g_stop = true;
}
if (!s.setReuseAddress()) {
LOG(1) << "Cannot set flag REUSEADDR to socket, reason: "
<< s.getLastError();
g_stop = true;
}
if (!s.bind(m_config.getDiscoveryAddress(), 0)) {
LOG(1) << "Cannot bind socket to " << m_config.getDiscoveryAddress()
<< ", reason: " << strerror(errno);
g_stop = true;
}
s.setRcvTimeOut(m_config.getSocketTimeout());
s.setDestination(m_config.getDiscoveryAddress(), m_config.getDiscoveryPort());
#endif
std::thread t = std::thread(&NeighbourDiscovery::cleanNeighbours, this);
t.detach();
t = std::thread(&NeighbourDiscovery::sendBeacons, this);
Expand All @@ -67,136 +88,152 @@ void NeighbourDiscovery::sendBeacons() {
std::string nodeId = m_config.getNodeId();
std::string nodeAddress = m_config.getNodeAddress();
uint16_t nodePort = m_config.getNodePort();
#ifndef LEPTON
// Generate this node address information.
LOG(64) << "Starting socket into " << nodeAddress << ":0";
// Create the socket.
Socket s = Socket(false);
if (!s) {
// Stop the application
LOG(1) << "Cannot create socket into sendBeacons thread, reason: "
<< s.getLastError();
<< s.getLastError();
g_stop = true;
} else {
// Bind the socket.
if (!s.bind(nodeAddress, 0)) {
// Stop the application
LOG(1) << "Cannot bind socket to " << nodeAddress << ", reason: "
<< strerror(errno);
<< s.getLastError();
g_stop = true;
} else {
// Generate the destination address.
s.setDestination(m_config.getDiscoveryAddress(),
m_config.getDiscoveryPort());
#endif
// Create the beacon with our information.
LOG(64) << "Sending beacons to " << m_config.getDiscoveryAddress() << ":"
<< m_config.getDiscoveryPort();
LOG(64) << "Sending beacons to " << m_config.getDiscoveryAddress()
<< ":" << m_config.getDiscoveryPort();
int sleepTime = m_config.getDiscoveryPeriod();
g_startedThread++;
while (!g_stop.load()) {
std::this_thread::sleep_for(std::chrono::seconds(sleepTime));
Beacon b = Beacon(nodeId, nodeAddress, nodePort,
m_listeningEndpointsTable->getValues());
LOG(21) << "Sending beacon from " << nodeId << " " << nodeAddress << ":"
<< nodePort;
<< nodePort;
std::string rawBeacon = b.getRaw();
if (!(s << rawBeacon)) {
LOG(4) << "Error sending beacon " << strerror(errno);
if (s.canSend(m_config.getSocketTimeout())) {
if (!(s << rawBeacon)) {
LOG(4) << "Error sending beacon " << s.getLastError();
}
}
}
s.close();
#ifndef LEPTON
}
#endif
LOG(14) << "Exit Beacon sender thread.";
g_stopped++;
#ifndef LEPTON
}
#endif
}

void NeighbourDiscovery::receiveBeacons() {
// Get node configuration
Logger::getInstance()->setThreadName(std::this_thread::get_id(),
"Beacon Receiver");
"Beacon Receiver");
LOG(15) << "Starting receiver beacon thread";
std::string nodeId = m_config.getNodeId();
std::string nodeAddress = m_config.getNodeAddress();
uint16_t discoveryPort = m_config.getDiscoveryPort();
std::string discoveryAddress = m_config.getDiscoveryAddress();
bool testMode = m_config.getNeighbourTestMode();
#ifndef LEPTON
// Generate this node address information.
LOG(65) << "Starting socket into " << discoveryAddress << ":"
<< discoveryPort;
<< discoveryPort;
// Create the socket.
Socket s = Socket(false);
if (!s) {
// Stop the application
LOG(1) << "Cannot create socket into receiveBeacons thread reason: "
<< s.getLastError();
<< s.getLastError();
g_stop = true;
} else {
if (!s.setReuseAddress()) {
// Stop the application
LOG(1) << "Cannot set flag REUSEADDR to socket, reason: "
<< s.getLastError();
<< s.getLastError();
g_stop = true;
} else {
// Bind the socket.
if (!s.bind(discoveryAddress, discoveryPort)) {
// Stop the application
LOG(1) << "Cannot bind the socket to " << discoveryAddress << ":"
<< discoveryPort << " reason: " << s.getLastError();
<< discoveryPort << " reason: " << s.getLastError();
g_stop = true;
} else {
// Join the multicast group.
if (!s.joinMulticastGroup(nodeAddress)) {
// Stop the application
LOG(1) << "Cannot join the multicast group, reason: "
<< s.getLastError();
<< s.getLastError();
g_stop = true;
} else {
// Add a timeout to the recv socket
s.setRcvTimeOut(m_config.getSocketTimeout());
#endif
g_startedThread++;
while (!g_stop.load()) {
uint32_t beaconLength = 65507;
std::string buffer;
StringWithSize sws = StringWithSize(buffer, beaconLength);
s >> sws;
// Create a thread to add the new neighbour and let this
// receiving more beacons
Beacon b = Beacon(buffer);
if (b.getNodeId() != nodeId || testMode) {
LOG(21) << "Received beacon from " << b.getNodeId() << " "
<< b.getNodeAddress() << ":" << b.getNodePort();
std::thread([b, this]() {
m_neighbourTable->update(std::make_shared<Neighbour>(
b.getNodeId(),
b.getNodeAddress(),
b.getNodePort(),
b.getEndpoints()));
}).detach();
if (s.canRead(m_config.getSocketTimeout())) {
s >> sws;
// Create a thread to add the new neighbour and let this
// receiving more beacons
Beacon b = Beacon(buffer);
if (b.getNodeId() != nodeId || testMode) {
LOG(21) << "Received beacon from " << b.getNodeId()
<< " " << b.getNodeAddress() << ":" << b.getNodePort();
std::thread([b, this]() {
m_neighbourTable->update(std::make_shared<Neighbour>(
b.getNodeId(),
b.getNodeAddress(),
b.getNodePort(),
b.getEndpoints()));
}).detach();
}
}
}

// Leave from the multicast group
if (!s.leaveMulticastGroup()) {
if (false /*!s.leaveMulticastGroup()*/) {
LOG(4) << "Cannot leave the multicast group, reason: "
<< s.getLastError();
<< s.getLastError();
}
#ifndef LEPTON
}
}
}
#endif
s.close();
#ifndef LEPTON
}
#endif
LOG(15) << "Exit Beacon receiver thread.";
g_stopped++;
}

void NeighbourDiscovery::cleanNeighbours() {
Logger::getInstance()->setThreadName(std::this_thread::get_id(),
"Neighbour cleaner");
"Neighbour cleaner");
int sleepTime = m_config.getNeighbourCleanerTime();
int expirationTime = m_config.getNeighbourExpirationTime();
LOG(16) << "Starting Cleaner thread cleaning every " << sleepTime
<< "s all the nodes with inactivity for a period of "
<< expirationTime << "s";
<< "s all the nodes with inactivity for a period of "
<< expirationTime << "s";
g_startedThread++;
while (!g_stop.load()) {
std::this_thread::sleep_for(std::chrono::seconds(sleepTime));
Expand Down
10 changes: 10 additions & 0 deletions BundleAgent/Node/Neighbour/NeighbourDiscovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include <memory>
#include "Node/Config.h"
#include "Node/Neighbour/NeighbourTable.h"
#ifdef LEPTON
#include "Utils/Socket.h"
#endif

class ListeningEndpointsTable;

Expand Down Expand Up @@ -90,6 +93,13 @@ class NeighbourDiscovery {

std::shared_ptr<ListeningEndpointsTable> m_listeningEndpointsTable;

#ifdef LEPTON
/**
* Socket to read and send beacons if using the platform with LEPTON
*/
Socket s;
#endif

private:
/**
* Function to send beacons.
Expand Down
5 changes: 4 additions & 1 deletion BundleAgent/Node/Neighbour/NeighbourTable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <algorithm>
#include "Utils/Logger.h"
#include "Utils/globals.h"
#include "Utils/PerfLogger.h"

NeighbourTable::NeighbourTable() {
}
Expand Down Expand Up @@ -66,6 +67,7 @@ void NeighbourTable::update(std::shared_ptr<Neighbour> neighbour) {
g_queueProcessEvents++;
std::unique_lock<std::mutex> lck(g_processorMutex);
g_processorConditionVariable.notify_one();
PERF(NEIGH_APPEAR) << neighbour->getId();
}
m_mutex.unlock();
}
Expand Down Expand Up @@ -127,7 +129,8 @@ void NeighbourTable::clean(int expirationTime) {
m_mutex.lock();
for (auto it = m_neigbours.begin(); it != m_neigbours.end();) {
if ((*it).second->getElapsedActivityTime() >= expirationTime) {
LOG(21) << "Neighbour " << (*it).second->getId() << " has disappeared";
LOG(21) << "Neighbour " << it->second->getId() << " has disappeared";
PERF(NEIGH_DISAPPEAR) << it->second->getId();
remove(it->second->getEndpoints(), it->second->getId());
it = m_neigbours.erase(it);
} else {
Expand Down
4 changes: 4 additions & 0 deletions BundleAgent/Node/Node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "Node/BundleProcessor/BundleProcessor.h"
#include "Node/BundleQueue/BundleContainer.h"
#include "Utils/Logger.h"
#include "Utils/PerfLogger.h"
#include "Utils/Functions.h"
#include "Utils/globals.h"
#include "Node/JsonFacades/NodeStateJson.h"
Expand All @@ -48,6 +49,8 @@ Node::Node(std::string filename) {
Logger::getInstance()->setLogLevel(m_config.getLogLevel());
Logger::getInstance()->setThreadName(std::this_thread::get_id(), "Main");
LOG(6) << "Starting Node...";
PerfLogger::getInstance()->setConfigAndStart("/tmp/" + m_config.getNodeId());
PERF(PLATFORM_START) << m_config.getNodeId() << " " << m_config.getNodeAddress();
g_queueProcessEvents = 0;
m_neighbourTable = std::unique_ptr<NeighbourTable>(new NeighbourTable());
m_listeningAppsTable = std::shared_ptr<ListeningEndpointsTable>(
Expand Down Expand Up @@ -123,5 +126,6 @@ Node::~Node() {
dlclose(m_handle);
}
delete Logger::getInstance();
delete PerfLogger::getInstance();
}

Loading

0 comments on commit 9f3ce70

Please sign in to comment.