diff --git a/nodelet/include/nodelet/detail/callback_queue.h b/nodelet/include/nodelet/detail/callback_queue.h index f4aa6edf..a37b529d 100644 --- a/nodelet/include/nodelet/detail/callback_queue.h +++ b/nodelet/include/nodelet/detail/callback_queue.h @@ -49,34 +49,33 @@ namespace detail class CallbackQueueManager; -class NODELETLIB_DECL CallbackQueue : public ros::CallbackQueueInterface, - public boost::enable_shared_from_this +class NODELETLIB_DECL CallbackQueue : + public ros::CallbackQueueInterface, + public boost::enable_shared_from_this { public: - CallbackQueue(CallbackQueueManager* parent, - const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr()); - ~CallbackQueue(); + CallbackQueue(CallbackQueueManager* parent, const ros::VoidConstPtr& tracked_object = ros::VoidConstPtr()); + ~CallbackQueue(); - virtual void addCallback(const ros::CallbackInterfacePtr& callback, uint64_t owner_id = 0); - virtual void removeByID(uint64_t owner_id); + virtual void addCallback(const ros::CallbackInterfacePtr& callback, uint64_t owner_id = 0); + virtual void removeByID(uint64_t owner_id); - uint32_t callOne(); + uint32_t callOne(); - /** + /** * \brief Enable the queue (queue is enabled by default) */ - void enable(); - /** + void enable(); + /** * \brief Disable the queue, meaning any calls to addCallback() will have no effect */ - void disable(); - + void disable(); private: - CallbackQueueManager* parent_; - ros::CallbackQueue queue_; - ros::VoidConstWPtr tracked_object_; - bool has_tracked_object_; + CallbackQueueManager* parent_; + ros::CallbackQueue queue_; + ros::VoidConstWPtr tracked_object_; + bool has_tracked_object_; }; } // namespace detail diff --git a/nodelet/include/nodelet/detail/callback_queue_manager.h b/nodelet/include/nodelet/detail/callback_queue_manager.h index bcf341c3..4c2f77f2 100644 --- a/nodelet/include/nodelet/detail/callback_queue_manager.h +++ b/nodelet/include/nodelet/detail/callback_queue_manager.h @@ -65,101 +65,104 @@ typedef boost::shared_ptr CallbackQueuePtr; class NODELETLIB_DECL CallbackQueueManager { public: - /** + /** * \brief Constructor * * By default, uses the number of hardware threads available on the current system. */ - CallbackQueueManager(uint32_t num_worker_threads = 0); - ~CallbackQueueManager(); + CallbackQueueManager(uint32_t num_worker_threads = 0); + ~CallbackQueueManager(); - void addQueue(const CallbackQueuePtr& queue, bool threaded); - void removeQueue(const CallbackQueuePtr& queue); - void callbackAdded(const CallbackQueuePtr& queue); + void addQueue(const CallbackQueuePtr& queue, bool threaded); + void removeQueue(const CallbackQueuePtr& queue); + void callbackAdded(const CallbackQueuePtr& queue); - uint32_t getNumWorkerThreads(); + uint32_t getNumWorkerThreads(); - void stop(); + void stop(); private: - void managerThread(); - struct ThreadInfo; - void workerThread(ThreadInfo*); - - ThreadInfo* getSmallestQueue(); - - struct QueueInfo - { - QueueInfo() - : threaded(false) - , thread_index(0xffffffff) - , in_thread(0) - {} - - CallbackQueuePtr queue; - bool threaded; - - // Only used if threaded == false - boost::mutex st_mutex; - /// @todo Could get rid of st_mutex by updating [thread_index|in_thread] atomically - uint32_t thread_index; - uint32_t in_thread; - }; - typedef boost::shared_ptr QueueInfoPtr; - - typedef boost::unordered_map M_Queue; - M_Queue queues_; - boost::mutex queues_mutex_; - - /// @todo SRMW lockfree queue. waiting_mutex_ has the potential for a lot of contention - typedef std::vector V_Queue; - V_Queue waiting_; - boost::mutex waiting_mutex_; - boost::condition_variable waiting_cond_; - boost::thread_group tg_; - - struct ThreadInfo - { - ThreadInfo() - : calling(0) - {} - - /// @todo SRSW lockfree queue - boost::mutex queue_mutex; - boost::condition_variable queue_cond; - std::vector > queue; - boost::detail::atomic_count calling; + void managerThread(); + struct ThreadInfo; + void workerThread(ThreadInfo*); -#ifdef NODELET_QUEUE_DEBUG - struct Record - { - Record(double stamp, uint32_t tasks, bool threaded) - : stamp(stamp), tasks(tasks), threaded(threaded) - {} + ThreadInfo* getSmallestQueue(); - double stamp; - uint32_t tasks; - bool threaded; + struct QueueInfo + { + QueueInfo() + : threaded(false) + , thread_index(0xffffffff) + , in_thread(0) + { + } + + CallbackQueuePtr queue; + bool threaded; + + // Only used if threaded == false + boost::mutex st_mutex; + /// @todo Could get rid of st_mutex by updating [thread_index|in_thread] atomically + uint32_t thread_index; + uint32_t in_thread; }; + typedef boost::shared_ptr QueueInfoPtr; - std::vector history; + typedef boost::unordered_map M_Queue; + M_Queue queues_; + boost::mutex queues_mutex_; + + /// @todo SRMW lockfree queue. waiting_mutex_ has the potential for a lot of contention + typedef std::vector V_Queue; + V_Queue waiting_; + boost::mutex waiting_mutex_; + boost::condition_variable waiting_cond_; + boost::thread_group tg_; + + struct ThreadInfo + { + ThreadInfo() + : calling(0) + { + } + + /// @todo SRSW lockfree queue + boost::mutex queue_mutex; + boost::condition_variable queue_cond; + std::vector > queue; + boost::detail::atomic_count calling; + +#ifdef NODELET_QUEUE_DEBUG + struct Record + { + Record(double stamp, uint32_t tasks, bool threaded) + : stamp(stamp), tasks(tasks), threaded(threaded) + { + } + + double stamp; + uint32_t tasks; + bool threaded; + }; + + std::vector history; #endif - // Pad sizeof(ThreadInfo) to be a multiple of 64 (cache line size) to avoid false sharing. - // This still doesn't guarantee ThreadInfo is actually allocated on a cache line boundary though. - static const int ACTUAL_SIZE = - sizeof(boost::mutex) + - sizeof(boost::condition_variable) + - sizeof(std::vector >) + - sizeof(boost::detail::atomic_count); - uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE]; - }; - /// @todo Use cache-aligned allocator for thread_info_ - typedef boost::scoped_array V_ThreadInfo; - V_ThreadInfo thread_info_; - - bool running_; - uint32_t num_worker_threads_; + // Pad sizeof(ThreadInfo) to be a multiple of 64 (cache line size) to avoid false sharing. + // This still doesn't guarantee ThreadInfo is actually allocated on a cache line boundary though. + static const int ACTUAL_SIZE = + sizeof(boost::mutex) + + sizeof(boost::condition_variable) + + sizeof(std::vector >) + + sizeof(boost::detail::atomic_count); + uint8_t pad[((ACTUAL_SIZE + 63) & ~63) - ACTUAL_SIZE]; + }; + /// @todo Use cache-aligned allocator for thread_info_ + typedef boost::scoped_array V_ThreadInfo; + V_ThreadInfo thread_info_; + + bool running_; + uint32_t num_worker_threads_; }; } // namespace detail diff --git a/nodelet/include/nodelet/exception.h b/nodelet/include/nodelet/exception.h index 29d153aa..2ad989c3 100644 --- a/nodelet/include/nodelet/exception.h +++ b/nodelet/include/nodelet/exception.h @@ -39,13 +39,12 @@ namespace nodelet class Exception : public std::runtime_error { public: - Exception(const std::string& what) - : std::runtime_error(what) - {} + Exception(const std::string& what) + : std::runtime_error(what) + { + } }; } // namespace nodelet #endif // NODELET_EXCEPTION_H - - diff --git a/nodelet/include/nodelet/loader.h b/nodelet/include/nodelet/loader.h index 00cd87f4..1d87dbd3 100644 --- a/nodelet/include/nodelet/loader.h +++ b/nodelet/include/nodelet/loader.h @@ -62,35 +62,34 @@ typedef std::vector V_string; class NODELETLIB_DECL Loader { public: - /** \brief Construct the nodelet loader with optional ros API at default location of NodeHandle("~")*/ - Loader(bool provide_ros_api = true); - /** \brief Construct the nodelet loader with optional ros API in namespace of passed NodeHandle */ - Loader(const ros::NodeHandle& server_nh); - /** + /** \brief Construct the nodelet loader with optional ros API at default location of NodeHandle("~")*/ + Loader(bool provide_ros_api = true); + /** \brief Construct the nodelet loader with optional ros API in namespace of passed NodeHandle */ + Loader(const ros::NodeHandle& server_nh); + /** * \brief Construct the nodelet loader without ros API, using non-standard factory function to * create nodelet instances */ - Loader(const boost::function (const std::string& lookup_name)>& create_instance); + Loader(const boost::function(const std::string&)>& create_instance); - ~Loader(); + ~Loader(); - /** \brief Load a nodelet */ - bool load(const std::string& name, const std::string& type, const M_string& remappings, - const V_string& my_argv); + /** \brief Load a nodelet */ + bool load(const std::string& name, const std::string& type, const M_string& remappings, const V_string& my_argv); - /** \brief Unload a nodelet */ - bool unload(const std::string& name); + /** \brief Unload a nodelet */ + bool unload(const std::string& name); - /** \brief Clear all nodelets from this loader */ - bool clear(); + /** \brief Clear all nodelets from this loader */ + bool clear(); - /**\brief List the names of all loaded nodelets */ - std::vector listLoadedNodelets(); + /**\brief List the names of all loaded nodelets */ + std::vector listLoadedNodelets(); private: - boost::mutex lock_; /// impl_; + boost::mutex lock_; /// impl_; }; } // namespace nodelet diff --git a/nodelet/include/nodelet/nodelet.h b/nodelet/include/nodelet/nodelet.h index ae94892e..c20a5b1f 100644 --- a/nodelet/include/nodelet/nodelet.h +++ b/nodelet/include/nodelet/nodelet.h @@ -44,7 +44,7 @@ namespace ros { class NodeHandle; class CallbackQueueInterface; -} +} // namespace ros #define NODELET_DEBUG(...) ROS_DEBUG_NAMED(getName(), __VA_ARGS__) #define NODELET_DEBUG_STREAM(...) ROS_DEBUG_STREAM_NAMED(getName(), __VA_ARGS__) @@ -163,79 +163,82 @@ typedef boost::shared_ptr NodeHandlePtr; typedef std::map M_string; typedef std::vector V_string; -namespace detail { - class CallbackQueue; +namespace detail +{ +class CallbackQueue; } class UninitializedException : public Exception { public: - UninitializedException(const std::string& method_name) - : Exception("Calling [" + method_name + "] before the Nodelet is initialized is not allowed.") - {} + UninitializedException(const std::string& method_name) + : Exception("Calling [" + method_name + "] before the Nodelet is initialized is not allowed.") + { + } }; class MultipleInitializationException : public Exception { public: - MultipleInitializationException() - : Exception("Initialized multiple times") - {} + MultipleInitializationException() + : Exception("Initialized multiple times") + { + } }; class NODELETLIB_DECL Nodelet { - // Protected data fields for use by the subclass. + // Protected data fields for use by the subclass. protected: - inline const std::string& getName() const { return nodelet_name_; } - inline std::string getSuffixedName(const std::string& suffix) const - { - return nodelet_name_ + "." + suffix; - } - inline const V_string& getMyArgv() const { return my_argv_; } - inline const M_string& getRemappingArgs() const { return remapping_args_; } - - ros::NodeHandle& getNodeHandle() const; - ros::NodeHandle& getPrivateNodeHandle() const; - ros::NodeHandle& getMTNodeHandle() const; - ros::NodeHandle& getMTPrivateNodeHandle() const; - - ros::CallbackQueueInterface& getSTCallbackQueue() const; - ros::CallbackQueueInterface& getMTCallbackQueue() const; - - - // Internal storage; + inline const std::string& getName() const { return nodelet_name_; } + inline std::string getSuffixedName(const std::string& suffix) const + { + return nodelet_name_ + "." + suffix; + } + inline const V_string& getMyArgv() const { return my_argv_; } + inline const M_string& getRemappingArgs() const { return remapping_args_; } + + ros::NodeHandle& getNodeHandle() const; + ros::NodeHandle& getPrivateNodeHandle() const; + ros::NodeHandle& getMTNodeHandle() const; + ros::NodeHandle& getMTPrivateNodeHandle() const; + + ros::CallbackQueueInterface& getSTCallbackQueue() const; + ros::CallbackQueueInterface& getMTCallbackQueue() const; + + // Internal storage; private: - bool inited_; + bool inited_; - std::string nodelet_name_; + std::string nodelet_name_; - NodeHandlePtr nh_; - NodeHandlePtr private_nh_; - NodeHandlePtr mt_nh_; - NodeHandlePtr mt_private_nh_; - V_string my_argv_; - M_string remapping_args_; + NodeHandlePtr nh_; + NodeHandlePtr private_nh_; + NodeHandlePtr mt_nh_; + NodeHandlePtr mt_private_nh_; + V_string my_argv_; + M_string remapping_args_; - // Method to be overridden by subclass when starting up. - virtual void onInit() = 0; + // Method to be overridden by subclass when starting up. + virtual void onInit() = 0; - // Public API used for launching + // Public API used for launching public: - /**\brief Empty constructor required for dynamic loading */ - Nodelet(); + /**\brief Empty constructor required for dynamic loading */ + Nodelet(); - /**\brief Init function called at startup + /**\brief Init function called at startup * \param name The name of the nodelet * \param remapping_args The remapping args in a map for the nodelet * \param my_argv The commandline arguments for this nodelet stripped of special arguments such as ROS arguments */ - void init(const std::string& name, const M_string& remapping_args, const V_string& my_argv, - ros::CallbackQueueInterface* st_queue = NULL, - ros::CallbackQueueInterface* mt_queue = NULL); + void init(const std::string& name, const M_string& remapping_args, const V_string& my_argv, + ros::CallbackQueueInterface* st_queue = NULL, + ros::CallbackQueueInterface* mt_queue = NULL); - virtual ~Nodelet(); + virtual ~Nodelet(); }; -} +} // namespace nodelet + #endif //NODELET_NODELET_H diff --git a/nodelet/include/nodelet/nodeletdecl.h b/nodelet/include/nodelet/nodeletdecl.h index 5ea2e84b..f042ddec 100644 --- a/nodelet/include/nodelet/nodeletdecl.h +++ b/nodelet/include/nodelet/nodeletdecl.h @@ -30,16 +30,18 @@ #include -#ifdef ROS_BUILD_SHARED_LIBS // ros is being built around shared libraries - #ifdef nodeletlib_EXPORTS // we are building a shared lib/dll - #define NODELETLIB_DECL ROS_HELPER_EXPORT - #else // we are using shared lib/dll - #define NODELETLIB_DECL ROS_HELPER_IMPORT - #endif -#else // ros is being built around static libraries - #define NODELETLIB_DECL +#ifdef ROS_BUILD_SHARED_LIBS + // ros is being built around shared libraries + #ifdef nodeletlib_EXPORTS + // we are building a shared lib/dll + #define NODELETLIB_DECL ROS_HELPER_EXPORT + #else + // we are using shared lib/dll + #define NODELETLIB_DECL ROS_HELPER_IMPORT + #endif +#else + // ros is being built around static libraries + #define NODELETLIB_DECL #endif #endif // NODELETDECL_H - - diff --git a/nodelet/src/callback_queue.cpp b/nodelet/src/callback_queue.cpp index 2883b068..524d011a 100644 --- a/nodelet/src/callback_queue.cpp +++ b/nodelet/src/callback_queue.cpp @@ -39,9 +39,9 @@ namespace detail CallbackQueue::CallbackQueue(CallbackQueueManager* parent, const ros::VoidConstPtr& tracked_object) -: parent_(parent) -, tracked_object_(tracked_object) -, has_tracked_object_(tracked_object) + : parent_(parent) + , tracked_object_(tracked_object) + , has_tracked_object_(tracked_object) { } @@ -51,43 +51,42 @@ CallbackQueue::~CallbackQueue() void CallbackQueue::addCallback(const ros::CallbackInterfacePtr& cb, uint64_t owner_id) { - if (queue_.isEnabled()) - { - queue_.addCallback(cb, owner_id); - parent_->callbackAdded(shared_from_this()); - } + if (queue_.isEnabled()) + { + queue_.addCallback(cb, owner_id); + parent_->callbackAdded(shared_from_this()); + } } void CallbackQueue::removeByID(uint64_t owner_id) { - queue_.removeByID(owner_id); + queue_.removeByID(owner_id); } uint32_t CallbackQueue::callOne() { - // Don't try to call the callback after its nodelet has been destroyed! - ros::VoidConstPtr tracker; - if (has_tracked_object_) - { - tracker = tracked_object_.lock(); + // Don't try to call the callback after its nodelet has been destroyed! + ros::VoidConstPtr tracker; + if (has_tracked_object_) + { + tracker = tracked_object_.lock(); - if (!tracker) - return ros::CallbackQueue::Disabled; - } + if (!tracker) + return ros::CallbackQueue::Disabled; + } - return queue_.callOne(); + return queue_.callOne(); } void CallbackQueue::enable() { - queue_.enable(); + queue_.enable(); } void CallbackQueue::disable() { - queue_.disable(); + queue_.disable(); } - } // namespace detail } // namespace nodelet diff --git a/nodelet/src/callback_queue_manager.cpp b/nodelet/src/callback_queue_manager.cpp index 234ebb30..7c3f61a2 100644 --- a/nodelet/src/callback_queue_manager.cpp +++ b/nodelet/src/callback_queue_manager.cpp @@ -44,247 +44,246 @@ namespace detail { CallbackQueueManager::CallbackQueueManager(uint32_t num_worker_threads) -: running_(true), - num_worker_threads_(num_worker_threads) + : running_(true), + num_worker_threads_(num_worker_threads) { - if (num_worker_threads_ == 0) - num_worker_threads_ = boost::thread::hardware_concurrency(); + if (num_worker_threads_ == 0) + num_worker_threads_ = boost::thread::hardware_concurrency(); - tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this)); + tg_.create_thread(boost::bind(&CallbackQueueManager::managerThread, this)); - size_t num_threads = getNumWorkerThreads(); - thread_info_.reset( new ThreadInfo[num_threads] ); - for (size_t i = 0; i < num_threads; ++i) - { - tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i])); - } + size_t num_threads = getNumWorkerThreads(); + thread_info_.reset(new ThreadInfo[num_threads]); + for (size_t i = 0; i < num_threads; ++i) + { + tg_.create_thread(boost::bind(&CallbackQueueManager::workerThread, this, &thread_info_[i])); + } } CallbackQueueManager::~CallbackQueueManager() { - stop(); + stop(); #ifdef NODELET_QUEUE_DEBUG - // Write out task assignment histories for each thread - typedef ThreadInfo::Record Record; - for (size_t i = 0; i < num_threads; ++i) - { - char filename[128]; - sprintf(filename, "thread_history_%d.txt", (int)i); - FILE* file = fopen(filename, "w"); - fprintf(file, "# timestamps tasks threaded\n"); - const std::vector& history = thread_info_[i].history; - for (int j = 0; j < (int)history.size(); ++j) + // Write out task assignment histories for each thread + typedef ThreadInfo::Record Record; + for (size_t i = 0; i < num_threads; ++i) { - Record r = history[j]; - fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded); + char filename[128]; + sprintf(filename, "thread_history_%d.txt", (int)i); + FILE* file = fopen(filename, "w"); + fprintf(file, "# timestamps tasks threaded\n"); + const std::vector& history = thread_info_[i].history; + for (int j = 0; j < (int)history.size(); ++j) + { + Record r = history[j]; + fprintf(file, "%.6f %u %d\n", r.stamp, r.tasks, (int)r.threaded); + } + fclose(file); } - fclose(file); - } #endif } void CallbackQueueManager::stop() { - running_ = false; - { - boost::mutex::scoped_lock lock(waiting_mutex_); - waiting_cond_.notify_all(); - } + running_ = false; + { + boost::mutex::scoped_lock lock(waiting_mutex_); + waiting_cond_.notify_all(); + } - size_t num_threads = getNumWorkerThreads(); - for (size_t i = 0; i < num_threads; ++i) - { - boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex); - thread_info_[i].queue_cond.notify_all(); - } + size_t num_threads = getNumWorkerThreads(); + for (size_t i = 0; i < num_threads; ++i) + { + boost::mutex::scoped_lock lock(thread_info_[i].queue_mutex); + thread_info_[i].queue_cond.notify_all(); + } - tg_.join_all(); + tg_.join_all(); } uint32_t CallbackQueueManager::getNumWorkerThreads() { - return num_worker_threads_; + return num_worker_threads_; } void CallbackQueueManager::addQueue(const CallbackQueuePtr& queue, bool threaded) { - boost::mutex::scoped_lock lock(queues_mutex_); + boost::mutex::scoped_lock lock(queues_mutex_); - QueueInfoPtr& info = queues_[queue.get()]; - ROS_ASSERT(!info); - info.reset(new QueueInfo); - info->queue = queue; - info->threaded = threaded; + QueueInfoPtr& info = queues_[queue.get()]; + ROS_ASSERT(!info); + info.reset(new QueueInfo); + info->queue = queue; + info->threaded = threaded; } void CallbackQueueManager::removeQueue(const CallbackQueuePtr& queue) { - boost::mutex::scoped_lock lock(queues_mutex_); - ROS_ASSERT(queues_.find(queue.get()) != queues_.end()); + boost::mutex::scoped_lock lock(queues_mutex_); + ROS_ASSERT(queues_.find(queue.get()) != queues_.end()); - queues_.erase(queue.get()); + queues_.erase(queue.get()); } void CallbackQueueManager::callbackAdded(const CallbackQueuePtr& queue) { - { - boost::mutex::scoped_lock lock(waiting_mutex_); - waiting_.push_back(queue); - } + { + boost::mutex::scoped_lock lock(waiting_mutex_); + waiting_.push_back(queue); + } - waiting_cond_.notify_all(); + waiting_cond_.notify_all(); } CallbackQueueManager::ThreadInfo* CallbackQueueManager::getSmallestQueue() { - size_t smallest = std::numeric_limits::max(); - uint32_t smallest_index = 0xffffffff; - for (unsigned i = 0; i < num_worker_threads_; ++i) - { - ThreadInfo& ti = thread_info_[i]; - - size_t size = ti.calling; - if (size == 0) + size_t smallest = std::numeric_limits::max(); + uint32_t smallest_index = 0xffffffff; + for (unsigned i = 0; i < num_worker_threads_; ++i) { - return &ti; - } + ThreadInfo& ti = thread_info_[i]; - if (size < smallest) - { - smallest = size; - smallest_index = i; + size_t size = ti.calling; + if (size == 0) + { + return &ti; + } + + if (size < smallest) + { + smallest = size; + smallest_index = i; + } } - } - return &thread_info_[smallest_index]; + return &thread_info_[smallest_index]; } void CallbackQueueManager::managerThread() { - V_Queue local_waiting; + V_Queue local_waiting; - while (running_) - { + while (running_) { - boost::mutex::scoped_lock lock(waiting_mutex_); - - while (waiting_.empty() && running_) - { - waiting_cond_.wait(lock); - } - - if (!running_) - { - return; - } - - local_waiting.swap(waiting_); - } - - { - boost::mutex::scoped_lock lock(queues_mutex_); - - V_Queue::iterator it = local_waiting.begin(); - V_Queue::iterator end = local_waiting.end(); - for (; it != end; ++it) - { - CallbackQueuePtr& queue = *it; - - M_Queue::iterator it = queues_.find(queue.get()); - if (it != queues_.end()) { - QueueInfoPtr& info = it->second; - ThreadInfo* ti = 0; - if (info->threaded) - { - // If this queue is thread-safe we immediately add it to the thread with the least work queued - ti = getSmallestQueue(); - } - else - { - // If this queue is non-thread-safe and has no in-progress calls happening, we add it to the - // thread with the least work queued. If the queue already has calls in-progress we add it - // to the thread it's already being called from - boost::mutex::scoped_lock lock(info->st_mutex); - - if (info->in_thread == 0) + boost::mutex::scoped_lock lock(waiting_mutex_); + + while (waiting_.empty() && running_) { - ti = getSmallestQueue(); - info->thread_index = ti - thread_info_.get(); + waiting_cond_.wait(lock); } - else + + if (!running_) { - ti = &thread_info_[info->thread_index]; + return; } - ++info->in_thread; - } + local_waiting.swap(waiting_); + } - { - boost::mutex::scoped_lock lock(ti->queue_mutex); - ti->queue.push_back(std::make_pair(queue, info)); - ++ti->calling; + { + boost::mutex::scoped_lock lock(queues_mutex_); + + V_Queue::iterator it = local_waiting.begin(); + V_Queue::iterator end = local_waiting.end(); + for (; it != end; ++it) + { + CallbackQueuePtr& queue = *it; + + M_Queue::iterator it = queues_.find(queue.get()); + if (it != queues_.end()) + { + QueueInfoPtr& info = it->second; + ThreadInfo* ti = 0; + if (info->threaded) + { + // If this queue is thread-safe we immediately add it to the thread with the least work queued + ti = getSmallestQueue(); + } + else + { + // If this queue is non-thread-safe and has no in-progress calls happening, we add it to the + // thread with the least work queued. If the queue already has calls in-progress we add it + // to the thread it's already being called from + boost::mutex::scoped_lock lock(info->st_mutex); + + if (info->in_thread == 0) + { + ti = getSmallestQueue(); + info->thread_index = ti - thread_info_.get(); + } + else + { + ti = &thread_info_[info->thread_index]; + } + + ++info->in_thread; + } + + { + boost::mutex::scoped_lock lock(ti->queue_mutex); + ti->queue.push_back(std::make_pair(queue, info)); + ++ti->calling; #ifdef NODELET_QUEUE_DEBUG - double stamp = ros::WallTime::now().toSec(); - uint32_t tasks = ti->calling; - ti->history.push_back(ThreadInfo::Record(stamp, tasks, true)); + double stamp = ros::WallTime::now().toSec(); + uint32_t tasks = ti->calling; + ti->history.push_back(ThreadInfo::Record(stamp, tasks, true)); #endif - } + } - ti->queue_cond.notify_all(); + ti->queue_cond.notify_all(); + } + } } - } - } - local_waiting.clear(); - } + local_waiting.clear(); + } } void CallbackQueueManager::workerThread(ThreadInfo* info) { - std::vector > local_queues; + std::vector > local_queues; - while (running_) - { + while (running_) { - boost::mutex::scoped_lock lock(info->queue_mutex); + { + boost::mutex::scoped_lock lock(info->queue_mutex); - while (info->queue.empty() && running_) - { - info->queue_cond.wait(lock); - } + while (info->queue.empty() && running_) + { + info->queue_cond.wait(lock); + } - if (!running_) - { - return; - } + if (!running_) + { + return; + } - info->queue.swap(local_queues); - } + info->queue.swap(local_queues); + } - std::vector >::iterator it = local_queues.begin(); - std::vector >::iterator end = local_queues.end(); - for (; it != end; ++it) - { - CallbackQueuePtr& queue = it->first; - QueueInfoPtr& qi = it->second; - if (queue->callOne() == ros::CallbackQueue::TryAgain) - { - callbackAdded(queue); - } - --info->calling; - - if (!qi->threaded) - { - boost::mutex::scoped_lock lock(qi->st_mutex); - --qi->in_thread; - } - } + std::vector >::iterator it = local_queues.begin(); + std::vector >::iterator end = local_queues.end(); + for (; it != end; ++it) + { + CallbackQueuePtr& queue = it->first; + QueueInfoPtr& qi = it->second; + if (queue->callOne() == ros::CallbackQueue::TryAgain) + { + callbackAdded(queue); + } + --info->calling; - local_queues.clear(); + if (!qi->threaded) + { + boost::mutex::scoped_lock lock(qi->st_mutex); + --qi->in_thread; + } + } - } + local_queues.clear(); + } } } // namespace detail diff --git a/nodelet/src/loader.cpp b/nodelet/src/loader.cpp index 2ebf1d74..3c187772 100644 --- a/nodelet/src/loader.cpp +++ b/nodelet/src/loader.cpp @@ -73,299 +73,297 @@ typedef boost::shared_ptr NodeletPtr; class LoaderROS { public: - LoaderROS(Loader* parent, const ros::NodeHandle& nh) - : parent_(parent) - , nh_(nh) - , bond_spinner_(1, &bond_callback_queue_) - { - load_server_ = nh_.advertiseService("load_nodelet", &LoaderROS::serviceLoad, this); - unload_server_ = nh_.advertiseService("unload_nodelet", &LoaderROS::serviceUnload, this); - list_server_ = nh_.advertiseService("list", &LoaderROS::serviceList, this); - - bond_spinner_.start(); - } - -private: - bool serviceLoad(nodelet::NodeletLoad::Request &req, - nodelet::NodeletLoad::Response &res) - { - boost::mutex::scoped_lock lock(lock_); - - // build map - M_string remappings; - if (req.remap_source_args.size() != req.remap_target_args.size()) + LoaderROS(Loader* parent, const ros::NodeHandle& nh) + : parent_(parent) + , nh_(nh) + , bond_spinner_(1, &bond_callback_queue_) { - ROS_ERROR("Bad remapppings provided, target and source of different length"); + load_server_ = nh_.advertiseService("load_nodelet", &LoaderROS::serviceLoad, this); + unload_server_ = nh_.advertiseService("unload_nodelet", &LoaderROS::serviceUnload, this); + list_server_ = nh_.advertiseService("list", &LoaderROS::serviceList, this); + + bond_spinner_.start(); } - else + +private: + bool serviceLoad(nodelet::NodeletLoad::Request& req, nodelet::NodeletLoad::Response& res) { - for (size_t i = 0; i < req.remap_source_args.size(); ++i) - { - remappings[ros::names::resolve(req.remap_source_args[i])] = ros::names::resolve(req.remap_target_args[i]); - ROS_DEBUG("%s:%s\n", ros::names::resolve(req.remap_source_args[i]).c_str(), remappings[ros::names::resolve(req.remap_source_args[i])].c_str()); - } + boost::mutex::scoped_lock lock(lock_); + + // build map + M_string remappings; + if (req.remap_source_args.size() != req.remap_target_args.size()) + { + ROS_ERROR("Bad remapppings provided, target and source of different length"); + } + else + { + for (size_t i = 0; i < req.remap_source_args.size(); ++i) + { + remappings[ros::names::resolve(req.remap_source_args[i])] = ros::names::resolve(req.remap_target_args[i]); + ROS_DEBUG("%s:%s\n", ros::names::resolve(req.remap_source_args[i]).c_str(), remappings[ros::names::resolve(req.remap_source_args[i])].c_str()); + } + } + + res.success = parent_->load(req.name, req.type, remappings, req.my_argv); + + // If requested, create bond to sister process + if (res.success && !req.bond_id.empty()) + { + bond::Bond* bond = new bond::Bond(nh_.getNamespace() + "/bond", req.bond_id); + bond_map_.insert(req.name, bond); + bond->setCallbackQueue(&bond_callback_queue_); + bond->setBrokenCallback(boost::bind(&LoaderROS::unload, this, req.name)); + bond->start(); + } + return res.success; } - res.success = parent_->load(req.name, req.type, remappings, req.my_argv); - - // If requested, create bond to sister process - if (res.success && !req.bond_id.empty()) + bool serviceUnload(nodelet::NodeletUnload::Request& req, nodelet::NodeletUnload::Response& res) { - bond::Bond* bond = new bond::Bond(nh_.getNamespace() + "/bond", req.bond_id); - bond_map_.insert(req.name, bond); - bond->setCallbackQueue(&bond_callback_queue_); - bond->setBrokenCallback(boost::bind(&LoaderROS::unload, this, req.name)); - bond->start(); + res.success = unload(req.name); + return res.success; } - return res.success; - } - - bool serviceUnload(nodelet::NodeletUnload::Request &req, - nodelet::NodeletUnload::Response &res) - { - res.success = unload(req.name); - return res.success; - } - - bool unload(const std::string& name) - { - boost::mutex::scoped_lock lock(lock_); - bool success = parent_->unload(name); - if (!success) + bool unload(const std::string& name) { - ROS_ERROR("Failed to find nodelet with name '%s' to unload.", name.c_str()); - return success; + boost::mutex::scoped_lock lock(lock_); + + bool success = parent_->unload(name); + if (!success) + { + ROS_ERROR("Failed to find nodelet with name '%s' to unload.", name.c_str()); + return success; + } + + // break the bond, if there is one + M_stringToBond::iterator it = bond_map_.find(name); + if (it != bond_map_.end()) + { + // disable callback for broken bond, as we are breaking it intentially now + it->second->setBrokenCallback(boost::function()); + // erase (and break) bond + bond_map_.erase(name); + } + + return success; } - // break the bond, if there is one - M_stringToBond::iterator it = bond_map_.find(name); - if (it != bond_map_.end()) { - // disable callback for broken bond, as we are breaking it intentially now - it->second->setBrokenCallback(boost::function()); - // erase (and break) bond - bond_map_.erase(name); + bool serviceList(nodelet::NodeletList::Request&, nodelet::NodeletList::Response& res) + { + res.nodelets = parent_->listLoadedNodelets(); + return true; } - return success; - } - - bool serviceList(nodelet::NodeletList::Request &, - nodelet::NodeletList::Response &res) - { - res.nodelets = parent_->listLoadedNodelets(); - return true; - } - - Loader* parent_; - ros::NodeHandle nh_; - ros::ServiceServer load_server_; - ros::ServiceServer unload_server_; - ros::ServiceServer list_server_; + Loader* parent_; + ros::NodeHandle nh_; + ros::ServiceServer load_server_; + ros::ServiceServer unload_server_; + ros::ServiceServer list_server_; - boost::mutex lock_; + boost::mutex lock_; - ros::CallbackQueue bond_callback_queue_; - ros::AsyncSpinner bond_spinner_; - typedef boost::ptr_map M_stringToBond; - M_stringToBond bond_map_; + ros::CallbackQueue bond_callback_queue_; + ros::AsyncSpinner bond_spinner_; + typedef boost::ptr_map M_stringToBond; + M_stringToBond bond_map_; }; // Owns a Nodelet and its callback queues struct ManagedNodelet : boost::noncopyable { - detail::CallbackQueuePtr st_queue; - detail::CallbackQueuePtr mt_queue; - NodeletPtr nodelet; // destroyed before the queues - detail::CallbackQueueManager* callback_manager; - - /// @todo Maybe addQueue/removeQueue should be done by CallbackQueue - ManagedNodelet(const NodeletPtr& nodelet, detail::CallbackQueueManager* cqm) - : st_queue(new detail::CallbackQueue(cqm, nodelet)) - , mt_queue(new detail::CallbackQueue(cqm, nodelet)) - , nodelet(nodelet) - , callback_manager(cqm) - { - // NOTE: Can't do this in CallbackQueue constructor because the shared_ptr to - // it doesn't exist then. - callback_manager->addQueue(st_queue, false); - callback_manager->addQueue(mt_queue, true); - } - - ~ManagedNodelet() - { - callback_manager->removeQueue(st_queue); - callback_manager->removeQueue(mt_queue); - } + detail::CallbackQueuePtr st_queue; + detail::CallbackQueuePtr mt_queue; + NodeletPtr nodelet; // destroyed before the queues + detail::CallbackQueueManager* callback_manager; + + /// @todo Maybe addQueue/removeQueue should be done by CallbackQueue + ManagedNodelet(const NodeletPtr& nodelet, detail::CallbackQueueManager* cqm) + : st_queue(new detail::CallbackQueue(cqm, nodelet)) + , mt_queue(new detail::CallbackQueue(cqm, nodelet)) + , nodelet(nodelet) + , callback_manager(cqm) + { + // NOTE: Can't do this in CallbackQueue constructor because the shared_ptr to + // it doesn't exist then. + callback_manager->addQueue(st_queue, false); + callback_manager->addQueue(mt_queue, true); + } + + ~ManagedNodelet() + { + callback_manager->removeQueue(st_queue); + callback_manager->removeQueue(mt_queue); + } }; struct Loader::Impl { - boost::shared_ptr services_; - - boost::function (const std::string& lookup_name)> create_instance_; - boost::function refresh_classes_; - boost::shared_ptr callback_manager_; // Must outlive nodelets_ - - typedef boost::ptr_map M_stringToNodelet; - M_stringToNodelet nodelets_; /// Loader; - boost::shared_ptr loader(new Loader("nodelet", "nodelet::Nodelet")); - // create_instance_ is self-contained; it owns a copy of the loader shared_ptr - create_instance_ = boost::bind(&Loader::createInstance, loader, _1); - refresh_classes_ = boost::bind(&Loader::refreshDeclaredClasses, loader); - } - - Impl(const boost::function (const std::string& lookup_name)>& create_instance) - : create_instance_(create_instance) - { - } - - void advertiseRosApi(Loader* parent, const ros::NodeHandle& server_nh) - { - int num_threads_param; - server_nh.param("num_worker_threads", num_threads_param, 0); - callback_manager_.reset(new detail::CallbackQueueManager(num_threads_param)); - ROS_INFO("Initializing nodelet with %d worker threads.", (int)callback_manager_->getNumWorkerThreads()); - - services_.reset(new LoaderROS(parent, server_nh)); - } + boost::shared_ptr services_; + + boost::function(const std::string&)> create_instance_; + boost::function refresh_classes_; + boost::shared_ptr callback_manager_; // Must outlive nodelets_ + + typedef boost::ptr_map M_stringToNodelet; + M_stringToNodelet nodelets_; /// Loader; + boost::shared_ptr loader(new Loader("nodelet", "nodelet::Nodelet")); + // create_instance_ is self-contained; it owns a copy of the loader shared_ptr + create_instance_ = boost::bind(&Loader::createInstance, loader, _1); + refresh_classes_ = boost::bind(&Loader::refreshDeclaredClasses, loader); + } + + Impl(const boost::function(const std::string&)>& create_instance) + : create_instance_(create_instance) + { + } + + void advertiseRosApi(Loader* parent, const ros::NodeHandle& server_nh) + { + int num_threads_param; + server_nh.param("num_worker_threads", num_threads_param, 0); + callback_manager_.reset(new detail::CallbackQueueManager(num_threads_param)); + ROS_INFO("Initializing nodelet with %d worker threads.", (int)callback_manager_->getNumWorkerThreads()); + + services_.reset(new LoaderROS(parent, server_nh)); + } }; /// @todo Instance of ROS API-related constructors, just take #threads for the manager Loader::Loader(bool provide_ros_api) - : impl_(new Impl) + : impl_(new Impl) { - if (provide_ros_api) - impl_->advertiseRosApi(this, ros::NodeHandle("~")); - else - impl_->callback_manager_.reset(new detail::CallbackQueueManager); + if (provide_ros_api) + impl_->advertiseRosApi(this, ros::NodeHandle("~")); + else + impl_->callback_manager_.reset(new detail::CallbackQueueManager); } Loader::Loader(const ros::NodeHandle& server_nh) - : impl_(new Impl) + : impl_(new Impl) { - impl_->advertiseRosApi(this, server_nh); + impl_->advertiseRosApi(this, server_nh); } -Loader::Loader(const boost::function (const std::string& lookup_name)>& create_instance) - : impl_(new Impl(create_instance)) +Loader::Loader(const boost::function(const std::string&)>& create_instance) + : impl_(new Impl(create_instance)) { - impl_->callback_manager_.reset(new detail::CallbackQueueManager); + impl_->callback_manager_.reset(new detail::CallbackQueueManager); } Loader::~Loader() { } -bool Loader::load(const std::string &name, const std::string& type, const ros::M_string& remappings, - const std::vector & my_argv) +bool Loader::load(const std::string& name, const std::string& type, const ros::M_string& remappings, const std::vector& my_argv) { - boost::mutex::scoped_lock lock(lock_); - if (impl_->nodelets_.count(name) > 0) - { - ROS_ERROR("Cannot load nodelet %s for one exists with that name already", name.c_str()); - return false; - } - - NodeletPtr p; - try - { - p = impl_->create_instance_(type); - } - catch (std::runtime_error& e) - { - // If we cannot refresh the nodelet cache, fail immediately - if(!impl_->refresh_classes_) + boost::mutex::scoped_lock lock(lock_); + if (impl_->nodelets_.count(name) > 0) { - ROS_ERROR("Failed to load nodelet [%s] of type [%s]: %s", name.c_str(), type.c_str(), e.what()); - return false; + ROS_ERROR("Cannot load nodelet %s for one exists with that name already", name.c_str()); + return false; } - // otherwise, refresh the cache and try again. + NodeletPtr p; try { - impl_->refresh_classes_(); - p = impl_->create_instance_(type); + p = impl_->create_instance_(type); } - catch (std::runtime_error& e2) + catch (std::runtime_error& e) { - // dlopen() can return inconsistent results currently (see - // https://sourceware.org/bugzilla/show_bug.cgi?id=17833), so make sure - // that we display the messages of both exceptions to the user. - ROS_ERROR("Failed to load nodelet [%s] of type [%s] even after refreshing the cache: %s", name.c_str(), type.c_str(), e2.what()); - ROS_ERROR("The error before refreshing the cache was: %s", e.what()); - return false; + // If we cannot refresh the nodelet cache, fail immediately + if (!impl_->refresh_classes_) + { + ROS_ERROR("Failed to load nodelet [%s] of type [%s]: %s", name.c_str(), type.c_str(), e.what()); + return false; + } + + // otherwise, refresh the cache and try again. + try + { + impl_->refresh_classes_(); + p = impl_->create_instance_(type); + } + catch (std::runtime_error& e2) + { + // dlopen() can return inconsistent results currently (see + // https://sourceware.org/bugzilla/show_bug.cgi?id=17833), so make sure + // that we display the messages of both exceptions to the user. + ROS_ERROR("Failed to load nodelet [%s] of type [%s] even after refreshing the cache: %s", name.c_str(), type.c_str(), e2.what()); + ROS_ERROR("The error before refreshing the cache was: %s", e.what()); + return false; + } } - } - if (!p) - { - return false; - } - ROS_DEBUG("Done loading nodelet %s", name.c_str()); + if (!p) + { + return false; + } + ROS_DEBUG("Done loading nodelet %s", name.c_str()); - ManagedNodelet* mn = new ManagedNodelet(p, impl_->callback_manager_.get()); - impl_->nodelets_.insert(const_cast(name), mn); // mn now owned by boost::ptr_map - try { - mn->st_queue->disable(); - mn->mt_queue->disable(); + ManagedNodelet* mn = new ManagedNodelet(p, impl_->callback_manager_.get()); + impl_->nodelets_.insert(const_cast(name), mn); // mn now owned by boost::ptr_map + try + { + mn->st_queue->disable(); + mn->mt_queue->disable(); - p->init(name, remappings, my_argv, mn->st_queue.get(), mn->mt_queue.get()); + p->init(name, remappings, my_argv, mn->st_queue.get(), mn->mt_queue.get()); - mn->st_queue->enable(); - mn->mt_queue->enable(); + mn->st_queue->enable(); + mn->mt_queue->enable(); + ROS_DEBUG("Done initing nodelet %s", name.c_str()); + } + catch (...) + { + Impl::M_stringToNodelet::iterator it = impl_->nodelets_.find(name); + if (it != impl_->nodelets_.end()) + { + impl_->nodelets_.erase(it); + ROS_DEBUG("Failed to initialize nodelet %s", name.c_str()); + return (false); + } + } + return true; +} - ROS_DEBUG("Done initing nodelet %s", name.c_str()); - } catch(...) { +bool Loader::unload(const std::string& name) +{ + boost::mutex::scoped_lock lock(lock_); Impl::M_stringToNodelet::iterator it = impl_->nodelets_.find(name); if (it != impl_->nodelets_.end()) { - impl_->nodelets_.erase(it); - ROS_DEBUG ("Failed to initialize nodelet %s", name.c_str ()); - return (false); + impl_->nodelets_.erase(it); + ROS_DEBUG("Done unloading nodelet %s", name.c_str()); + return (true); } - } - return true; -} -bool Loader::unload (const std::string & name) -{ - boost::mutex::scoped_lock lock (lock_); - Impl::M_stringToNodelet::iterator it = impl_->nodelets_.find(name); - if (it != impl_->nodelets_.end()) - { - impl_->nodelets_.erase(it); - ROS_DEBUG ("Done unloading nodelet %s", name.c_str ()); - return (true); - } - - return (false); + return (false); } -bool Loader::clear () +bool Loader::clear() { - boost::mutex::scoped_lock lock(lock_); - impl_->nodelets_.clear(); - return true; + boost::mutex::scoped_lock lock(lock_); + impl_->nodelets_.clear(); + return true; }; std::vector Loader::listLoadedNodelets() { - boost::mutex::scoped_lock lock (lock_); - std::vector output; - Impl::M_stringToNodelet::iterator it = impl_->nodelets_.begin(); - for (; it != impl_->nodelets_.end(); ++it) - { - output.push_back(it->first); - } - return output; + boost::mutex::scoped_lock lock(lock_); + std::vector output; + Impl::M_stringToNodelet::iterator it = impl_->nodelets_.begin(); + for (; it != impl_->nodelets_.end(); ++it) + { + output.push_back(it->first); + } + return output; } } // namespace nodelet - diff --git a/nodelet/src/nodelet.cpp b/nodelet/src/nodelet.cpp index 47a5f9c8..cf83bb6c 100644 --- a/nodelet/src/nodelet.cpp +++ b/nodelet/src/nodelet.cpp @@ -42,6 +42,7 @@ @b nodeletcpp is a tool for loading/unloading nodelets to/from a Nodelet manager. **/ + #include #include @@ -53,33 +54,33 @@ #include "nodelet/NodeletUnload.h" #ifndef _WIN32 -# include + #include #else -# include + #include #endif std::string genId() { #ifndef _WIN32 - uuid_t uuid; - uuid_generate_random(uuid); - char uuid_str[40]; - uuid_unparse(uuid, uuid_str); - return std::string(uuid_str); + uuid_t uuid; + uuid_generate_random(uuid); + char uuid_str[40]; + uuid_unparse(uuid, uuid_str); + return std::string(uuid_str); #else - UUID uuid; - UuidCreate(&uuid); - RPC_CSTR str; - UuidToStringA(&uuid, &str); - std::string return_string(reinterpret_cast(str)); - RpcStringFreeA(&str); - return return_string; + UUID uuid; + UuidCreate(&uuid); + RPC_CSTR str; + UuidToStringA(&uuid, &str); + std::string return_string(reinterpret_cast(str)); + RpcStringFreeA(&str); + return return_string; #endif } class NodeletArgumentParsing { - private: +private: std::string command_; std::string type_; std::string name_; @@ -88,185 +89,191 @@ class NodeletArgumentParsing std::vector local_args_; bool is_bond_enabled_; - public: +public: //NodeletArgumentParsing() { }; bool - parseArgs(int argc, char** argv) + parseArgs(int argc, char** argv) { - is_bond_enabled_ = true; - std::vector non_ros_args; - ros::removeROSArgs (argc, argv, non_ros_args); - size_t used_args = 0; - - if (non_ros_args.size() > 1) - command_ = non_ros_args[1]; - else - return false; - - if (command_ == "load" && non_ros_args.size() > 3) - { - type_ = non_ros_args[2]; - manager_ = non_ros_args[3]; - used_args = 4; - - if (non_ros_args.size() > used_args) + is_bond_enabled_ = true; + std::vector non_ros_args; + ros::removeROSArgs(argc, argv, non_ros_args); + size_t used_args = 0; + + if (non_ros_args.size() > 1) { - if (non_ros_args[used_args] == "--no-bond") - { - is_bond_enabled_ = false; - ++used_args; - } + command_ = non_ros_args[1]; + } + else + { + return false; } - } - - else if (command_ == "unload" && non_ros_args.size() > 3) - { - type_ = "nodelet_unloader"; - name_ = non_ros_args[2]; - manager_ = non_ros_args[3]; - used_args = 4; - } - else if (command_ == "standalone" && non_ros_args.size() > 2) - { - type_ = non_ros_args[2]; - printf("type is %s\n", type_.c_str()); - used_args = 3; - } - if (command_ == "manager") - used_args = 2; + if (command_ == "load" && non_ros_args.size() > 3) + { + type_ = non_ros_args[2]; + manager_ = non_ros_args[3]; + used_args = 4; + + if (non_ros_args.size() > used_args) + { + if (non_ros_args[used_args] == "--no-bond") + { + is_bond_enabled_ = false; + ++used_args; + } + } + } - for (size_t i = used_args; i < non_ros_args.size(); i++) - local_args_.push_back(non_ros_args[i]); + else if (command_ == "unload" && non_ros_args.size() > 3) + { + type_ = "nodelet_unloader"; + name_ = non_ros_args[2]; + manager_ = non_ros_args[3]; + used_args = 4; + } + else if (command_ == "standalone" && non_ros_args.size() > 2) + { + type_ = non_ros_args[2]; + printf("type is %s\n", type_.c_str()); + used_args = 3; + } + if (command_ == "manager") + { + used_args = 2; + } - if (used_args > 0) return true; - else return false; + for (size_t i = used_args; i < non_ros_args.size(); i++) + { + local_args_.push_back(non_ros_args[i]); + } + return (used_args > 0); }; - - std::string getCommand () const { return (command_); } - std::string getType () const { return (type_); } - std::string getName () const { return (name_); } - std::string getManager() const { return (manager_); } + std::string getCommand() const { return command_; } + std::string getType() const { return type_; } + std::string getName() const { return name_; } + std::string getManager() const { return manager_; } bool isBondEnabled() const { return is_bond_enabled_; } - std::vector getMyArgv () const {return local_args_;}; + std::vector getMyArgv() const { return local_args_; }; std::string getDefaultName() { - std::string s = type_; - replace(s.begin(), s.end(), '/', '_'); - return s; + std::string s = type_; + replace(s.begin(), s.end(), '/', '_'); + return s; }; - }; - class NodeletInterface { - public: +public: //////////////////////////////////////////////////////////////////////////////// /** \brief Unload the nodelet */ bool - unloadNodelet (const std::string &name, const std::string &manager) + unloadNodelet(const std::string& name, const std::string& manager) { - ROS_INFO_STREAM ("Unloading nodelet " << name << " from manager " << manager); - - std::string service_name = manager + "/unload_nodelet"; - // Check if the service exists and is available - if (!ros::service::exists (service_name, true)) - { - // Probably the manager has shut down already, which is fine - ROS_WARN("Couldn't find service %s, perhaps the manager is already shut down", - service_name.c_str()); - return (false); - } - - ros::ServiceClient client = n_.serviceClient (service_name); - //client.waitForExistence (); - - // Call the service - nodelet::NodeletUnload srv; - srv.request.name = name; - if (!client.call (srv)) - { - // Maybe service shut down in the meantime, which isn't an error - if (ros::service::exists(service_name, false)) - ROS_FATAL_STREAM("Failed to unload nodelet '" << name << "` from manager `" << manager << "'"); - return (false); - } - return (true); + ROS_INFO_STREAM("Unloading nodelet " << name << " from manager " << manager); + + std::string service_name = manager + "/unload_nodelet"; + // Check if the service exists and is available + if (!ros::service::exists(service_name, true)) + { + // Probably the manager has shut down already, which is fine + ROS_WARN("Couldn't find service %s, perhaps the manager is already shut down", + service_name.c_str()); + return false; + } + + ros::ServiceClient client = n_.serviceClient(service_name); + //client.waitForExistence (); + + // Call the service + nodelet::NodeletUnload srv; + srv.request.name = name; + if (!client.call(srv)) + { + // Maybe service shut down in the meantime, which isn't an error + if (ros::service::exists(service_name, false)) + { + ROS_FATAL_STREAM("Failed to unload nodelet '" << name << "` from manager `" << manager << "'"); + } + return false; + } + return true; } //////////////////////////////////////////////////////////////////////////////// /** \brief Load the nodelet */ bool - loadNodelet (const std::string &name, const std::string &type, - const std::string &manager, const std::vector &args, - const std::string &bond_id) + loadNodelet(const std::string& name, const std::string& type, + const std::string& manager, const std::vector& args, + const std::string& bond_id) { - ros::M_string remappings = ros::names::getRemappings (); - std::vector sources (remappings.size ()), targets (remappings.size ()); - ROS_INFO_STREAM ("Loading nodelet " << name << " of type " << type << " to manager " << manager << " with the following remappings:"); - int i = 0; - for (ros::M_string::iterator it = remappings.begin (); it != remappings.end (); ++it, ++i) - { - sources[i] = (*it).first; - targets[i] = (*it).second; - ROS_INFO_STREAM (sources[i] << " -> " << targets[i]); - } - - // Get and set the parameters - XmlRpc::XmlRpcValue param; - std::string node_name = ros::this_node::getName (); - n_.getParam (node_name, param); - n_.setParam (name, param); - - std::string service_name = std::string (manager) + "/load_nodelet"; - - // Wait until the service is advertised - ROS_DEBUG ("Waiting for service %s to be available...", service_name.c_str ()); - ros::ServiceClient client = n_.serviceClient (service_name); - client.waitForExistence (); - - // Call the service - nodelet::NodeletLoad srv; - srv.request.name = std::string (name); - srv.request.type = std::string (type); - srv.request.remap_source_args = sources; - srv.request.remap_target_args = targets; - srv.request.my_argv = args; - srv.request.bond_id = bond_id; - if (!client.call (srv)) - { - ROS_FATAL_STREAM("Failed to load nodelet '" << name << "` of type `" << type << "` to manager `" << manager << "'"); - return false; - } - return true; + ros::M_string remappings = ros::names::getRemappings(); + std::vector sources(remappings.size()), targets(remappings.size()); + ROS_INFO_STREAM("Loading nodelet " << name << " of type " << type << " to manager " << manager << " with the following remappings:"); + int i = 0; + for (ros::M_string::iterator it = remappings.begin(); it != remappings.end(); ++it, ++i) + { + sources[i] = (*it).first; + targets[i] = (*it).second; + ROS_INFO_STREAM(sources[i] << " -> " << targets[i]); + } + + // Get and set the parameters + XmlRpc::XmlRpcValue param; + std::string node_name = ros::this_node::getName(); + n_.getParam(node_name, param); + n_.setParam(name, param); + + std::string service_name = std::string(manager) + "/load_nodelet"; + + // Wait until the service is advertised + ROS_DEBUG("Waiting for service %s to be available...", service_name.c_str()); + ros::ServiceClient client = n_.serviceClient(service_name); + client.waitForExistence(); + + // Call the service + nodelet::NodeletLoad srv; + srv.request.name = std::string(name); + srv.request.type = std::string(type); + srv.request.remap_source_args = sources; + srv.request.remap_target_args = targets; + srv.request.my_argv = args; + srv.request.bond_id = bond_id; + if (!client.call(srv)) + { + ROS_FATAL_STREAM("Failed to load nodelet '" << name << "` of type `" << type << "` to manager `" << manager << "'"); + return false; + } + return true; } - private: + +private: ros::NodeHandle n_; }; void print_usage(int argc, char** argv) { - printf("Your usage: \n"); - for (int i = 0; i < argc; i++) - printf("%s ", argv[i]); - printf("\nnodelet usage:\n"); - printf("nodelet load pkg/Type manager [--no-bond] - Launch a nodelet of type pkg/Type on manager manager\n"); - printf("nodelet standalone pkg/Type - Launch a nodelet of type pkg/Type in a standalone node\n"); - printf("nodelet unload name manager - Unload a nodelet by name from manager\n"); - printf("nodelet manager - Launch a nodelet manager node\n"); - + printf("Your usage: \n"); + for (int i = 0; i < argc; i++) + { + printf("%s ", argv[i]); + } + printf("\nnodelet usage:\n"); + printf("nodelet load pkg/Type manager [--no-bond] - Launch a nodelet of type pkg/Type on manager manager\n"); + printf("nodelet standalone pkg/Type - Launch a nodelet of type pkg/Type in a standalone node\n"); + printf("nodelet unload name manager - Unload a nodelet by name from manager\n"); + printf("nodelet manager - Launch a nodelet manager node\n"); }; sig_atomic_t volatile request_shutdown = 0; void nodeletLoaderSigIntHandler(int) { - request_shutdown = 1; + request_shutdown = 1; } // Shutdown can be triggered externally by an XML-RPC call, this is how "rosnode kill" @@ -275,116 +282,125 @@ void nodeletLoaderSigIntHandler(int) // handler for a "shutdown" XML-RPC call. void shutdownCallback(XmlRpc::XmlRpcValue& params, XmlRpc::XmlRpcValue& result) { - int num_params = 0; - if (params.getType() == XmlRpc::XmlRpcValue::TypeArray) - num_params = params.size(); - if (num_params > 1) - { - std::string reason = params[1]; - ROS_WARN("Shutdown request received. Reason: [%s]", reason.c_str()); - request_shutdown = 1; - } + int num_params = 0; + if (params.getType() == XmlRpc::XmlRpcValue::TypeArray) + { + num_params = params.size(); + } + if (num_params > 1) + { + std::string reason = params[1]; + ROS_WARN("Shutdown request received. Reason: [%s]", reason.c_str()); + request_shutdown = 1; + } - result = ros::xmlrpc::responseInt(1, "", 0); + result = ros::xmlrpc::responseInt(1, "", 0); } -/* ---[ */ -int - main (int argc, char** argv) +int main(int argc, char** argv) { - NodeletArgumentParsing arg_parser; - - if (!arg_parser.parseArgs(argc, argv)) - { - print_usage(argc, argv); - return (-1); - } - std::string command = arg_parser.getCommand(); - - - if (command == "manager") - { - ros::init (argc, argv, "manager"); - nodelet::Loader n; - ros::spin (); - } - else if (command == "standalone") - { - ros::init (argc, argv, arg_parser.getDefaultName()); - - ros::NodeHandle nh; - nodelet::Loader n(false); - ros::M_string remappings; //Remappings are already applied by ROS no need to generate them. - std::string nodelet_name = ros::this_node::getName (); - std::string nodelet_type = arg_parser.getType(); - if(!n.load(nodelet_name, nodelet_type, remappings, arg_parser.getMyArgv())) - return -1; - - ROS_DEBUG("Successfully loaded nodelet of type '%s' into name '%s'\n", nodelet_name.c_str(), nodelet_name.c_str()); - - ros::spin(); - } - else if (command == "load") - { - ros::init (argc, argv, arg_parser.getDefaultName (), ros::init_options::NoSigintHandler); - NodeletInterface ni; - ros::NodeHandle nh; - std::string name = ros::this_node::getName (); - std::string type = arg_parser.getType(); - std::string manager = arg_parser.getManager(); - std::string bond_id; - if (arg_parser.isBondEnabled()) - bond_id = name + "_" + genId(); - bond::Bond bond(manager + "/bond", bond_id); - if (!ni.loadNodelet(name, type, manager, arg_parser.getMyArgv(), bond_id)) - return -1; - - // Override default exit handlers for roscpp - signal(SIGINT, nodeletLoaderSigIntHandler); - ros::XMLRPCManager::instance()->unbind("shutdown"); - ros::XMLRPCManager::instance()->bind("shutdown", shutdownCallback); - - if (arg_parser.isBondEnabled()) - bond.start(); - // Spin our own loop - ros::AsyncSpinner spinner(1); - spinner.start(); - while (!request_shutdown) + NodeletArgumentParsing arg_parser; + + if (!arg_parser.parseArgs(argc, argv)) + { + print_usage(argc, argv); + return -1; + } + std::string command = arg_parser.getCommand(); + + if (command == "manager") + { + ros::init(argc, argv, "manager"); + nodelet::Loader n; + ros::spin(); + } + else if (command == "standalone") { - if (arg_parser.isBondEnabled() && bond.isBroken()) - { - ROS_INFO("Bond broken, exiting"); - goto shutdown; - } + ros::init(argc, argv, arg_parser.getDefaultName()); + ros::NodeHandle nh; + nodelet::Loader n(false); + ros::M_string remappings; //Remappings are already applied by ROS no need to generate them. + std::string nodelet_name = ros::this_node::getName(); + std::string nodelet_type = arg_parser.getType(); + if (!n.load(nodelet_name, nodelet_type, remappings, arg_parser.getMyArgv())) + { + return -1; + } + ROS_DEBUG("Successfully loaded nodelet of type '%s' into name '%s'\n", nodelet_name.c_str(), nodelet_name.c_str()); + + ros::spin(); + } + else if (command == "load") + { + ros::init(argc, argv, arg_parser.getDefaultName(), ros::init_options::NoSigintHandler); + NodeletInterface ni; + ros::NodeHandle nh; + std::string name = ros::this_node::getName(); + std::string type = arg_parser.getType(); + std::string manager = arg_parser.getManager(); + std::string bond_id; + if (arg_parser.isBondEnabled()) + { + bond_id = name + "_" + genId(); + } + bond::Bond bond(manager + "/bond", bond_id); + if (!ni.loadNodelet(name, type, manager, arg_parser.getMyArgv(), bond_id)) + { + return -1; + } + + // Override default exit handlers for roscpp + signal(SIGINT, nodeletLoaderSigIntHandler); + ros::XMLRPCManager::instance()->unbind("shutdown"); + ros::XMLRPCManager::instance()->bind("shutdown", shutdownCallback); + + if (arg_parser.isBondEnabled()) + { + bond.start(); + } + // Spin our own loop + ros::AsyncSpinner spinner(1); + spinner.start(); + while (!request_shutdown) + { + if (arg_parser.isBondEnabled() && bond.isBroken()) + { + ROS_INFO("Bond broken, exiting"); + goto shutdown; + } #ifndef _WIN32 - usleep(100000); + usleep(100000); #else - Sleep(100); + Sleep(100); #endif + } + // Attempt to unload the nodelet before shutting down ROS + ni.unloadNodelet(name, manager); + if (arg_parser.isBondEnabled()) + { + bond.breakBond(); + } + + shutdown: + ros::shutdown(); + } + else if (command == "unload") + { + ros::init(argc, argv, arg_parser.getDefaultName()); + std::string name = arg_parser.getName(); + std::string manager = arg_parser.getManager(); + NodeletInterface ni; + if (!ni.unloadNodelet(name, manager)) + { + return -1; + } + } + else + { + ros::init(argc, argv, "nodelet"); + ROS_ERROR("Command %s unknown", command.c_str()); + return -1; } - // Attempt to unload the nodelet before shutting down ROS - ni.unloadNodelet(name, manager); - if (arg_parser.isBondEnabled()) - bond.breakBond(); - shutdown: - ros::shutdown(); - } - else if (command == "unload") - { - ros::init (argc, argv, arg_parser.getDefaultName ()); - std::string name = arg_parser.getName (); - std::string manager = arg_parser.getManager(); - NodeletInterface ni; - if (!ni.unloadNodelet(name, manager)) - return -1; - } - else - { - ros::init(argc, argv, "nodelet"); - ROS_ERROR("Command %s unknown", command.c_str()); - return -1; - } - - return (0); + + return 0; } -/* ]--- */ diff --git a/nodelet/src/nodelet_class.cpp b/nodelet/src/nodelet_class.cpp index e53d0d4a..95fae390 100644 --- a/nodelet/src/nodelet_class.cpp +++ b/nodelet/src/nodelet_class.cpp @@ -37,9 +37,9 @@ namespace nodelet { -Nodelet::Nodelet () -: inited_(false) -, nodelet_name_("uninitialized") +Nodelet::Nodelet() + : inited_(false) + , nodelet_name_("uninitialized") { } @@ -47,91 +47,92 @@ Nodelet::~Nodelet() { } -ros::CallbackQueueInterface& Nodelet::getSTCallbackQueue () const +ros::CallbackQueueInterface& Nodelet::getSTCallbackQueue() const { - if (!inited_) - { - throw UninitializedException("getSTCallbackQueue"); - } + if (!inited_) + { + throw UninitializedException("getSTCallbackQueue"); + } - return *nh_->getCallbackQueue(); + return *nh_->getCallbackQueue(); } -ros::CallbackQueueInterface& Nodelet::getMTCallbackQueue () const +ros::CallbackQueueInterface& Nodelet::getMTCallbackQueue() const { - if (!inited_) - { - throw UninitializedException("getMTCallbackQueue"); - } + if (!inited_) + { + throw UninitializedException("getMTCallbackQueue"); + } - return *mt_nh_->getCallbackQueue(); + return *mt_nh_->getCallbackQueue(); } ros::NodeHandle& Nodelet::getNodeHandle() const { - if (!inited_) - { - throw UninitializedException("getNodeHandle"); - } + if (!inited_) + { + throw UninitializedException("getNodeHandle"); + } - return *nh_; + return *nh_; } ros::NodeHandle& Nodelet::getPrivateNodeHandle() const { - if (!inited_) - { - throw UninitializedException("getPrivateNodeHandle"); - } + if (!inited_) + { + throw UninitializedException("getPrivateNodeHandle"); + } - return *private_nh_; + return *private_nh_; } ros::NodeHandle& Nodelet::getMTNodeHandle() const { - if (!inited_) - { - throw UninitializedException("getMTNodeHandle"); - } + if (!inited_) + { + throw UninitializedException("getMTNodeHandle"); + } - return *mt_nh_; + return *mt_nh_; } ros::NodeHandle& Nodelet::getMTPrivateNodeHandle() const { - if (!inited_) - { - throw UninitializedException("getMTPrivateNodeHandle"); - } + if (!inited_) + { + throw UninitializedException("getMTPrivateNodeHandle"); + } - return *mt_private_nh_; + return *mt_private_nh_; } void Nodelet::init(const std::string& name, const M_string& remapping_args, const V_string& my_argv, - ros::CallbackQueueInterface* st_queue, ros::CallbackQueueInterface* mt_queue) + ros::CallbackQueueInterface* st_queue, + ros::CallbackQueueInterface* mt_queue) { - if (inited_) - { - throw MultipleInitializationException(); - } - - nodelet_name_ = name; - remapping_args_ = remapping_args; - my_argv_ = my_argv; - - // Set up NodeHandles with correct namespaces - private_nh_.reset(new ros::NodeHandle(name, remapping_args)); - nh_.reset(new ros::NodeHandle(ros::names::parentNamespace(name), remapping_args)); - mt_private_nh_.reset(new ros::NodeHandle(name, remapping_args)); - mt_nh_.reset(new ros::NodeHandle(ros::names::parentNamespace(name), remapping_args)); - - // Use the provided callback queues (or the global queue if they're NULL). - // This allows Loader and CallbackQueueManager to spread nodelets over multiple threads. - private_nh_->setCallbackQueue(st_queue); - nh_->setCallbackQueue(st_queue); - mt_private_nh_->setCallbackQueue(mt_queue); - mt_nh_->setCallbackQueue(mt_queue); - - NODELET_DEBUG ("Nodelet initializing"); - inited_ = true; - this->onInit (); + if (inited_) + { + throw MultipleInitializationException(); + } + + nodelet_name_ = name; + remapping_args_ = remapping_args; + my_argv_ = my_argv; + + // Set up NodeHandles with correct namespaces + private_nh_.reset(new ros::NodeHandle(name, remapping_args)); + nh_.reset(new ros::NodeHandle(ros::names::parentNamespace(name), remapping_args)); + mt_private_nh_.reset(new ros::NodeHandle(name, remapping_args)); + mt_nh_.reset(new ros::NodeHandle(ros::names::parentNamespace(name), remapping_args)); + + // Use the provided callback queues (or the global queue if they're NULL). + // This allows Loader and CallbackQueueManager to spread nodelets over multiple threads. + private_nh_->setCallbackQueue(st_queue); + nh_->setCallbackQueue(st_queue); + mt_private_nh_->setCallbackQueue(mt_queue); + mt_nh_->setCallbackQueue(mt_queue); + + NODELET_DEBUG("Nodelet initializing"); + inited_ = true; + this->onInit(); } } // namespace nodelet