-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
to support a feature of content filtered topic #1
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -24,6 +24,8 @@ extern "C" | |||
#include "rcl/error_handling.h" | ||||
#include "rcl/node.h" | ||||
#include "rcutils/logging_macros.h" | ||||
#include "rcutils/strdup.h" | ||||
#include "rcutils/types/string_array.h" | ||||
#include "rmw/error_handling.h" | ||||
#include "rmw/validate_full_topic_name.h" | ||||
#include "tracetools/tracetools.h" | ||||
|
@@ -92,6 +94,7 @@ rcl_subscription_init( | |||
sizeof(rcl_subscription_impl_t), allocator->state); | ||||
RCL_CHECK_FOR_NULL_WITH_MSG( | ||||
subscription->impl, "allocating memory failed", ret = RCL_RET_BAD_ALLOC; goto cleanup); | ||||
subscription->impl->options = rcl_subscription_get_default_options(); | ||||
// Fill out the implemenation struct. | ||||
// rmw_handle | ||||
// TODO(wjwwood): pass allocator once supported in rmw api. | ||||
|
@@ -116,8 +119,11 @@ rcl_subscription_init( | |||
} | ||||
subscription->impl->actual_qos.avoid_ros_namespace_conventions = | ||||
options->qos.avoid_ros_namespace_conventions; | ||||
// options | ||||
subscription->impl->options = *options; | ||||
ret = rcl_subscription_options_copy(options, &subscription->impl->options); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think at this time, calling There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The concrete rmw(rmw_connextdds) had already directly used rcl/rcl/src/rcl/subscription.c Line 101 in 6ef26c8
I thought that these two parameters should be backed up into rcl_subscription_impl_t , so it can prevent from accessing a pointer that might be changed outside.
Yes. |
||||
if (RCL_RET_OK != ret) { | ||||
goto fail; | ||||
} | ||||
|
||||
RCUTILS_LOG_DEBUG_NAMED(ROS_PACKAGE_NAME, "Subscription initialized"); | ||||
ret = RCL_RET_OK; | ||||
TRACEPOINT( | ||||
|
@@ -139,6 +145,12 @@ rcl_subscription_init( | |||
} | ||||
} | ||||
|
||||
ret = rcl_subscription_options_fini(&subscription->impl->options); | ||||
if (RCL_RET_OK != ret) { | ||||
RCUTILS_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str); | ||||
RCUTILS_SAFE_FWRITE_TO_STDERR("\n"); | ||||
} | ||||
|
||||
allocator->deallocate(subscription->impl, allocator->state); | ||||
subscription->impl = NULL; | ||||
} | ||||
|
@@ -175,6 +187,13 @@ rcl_subscription_fini(rcl_subscription_t * subscription, rcl_node_t * node) | |||
RCL_SET_ERROR_MSG(rmw_get_error_string().str); | ||||
result = RCL_RET_ERROR; | ||||
} | ||||
rcl_ret_t rcl_ret = rcl_subscription_options_fini(&subscription->impl->options); | ||||
if (RCL_RET_OK != rcl_ret) { | ||||
RCUTILS_SAFE_FWRITE_TO_STDERR(rcl_get_error_string().str); | ||||
RCUTILS_SAFE_FWRITE_TO_STDERR("\n"); | ||||
result = RCL_RET_ERROR; | ||||
} | ||||
|
||||
allocator.deallocate(subscription->impl, allocator.state); | ||||
subscription->impl = NULL; | ||||
} | ||||
|
@@ -194,6 +213,164 @@ rcl_subscription_get_default_options() | |||
return default_options; | ||||
} | ||||
|
||||
RCL_PUBLIC | ||||
RCL_WARN_UNUSED | ||||
rcl_ret_t | ||||
rcl_subscription_options_copy( | ||||
const rcl_subscription_options_t * src, | ||||
rcl_subscription_options_t * dst | ||||
) | ||||
{ | ||||
RCL_CHECK_ARGUMENT_FOR_NULL(src, RCL_RET_INVALID_ARGUMENT); | ||||
RCL_CHECK_ARGUMENT_FOR_NULL(dst, RCL_RET_INVALID_ARGUMENT); | ||||
const rcl_allocator_t * allocator = &src->allocator; | ||||
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT); | ||||
|
||||
rcl_ret_t ret = RCL_RET_OK; | ||||
dst->qos = src->qos; | ||||
dst->allocator = src->allocator; | ||||
// copy rmw_subscription_options_t | ||||
dst->rmw_subscription_options.rmw_specific_subscription_payload = | ||||
src->rmw_subscription_options.rmw_specific_subscription_payload; | ||||
dst->rmw_subscription_options.ignore_local_publications = | ||||
src->rmw_subscription_options.ignore_local_publications; | ||||
if (src->rmw_subscription_options.filter_expression) { | ||||
dst->rmw_subscription_options.filter_expression = | ||||
rcutils_strdup(src->rmw_subscription_options.filter_expression, *allocator); | ||||
if (!dst->rmw_subscription_options.filter_expression) { | ||||
RMW_SET_ERROR_MSG("failed to allocate memory for filter expression"); | ||||
ret = RCL_RET_BAD_ALLOC; | ||||
goto clean; | ||||
} | ||||
|
||||
// set expression parameters only if filter expression is valid | ||||
if (src->rmw_subscription_options.expression_parameters) { | ||||
rcutils_string_array_t * parameters = | ||||
(rcutils_string_array_t *)allocator->allocate( | ||||
sizeof(rcutils_string_array_t), | ||||
allocator->state); | ||||
if (!parameters) { | ||||
RMW_SET_ERROR_MSG("failed to allocate memory for expression parameters"); | ||||
ret = RCL_RET_BAD_ALLOC; | ||||
goto clean; | ||||
} | ||||
|
||||
dst->rmw_subscription_options.expression_parameters = parameters; | ||||
|
||||
rcutils_ret_t ret = rcutils_string_array_init( | ||||
parameters, src->rmw_subscription_options.expression_parameters->size, allocator); | ||||
if (RCUTILS_RET_OK != ret) { | ||||
ret = RCL_RET_BAD_ALLOC; | ||||
goto clean; | ||||
} | ||||
|
||||
for (size_t i = 0; i < src->rmw_subscription_options.expression_parameters->size; ++i) { | ||||
char * parameter = rcutils_strdup( | ||||
src->rmw_subscription_options.expression_parameters->data[i], *allocator); | ||||
if (!parameter) { | ||||
RMW_SET_ERROR_MSG("failed to allocate memory for expression parameter"); | ||||
ret = RCL_RET_BAD_ALLOC; | ||||
goto clean; | ||||
} | ||||
parameters->data[i] = parameter; | ||||
} | ||||
} | ||||
} | ||||
|
||||
return RCL_RET_OK; | ||||
|
||||
clean: | ||||
if (RCL_RET_OK != rcl_subscription_options_fini(dst)) { | ||||
RCL_SET_ERROR_MSG("Error while finalizing rcl subscription options due to another error"); | ||||
} | ||||
return ret; | ||||
} | ||||
|
||||
RCL_PUBLIC | ||||
RCL_WARN_UNUSED | ||||
rcl_ret_t | ||||
rcl_subscription_options_fini(rcl_subscription_options_t * option) | ||||
{ | ||||
RCL_CHECK_ARGUMENT_FOR_NULL(option, RCL_RET_INVALID_ARGUMENT); | ||||
// fini rmw_subscription_options_t | ||||
const rcl_allocator_t * allocator = &option->allocator; | ||||
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT); | ||||
if (option->rmw_subscription_options.filter_expression) { | ||||
allocator->deallocate(option->rmw_subscription_options.filter_expression, allocator->state); | ||||
option->rmw_subscription_options.filter_expression = NULL; | ||||
} | ||||
|
||||
if (option->rmw_subscription_options.expression_parameters) { | ||||
rcutils_ret_t ret = rcutils_string_array_fini( | ||||
option->rmw_subscription_options.expression_parameters); | ||||
if (RCUTILS_RET_OK != ret) { | ||||
RCUTILS_SAFE_FWRITE_TO_STDERR("Failed to fini string array.\n"); | ||||
} | ||||
allocator->deallocate(option->rmw_subscription_options.expression_parameters, allocator->state); | ||||
option->rmw_subscription_options.expression_parameters = NULL; | ||||
} | ||||
return RCL_RET_OK; | ||||
} | ||||
|
||||
bool | ||||
rcl_subscription_is_cft_supported(const rcl_subscription_t * subscription) | ||||
{ | ||||
if (!rcl_subscription_is_valid(subscription)) { | ||||
rcl_reset_error(); | ||||
return false; | ||||
} | ||||
return subscription->impl->rmw_handle->is_cft_supported; | ||||
} | ||||
|
||||
rcl_ret_t | ||||
rcl_subscription_set_cft_expression_parameters( | ||||
const rcl_subscription_t * subscription, | ||||
const char * filter_expression, | ||||
const rcutils_string_array_t * expression_parameters | ||||
) | ||||
{ | ||||
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID); | ||||
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT); | ||||
|
||||
if (!rcl_subscription_is_valid(subscription)) { | ||||
return RCL_RET_SUBSCRIPTION_INVALID; | ||||
} | ||||
RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT); | ||||
rmw_ret_t ret = rmw_subscription_set_cft_expression_parameters( | ||||
subscription->impl->rmw_handle, filter_expression, expression_parameters); | ||||
|
||||
if (ret != RMW_RET_OK) { | ||||
RCL_SET_ERROR_MSG(rmw_get_error_string().str); | ||||
return rcl_convert_rmw_ret_to_rcl_ret(ret); | ||||
} | ||||
return RCL_RET_OK; | ||||
} | ||||
|
||||
rcl_ret_t | ||||
rcl_subscription_get_cft_expression_parameters( | ||||
const rcl_subscription_t * subscription, | ||||
char ** filter_expression, | ||||
rcutils_string_array_t * expression_parameters | ||||
) | ||||
{ | ||||
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_SUBSCRIPTION_INVALID); | ||||
RCUTILS_CAN_RETURN_WITH_ERROR_OF(RCL_RET_INVALID_ARGUMENT); | ||||
|
||||
if (!rcl_subscription_is_valid(subscription)) { | ||||
return RCL_RET_SUBSCRIPTION_INVALID; | ||||
} | ||||
RCL_CHECK_ARGUMENT_FOR_NULL(filter_expression, RCL_RET_INVALID_ARGUMENT); | ||||
RCL_CHECK_ARGUMENT_FOR_NULL(expression_parameters, RCL_RET_INVALID_ARGUMENT); | ||||
rmw_ret_t ret = rmw_subscription_get_cft_expression_parameters( | ||||
subscription->impl->rmw_handle, filter_expression, expression_parameters); | ||||
|
||||
if (ret != RMW_RET_OK) { | ||||
RCL_SET_ERROR_MSG(rmw_get_error_string().str); | ||||
return rcl_convert_rmw_ret_to_rcl_ret(ret); | ||||
} | ||||
return RCL_RET_OK; | ||||
} | ||||
|
||||
rcl_ret_t | ||||
rcl_take( | ||||
const rcl_subscription_t * subscription, | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How we can
reset
the filtering on subscription? i think that there should be a way to do that, since the application is likely to do the following.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking as following,
rcl_subscription_set_cft_expression_parameters
, such as "data MATCH 'Hello World: 5'"There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, i see and it works
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry. After checking and trying to reproduce the above case, I found it's not supported.
Currently,
User must set content filter topic with a non-empty string for subscription at the beginning, and then update with a new filter (not empty string).
it means if users create a subscription without a content filter expression, this subscription can't subscribe to the content filter topic anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't be 😄 we just trying some new stuff, there will be something like this.
okay that is said, the followings should be done in user application aspect?
i am not sure if anything like this feasible, but if we do, it just appears to be the filtering expression and parameters interfaces only for user application. in other words, it can conceal creating/destroying contentfilteredtopic to user application, but rmw should take care of.