Skip to content

Commit

Permalink
Remove queues from the C api and allow getting serialized messages in…
Browse files Browse the repository at this point in the history
… callbacks

Use allocators in more cases
  • Loading branch information
matt-attack committed Feb 15, 2025
1 parent 4cc0ab3 commit 018e070
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 279 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,4 @@ tests/CMakeFiles/*
Makefile
cmake_install.cmake
build/
html/
4 changes: 2 additions & 2 deletions examples/simple_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ int main()

// Create the publisher
struct ps_pub_t string_pub;
ps_node_create_publisher(&node, "/data"/*topic name*/,
ps_node_create_publisher_ex(&node, "/data"/*topic name*/,
&pubsub__String_def/*message definition*/,
&string_pub,
true/*true to "latch" the topic*/);
true/*true to "latch" the topic*/, 1);

// User is responsible for lifetime of the message they publish
// Publish does a copy internally if necessary
Expand Down
23 changes: 11 additions & 12 deletions examples/simple_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

#include <pubsub/String.msg.h>

struct ps_sub_t string_sub;
void callback(void* message, unsigned int size, void* cbdata, const struct ps_msg_info_t* info)
{
// user is responsible for freeing the message and its arrays
struct pubsub__String* data = (struct pubsub__String*)message;
printf("Got message: %s\n", data->value);
pubsub__String_free(string_sub.allocator, data);
}

int main()
{
// Create the node
Expand All @@ -23,10 +32,10 @@ int main()
ps_node_add_transport(&node, &tcp_transport);

// Create the subscriber
struct ps_sub_t string_sub;
struct ps_subscriber_options options;
ps_subscriber_options_init(&options);
options.preferred_transport = PUBSUB_TCP_TRANSPORT;// sets preferred transport to TCP
options.cb = callback;
ps_node_create_subscriber_adv(&node, "/data", &pubsub__String_def, &string_sub, &options);

// Loop and spin
Expand All @@ -36,18 +45,8 @@ int main()
// Used to prevent this from using 100% CPU, but you can do that through other means
ps_node_wait(&node, 1000/*maximum wait time in ms*/);

// Updates the node, which will queue up any received messages
// Updates the node, which will receive messages and call any callbacks as they come in
ps_node_spin(&node);

// our sub has a message definition, so the queue contains real messages
struct pubsub__String* data;
while (data = (struct pubsub__String*)ps_sub_deque(&string_sub))
{
// user is responsible for freeing the message and its arrays
printf("Got message: %s\n", data->value);
free(data->value);
free(data);
}
}

// Shutdown the node to free resources
Expand Down
15 changes: 5 additions & 10 deletions include/pubsub/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ typedef void(*ps_param_confirm_cb_t)(const char* name, double value, void* data)
struct ps_node_t
{
const char* name;
const char* description;
unsigned int num_pubs;
struct ps_pub_t** pubs;
unsigned int num_subs;
Expand Down Expand Up @@ -195,24 +196,18 @@ void ps_node_create_publisher(struct ps_node_t* node, const char* topic, const s

void ps_node_create_publisher_ex(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, struct ps_pub_t* pub, bool latched, unsigned int recommended_transport);

void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
struct ps_sub_t* sub,
unsigned int queue_size,//make >= 1
struct ps_allocator_t* allocator,//give null to use default
bool ignore_local);// if ignore local is set, this node ignores publications from itself
// this facilitiates passing messages through shared memory


typedef void(*ps_subscriber_fn_cb_t)(void* message, unsigned int size, void* data, const struct ps_msg_info_t* info);
struct ps_subscriber_options
{
unsigned int queue_size;
bool ignore_local;
struct ps_allocator_t* allocator;
unsigned int skip;// skips to every nth message for throttling
ps_subscriber_fn_cb_t cb;
ps_subscriber_fn_cb_t cb_raw;
void* cb_data;
int32_t preferred_transport;// falls back to udp otherwise
int32_t preferred_transport;// falls back to udp otherwise
const char* description;
};

void ps_subscriber_options_init(struct ps_subscriber_options* options);
Expand All @@ -221,7 +216,7 @@ void ps_node_create_subscriber_adv(struct ps_node_t* node, const char* topic, co
struct ps_sub_t* sub,
const struct ps_subscriber_options* options);

void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
struct ps_sub_t* sub,
ps_subscriber_fn_cb_t cb,
void* cb_data,
Expand Down
13 changes: 2 additions & 11 deletions include/pubsub/Subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,13 @@ struct ps_sub_t

struct ps_allocator_t* allocator;

// used instead of a queue optionally
ps_subscriber_fn_cb_t cb;
ps_subscriber_fn_cb_t cb_raw;
void* cb_data;

int preferred_transport;// udp or tcp, or -1 for no preference

unsigned int skip;

// queue is implemented as a deque
int queue_start;// start index of items in the queue (loops around on positive side)
int queue_size;// maximum size of the queue
int queue_len;// current queue size
void** queue;// pointers to each of the queue items
};

