Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refinements #27

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
fbf334c
Add ability to recommend transports from the publisher
matt-attack Jul 8, 2024
91c8cdc
Add free to msg_def struct to fix memory leak and make it easier to f…
matt-attack Jul 8, 2024
34266c2
Fix some missing files
matt-attack Sep 18, 2024
f2dc92a
Run tests on linux
matt-attack Sep 18, 2024
214fe6a
Fix random include breaking things
matt-attack Sep 18, 2024
e6ccaaa
Add timeout for tests
matt-attack Sep 18, 2024
aef549c
Fix missing dependency on windows
matt-attack Sep 18, 2024
893b6d9
Avoid some copies in TCP transport and reduce number of send calls
matt-attack Oct 27, 2024
4cc0ab3
Fix tcp transport queuing
matt-attack Oct 27, 2024
018e070
Remove queues from the C api and allow getting serialized messages in…
matt-attack Feb 15, 2025
ab3c7be
Fix some errors and warnings
matt-attack Feb 15, 2025
6009673
Fix some windows errors
matt-attack Feb 15, 2025
016014b
Fix warnings and tests
matt-attack Feb 15, 2025
50a3cd9
Add a message copy test and add a test timeout
matt-attack Feb 15, 2025
62fe6cc
Check network config
matt-attack Feb 15, 2025
bd984f4
Discover broadcast address
matt-attack Feb 16, 2025
a53e783
Fix windows
matt-attack Feb 16, 2025
0d81430
Fix things
matt-attack Feb 16, 2025
b461895
Speed up CI
matt-attack Feb 16, 2025
f96fa59
Remove mac CI. Was an attempt.
matt-attack Feb 16, 2025
7e8d15f
Add some additional tests and fix things as necessary
matt-attack Feb 22, 2025
2a7f1ad
Add test for queue size and correct subscriber count for local subscr…
matt-attack Feb 22, 2025
0f3df95
Support arrays in structs
matt-attack Feb 24, 2025
de2dd7d
Fix enumerations in structs
matt-attack Feb 25, 2025
7ea7230
Cleanup a bit
matt-attack Feb 26, 2025
5c9215d
Add allocator support to C++ message generation
matt-attack Feb 28, 2025
07b52d9
Integrate allocators into C++
matt-attack Mar 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions .github/workflows/ccpp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ jobs:
- name: cmake configure
run: cmake .
- name: cmake build
run: cmake --build .

run: cmake --build . -j4
linux-build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: cmake configure
run: cmake .
- name: cmake build
run: cmake --build .
# - name: Run Tests
# run: cd tests && ctest --output-on-failure
run: cmake --build . -j4
- name: network
run: sudo apt install net-tools && ifconfig
- name: Run Tests
run: cd tests && ctest -VV
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,4 @@ tests/CMakeFiles/*
Makefile
cmake_install.cmake
build/
html/
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ add_library (pubsub
"src/Serialization.c"
"src/System.c"
"src/UDPTransport.c"
"src/Parameter.c"
)
if (UNIX)
target_link_libraries(pubsub pubsub_msgs)
else()
target_link_libraries(pubsub winmm)
target_link_libraries(pubsub winmm pubsub_msgs)
endif()
set_target_properties(pubsub PROPERTIES DEBUG_POSTFIX "d")
target_include_directories(pubsub PUBLIC include/)
Expand Down
49 changes: 0 additions & 49 deletions CMakeSettings.json

This file was deleted.

6 changes: 0 additions & 6 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,3 @@ set_property(TARGET simple_sub_cpp PROPERTY CXX_STANDARD 11)
add_executable(read_param "read_param.c")
target_link_libraries(read_param pubsub pubsub_msgs)
add_dependencies(read_param pubsub pubsub_msgs)

#install(TARGETS pubsubtest
# ARCHIVE DESTINATION "s"
# LIBRARY DESTINATION "s"
# RUNTIME DESTINATION "s"
#)
3 changes: 1 addition & 2 deletions examples/simple_logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ int main()

// okay, since we are publishing with shared pointer we actually need to allocate the string properly
auto shared = pubsub::msg::StringSharedPtr(new pubsub::msg::String);
shared->value = new char[strlen(msg.value) + 1];
strcpy(shared->value, msg.value);
shared->value = msg.value;
string_pub.publish(shared);

msg.value = 0;// so it doesnt get freed by the destructor since we allocated it ourself
Expand Down
4 changes: 2 additions & 2 deletions examples/simple_pub.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ int main()

// Create the publisher
struct ps_pub_t string_pub;
ps_node_create_publisher(&node, "/data"/*topic name*/,
ps_node_create_publisher_ex(&node, "/data"/*topic name*/,
&pubsub__String_def/*message definition*/,
&string_pub,
true/*true to "latch" the topic*/);
true/*true to "latch" the topic*/, 1);

