Skip to content

Commit

Permalink
Add some additional tests and fix things as necessary
Browse files Browse the repository at this point in the history
Unify argument order for generated message C functions
  • Loading branch information
matt-attack committed Feb 22, 2025
1 parent f96fa59 commit 7e8d15f
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 106 deletions.
14 changes: 8 additions & 6 deletions include/pubsub/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,20 +162,22 @@ struct ps_advertise_req_t
uint32_t type_hash;// to see if the type is correct
uint32_t group_id;// unique (hopefully) id that indicates which process this node is a part of
};
#pragma pack(pop)

#pragma pack(push)
#pragma pack(1)
struct ps_subscribe_req_t
{
uint8_t id;
int32_t addr;
uint16_t port;
};
#pragma pack(pop)

#pragma pack(push)
#pragma pack(1)
struct ps_unsubscribe_req_t
{
uint8_t id;
uint32_t addr;
uint16_t port;
uint32_t stream_id;
};

struct ps_subscribe_accept_t
{
uint8_t pid;// packet type identifier
Expand Down
4 changes: 2 additions & 2 deletions include/pubsub/Serialization.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,9 @@ extern "C"
void ps_msg_ref_free(struct ps_msg_ref_t* msg);

struct ps_allocator_t;
typedef struct ps_msg_t(*ps_fn_encode_t)(struct ps_allocator_t* allocator, const void* msg);
typedef struct ps_msg_t(*ps_fn_encode_t)(const void* msg, struct ps_allocator_t* allocator);
typedef void*(*ps_fn_decode_t)(const void* data, struct ps_allocator_t* allocator);// allocates the message
typedef void (*ps_fn_free_t)(struct ps_allocator_t* allocator, void* msg);// frees the message
typedef void (*ps_fn_free_t)(void* msg, struct ps_allocator_t* allocator);// frees the message
struct ps_message_definition_t
{
unsigned int hash;
Expand Down
104 changes: 54 additions & 50 deletions include/pubsub_cpp/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ static std::multimap<std::string, SubscriberBase*> _subscribers;
// ns should not have a leading slash, topic should if it is absolute
inline std::string handle_remap(const std::string& topic, const std::string& ns)
{
//printf("Handling remap of %s in ns %s\n", topic.c_str(), ns.c_str());
//printf("Handling remap of %s in ns %s\n", topic.c_str(), ns.c_str());
// we need at least one character
if (topic.length() == 0)
{
Expand Down Expand Up @@ -87,10 +87,14 @@ inline std::string handle_remap(const std::string& topic, const std::string& ns)
}

// okay, we had no remappings, use our namespace
if (ns.length())
return "/" + ns + "/" + topic;
else
return "/" + topic;
if (ns.length())
{
return "/" + ns + "/" + topic;
}
else
{
return "/" + topic;
}
}

// not thread safe
Expand Down Expand Up @@ -119,7 +123,7 @@ inline void initialize(const char** args, const int argc)
// valid names must be all lowercase and only
inline std::string validate_name(const std::string& name, bool remove_leading_slashes = false)
{
//printf("Validating %s\n", name.c_str());
//printf("Validating %s\n", name.c_str());
for (size_t i = 0; i < name.length(); i++)
{
if (name[i] >= 'A' && name[i] <= 'Z')
Expand All @@ -138,8 +142,8 @@ inline std::string validate_name(const std::string& name, bool remove_leading_sl

return name.substr(i);
}
else
{
else
{
// remove any duplicate slashes we may have
std::string out;
if (name.length())
Expand All @@ -155,7 +159,7 @@ inline std::string validate_name(const std::string& name, bool remove_leading_sl
out += name[i];
}
return out;
}
}
return name;
}

Expand Down Expand Up @@ -211,12 +215,14 @@ class Node

std::string getQualifiedName()
{
if (namespace_.length())
return "/" + namespace_ + "/" + real_name_;
else
{
return "/" + real_name_;
}
if (namespace_.length())
{
return "/" + namespace_ + "/" + real_name_;
}
else
{
return "/" + real_name_;
}
}

const std::string& getName()
Expand All @@ -239,10 +245,10 @@ class Node
return ps_node_spin(&node_);
}

inline void setEventSet(ps_event_set_t* set)
{
event_set_ = set;
}
inline void setEventSet(ps_event_set_t* set)
{
event_set_ = set;
}

inline ps_event_set_t* getEventSet()
{
Expand Down Expand Up @@ -311,14 +317,14 @@ class Publisher: public PublisherBase
_publisher_mutex.lock();
_publishers.insert(std::pair<std::string, PublisherBase*>(remapped_topic_, this));

// look for any matching subscribers and add them to our list
auto iterpair = _subscribers.equal_range(topic);
for (auto it = iterpair.first; it != iterpair.second; ++it)
{
node.lock_.lock();
subs_.push_back(it->second);
node.lock_.unlock();
}
// look for any matching subscribers and add them to our list
auto iterpair = _subscribers.equal_range(topic);
for (auto it = iterpair.first; it != iterpair.second; ++it)
{
node.lock_.lock();
subs_.push_back(it->second);
node.lock_.unlock();
}
_publisher_mutex.unlock();
}

Expand Down Expand Up @@ -367,10 +373,8 @@ class Publisher: public PublisherBase
// loop through shared subscribers
node_->lock_.lock();
// now go through my local subscriber list
for (size_t i = 0; i < subs_.size(); i++)
for (auto& sub: subs_)
{
auto sub = subs_[i];

//printf("Publishing locally with no copy..\n");

auto specific_sub = (Subscriber<T>*)sub;
Expand All @@ -394,18 +398,17 @@ class Publisher: public PublisherBase
void publish(const T& msg)
{
std::shared_ptr<T> copy;
if (latched_) {
if (latched_)
{
copy = std::shared_ptr<T>(new T);
*copy = msg;
// save for later
latched_msg_ = copy;
}
node_->lock_.lock();
// now go through my local subscriber list
for (size_t i = 0; i < subs_.size(); i++)
for (auto& sub: subs_)
{
auto sub = subs_[i];

//printf("Publishing locally with a copy..\n");
if (!copy)
{
Expand Down Expand Up @@ -481,7 +484,7 @@ class SubscriberBase
}
}

_subscribers.insert(std::pair<std::string, SubscriberBase*>(topic, sub));
_subscribers.insert(std::pair<std::string, SubscriberBase*>(topic, sub));

_publisher_mutex.unlock();
}
Expand All @@ -499,21 +502,23 @@ class SubscriberBase
// remove me from its list if im there
auto pos = std::find(it->second->subs_.begin(), it->second->subs_.end(), sub);
if (pos != it->second->subs_.end())
{
it->second->subs_.erase(pos);
}
it->second->GetNode()->lock_.unlock();
}
}

//remove me from the subscriber list
auto subiterpair = _subscribers.equal_range(topic);
for (auto it = subiterpair.first; it != subiterpair.second; ++it)
{
if (it->second == sub)
{
_subscribers.erase(it);
break;
}
}
//remove me from the subscriber list
auto subiterpair = _subscribers.equal_range(topic);
for (auto it = subiterpair.first; it != subiterpair.second; ++it)
{
if (it->second == sub)
{
_subscribers.erase(it);
break;
}
}

_publisher_mutex.unlock();
}
Expand All @@ -528,8 +533,6 @@ class SubscriberBase
virtual bool CallOne() = 0;
};



