Skip to content

Commit

Permalink
Fix tcp transport queuing
Browse files Browse the repository at this point in the history
Add test for very large messages
  • Loading branch information
matt-attack committed Oct 27, 2024
1 parent 893b6d9 commit 4cc0ab3
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 3 deletions.
13 changes: 10 additions & 3 deletions include/pubsub/TCPTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
while (client->queued_message != 0)
{
int to_send = client->queued_message_length - client->queued_message_written;
int sent = send(client->socket, &client->queued_message[client->queued_message_written], to_send, 0);
uint8_t* data = (uint8_t*)client->queued_message->data;
int sent = send(client->socket, &data[client->queued_message_written], to_send, 0);
if (sent > 0)
{
client->queued_message_written += sent;
Expand All @@ -292,8 +293,11 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
else if (sent < 0 && errno != EAGAIN)
#endif
{
//printf("needs removal %i\n", errno);
client->needs_removal = true;
}

//printf("Sending more: %i to make %i of %i\n", sent, client->queued_message_written, client->queued_message_length);

if (client->queued_message_written == client->queued_message_length)
{
Expand Down Expand Up @@ -533,9 +537,9 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
}
else if (len > 0)
{
//printf("Read %i bytes of message\n", len);
connection->current_size += len;

//printf("Read %i bytes of message, so far: %i\n", len, connection->current_size);

if (connection->current_size == connection->packet_size)
{
//printf("message finished type %x\n", connection->packet_type);
Expand Down Expand Up @@ -678,6 +682,7 @@ void ps_tcp_transport_pub(struct ps_transport_t* transport, struct ps_pub_t* pub
// the message header is already filled out with the packet id and length

int32_t desired_len = sizeof(struct ps_msg_header) + length;
//printf("trying to send message of %i bytes\n", desired_len);
int32_t c = send(socket, message, desired_len, 0);
if (c < desired_len && c >= 0)
{
Expand Down Expand Up @@ -712,6 +717,8 @@ void ps_tcp_transport_pub(struct ps_transport_t* transport, struct ps_pub_t* pub
// add a reference count and put it in our queue
ps_msg_ref_add(msg);

//printf("Wrote %i bytes\n", tclient->queued_message_written);

tclient->queued_message = msg;
tclient->queued_message_length = length + sizeof(struct ps_msg_header);
ps_event_set_add_socket_write(&publisher->node->events, socket);
Expand Down
59 changes: 59 additions & 0 deletions tests/test_pubsub_c.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <pubsub/TCPTransport.h>

#include <pubsub/String.msg.h>
#include <pubsub/PointCloud.msg.h>

#include "mini_mock.hpp"
/*add test to make sure mismatched messages are detected and not received
Expand Down Expand Up @@ -205,4 +206,62 @@ TEST(test_publish_subscribe_latched_cb_broadcast_tcp, []() {
latch_test_cb(true, true);
});

// test sending a very large message
TEST(test_publish_subscribe_large, []() {
struct ps_node_t node;
ps_node_init(&node, "test_node", "", true);

struct ps_transport_t tcp_transport;
ps_tcp_transport_init(&tcp_transport, &node);
ps_node_add_transport(&node, &tcp_transport);

struct ps_pub_t string_pub;
ps_node_create_publisher(&node, "/data", &pubsub__PointCloud_def, &string_pub, true);

// come up with the latched topic
static struct pubsub__PointCloud rmsg;
rmsg.num_points = 100000000;// 10 million points!
rmsg.point_type = pubsub::msg::PointCloud::POINT_XYZ;
rmsg.data_length = rmsg.num_points*4*3;//3 floats per point
rmsg.data = (uint8_t*)malloc(rmsg.data_length);
ps_pub_publish_ez(&string_pub, &rmsg);

struct ps_sub_t string_sub;

struct ps_subscriber_options options;
ps_subscriber_options_init(&options);
//options.skip = skip;
options.queue_size = 0;
options.allocator = 0;
options.ignore_local = false;

static bool got_message = false;
options.preferred_transport = 1;
options.cb = [](void* message, unsigned int size, void* data2, const ps_msg_info_t* info)
{
got_message = true;
printf("Got message\n");
// todo need to also assert we have the message type
// which is tricky for udp...
auto data = (struct pubsub__PointCloud*)pubsub__PointCloud_decode(message, &ps_default_allocator);
printf("Decoded message\n");
EXPECT(data->num_points == rmsg.num_points);
free(data->data);
free(data);
};
ps_node_create_subscriber_adv(&node, "/data", 0, &string_sub, &options);

// now spin and wait for us to get the published message
while (ps_okay() && !got_message)
{
ps_node_spin(&node);// todo blocking wait first

ps_sleep(1);
}

done:
EXPECT(got_message);
ps_node_destroy(&node);
});

CREATE_MAIN_ENTRY_POINT();

0 comments on commit 4cc0ab3

Please sign in to comment.