// User is responsible for lifetime of the message they publish
// Publish does a copy internally if necessary
Expand Down
2 changes: 1 addition & 1 deletion examples/simple_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ int main()
});

// Wait for the spinner to exit (on control-c)
spinner.wait();
spinner.run();

return 0;
}
Expand Down
23 changes: 11 additions & 12 deletions examples/simple_sub.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@

#include <pubsub/String.msg.h>

struct ps_sub_t string_sub;
void callback(void* message, unsigned int size, void* cbdata, const struct ps_msg_info_t* info)
{
// user is responsible for freeing the message and its arrays
struct pubsub__String* data = (struct pubsub__String*)message;
printf("Got message: %s\n", data->value);
pubsub__String_free(data, string_sub.allocator);
}

int main()
{
// Create the node
Expand All @@ -23,10 +32,10 @@ int main()
ps_node_add_transport(&node, &tcp_transport);

// Create the subscriber
struct ps_sub_t string_sub;
struct ps_subscriber_options options;
ps_subscriber_options_init(&options);
options.preferred_transport = PUBSUB_TCP_TRANSPORT;// sets preferred transport to TCP
options.cb = callback;
ps_node_create_subscriber_adv(&node, "/data", &pubsub__String_def, &string_sub, &options);

// Loop and spin
Expand All @@ -36,18 +45,8 @@ int main()
// Used to prevent this from using 100% CPU, but you can do that through other means
ps_node_wait(&node, 1000/*maximum wait time in ms*/);

// Updates the node, which will queue up any received messages
// Updates the node, which will receive messages and call any callbacks as they come in
ps_node_spin(&node);

// our sub has a message definition, so the queue contains real messages
struct pubsub__String* data;
while (data = (struct pubsub__String*)ps_sub_deque(&string_sub))
{
// user is responsible for freeing the message and its arrays
printf("Got message: %s\n", data->value);
free(data->value);
free(data);
}
}

// Shutdown the node to free resources
Expand Down
2 changes: 1 addition & 1 deletion examples/simple_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ int main()
// Create the subscriber, the provided callback will be called each time a message comes in
pubsub::Subscriber<pubsub::msg::String> subscriber(node, "data"/*topic name*/,
[](const pubsub::msg::StringSharedPtr& msg) {
printf("Got message %s\n", msg->value);
printf("Got message %s\n", msg->value.c_str());
}, 10/*maximum queue size, after this many messages build up the oldest will get dropped*/);

// Create the "spinner" which executes callbacks and timers in a background thread
Expand Down
35 changes: 17 additions & 18 deletions include/pubsub/Node.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ struct ps_endpoint_t;
struct ps_client_t;
struct ps_subscribe_req_t;
struct ps_allocator_t;
typedef void(*ps_transport_fn_pub_t)(struct ps_transport_t* transport, struct ps_pub_t* publisher, struct ps_client_t* client, const void* message, uint32_t length);
struct ps_msg_ref_t;
typedef void(*ps_transport_fn_pub_t)(struct ps_transport_t* transport, struct ps_pub_t* publisher, struct ps_client_t* client, struct ps_msg_ref_t* message);
typedef int(*ps_transport_fn_spin_t)(struct ps_transport_t* transport, struct ps_node_t* node);
typedef void(*ps_transport_fn_add_publisher_t)(struct ps_transport_t* transport, struct ps_pub_t* publisher);
typedef void(*ps_transport_fn_remove_publisher_t)(struct ps_transport_t* transport, struct ps_pub_t* publisher);
Expand Down Expand Up @@ -66,6 +67,7 @@ typedef void(*ps_param_confirm_cb_t)(const char* name, double value, void* data)
struct ps_node_t
{
const char* name;
const char* description;
unsigned int num_pubs;
struct ps_pub_t** pubs;
unsigned int num_subs;
Expand Down Expand Up @@ -129,10 +131,9 @@ struct ps_msg_info_t
struct ps_msg_header
{
uint8_t pid;//packet type id
uint32_t length;//message length
uint32_t id;//stream id
uint16_t seq;//sequence number
uint8_t index;
uint8_t count;
};
#pragma pack(pop)

Expand Down Expand Up @@ -161,20 +162,22 @@ struct ps_advertise_req_t
uint32_t type_hash;// to see if the type is correct
uint32_t group_id;// unique (hopefully) id that indicates which process this node is a part of
};
#pragma pack(pop)

#pragma pack(push)
#pragma pack(1)
struct ps_subscribe_req_t
{
uint8_t id;
int32_t addr;
uint16_t port;
};
#pragma pack(pop)

