Skip to content

Commit acec981

Browse files
authored
Shared state pattern (spin-off of #427) (#430)
* Migrate Node to shared state pattern Signed-off-by: Michael X. Grey <[email protected]> * Migrate primitives to state pattern Signed-off-by: Michael X. Grey <[email protected]> * Fix examples Signed-off-by: Michael X. Grey <[email protected]> * Fix merge errors Signed-off-by: Michael X. Grey <[email protected]> * Fix style Signed-off-by: Michael X. Grey <[email protected]> * Fix doc Signed-off-by: Michael X. Grey <[email protected]> --------- Signed-off-by: Michael X. Grey <[email protected]> Signed-off-by: Michael X. Grey <[email protected]>
1 parent becc938 commit acec981

17 files changed

+177
-107
lines changed

examples/minimal_pub_sub/src/minimal_two_nodes.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use anyhow::{Error, Result};
88

99
struct MinimalSubscriber {
1010
num_messages: AtomicU32,
11-
node: Arc<Node>,
12-
subscription: Mutex<Option<Arc<Subscription<std_msgs::msg::String>>>>,
11+
node: Node,
12+
subscription: Mutex<Option<Subscription<std_msgs::msg::String>>>,
1313
}
1414

1515
impl MinimalSubscriber {

examples/rust_pubsub/src/simple_publisher.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use rclrs::*;
2-
use std::{sync::Arc, thread, time::Duration};
2+
use std::{thread, time::Duration};
33
use std_msgs::msg::String as StringMsg;
44

55
struct SimplePublisher {
6-
publisher: Arc<Publisher<StringMsg>>,
6+
publisher: Publisher<StringMsg>,
77
}
88

99
impl SimplePublisher {
@@ -24,12 +24,11 @@ impl SimplePublisher {
2424

2525
fn main() -> Result<(), RclrsError> {
2626
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
27-
let publisher = Arc::new(SimplePublisher::new(&executor).unwrap());
28-
let publisher_other_thread = Arc::clone(&publisher);
27+
let publisher = SimplePublisher::new(&executor).unwrap();
2928
let mut count: i32 = 0;
3029
thread::spawn(move || loop {
3130
thread::sleep(Duration::from_millis(1000));
32-
count = publisher_other_thread.publish_data(count).unwrap();
31+
count = publisher.publish_data(count).unwrap();
3332
});
3433
executor.spin(SpinOptions::default()).first_error()
3534
}

examples/rust_pubsub/src/simple_subscriber.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::{
77
use std_msgs::msg::String as StringMsg;
88

99
pub struct SimpleSubscriptionNode {
10-
_subscriber: Arc<Subscription<StringMsg>>,
10+
_subscriber: Subscription<StringMsg>,
1111
data: Arc<Mutex<Option<StringMsg>>>,
1212
}
1313

@@ -34,11 +34,10 @@ impl SimpleSubscriptionNode {
3434
}
3535
fn main() -> Result<(), RclrsError> {
3636
let mut executor = Context::default_from_env().unwrap().create_basic_executor();
37-
let subscription = Arc::new(SimpleSubscriptionNode::new(&executor).unwrap());
38-
let subscription_other_thread = Arc::clone(&subscription);
37+
let subscription = SimpleSubscriptionNode::new(&executor).unwrap();
3938
thread::spawn(move || loop {
4039
thread::sleep(Duration::from_millis(1000));
41-
subscription_other_thread.data_callback().unwrap()
40+
subscription.data_callback().unwrap()
4241
});
4342
executor.spin(SpinOptions::default()).first_error()
4443
}

rclrs/src/client.rs

+22-9
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,25 @@ type RequestId = i64;
6565

6666
/// Main class responsible for sending requests to a ROS service.
6767
///
68-
/// The only available way to instantiate clients is via [`Node::create_client`][1], this is to
69-
/// ensure that [`Node`][2]s can track all the clients that have been created.
68+
/// Create a client using [`Node::create_client`][1].
7069
///
71-
/// [1]: crate::Node::create_client
72-
/// [2]: crate::Node
73-
pub struct Client<T>
70+
/// Receiving responses requires the node's executor to [spin][2].
71+
///
72+
/// [1]: crate::NodeState::create_client
73+
/// [2]: crate::spin
74+
pub type Client<T> = Arc<ClientState<T>>;
75+
76+
/// The inner state of a [`Client`].
77+
///
78+
/// This is public so that you can choose to create a [`Weak`][1] reference to it
79+
/// if you want to be able to refer to a [`Client`] in a non-owning way. It is
80+
/// generally recommended to manage the `ClientState` inside of an [`Arc`],
81+
/// and [`Client`] is provided as a convenience alias for that.
82+
///
83+
/// The public API of the [`Client`] type is implemented via `ClientState`.
84+
///
85+
/// [1]: std::sync::Weak
86+
pub struct ClientState<T>
7487
where
7588
T: rosidl_runtime_rs::Service,
7689
{
@@ -80,16 +93,16 @@ where
8093
/// Ensure the parent node remains alive as long as the subscription is held.
8194
/// This implementation will change in the future.
8295
#[allow(unused)]
83-
node: Arc<Node>,
96+
node: Node,
8497
}
8598

86-
impl<T> Client<T>
99+
impl<T> ClientState<T>
87100
where
88101
T: rosidl_runtime_rs::Service,
89102
{
90103
/// Creates a new client.
91104
pub(crate) fn new<'a>(
92-
node: &Arc<Node>,
105+
node: &Node,
93106
options: impl Into<ClientOptions<'a>>,
94107
) -> Result<Self, RclrsError>
95108
// This uses pub(crate) visibility to avoid instantiating this struct outside
@@ -319,7 +332,7 @@ impl<'a, T: IntoPrimitiveOptions<'a>> From<T> for ClientOptions<'a> {
319332
}
320333
}
321334

322-
impl<T> ClientBase for Client<T>
335+
impl<T> ClientBase for ClientState<T>
323336
where
324337
T: rosidl_runtime_rs::Service,
325338
{

rclrs/src/executor.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
2-
rcl_bindings::rcl_context_is_valid, Context, ContextHandle, IntoNodeOptions, Node, RclrsError,
3-
WaitSet,
2+
rcl_bindings::rcl_context_is_valid, Context, ContextHandle, IntoNodeOptions, Node, NodeState,
3+
RclrsError, WaitSet,
44
};
55
use std::{
66
sync::{Arc, Mutex, Weak},
@@ -10,15 +10,15 @@ use std::{
1010
/// Single-threaded executor implementation.
1111
pub struct Executor {
1212
context: Arc<ContextHandle>,
13-
nodes_mtx: Mutex<Vec<Weak<Node>>>,
13+
nodes_mtx: Mutex<Vec<Weak<NodeState>>>,
1414
}
1515

1616
impl Executor {
1717
/// Create a [`Node`] that will run on this Executor.
1818
pub fn create_node<'a>(
1919
&'a self,
2020
options: impl IntoNodeOptions<'a>,
21-
) -> Result<Arc<Node>, RclrsError> {
21+
) -> Result<Node, RclrsError> {
2222
let options = options.into_node_options();
2323
let node = options.build(&self.context)?;
2424
self.nodes_mtx.lock().unwrap().push(Arc::downgrade(&node));

rclrs/src/node.rs

+31-21
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@ use std::{
1919
use rosidl_runtime_rs::Message;
2020

2121
use crate::{
22-
rcl_bindings::*, Client, ClientBase, ClientOptions, Clock, ContextHandle, GuardCondition,
23-
LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant, Parameters,
24-
Publisher, PublisherOptions, RclrsError, Service, ServiceBase, ServiceOptions, Subscription,
25-
SubscriptionBase, SubscriptionCallback, SubscriptionOptions, TimeSource, ToLogParams,
26-
ENTITY_LIFECYCLE_MUTEX,
22+
rcl_bindings::*, Client, ClientBase, ClientOptions, ClientState, Clock, ContextHandle,
23+
GuardCondition, LogParams, Logger, ParameterBuilder, ParameterInterface, ParameterVariant,
24+
Parameters, Publisher, PublisherOptions, PublisherState, RclrsError, Service, ServiceBase,
25+
ServiceOptions, ServiceState, Subscription, SubscriptionBase, SubscriptionCallback,
26+
SubscriptionOptions, SubscriptionState, TimeSource, ToLogParams, ENTITY_LIFECYCLE_MUTEX,
2727
};
2828

2929
// SAFETY: The functions accessing this type, including drop(), shouldn't care about the thread
@@ -59,7 +59,7 @@ unsafe impl Send for rcl_node_t {}
5959
/// In that sense, the parameters to the node creation functions are only the _default_ namespace and
6060
/// name.
6161
/// See also the [official tutorial][1] on the command line arguments for ROS nodes, and the
62-
/// [`Node::namespace()`][3] and [`Node::name()`][4] functions for examples.
62+
/// [`NodeState::namespace()`] and [`NodeState::name()`] functions for examples.
6363
///
6464
/// ## Rules for valid names
6565
/// The rules for valid node names and node namespaces are explained in
@@ -72,7 +72,17 @@ unsafe impl Send for rcl_node_t {}
7272
/// [5]: crate::NodeOptions::new
7373
/// [6]: crate::NodeOptions::namespace
7474
/// [7]: crate::Executor::create_node
75-
pub struct Node {
75+
pub type Node = Arc<NodeState>;
76+
77+
/// The inner state of a [`Node`].
78+
///
79+
/// This is public so that you can choose to put it inside a [`Weak`] if you
80+
/// want to be able to refer to a [`Node`] in a non-owning way. It is generally
81+
/// recommended to manage the [`NodeState`] inside of an [`Arc`], and [`Node`]
82+
/// is provided as convenience alias for that.
83+
///
84+
/// The public API of the [`Node`] type is implemented via `NodeState`.
85+
pub struct NodeState {
7686
pub(crate) clients_mtx: Mutex<Vec<Weak<dyn ClientBase>>>,
7787
pub(crate) guard_conditions_mtx: Mutex<Vec<Weak<GuardCondition>>>,
7888
pub(crate) services_mtx: Mutex<Vec<Weak<dyn ServiceBase>>>,
@@ -128,23 +138,23 @@ impl Drop for NodeHandle {
128138
}
129139
}
130140

131-
impl Eq for Node {}
141+
impl Eq for NodeState {}
132142

133-
impl PartialEq for Node {
143+
impl PartialEq for NodeState {
134144
fn eq(&self, other: &Self) -> bool {
135145
Arc::ptr_eq(&self.handle, &other.handle)
136146
}
137147
}
138148

139-
impl fmt::Debug for Node {
149+
impl fmt::Debug for NodeState {
140150
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
141151
f.debug_struct("Node")
142152
.field("fully_qualified_name", &self.fully_qualified_name())
143153
.finish()
144154
}
145155
}
146156

147-
impl Node {
157+
impl NodeState {
148158
/// Returns the clock associated with this node.
149159
pub fn get_clock(&self) -> Clock {
150160
self.time_source.get_clock()
@@ -202,7 +212,7 @@ impl Node {
202212
/// Returns the fully qualified name of the node.
203213
///
204214
/// The fully qualified name of the node is the node namespace combined with the node name.
205-
/// It is subject to the remappings shown in [`Node::name()`] and [`Node::namespace()`].
215+
/// It is subject to the remappings shown in [`NodeState::name()`] and [`NodeState::namespace()`].
206216
///
207217
/// # Example
208218
/// ```
@@ -266,11 +276,11 @@ impl Node {
266276
pub fn create_client<'a, T>(
267277
self: &Arc<Self>,
268278
options: impl Into<ClientOptions<'a>>,
269-
) -> Result<Arc<Client<T>>, RclrsError>
279+
) -> Result<Client<T>, RclrsError>
270280
where
271281
T: rosidl_runtime_rs::Service,
272282
{
273-
let client = Arc::new(Client::<T>::new(self, options)?);
283+
let client = Arc::new(ClientState::<T>::new(self, options)?);
274284
{ self.clients_mtx.lock().unwrap() }.push(Arc::downgrade(&client) as Weak<dyn ClientBase>);
275285
Ok(client)
276286
}
@@ -349,11 +359,11 @@ impl Node {
349359
pub fn create_publisher<'a, T>(
350360
&self,
351361
options: impl Into<PublisherOptions<'a>>,
352-
) -> Result<Arc<Publisher<T>>, RclrsError>
362+
) -> Result<Publisher<T>, RclrsError>
353363
where
354364
T: Message,
355365
{
356-
let publisher = Arc::new(Publisher::<T>::new(Arc::clone(&self.handle), options)?);
366+
let publisher = Arc::new(PublisherState::<T>::new(Arc::clone(&self.handle), options)?);
357367
Ok(publisher)
358368
}
359369

@@ -403,12 +413,12 @@ impl Node {
403413
self: &Arc<Self>,
404414
options: impl Into<ServiceOptions<'a>>,
405415
callback: F,
406-
) -> Result<Arc<Service<T>>, RclrsError>
416+
) -> Result<Service<T>, RclrsError>
407417
where
408418
T: rosidl_runtime_rs::Service,
409419
F: Fn(&rmw_request_id_t, T::Request) -> T::Response + 'static + Send,
410420
{
411-
let service = Arc::new(Service::<T>::new(self, options, callback)?);
421+
let service = Arc::new(ServiceState::<T>::new(self, options, callback)?);
412422
{ self.services_mtx.lock().unwrap() }
413423
.push(Arc::downgrade(&service) as Weak<dyn ServiceBase>);
414424
Ok(service)
@@ -459,11 +469,11 @@ impl Node {
459469
self: &Arc<Self>,
460470
options: impl Into<SubscriptionOptions<'a>>,
461471
callback: impl SubscriptionCallback<T, Args>,
462-
) -> Result<Arc<Subscription<T>>, RclrsError>
472+
) -> Result<Subscription<T>, RclrsError>
463473
where
464474
T: Message,
465475
{
466-
let subscription = Arc::new(Subscription::<T>::new(self, options, callback)?);
476+
let subscription = Arc::new(SubscriptionState::<T>::new(self, options, callback)?);
467477
{ self.subscriptions_mtx.lock() }
468478
.unwrap()
469479
.push(Arc::downgrade(&subscription) as Weak<dyn SubscriptionBase>);
@@ -583,7 +593,7 @@ impl Node {
583593
}
584594
}
585595

586-
impl<'a> ToLogParams<'a> for &'a Node {
596+
impl<'a> ToLogParams<'a> for &'a NodeState {
587597
fn to_log_params(self) -> LogParams<'a> {
588598
self.logger().to_log_params()
589599
}

rclrs/src/node/graph.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
ffi::{CStr, CString},
44
};
55

6-
use crate::{rcl_bindings::*, Node, RclrsError, ToResult};
6+
use crate::{rcl_bindings::*, NodeState, RclrsError, ToResult};
77

88
impl Drop for rmw_names_and_types_t {
99
fn drop(&mut self) {
@@ -57,7 +57,7 @@ pub struct TopicEndpointInfo {
5757
pub topic_type: String,
5858
}
5959

60-
impl Node {
60+
impl NodeState {
6161
/// Returns a list of topic names and types for publishers associated with a node.
6262
pub fn get_publisher_names_and_types_by_node(
6363
&self,

rclrs/src/node/node_options.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@ use std::{
55
};
66

77
use crate::{
8-
rcl_bindings::*, ClockType, ContextHandle, Logger, Node, NodeHandle, ParameterInterface,
9-
QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX, QOS_PROFILE_CLOCK,
8+
rcl_bindings::*, ClockType, ContextHandle, Logger, Node, NodeHandle, NodeState,
9+
ParameterInterface, QoSProfile, RclrsError, TimeSource, ToResult, ENTITY_LIFECYCLE_MUTEX,
10+
QOS_PROFILE_CLOCK,
1011
};
1112

1213
/// This trait helps to build [`NodeOptions`] which can be passed into
@@ -285,7 +286,7 @@ impl<'a> NodeOptions<'a> {
285286
///
286287
/// Only used internally. Downstream users should call
287288
/// [`Executor::create_node`].
288-
pub(crate) fn build(self, context: &Arc<ContextHandle>) -> Result<Arc<Node>, RclrsError> {
289+
pub(crate) fn build(self, context: &Arc<ContextHandle>) -> Result<Node, RclrsError> {
289290
let node_name = CString::new(self.name).map_err(|err| RclrsError::StringContainsNul {
290291
err,
291292
s: self.name.to_owned(),
@@ -355,7 +356,7 @@ impl<'a> NodeOptions<'a> {
355356
}
356357
};
357358

358-
let node = Arc::new(Node {
359+
let node = Arc::new(NodeState {
359360
clients_mtx: Mutex::default(),
360361
guard_conditions_mtx: Mutex::default(),
361362
services_mtx: Mutex::default(),

rclrs/src/parameter.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ enum DeclaredValue {
8484
}
8585

8686
/// Builder used to declare a parameter. Obtain this by calling
87-
/// [`crate::Node::declare_parameter`].
87+
/// [`crate::NodeState::declare_parameter`].
8888
#[must_use]
8989
pub struct ParameterBuilder<'a, T: ParameterVariant> {
9090
name: Arc<str>,
@@ -789,7 +789,7 @@ impl ParameterInterface {
789789
}
790790
}
791791

792-
pub(crate) fn create_services(&self, node: &Arc<Node>) -> Result<(), RclrsError> {
792+
pub(crate) fn create_services(&self, node: &Node) -> Result<(), RclrsError> {
793793
*self.services.lock().unwrap() =
794794
Some(ParameterService::new(node, self.parameter_map.clone())?);
795795
Ok(())

0 commit comments

Comments
 (0)