#pragma pack(push)
Expand All @@ -58,10 +52,7 @@ struct ps_sub_req_header_t
};
#pragma pack(pop)

void ps_sub_enqueue(struct ps_sub_t* sub, void* message, int data_size, const struct ps_msg_info_t* message_info);

// if the subscriber was initialized with a type this returns decoded messages
void* ps_sub_deque(struct ps_sub_t* sub);
void ps_sub_receive(struct ps_sub_t* sub, void* encoded_message, int data_size, bool is_reference, const struct ps_msg_info_t* message_info);

void ps_sub_destroy(struct ps_sub_t* sub);

Expand Down
36 changes: 11 additions & 25 deletions include/pubsub/TCPTransport.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,6 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
}
struct ps_tcp_client_t* new_client = &impl->clients[impl->num_clients - 1];
new_client->socket = socket;
new_client->socket = socket;
new_client->needs_removal = false;
new_client->current_packet_size = 0;
new_client->desired_packet_size = 0;
Expand Down Expand Up @@ -517,7 +516,7 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
connection->waiting_for_header = false;
connection->packet_size = *(uint32_t*)&buf[1];
//printf("Incoming message with %i bytes\n", impl->connections[i].packet_size);
connection->packet_data = (char*)malloc(connection->packet_size);
connection->packet_data = (char*)connection->subscriber->allocator->alloc(connection->packet_size, connection->subscriber->allocator->context);

connection->current_size = 0;
}
Expand Down Expand Up @@ -561,7 +560,7 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
}
}

free(connection->packet_data);
connection->subscriber->allocator->free(connection->packet_data, connection->subscriber->allocator->context);
}
else if (connection->packet_type == PS_TCP_PROTOCOL_DATA)
{
Expand All @@ -570,32 +569,17 @@ int ps_tcp_transport_spin(struct ps_transport_t* transport, struct ps_node_t* no
struct ps_msg_info_t message_info;
message_info.address = connection->endpoint.address;
message_info.port = connection->endpoint.port;

void* out_data;
if (connection->subscriber->type)
{
out_data = connection->subscriber->type->decode(connection->packet_data, connection->subscriber->allocator);
free(connection->packet_data);
}
else
{
out_data = connection->packet_data;
}

// remove the reference to packet data so we dont try and double free it on destroy
// it is the queue's responsibility now

ps_sub_receive(connection->subscriber, connection->packet_data, connection->packet_size, false, &message_info);

// remove the reference to packet data so we don't try and double free it on destroy
connection->packet_data = 0;
ps_sub_enqueue(connection->subscriber,
out_data,
connection->packet_size,
&message_info);

message_count++;
}
else
{
// unhandled packet id
free(connection->packet_data);
connection->subscriber->allocator->free(connection->packet_data, connection->subscriber->allocator->context);
}
connection->waiting_for_header = true;
}
Expand Down Expand Up @@ -860,7 +844,8 @@ void ps_tcp_transport_unsubscribe(struct ps_transport_t* transport, struct ps_su

if (!impl->connections[i].waiting_for_header)
{
free(impl->connections[i].packet_data);
struct ps_sub_t* sub = impl->connections[i].subscriber;
sub->allocator->free(impl->connections[i].packet_data, sub->allocator->context);
}
ps_event_set_remove_socket(&impl->node->events, impl->connections[i].socket);
#ifdef _WIN32
Expand All @@ -885,7 +870,8 @@ void ps_tcp_transport_destroy(struct ps_transport_t* transport)
{
if (!impl->connections[i].waiting_for_header)
{
free(impl->connections[i].packet_data);
struct ps_sub_t* sub = impl->connections[i].subscriber;
sub->allocator->free(impl->connections[i].packet_data, sub->allocator->context);
}
ps_event_set_remove_socket(&impl->node->events, impl->connections[i].socket);
#ifdef _WIN32
Expand Down
6 changes: 0 additions & 6 deletions include/pubsub_cpp/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ class Subscriber: public SubscriberBase
struct ps_subscriber_options options;
ps_subscriber_options_init(&options);

options.queue_size = 0;
options.cb = cb2;
options.cb_data = this;
options.allocator = 0;
Expand Down Expand Up @@ -639,11 +638,6 @@ class Subscriber: public SubscriberBase
node_ = 0;
}

T* deque()
{
return (T*)ps_sub_deque(&subscriber_);
}

const std::string& getQualifiedTopic()
{
return remapped_topic_;
Expand Down
9 changes: 9 additions & 0 deletions include/pubsub_cpp/array_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class ArrayVector
}
data_ = new_data;
}