#pragma pack(push)
#pragma pack(1)
struct ps_unsubscribe_req_t
{
uint8_t id;
uint32_t addr;
uint16_t port;
uint32_t stream_id;
};

struct ps_subscribe_accept_t
{
uint8_t pid;// packet type identifier
Expand All @@ -193,24 +196,20 @@ void ps_node_init_ex(struct ps_node_t* node, const char* name, const char* ip, b

void ps_node_create_publisher(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, struct ps_pub_t* pub, bool latched);

void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
struct ps_sub_t* sub,
unsigned int queue_size,//make >= 1
struct ps_allocator_t* allocator,//give null to use default
bool ignore_local);// if ignore local is set, this node ignores publications from itself
// this facilitiates passing messages through shared memory
void ps_node_create_publisher_ex(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type, struct ps_pub_t* pub, bool latched, unsigned int recommended_transport, struct ps_allocator_t* allocator);


typedef void(*ps_subscriber_fn_cb_t)(void* message, unsigned int size, void* data, const struct ps_msg_info_t* info);
struct ps_subscriber_options
{
unsigned int queue_size;
bool ignore_local;
struct ps_allocator_t* allocator;
unsigned int skip;// skips to every nth message for throttling
ps_subscriber_fn_cb_t cb;
ps_subscriber_fn_cb_t cb_raw;
void* cb_data;
uint32_t preferred_transport;// falls back to udp otherwise
int32_t preferred_transport;// falls back to udp otherwise
const char* description;
};

void ps_subscriber_options_init(struct ps_subscriber_options* options);
Expand All @@ -219,7 +218,7 @@ void ps_node_create_subscriber_adv(struct ps_node_t* node, const char* topic, co
struct ps_sub_t* sub,
const struct ps_subscriber_options* options);

void ps_node_create_subscriber_cb(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
void ps_node_create_subscriber(struct ps_node_t* node, const char* topic, const struct ps_message_definition_t* type,
struct ps_sub_t* sub,
ps_subscriber_fn_cb_t cb,
void* cb_data,
Expand Down
40 changes: 40 additions & 0 deletions include/pubsub/Parameter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#pragma once

#ifdef __cplusplus
extern "C"
{
#endif

//#include <cstdlib>
#include <stdio.h>

#include <pubsub/Node.h>
#include <pubsub/Publisher.h>
#include <pubsub/Subscriber.h>
#include <pubsub/System.h>

#include <pubsub/Parameters.msg.h>

#include <math.h>

typedef void(*ps_param_fancy_cb_t)(const char* name, double value, void* data);

struct ps_parameters
{
struct pubsub__Parameters msg;
struct ps_pub_t param_pub;
ps_param_fancy_cb_t callback;
void* cb_data;
};

void ps_create_parameters(struct ps_node_t* node, struct ps_parameters* params_out, ps_param_fancy_cb_t callback, void* data);

void ps_destroy_parameters(struct ps_parameters* params);

void ps_add_parameter_double(struct ps_parameters* params,
const char* name, const char* description,
double value, double min, double max);

#ifdef __cplusplus
}
#endif
21 changes: 12 additions & 9 deletions include/pubsub/Publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,20 @@ struct ps_message_definition_t;

struct ps_endpoint_t
{
unsigned short port;
int address;
unsigned short port;
int address;
//bool multicast;// this is probably unnecessary
};

// publisher client to network to
struct ps_client_t
{
struct ps_endpoint_t endpoint;
unsigned short sequence_number;// sequence of the networked packets, incremented with each one
unsigned long long last_keepalive;// timestamp of the last keepalive message, used to know when to deactiveate this connection
unsigned int stream_id;// user-unique identifier of what topic this came from
unsigned int modulo;
struct ps_transport_t* transport;
struct ps_endpoint_t endpoint;
unsigned short sequence_number;// sequence of the networked packets, incremented with each one
unsigned long long last_keepalive;// timestamp of the last keepalive message, used to know when to deactiveate this connection
unsigned int stream_id;// user-unique identifier of what topic this came from
unsigned int modulo;
struct ps_transport_t* transport;
};

struct ps_pub_t
Expand All @@ -39,10 +39,13 @@ struct ps_pub_t
struct ps_node_t* node;
unsigned int num_clients;
struct ps_client_t* clients;

struct ps_allocator_t* allocator;

bool latched;// todo make this an enum of options if we add more
uint8_t recommended_transport;

struct ps_msg_t last_message;//only used if latched
struct ps_msg_ref_t* last_message;//only used if latched
unsigned int sequence_number;
};

Expand Down
Loading
Loading