Skip to content

Commit

Permalink
Add test for queue size and correct subscriber count for local subscr…
Browse files Browse the repository at this point in the history
…ibers
  • Loading branch information
matt-attack committed Feb 22, 2025
1 parent 7e8d15f commit 2a7f1ad
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 29 deletions.
3 changes: 1 addition & 2 deletions include/pubsub_cpp/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ class Publisher: public PublisherBase

unsigned int getNumSubscribers()
{
return ps_pub_get_subscriber_count(&publisher_);
return ps_pub_get_subscriber_count(&publisher_) + subs_.size();
}

void addCustomEndpoint(const int ip_addr, const short port, const unsigned int stream_id)
Expand Down Expand Up @@ -475,7 +475,6 @@ class SubscriberBase
// if its latched, get the message from it
if (it->second->latched_)
{
// hmm, this should just queue not call
cb(it->second);
}
// add me to its sub list
Expand Down
1 change: 1 addition & 0 deletions msg/Int.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
int64 value
66 changes: 39 additions & 27 deletions tests/test_pubsub_cpp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <pubsub_cpp/Node.h>
#include <pubsub_cpp/Spinners.h>

#include <pubsub/Int.msg.h>
#include <pubsub/String.msg.h>

#include "mini_mock.hpp"
Expand All @@ -25,6 +26,9 @@ TEST(test_publish_subscribe_latched_cpp, []() {
EXPECT(strcmp(omsg.value, msg->value) == 0);
got_message = true;
spinner.stop();

// make sure subscriber count is correct
EXPECT(string_pub.getNumSubscribers() == 1);
}, 10);

spinner.run();
Expand All @@ -51,12 +55,47 @@ TEST(test_publish_subscribe_zero_copy, []() {
got_message = true;
EXPECT(msg.get() == omsg.get());
spinner.stop();

// make sure subscriber count is correct
EXPECT(string_pub.getNumSubscribers() == 1);
}, 10);

spinner.run();
EXPECT(got_message);
});

TEST(test_publish_subscribe_queue_behavior, []() {
// test that the queue drops messages as expected and we only get the newest
pubsub::Node node("simple_publisher");

pubsub::Publisher<pubsub::msg::Int> int_pub(node, "/data");

pubsub::BlockingSpinnerWithTimers spinner;
spinner.setNode(node);

std::vector<int> received;
pubsub::Subscriber<pubsub::msg::Int> subscriber(node, "/data", [&](const pubsub::msg::IntSharedPtr& msg) {
printf("Got message %i in sub1\n", msg->value);
received.push_back(msg->value);
spinner.stop();
}, 10);

for (int i = 0; i < 100; i++)
{
pubsub::msg::Int omsg;
omsg.value = i;
int_pub.publish(omsg);
}

// spin after publishing so the queue fills up
spinner.run();
EXPECT(received.size() == 10);
for (int i = 0; i < 10; i++)
{
EXPECT(received[i] == 90 + i);
}
});

TEST(test_publish_subscribe_nodelets, []() {
// test that data gets passed through without copying between multiple nodes
pubsub::Node nodep("simple_publisher");
Expand Down Expand Up @@ -104,33 +143,6 @@ TEST(test_publisher_subscriber_close_cpp, []() {
sub.close();
});

/*TEST(test_publish_subscribe_latched_cpp, []() {
// test that latched topics make it through the local message passing between nodes
pubsub::Node node("simple_publisher");
pubsub::Publisher<pubsub::msg::String> string_pub(node, "/data", true);
pubsub::msg::String omsg;
omsg.value = "Hello";
string_pub.publish(omsg);
pubsub::BlockingSpinnerWithTimers spinner;
spinner.setNode(node);
bool got_message = false;
pubsub::Subscriber<pubsub::msg::String> subscriber(node, "/data", [&](const pubsub::msg::StringSharedPtr& msg) {
printf("Got message %s in sub1\n", msg->value);
EXPECT(strcmp(omsg.value, msg->value) == 0);
got_message = true;
spinner.stop();
}, 10);
spinner.wait();
EXPECT(got_message);
});*/


TEST(test_publish_subscribe_cpp, []() {
// test that normal messages make it through message passing
pubsub::Node node("simple_publisher");
Expand Down
1 change: 1 addition & 0 deletions tools/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ generate_messages(FILES
Log.msg
Path2D.msg
Odom.msg
Int.msg
)


Expand Down

0 comments on commit 2a7f1ad

Please sign in to comment.