// reliquinquishes the held pointer without freeing
T* reset()
{
auto out = data_;
data_ = 0;
length_ = 0;
return out;
}

void clear()
{
Expand Down
68 changes: 5 additions & 63 deletions src/Node.c
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,11 @@ struct ps_allocator_t ps_default_allocator = { ps_malloc_alloc, ps_malloc_free,

void ps_subscriber_options_init(struct ps_subscriber_options* options)
{
options->queue_size = 1;
options->ignore_local = false;
options->allocator = 0;
options->skip = 0;
options->cb = 0;
options->cb_raw = 0;
options->cb_data = 0;
options->preferred_transport = -1;// no preference
}
Expand Down Expand Up @@ -564,57 +564,15 @@ void ps_node_create_subscriber_adv(struct ps_node_t* node, const char* topic, co
sub->received_message_def.hash = 0;
sub->received_message_def.num_fields = 0;

// force queue size to be > 0
unsigned int queue_size = options->queue_size;
if (options->cb)
{
sub->cb = options->cb;
sub->cb_data = options->cb_data;
sub->queue_size = 0;
sub->queue_len = 0;
sub->queue_start = 0;
sub->queue = 0;
}
else
{
if (queue_size <= 0)
{
queue_size = 1;
}

// allocate queue data
sub->queue_len = 0;
sub->queue_start = 0;
sub->queue_size = queue_size;
sub->queue = (void**)malloc(sizeof(void*) * queue_size);

for (unsigned int i = 0; i < queue_size; i++)
{
sub->queue[i] = 0;
}
}
sub->cb = options->cb;
sub->cb_raw = options->cb_raw;
sub->cb_data = options->cb_data;

// send out the subscription query while we are at it
ps_node_subscribe_query(sub);
}

void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
struct ps_sub_t* sub,
unsigned int queue_size,
struct ps_allocator_t* allocator,
bool ignore_local)
{
struct ps_subscriber_options options;
ps_subscriber_options_init(&options);

options.queue_size = queue_size;
options.allocator = allocator;
options.ignore_local = ignore_local;

ps_node_create_subscriber_adv(node, topic, type, sub, &options);
}

void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
struct ps_sub_t* sub,
ps_subscriber_fn_cb_t cb,
void* cb_data,
Expand All @@ -625,7 +583,6 @@ void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, con
struct ps_subscriber_options options;
ps_subscriber_options_init(&options);

options.queue_size = 0;
options.cb = cb;
options.cb_data = cb_data;
options.allocator = allocator;
Expand Down Expand Up @@ -822,22 +779,7 @@ int ps_node_spin(struct ps_node_t* node)
// queue up the data, and copy :/ (can make zero copy for arduino version)
int data_size = received_bytes - sizeof(struct ps_msg_header);

// also todo fastpath for PoD message types

// okay, if we have the message definition, deserialize and output in a message
void* out_data;
if (sub->type)
{
//theres a leak if you use this and the queue fills up with complex types
out_data = sub->type->decode(data + sizeof(struct ps_msg_header), sub->allocator);
}
else
{
out_data = sub->allocator->alloc(data_size, sub->allocator->context);
memcpy(out_data, data + sizeof(struct ps_msg_header), data_size);
}

ps_sub_enqueue(sub, out_data, data_size, &message_info);
ps_sub_receive(sub, data + sizeof(struct ps_msg_header), data_size, true, &message_info);

#ifdef PUBSUB_VERBOSE
//printf("Got message, queue len %i\n", sub->queue_len);
Expand Down
Loading

0 comments on commit 018e070

Please sign in to comment.