Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhancement Add PROTOSpawner #819

Draft
wants to merge 8 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions webots_ros2_driver/webots_ros2_driver/proto_spawner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#!/usr/bin/env python

# Copyright 1996-2023 Cyberbotics Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""This process simply sends urdf information to the Spawner through a service."""

from launch.actions import ExecuteProcess


def get_webots_driver_node(event, driver_node):
"""Return the driver node in case the service response is successful."""
if 'success=True' in event.text.decode().strip():
return driver_node
if 'success=False' in event.text.decode().strip():
print('WARNING: the Ros2Supervisor was not able to spawn this URDF robot.')
return


class PROTOSpawner(ExecuteProcess):
def __init__(self, output='log', name=None, proto_path=None, robot_string=None, **kwargs):
message = '{robot: {'

if proto_path:
message += 'proto_path: "' + proto_path + '",'
elif robot_string:
message += 'robot_string: "\\\n' + robot_string + '",'

message += '} }'

command = ['ros2',
'service',
'call',
'/Ros2Supervisor/spawn_proto_robot',
'webots_ros2_msgs/srv/SpawnProtoRobot',
message]

super().__init__(
output=output,
cmd=command,
**kwargs
)
84 changes: 83 additions & 1 deletion webots_ros2_driver/webots_ros2_driver/ros2_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,89 @@ def __spawn_urdf_robot_callback(self, request, response):
return response

def __spawn_node_from_string_callback(self, request, response):
object_string = request.data
robot = request.robot
# Choose the conversion according to the input and platform
if robot.proto_urls:
# TODO: iterate over all urls
if has_shared_folder() or is_wsl():
# Check that the file exists and is an URDF
if not os.path.isfile(robot.proto_urls):
sys.exit('Input file "%s" does not exist.' % robot.proto_urls)
if not robot.proto_path.endswith('.urdf'):
sys.exit('"%s" is not a URDF file.' % robot.proto_urls)

# Read the content of the URDF
with open(robot.proto_urls, 'r') as file:
urdfContent = file.read()
if urdfContent is None:
sys.exit('Could not read the URDF file.')

# Get the package name and parent resource directory from URDF path
split_path = robot.urdf_path.split(os.path.sep)
for i, folder in (list(enumerate(split_path))):
if folder == "share":
package_dir = os.path.sep.join(split_path[:i + 2])
resource_dir = os.path.sep.join(split_path[:i + 3])
break
# On macOS, the resources are copied to shared_folder/package_name/resource_folder
# The path prefix is updated to the path of the shared folder
if has_shared_folder():
shared_package_dir = os.path.join(container_shared_folder(), os.path.basename(package_dir))
shared_resource_dir = os.path.join(shared_package_dir, os.path.basename(resource_dir))
if (not os.path.isdir(shared_package_dir)):
os.mkdir(shared_package_dir)
if (not os.path.isdir(shared_resource_dir)):
shutil.copytree(resource_dir, shared_resource_dir)
relative_path_prefix = os.path.join(host_shared_folder(), os.path.basename(package_dir),
os.path.basename(resource_dir))
# In WSL, the prefix must be converted to WSL path to work in Webots running on native Windows
if is_wsl():
relative_path_prefix = resource_dir
command = ['wslpath', '-w', relative_path_prefix]
relative_path_prefix = subprocess.check_output(command).strip().decode('utf-8').replace('\\', '/')

robot_string = convertUrdfContent(input=urdfContent, robotName=robot_name, normal=normal,
boxCollision=box_collision, initTranslation=robot_translation,
initRotation=robot_rotation, initPos=init_pos,
relativePathPrefix=relative_path_prefix)
else:
robot_string = convertUrdfFile(input=robot.urdf_path, robotName=robot_name, normal=normal,
boxCollision=box_collision, initTranslation=robot_translation,
initRotation=robot_rotation, initPos=init_pos)
elif robot.robot_description:
relative_path_prefix = robot.relative_path_prefix if robot.relative_path_prefix else None
# In WSL, the prefix must be converted to WSL path to work in Webots running on native Windows
if is_wsl() and relative_path_prefix:
command = ['wslpath', '-w', relative_path_prefix]
relative_path_prefix = subprocess.check_output(command).strip().decode('utf-8').replace('\\', '/')
if has_shared_folder() and relative_path_prefix:
# Get the package name and parent resource directory from URDF path
split_path = relative_path_prefix.split(os.path.sep)
for i, folder in (list(enumerate(split_path))):
if folder == "share":
package_dir = os.path.sep.join(split_path[:i + 2])
resource_dir = os.path.sep.join(split_path[:i + 3])
break
# On macOS, the resources are copied to shared_folder/package_name/resource_folder
# The path prefix is updated to the path of the shared folder
shared_package_dir = os.path.join(container_shared_folder(), os.path.basename(package_dir))
shared_resource_dir = os.path.join(shared_package_dir, os.path.basename(resource_dir))
if (not os.path.isdir(shared_package_dir)):
os.mkdir(shared_package_dir)
if (not os.path.isdir(shared_resource_dir)):
shutil.copytree(resource_dir, shared_resource_dir)
relative_path_prefix = os.path.join(host_shared_folder(), os.path.basename(package_dir),
os.path.basename(resource_dir))
robot_string = convertUrdfContent(input=robot.robot_description, robotName=robot_name, normal=normal,
boxCollision=box_collision, initTranslation=robot_translation,
initRotation=robot_rotation, initPos=init_pos,
relativePathPrefix=relative_path_prefix)
else:
self.get_logger().info('Ros2Supervisor can not import a URDF file without a specified "urdf_path" or '
'"robot_description" in the URDFSpawner object.')
response.success = False
return response
object_string = robot.node_string
if object_string == '':
self.get_logger().info('Ros2Supervisor cannot import an empty string.')
response.success = False
Expand Down
2 changes: 2 additions & 0 deletions webots_ros2_msgs/msg/ProtoRobot.msg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
string proto_urls
string node_string
3 changes: 3 additions & 0 deletions webots_ros2_msgs/srv/SpawnProtoRobot.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ProtoRobot robot
---
bool success