template<class T>
class Subscriber: public SubscriberBase
{
Expand All @@ -545,7 +548,7 @@ class Subscriber: public SubscriberBase

public:

Subscriber(Node& node, const std::string& topic, std::function<void(const std::shared_ptr<T>&)> cb, unsigned int queue_size = 1, int preferred_transport = -1) : cb_(cb), queue_size_(queue_size)
Subscriber(Node& node, const std::string& topic, std::function<void(const std::shared_ptr<T>&)> cb, unsigned int queue_size = 1, int preferred_transport = -1, int skip = 0) : cb_(cb), queue_size_(queue_size)
{
node_ = &node;

Expand All @@ -571,14 +574,13 @@ class Subscriber: public SubscriberBase

struct ps_subscriber_options options;
ps_subscriber_options_init(&options);

options.skip = skip;
options.cb = cb2;
options.cb_data = this;
options.allocator = 0;
options.ignore_local = true;
options.preferred_transport = preferred_transport;


node.lock_.lock();
ps_node_create_subscriber_adv(node.getNode(), remapped_topic_.c_str(), T::GetDefinition(), &subscriber_, &options);
node.subscribers_.push_back(this);
Expand Down Expand Up @@ -631,7 +633,9 @@ class Subscriber: public SubscriberBase
node_->lock_.lock();
auto it = std::find(node_->subscribers_.begin(), node_->subscribers_.end(), this);
if (it != node_->subscribers_.end())
{
node_->subscribers_.erase(it);
}
ps_sub_destroy(&subscriber_);
node_->lock_.unlock();

Expand Down
34 changes: 20 additions & 14 deletions include/pubsub_cpp/Spinners.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class BlockingSpinnerWithTimers
BlockingSpinnerWithTimers(int num_threads = 1) : running_(false), node_(0)
{
//ps_event_set_create(&events_);
stop(true);
}

~BlockingSpinnerWithTimers()
Expand All @@ -138,7 +139,7 @@ class BlockingSpinnerWithTimers

void setNode(Node& node)
{
node_ = &node;
node_ = &node;
//list_mutex_.lock();

// build a wait list for all nodes
Expand All @@ -154,13 +155,12 @@ class BlockingSpinnerWithTimers
{
while (running_ && ps_okay())
{
//printf("Entering thread\n");
list_mutex_.lock();
if (node_ == 0)//ps_event_set_count(&events_) == 0)
{
printf("Waiting for events\n");
printf("Waiting for events\n");
ps_sleep(10);
continue;
continue;
}
else
{
Expand All @@ -184,15 +184,15 @@ class BlockingSpinnerWithTimers

// this line is necessary anyways, but happens to work around the above bug
timeout = std::min<int>(timeout, 1000000);// make sure we dont block too long
//printf("setting timeout to %i\n", timeout);
//printf("setting timeout to %i\n", timeout);
}
list_mutex_.unlock();
//ps_node_wait(node_->getNode(), 0);
//ps_node_wait(node_->getNode(), 0);
// todo allow finer grained waits
//if (timeout > 2)
{
// allows for fine grained waits
ps_event_set_set_timer(&node_->getNode()->events, timeout);// in us
ps_event_set_set_timer(&node_->getNode()->events, timeout);// in us
ps_node_wait(node_->getNode(), 1000);
//ps_event_set_wait(&events_, timeout);
}
Expand All @@ -205,15 +205,15 @@ class BlockingSpinnerWithTimers
Time now = Time::now();
if (now >= timer.next_trigger)
{
//printf("Calling timer\n");
//printf("Calling timer\n");
timer.next_trigger = timer.next_trigger + timer.period;
timer.fn();
}
}

// check all nodes
//for (auto node : nodes_)
if (node_)
if (node_)
{
node_->lock_.lock();
if (ps_node_spin(node_->getNode()) || node_->marked())
Expand Down Expand Up @@ -249,7 +249,12 @@ class BlockingSpinnerWithTimers

void wait()
{
if (!running_)
thread_.join();
}

void run()
{
if (!running_)
{
start();
}
Expand All @@ -265,8 +270,10 @@ class BlockingSpinnerWithTimers
}

running_ = false;
if (join && thread_.joinable())
thread_.join();// wait for it to stop
if (join && thread_.joinable())
{
thread_.join();// wait for it to stop
}
}
};

Expand Down Expand Up @@ -301,7 +308,7 @@ class Spinner
int res = 0;
if (res = ps_node_spin(node->getNode()) || node->marked())
{
printf("Received %i messages\n", res);
printf("Received %i messages\n", res);
// we got a message, now call a subscriber
// todo how to make this not scale with subscriber count...
for (size_t i = 0; i < node->subscribers_.size(); i++)
Expand All @@ -311,7 +318,6 @@ class Spinner
}
}


node->lock_.unlock();
}
list_mutex_.unlock();
Expand Down
Loading

0 comments on commit 7e8d15f

Please sign in to comment.