diff --git a/rclrs/src/client.rs b/rclrs/src/client.rs index b308f1de..5b27fc66 100644 --- a/rclrs/src/client.rs +++ b/rclrs/src/client.rs @@ -11,7 +11,7 @@ use rosidl_runtime_rs::Message; use crate::{ error::{RclReturnCode, ToResult}, rcl_bindings::*, - MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -76,6 +76,10 @@ where pub(crate) handle: Arc, requests: Mutex>>, futures: Arc>>>, + /// Ensure the parent node remains alive as long as the subscription is held. + /// This implementation will change in the future. + #[allow(unused)] + node: Arc, } impl Client @@ -83,7 +87,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new client. - pub(crate) fn new(node_handle: Arc, topic: &str) -> Result + pub(crate) fn new(node: &Arc, topic: &str) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside // [`Node::create_client`], see the struct's documentation for the rationale where @@ -102,7 +106,7 @@ where let client_options = unsafe { rcl_client_get_default_options() }; { - let rcl_node = node_handle.rcl_node.lock().unwrap(); + let rcl_node = node.handle.rcl_node.lock().unwrap(); let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); // SAFETY: @@ -126,7 +130,7 @@ where let handle = Arc::new(ClientHandle { rcl_client: Mutex::new(rcl_client), - node_handle, + node_handle: Arc::clone(&node.handle), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); @@ -136,6 +140,7 @@ where futures: Arc::new(Mutex::new( HashMap::>::new(), )), + node: Arc::clone(node), }) } diff --git a/rclrs/src/node.rs b/rclrs/src/node.rs index 0adefadd..21532c8e 100644 --- a/rclrs/src/node.rs +++ b/rclrs/src/node.rs @@ -225,11 +225,11 @@ impl Node { /// /// [1]: crate::Client // TODO: make client's lifetime depend on node's lifetime - pub fn create_client(&self, topic: &str) -> Result>, RclrsError> + pub fn create_client(self: &Arc, topic: &str) -> Result>, RclrsError> where T: rosidl_runtime_rs::Service, { - let client = Arc::new(Client::::new(Arc::clone(&self.handle), topic)?); + let client = Arc::new(Client::::new(self, topic)?); { self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak); Ok(client) } @@ -292,7 +292,7 @@ impl Node { /// /// [1]: crate::Service pub fn create_service( - &self, + self: &Arc, topic: &str, callback: F, ) -> Result>, RclrsError> @@ -300,11 +300,7 @@ impl Node { T: rosidl_runtime_rs::Service, F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send, { - let service = Arc::new(Service::::new( - Arc::clone(&self.handle), - topic, - callback, - )?); + let service = Arc::new(Service::::new(self, topic, callback)?); { self.services_mtx.lock().unwrap() } .push(Arc::downgrade(&service) as Weak); Ok(service) @@ -314,7 +310,7 @@ impl Node { /// /// [1]: crate::Subscription pub fn create_subscription( - &self, + self: &Arc, topic: &str, qos: QoSProfile, callback: impl SubscriptionCallback, @@ -322,12 +318,7 @@ impl Node { where T: Message, { - let subscription = Arc::new(Subscription::::new( - Arc::clone(&self.handle), - topic, - qos, - callback, - )?); + let subscription = Arc::new(Subscription::::new(self, topic, qos, callback)?); { self.subscriptions_mtx.lock() } .unwrap() .push(Arc::downgrade(&subscription) as Weak); diff --git a/rclrs/src/parameter.rs b/rclrs/src/parameter.rs index 87ff192d..083735ed 100644 --- a/rclrs/src/parameter.rs +++ b/rclrs/src/parameter.rs @@ -789,7 +789,7 @@ impl ParameterInterface { } } - pub(crate) fn create_services(&self, node: &Node) -> Result<(), RclrsError> { + pub(crate) fn create_services(&self, node: &Arc) -> Result<(), RclrsError> { *self.services.lock().unwrap() = Some(ParameterService::new(node, self.parameter_map.clone())?); Ok(()) diff --git a/rclrs/src/parameter/service.rs b/rclrs/src/parameter/service.rs index 465275f7..7d22cb44 100644 --- a/rclrs/src/parameter/service.rs +++ b/rclrs/src/parameter/service.rs @@ -239,7 +239,7 @@ fn set_parameters_atomically( impl ParameterService { pub(crate) fn new( - node: &Node, + node: &Arc, parameter_map: Arc>, ) -> Result { let fqn = node.fully_qualified_name(); diff --git a/rclrs/src/service.rs b/rclrs/src/service.rs index ac43e51a..f901f16a 100644 --- a/rclrs/src/service.rs +++ b/rclrs/src/service.rs @@ -9,7 +9,7 @@ use rosidl_runtime_rs::Message; use crate::{ error::{RclReturnCode, ToResult}, rcl_bindings::*, - MessageCow, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + MessageCow, Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; // SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread @@ -73,6 +73,10 @@ where pub(crate) handle: Arc, /// The callback function that runs when a request was received. pub callback: Mutex>, + /// Ensure the parent node remains alive as long as the subscription is held. + /// This implementation will change in the future. + #[allow(unused)] + node: Arc, } impl Service @@ -80,11 +84,7 @@ where T: rosidl_runtime_rs::Service, { /// Creates a new service. - pub(crate) fn new( - node_handle: Arc, - topic: &str, - callback: F, - ) -> Result + pub(crate) fn new(node: &Arc, topic: &str, callback: F) -> Result // This uses pub(crate) visibility to avoid instantiating this struct outside // [`Node::create_service`], see the struct's documentation for the rationale where @@ -104,7 +104,7 @@ where let service_options = unsafe { rcl_service_get_default_options() }; { - let rcl_node = node_handle.rcl_node.lock().unwrap(); + let rcl_node = node.handle.rcl_node.lock().unwrap(); let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); unsafe { // SAFETY: @@ -127,12 +127,13 @@ where let handle = Arc::new(ServiceHandle { rcl_service: Mutex::new(rcl_service), - node_handle, + node_handle: Arc::clone(&node.handle), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); Ok(Self { handle, + node: Arc::clone(node), callback: Mutex::new(Box::new(callback)), }) } diff --git a/rclrs/src/subscription.rs b/rclrs/src/subscription.rs index 645f9019..7f0f24eb 100644 --- a/rclrs/src/subscription.rs +++ b/rclrs/src/subscription.rs @@ -10,7 +10,7 @@ use crate::{ error::{RclReturnCode, ToResult}, qos::QoSProfile, rcl_bindings::*, - NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, + Node, NodeHandle, RclrsError, ENTITY_LIFECYCLE_MUTEX, }; mod callback; @@ -84,6 +84,10 @@ where pub(crate) handle: Arc, /// The callback function that runs when a message was received. pub callback: Mutex>, + /// Ensure the parent node remains alive as long as the subscription is held. + /// This implementation will change in the future. + #[allow(unused)] + node: Arc, message: PhantomData, } @@ -93,7 +97,7 @@ where { /// Creates a new subscription. pub(crate) fn new( - node_handle: Arc, + node: &Arc, topic: &str, qos: QoSProfile, callback: impl SubscriptionCallback, @@ -117,7 +121,7 @@ where subscription_options.qos = qos.into(); { - let rcl_node = node_handle.rcl_node.lock().unwrap(); + let rcl_node = node.handle.rcl_node.lock().unwrap(); let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap(); unsafe { // SAFETY: @@ -139,13 +143,14 @@ where let handle = Arc::new(SubscriptionHandle { rcl_subscription: Mutex::new(rcl_subscription), - node_handle, + node_handle: Arc::clone(&node.handle), in_use_by_wait_set: Arc::new(AtomicBool::new(false)), }); Ok(Self { handle, callback: Mutex::new(callback.into_callback()), + node: Arc::clone(node), message: PhantomData, }) } @@ -396,4 +401,41 @@ mod tests { ); Ok(()) } + + #[test] + fn test_node_subscription_raii() { + use crate::*; + use std::sync::atomic::Ordering; + + let mut executor = Context::default().create_basic_executor(); + + let triggered = Arc::new(AtomicBool::new(false)); + let inner_triggered = Arc::clone(&triggered); + let callback = move |_: msg::Empty| { + inner_triggered.store(true, Ordering::Release); + }; + + let (_subscription, publisher) = { + let node = executor + .create_node(&format!("test_node_subscription_raii_{}", line!())) + .unwrap(); + + let qos = QoSProfile::default().keep_all().reliable(); + let subscription = node + .create_subscription::("test_topic", qos, callback) + .unwrap(); + let publisher = node + .create_publisher::("test_topic", qos) + .unwrap(); + + (subscription, publisher) + }; + + publisher.publish(msg::Empty::default()).unwrap(); + let start_time = std::time::Instant::now(); + while !triggered.load(Ordering::Acquire) { + assert!(executor.spin(SpinOptions::spin_once()).is_empty()); + assert!(start_time.elapsed() < std::time::Duration::from_secs(10)); + } + } }