From 018e07044df802170390e5333dea9552c5de521b Mon Sep 17 00:00:00 2001 From: Matthew B Date: Fri, 14 Feb 2025 21:15:10 -0800 Subject: [PATCH] Remove queues from the C api and allow getting serialized messages in callbacks Use allocators in more cases --- .gitignore | 1 + examples/simple_pub.c | 4 +- examples/simple_sub.c | 23 ++++---- include/pubsub/Node.h | 15 ++--- include/pubsub/Subscriber.h | 13 +---- include/pubsub/TCPTransport.h | 36 ++++-------- include/pubsub_cpp/Node.h | 6 -- include/pubsub_cpp/array_vector.h | 9 +++ src/Node.c | 68 ++-------------------- src/Subscriber.c | 96 +++++++++---------------------- tests/test_pubsub_c.cpp | 68 ---------------------- tools/PubSubTest.cpp | 15 +++-- tools/pubsub.cpp | 7 +-- 13 files changed, 82 insertions(+), 279 deletions(-) diff --git a/.gitignore b/.gitignore index 823ba94..9b631a8 100644 --- a/.gitignore +++ b/.gitignore @@ -272,3 +272,4 @@ tests/CMakeFiles/* Makefile cmake_install.cmake build/ +html/ diff --git a/examples/simple_pub.c b/examples/simple_pub.c index 79f4f76..fa269e6 100644 --- a/examples/simple_pub.c +++ b/examples/simple_pub.c @@ -26,10 +26,10 @@ int main() // Create the publisher struct ps_pub_t string_pub; - ps_node_create_publisher(&node, "/data"/*topic name*/, + ps_node_create_publisher_ex(&node, "/data"/*topic name*/, &pubsub__String_def/*message definition*/, &string_pub, - true/*true to "latch" the topic*/); + true/*true to "latch" the topic*/, 1); // User is responsible for lifetime of the message they publish // Publish does a copy internally if necessary diff --git a/examples/simple_sub.c b/examples/simple_sub.c index 8f7fae3..0d52f2c 100644 --- a/examples/simple_sub.c +++ b/examples/simple_sub.c @@ -9,6 +9,15 @@ #include +struct ps_sub_t string_sub; +void callback(void* message, unsigned int size, void* cbdata, const struct ps_msg_info_t* info) +{ + // user is responsible for freeing the message and its arrays + struct pubsub__String* data = (struct pubsub__String*)message; + printf("Got message: %s\n", data->value); + pubsub__String_free(string_sub.allocator, data); +} + int main() { // Create the node @@ -23,10 +32,10 @@ int main() ps_node_add_transport(&node, &tcp_transport); // Create the subscriber - struct ps_sub_t string_sub; struct ps_subscriber_options options; ps_subscriber_options_init(&options); options.preferred_transport = PUBSUB_TCP_TRANSPORT;// sets preferred transport to TCP + options.cb = callback; ps_node_create_subscriber_adv(&node, "/data", &pubsub__String_def, &string_sub, &options); // Loop and spin @@ -36,18 +45,8 @@ int main() // Used to prevent this from using 100% CPU, but you can do that through other means ps_node_wait(&node, 1000/*maximum wait time in ms*/); - // Updates the node, which will queue up any received messages + // Updates the node, which will receive messages and call any callbacks as they come in ps_node_spin(&node); - - // our sub has a message definition, so the queue contains real messages - struct pubsub__String* data; - while (data = (struct pubsub__String*)ps_sub_deque(&string_sub)) - { - // user is responsible for freeing the message and its arrays - printf("Got message: %s\n", data->value); - free(data->value); - free(data); - } } // Shutdown the node to free resources diff --git a/include/pubsub/Node.h b/include/pubsub/Node.h index 228582f..8a572f9 100644 --- a/include/pubsub/Node.h +++ b/include/pubsub/Node.h @@ -67,6 +67,7 @@ typedef void(*ps_param_confirm_cb_t)(const char* name, double value, void* data) struct ps_node_t { const char* name; + const char* description; unsigned int num_pubs; struct ps_pub_t** pubs; unsigned int num_subs; @@ -195,24 +196,18 @@ void ps_node_create_publisher(struct ps_node_t* node, const char* topic, const s void ps_node_create_publisher_ex(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, struct ps_pub_t* pub, bool latched, unsigned int recommended_transport); -void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, - struct ps_sub_t* sub, - unsigned int queue_size,//make >= 1 - struct ps_allocator_t* allocator,//give null to use default - bool ignore_local);// if ignore local is set, this node ignores publications from itself - // this facilitiates passing messages through shared memory - typedef void(*ps_subscriber_fn_cb_t)(void* message, unsigned int size, void* data, const struct ps_msg_info_t* info); struct ps_subscriber_options { - unsigned int queue_size; bool ignore_local; struct ps_allocator_t* allocator; unsigned int skip;// skips to every nth message for throttling ps_subscriber_fn_cb_t cb; + ps_subscriber_fn_cb_t cb_raw; void* cb_data; - int32_t preferred_transport;// falls back to udp otherwise + int32_t preferred_transport;// falls back to udp otherwise + const char* description; }; void ps_subscriber_options_init(struct ps_subscriber_options* options); @@ -221,7 +216,7 @@ void ps_node_create_subscriber_adv(struct ps_node_t* node, const char* topic, co struct ps_sub_t* sub, const struct ps_subscriber_options* options); -void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, +void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, struct ps_sub_t* sub, ps_subscriber_fn_cb_t cb, void* cb_data, diff --git a/include/pubsub/Subscriber.h b/include/pubsub/Subscriber.h index 5dbd7d5..549bc58 100644 --- a/include/pubsub/Subscriber.h +++ b/include/pubsub/Subscriber.h @@ -31,19 +31,13 @@ struct ps_sub_t struct ps_allocator_t* allocator; - // used instead of a queue optionally ps_subscriber_fn_cb_t cb; + ps_subscriber_fn_cb_t cb_raw; void* cb_data; int preferred_transport;// udp or tcp, or -1 for no preference unsigned int skip; - - // queue is implemented as a deque - int queue_start;// start index of items in the queue (loops around on positive side) - int queue_size;// maximum size of the queue - int queue_len;// current queue size - void** queue;// pointers to each of the queue items }; #pragma pack(push) @@ -58,10 +52,7 @@ struct ps_sub_req_header_t }; #pragma pack(pop) -void ps_sub_enqueue(struct ps_sub_t* sub, void* message, int data_size, const struct ps_msg_info_t* message_info); - -// if the subscriber was initialized with a type this returns decoded messages -void* ps_sub_deque(struct ps_sub_t* sub); +void ps_sub_receive(struct ps_sub_t* sub, void* encoded_message, int data_size, bool is_reference, const struct ps_msg_info_t* message_info); void ps_sub_destroy(struct ps_sub_t* sub); diff --git a/include/pubsub/TCPTransport.h b/include/pubsub/TCPTransport.h index 9ea8e5d..1280650 100644 --- a/include/pubsub/TCPTransport.h +++ b/include/pubsub/TCPTransport.h @@ -215,7 +215,6 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no } struct ps_tcp_client_t* new_client = &impl->clients[impl->num_clients - 1]; new_client->socket = socket; - new_client->socket = socket; new_client->needs_removal = false; new_client->current_packet_size = 0; new_client->desired_packet_size = 0; @@ -517,7 +516,7 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no connection->waiting_for_header = false; connection->packet_size = *(uint32_t*)&buf[1]; //printf("Incoming message with %i bytes\n", impl->connections[i].packet_size); - connection->packet_data = (char*)malloc(connection->packet_size); + connection->packet_data = (char*)connection->subscriber->allocator->alloc(connection->packet_size, connection->subscriber->allocator->context); connection->current_size = 0; } @@ -561,7 +560,7 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no } } - free(connection->packet_data); + connection->subscriber->allocator->free(connection->packet_data, connection->subscriber->allocator->context); } else if (connection->packet_type == PS_TCP_PROTOCOL_DATA) { @@ -570,32 +569,17 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no struct ps_msg_info_t message_info; message_info.address = connection->endpoint.address; message_info.port = connection->endpoint.port; - - void* out_data; - if (connection->subscriber->type) - { - out_data = connection->subscriber->type->decode(connection->packet_data, connection->subscriber->allocator); - free(connection->packet_data); - } - else - { - out_data = connection->packet_data; - } - - // remove the reference to packet data so we dont try and double free it on destroy - // it is the queue's responsibility now + + ps_sub_receive(connection->subscriber, connection->packet_data, connection->packet_size, false, &message_info); + + // remove the reference to packet data so we don't try and double free it on destroy connection->packet_data = 0; - ps_sub_enqueue(connection->subscriber, - out_data, - connection->packet_size, - &message_info); - message_count++; } else { // unhandled packet id - free(connection->packet_data); + connection->subscriber->allocator->free(connection->packet_data, connection->subscriber->allocator->context); } connection->waiting_for_header = true; } @@ -860,7 +844,8 @@ void ps_tcp_transport_unsubscribe(struct ps_transport_t* transport, struct ps_su if (!impl->connections[i].waiting_for_header) { - free(impl->connections[i].packet_data); + struct ps_sub_t* sub = impl->connections[i].subscriber; + sub->allocator->free(impl->connections[i].packet_data, sub->allocator->context); } ps_event_set_remove_socket(&impl->node->events, impl->connections[i].socket); #ifdef _WIN32 @@ -885,7 +870,8 @@ void ps_tcp_transport_destroy(struct ps_transport_t* transport) { if (!impl->connections[i].waiting_for_header) { - free(impl->connections[i].packet_data); + struct ps_sub_t* sub = impl->connections[i].subscriber; + sub->allocator->free(impl->connections[i].packet_data, sub->allocator->context); } ps_event_set_remove_socket(&impl->node->events, impl->connections[i].socket); #ifdef _WIN32 diff --git a/include/pubsub_cpp/Node.h b/include/pubsub_cpp/Node.h index af862d7..bfbff1d 100644 --- a/include/pubsub_cpp/Node.h +++ b/include/pubsub_cpp/Node.h @@ -572,7 +572,6 @@ class Subscriber: public SubscriberBase struct ps_subscriber_options options; ps_subscriber_options_init(&options); - options.queue_size = 0; options.cb = cb2; options.cb_data = this; options.allocator = 0; @@ -639,11 +638,6 @@ class Subscriber: public SubscriberBase node_ = 0; } - T* deque() - { - return (T*)ps_sub_deque(&subscriber_); - } - const std::string& getQualifiedTopic() { return remapped_topic_; diff --git a/include/pubsub_cpp/array_vector.h b/include/pubsub_cpp/array_vector.h index 84e5e03..426646f 100644 --- a/include/pubsub_cpp/array_vector.h +++ b/include/pubsub_cpp/array_vector.h @@ -93,6 +93,15 @@ class ArrayVector } data_ = new_data; } + + // reliquinquishes the held pointer without freeing + T* reset() + { + auto out = data_; + data_ = 0; + length_ = 0; + return out; + } void clear() { diff --git a/src/Node.c b/src/Node.c index 017a65e..73de257 100644 --- a/src/Node.c +++ b/src/Node.c @@ -522,11 +522,11 @@ struct ps_allocator_t ps_default_allocator = { ps_malloc_alloc, ps_malloc_free, void ps_subscriber_options_init(struct ps_subscriber_options* options) { - options->queue_size = 1; options->ignore_local = false; options->allocator = 0; options->skip = 0; options->cb = 0; + options->cb_raw = 0; options->cb_data = 0; options->preferred_transport = -1;// no preference } @@ -564,57 +564,15 @@ void ps_node_create_subscriber_adv(struct ps_node_t* node, const char* topic, co sub->received_message_def.hash = 0; sub->received_message_def.num_fields = 0; - // force queue size to be > 0 - unsigned int queue_size = options->queue_size; - if (options->cb) - { - sub->cb = options->cb; - sub->cb_data = options->cb_data; - sub->queue_size = 0; - sub->queue_len = 0; - sub->queue_start = 0; - sub->queue = 0; - } - else - { - if (queue_size <= 0) - { - queue_size = 1; - } - - // allocate queue data - sub->queue_len = 0; - sub->queue_start = 0; - sub->queue_size = queue_size; - sub->queue = (void**)malloc(sizeof(void*) * queue_size); - - for (unsigned int i = 0; i < queue_size; i++) - { - sub->queue[i] = 0; - } - } + sub->cb = options->cb; + sub->cb_raw = options->cb_raw; + sub->cb_data = options->cb_data; // send out the subscription query while we are at it ps_node_subscribe_query(sub); } void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, - struct ps_sub_t* sub, - unsigned int queue_size, - struct ps_allocator_t* allocator, - bool ignore_local) -{ - struct ps_subscriber_options options; - ps_subscriber_options_init(&options); - - options.queue_size = queue_size; - options.allocator = allocator; - options.ignore_local = ignore_local; - - ps_node_create_subscriber_adv(node, topic, type, sub, &options); -} - -void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, struct ps_sub_t* sub, ps_subscriber_fn_cb_t cb, void* cb_data, @@ -625,7 +583,6 @@ void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, con struct ps_subscriber_options options; ps_subscriber_options_init(&options); - options.queue_size = 0; options.cb = cb; options.cb_data = cb_data; options.allocator = allocator; @@ -822,22 +779,7 @@ int ps_node_spin(struct ps_node_t* node) // queue up the data, and copy :/ (can make zero copy for arduino version) int data_size = received_bytes - sizeof(struct ps_msg_header); - // also todo fastpath for PoD message types - - // okay, if we have the message definition, deserialize and output in a message - void* out_data; - if (sub->type) - { -//theres a leak if you use this and the queue fills up with complex types - out_data = sub->type->decode(data + sizeof(struct ps_msg_header), sub->allocator); - } - else - { - out_data = sub->allocator->alloc(data_size, sub->allocator->context); - memcpy(out_data, data + sizeof(struct ps_msg_header), data_size); - } - - ps_sub_enqueue(sub, out_data, data_size, &message_info); + ps_sub_receive(sub, data + sizeof(struct ps_msg_header), data_size, true, &message_info); #ifdef PUBSUB_VERBOSE //printf("Got message, queue len %i\n", sub->queue_len); diff --git a/src/Subscriber.c b/src/Subscriber.c index 3ea0c4d..241baeb 100644 --- a/src/Subscriber.c +++ b/src/Subscriber.c @@ -7,45 +7,42 @@ #ifndef ANDROID #include #endif +#include #include -void ps_sub_enqueue(struct ps_sub_t* sub, void* data, int data_size, const struct ps_msg_info_t* message_info) +void ps_sub_receive(struct ps_sub_t* sub, void* encoded_message, int data_size, bool is_reference, const struct ps_msg_info_t* message_info) { - // Implement a LIFO queue. Is this the best option? - int new_start = sub->queue_start - 1; - if (new_start < 0) - { - new_start += sub->queue_size; - } + // if is_reference is true, we must make a copy for the subscriber to own + // okay, so how do we let the callback specify if it wants a copy or not? - // If no queue size, just run the callback immediately - if (sub->queue_size == 0) + //todo make this not always require owning the data + //how do I release a "loaned" message? also, how do I loan? + if (sub->cb_raw) { - sub->cb(data, data_size, sub->cb_data, message_info); - } - // Handle replacement if the queue is full - else if (sub->queue_size == sub->queue_len) - { - // we'll replace the item at the back by shifting the queue around - if (sub->type) + // todo can avoid the copy if the allocator is used + void* out_data; + if (is_reference) { - sub->type->free(sub->allocator, sub->queue[sub->queue_start]); + out_data = sub->allocator->alloc(data_size, sub->allocator->context); + memcpy(out_data, encoded_message, data_size); } else { - free(sub->queue[sub->queue_start]); - } - // add at the front - sub->queue[new_start] = data; - sub->queue_start = new_start; + out_data = encoded_message; + } + sub->cb_raw(out_data, data_size, sub->cb_data, message_info); } - else + + if (sub->cb) + { + void* out_data = sub->type->decode(encoded_message, sub->allocator); + sub->cb(out_data, data_size, sub->cb_data, message_info); + } + + if (!is_reference && !sub->cb_raw) { - // add to the front - sub->queue_len++; - sub->queue[new_start] = data; - sub->queue_start = new_start; + sub->allocator->free(encoded_message, sub->allocator->context); } } @@ -58,7 +55,7 @@ void ps_sub_destroy(struct ps_sub_t* sub) for (unsigned int i = 0; i < sub->node->num_transports; i++) { sub->node->transports[i].unsubscribe(&sub->node->transports[i], sub); - } + } //remove it from my list of subs sub->node->num_subs--; @@ -84,46 +81,5 @@ void ps_sub_destroy(struct ps_sub_t* sub) } } free(old_subs); - } - - // free any queued up received messages and the queue itself - for (int i = 0; i < sub->queue_len; i++) - { - int index = (sub->queue_start + i)%sub->queue_size; - if (sub->queue[index] != 0) - { - if (sub->type) - { - sub->type->free(sub->allocator, sub->queue[index]); - } - else - { - free(sub->queue[index]); - } - } - } - free(sub->queue); -} - -void* ps_sub_deque(struct ps_sub_t* sub) -{ - if (sub->queue_len == 0) - { - //printf("Warning: dequeued when there was nothing in queue\n"); - return 0; - } - - // we are dequeueing, so remove the newest first (from the front) - sub->queue_len--; - - void* data = sub->queue[sub->queue_start]; - sub->queue[sub->queue_start] = 0; - - int new_start = sub->queue_start+1; - if (new_start >= sub->queue_size) - { - new_start -= sub->queue_size; - } - sub->queue_start = new_start; - return data; + } } diff --git a/tests/test_pubsub_c.cpp b/tests/test_pubsub_c.cpp index 745f639..9225ff7 100644 --- a/tests/test_pubsub_c.cpp +++ b/tests/test_pubsub_c.cpp @@ -45,7 +45,6 @@ TEST(test_publish_subscribe_generic, []() { struct ps_subscriber_options options; ps_subscriber_options_init(&options); //options.skip = skip; - options.queue_size = 0; options.allocator = 0; options.ignore_local = false; @@ -77,72 +76,6 @@ TEST(test_publish_subscribe_generic, []() { ps_node_destroy(&node); }); -void latch_test(bool broadcast, bool tcp) -{ - struct ps_node_t node; - ps_node_init(&node, "test_node", "", broadcast); - - struct ps_transport_t tcp_transport; - ps_tcp_transport_init(&tcp_transport, &node); - ps_node_add_transport(&node, &tcp_transport); - - struct ps_pub_t string_pub; - ps_node_create_publisher(&node, "/data", &pubsub__String_def, &string_pub, true); - - struct ps_sub_t string_sub; - - struct ps_subscriber_options options; - ps_subscriber_options_init(&options); - options.preferred_transport = tcp ? 1 : 0;// tcp yo - ps_node_create_subscriber_adv(&node, "/data", &pubsub__String_def, &string_sub, &options); - - // come up with the latched topic - struct pubsub__String rmsg; - rmsg.value = "Hello"; - ps_pub_publish_ez(&string_pub, &rmsg); - - bool got_message = false; - - // now spin and wait for us to get the published message - while (ps_okay()) - { - ps_node_spin(&node);// todo blocking wait first - - struct pubsub__String* data; - while (data = (struct pubsub__String*)ps_sub_deque(&string_sub)) - { - // user is responsible for freeing the message and its arrays - printf("Got message: %s\n", data->value); - EXPECT(strcmp(data->value, rmsg.value) == 0); - got_message = true; - free(data->value); - free(data);//todo use allocator free - goto done; - } - ps_sleep(1); - } - -done: - EXPECT(got_message); - ps_node_destroy(&node); -} - -TEST(test_publish_subscribe_latched_multicast, []() { - latch_test(false, false); -}); - -TEST(test_publish_subscribe_latched_broadcast, []() { - latch_test(true, false); -}); - -TEST(test_publish_subscribe_latched_multicast_tcp, []() { - latch_test(false, true); -}); - -TEST(test_publish_subscribe_latched_broadcast_tcp, []() { - latch_test(true, true); -}); - void latch_test_cb(bool broadcast, bool tcp) { struct ps_node_t node; @@ -231,7 +164,6 @@ TEST(test_publish_subscribe_large, []() { struct ps_subscriber_options options; ps_subscriber_options_init(&options); //options.skip = skip; - options.queue_size = 0; options.allocator = 0; options.ignore_local = false; diff --git a/tools/PubSubTest.cpp b/tools/PubSubTest.cpp index 05cb4ee..cc77a3c 100644 --- a/tools/PubSubTest.cpp +++ b/tools/PubSubTest.cpp @@ -23,7 +23,13 @@ int main() ps_node_create_publisher(&node, "/joy", &pubsub__Joy_def, &adv_pub, false); ps_sub_t string_sub; - ps_node_create_subscriber(&node, "/data", &pubsub__String_def, &string_sub, 10, 0, false); + auto cb = [](void* message, unsigned int size, void* cbdata, const ps_msg_info_t* info) + { + pubsub__String* data = (pubsub__String*)message; + free(data->value); + free(data);//todo use allocator free + }; + ps_node_create_subscriber(&node, "/data", &pubsub__String_def, &string_sub, cb, 0, 0, false); // wait until we get the subscription request while (ps_pub_get_subscriber_count(&string_pub) == 0) @@ -61,13 +67,6 @@ int main() ps_node_spin(&node); //while (ps_node_spin(&node) == 0) { Sleep(1); } - // our sub has a message definition, so the queue contains real messages - while (pubsub__String* data = (pubsub__String*)ps_sub_deque(&string_sub)) - { - printf("Got message: %s\n", data->value); - free(data->value); - free(data);//todo use allocator free - } printf("Num subs: %i %i\n", ps_pub_get_subscriber_count(&string_pub), ps_pub_get_subscriber_count(&adv_pub)); ps_sleep(1000); diff --git a/tools/pubsub.cpp b/tools/pubsub.cpp index 56f24f7..4cfa7d4 100644 --- a/tools/pubsub.cpp +++ b/tools/pubsub.cpp @@ -320,13 +320,12 @@ int topic_echo(int num_args, char** args, ps_node_t* _node) struct ps_subscriber_options options; ps_subscriber_options_init(&options); options.skip = skip; - options.queue_size = 0; options.allocator = 0; options.ignore_local = false; options.preferred_transport = -1; options.preferred_transport = parser.GetBool("tcp") ? 1 : options.preferred_transport; options.preferred_transport = parser.GetBool("udp") ? 0 : options.preferred_transport; - options.cb = [](void* message, unsigned int size, void* data, const ps_msg_info_t* info) + options.cb_raw = [](void* message, unsigned int size, void* data, const ps_msg_info_t* info) { // get and deserialize the messages if (sub.received_message_def.fields == 0) @@ -357,7 +356,7 @@ int topic_echo(int num_args, char** args, ps_node_t* _node) } ps_deserialize_print(message, &sub.received_message_def, no_arr ? 10 : 0, field_name); printf("-------------\n"); - free(message); + free(message);// todo use allocator if (++count >= n) { // need to commit sudoku here.. @@ -889,7 +888,7 @@ int main(int num_args_real, char** args) ps_subscriber_options opts; ps_subscriber_options_init(&opts); - opts.cb = cb; + opts.cb_raw = cb; opts.cb_data = &message_times; opts.preferred_transport = -1; opts.preferred_transport = parser.GetBool("tcp") ? 1 : opts.preferred_transport;