diff --git a/ros2_ws/src/python_workspace/README.md b/ros2_ws/src/python_package/README.md
similarity index 97%
rename from ros2_ws/src/python_workspace/README.md
rename to ros2_ws/src/python_package/README.md
index fa86601..a23611c 100644
--- a/ros2_ws/src/python_workspace/README.md
+++ b/ros2_ws/src/python_package/README.md
@@ -1,104 +1,104 @@
-
-### List of Nodes
-
-| Node Type | Subscribes To | Publishes To | Example command |
-|-------------------|----------------|-----------------------------------|------|
-| Picture Node | - | `/input_image` | `ros2 run python_workspace picture_node --ros-args -p static_image_path:='./../assets/maize' -p loop:=-1 -p frame_rate:=1`|
-| Inference Node | `/input_image` | -`/inference_out`
- `/output_img` | `ros2 run python_workspace inference_node --ros-args -p weights_path:='../models/maize/Maize.pt'`|
-| Extermination Node | `/inference_out` | external binary | `ros2 run python_workspace extermination_node`|
-
-
-### List of Topics and Data Types
-
-| Topic Name | Description | Data Type |
-|-----------------------------|--------------------------------------|--------------------|
-| `/input_image` | custom msg that sends raw image, processed image and velocity | `custom_interface/msg/ImageInput`|
-| `/inference_out` | custom msg that passes inference output and raw image to extermination node | `custom_interface/msg/InferenceOutput` |
-| `/output_img` | Processed output image | `sensor_msgs/Image`|
-
-### Other commands
-#### Running the camera node:
-`ros2 run python_workspace camera_node --ros-args -p static_image_path:='/home/user/Desktop/ROS/Models/Maize Model/sample_maize_images' -p loop:=-1 -p frame_rate:=10 -p model_dimensions:=[640,640]`
-
-#### Running the jetson_node
-ros2 run python_workspace jetson_node
-
-#### Running the extermination node
-```ros2 run python_workspace extermination_node```
-
-Without any additional arguments, this will also show a window with the drawn bounding boxes.
-To turn it off,
-add the argument `--ros-args -p use_display_node:=false` to the command.
-#### Running the picture node:
-File paths are relative to the working directory
-
-`ros2 run python_workspace picture_node --ros-args -p static_image_path:='./../assets/maize' -p loop:=-1 -p frame_rate:=1`
-
-#### Running the inference node:
-
-The path for weights (.pt) is relative to the ROS/workspace_python/ros2_ws directory.
-
-
-```bash
-ros2 run python_workspace inference_node --ros-args -p weights_path:='../models/maize/Maize.pt'
-```
-
-### Compiling new Changes
-```bash
-colcon build --packages-select custom_interface python_workspace
-source install/setup.bash
-```
-
-### using RQT to view
-#### Installing RQT on ROS 2 Humble
-
-To install RQT and its plugins on ROS 2 Humble, use the following command:
-
-```bash
-sudo apt update
-sudo apt install ~nros-humble-rqt*
-```
-
-This command will install RQT along with all available plugins for ROS 2 Humble.
-
-#### Running RQT
-
-To start RQT, simply run:
-
-```bash
-rqt
-```
-
-You can now use RQT to visualize and interact with various ROS topics and nodes.
-
-To view the image topic :
-
-1. go to plugins/visualization/image view. a new box will open up
-2. click refresh topic
-3. select the name of the image topic you want to see
-
-
-
-## Testing the performance of the zed ros2 wrapper:
-
-### using top:
-ros2 node list
-top -c -p $(pgrep -d',' -f name_of_command)
-
-
-### using rqt:
-install rqt with
-
-`sudo apt install ros-humble-rqt*`
-sudo apt install ros-${ROS_DISTRO}-rqt-top
-to launch, just type `rqt` into the terminal
-
-
-### visualizing the image topics
-source: https://ros2jsguy.medium.com/4-data-visualization-using-ros-2-rviz2-pub-sub-communications-and-javascript-typescript-6e43cde75029
-
-
-
-
-### looking at the rviz2 display
-`ros2 launch zed_display_rviz2 display_zed_cam.launch.py camera_model:=zed2`
+
+### List of Nodes
+
+| Node Type | Subscribes To | Publishes To | Example command |
+|-------------------|----------------|-----------------------------------|------|
+| Picture Node | - | `/input_image` | `ros2 run python_workspace picture_node --ros-args -p static_image_path:='./../assets/maize' -p loop:=-1 -p frame_rate:=1`|
+| Inference Node | `/input_image` | -`/inference_out`
- `/output_img` | `ros2 run python_workspace inference_node --ros-args -p weights_path:='../models/maize/Maize.pt'`|
+| Extermination Node | `/inference_out` | external binary | `ros2 run python_workspace extermination_node`|
+
+
+### List of Topics and Data Types
+
+| Topic Name | Description | Data Type |
+|-----------------------------|--------------------------------------|--------------------|
+| `/input_image` | custom msg that sends raw image, processed image and velocity | `custom_interface/msg/ImageInput`|
+| `/inference_out` | custom msg that passes inference output and raw image to extermination node | `custom_interface/msg/InferenceOutput` |
+| `/output_img` | Processed output image | `sensor_msgs/Image`|
+
+### Other commands
+#### Running the camera node:
+`ros2 run python_workspace camera_node --ros-args -p static_image_path:='/home/user/Desktop/ROS/Models/Maize Model/sample_maize_images' -p loop:=-1 -p frame_rate:=10 -p model_dimensions:=[640,640]`
+
+#### Running the jetson_node
+ros2 run python_workspace jetson_node
+
+#### Running the extermination node
+```ros2 run python_workspace extermination_node```
+
+Without any additional arguments, this will also show a window with the drawn bounding boxes.
+To turn it off,
+add the argument `--ros-args -p use_display_node:=false` to the command.
+#### Running the picture node:
+File paths are relative to the working directory
+
+`ros2 run python_workspace picture_node --ros-args -p static_image_path:='./../assets/maize' -p loop:=-1 -p frame_rate:=1`
+
+#### Running the inference node:
+
+The path for weights (.pt) is relative to the ROS/workspace_python/ros2_ws directory.
+
+
+```bash
+ros2 run python_workspace inference_node --ros-args -p weights_path:='../models/maize/Maize.pt'
+```
+
+### Compiling new Changes
+```bash
+colcon build --packages-select custom_interface python_workspace
+source install/setup.bash
+```
+
+### using RQT to view
+#### Installing RQT on ROS 2 Humble
+
+To install RQT and its plugins on ROS 2 Humble, use the following command:
+
+```bash
+sudo apt update
+sudo apt install ~nros-humble-rqt*
+```
+
+This command will install RQT along with all available plugins for ROS 2 Humble.
+
+#### Running RQT
+
+To start RQT, simply run:
+
+```bash
+rqt
+```
+
+You can now use RQT to visualize and interact with various ROS topics and nodes.
+
+To view the image topic :
+
+1. go to plugins/visualization/image view. a new box will open up
+2. click refresh topic
+3. select the name of the image topic you want to see
+
+
+
+## Testing the performance of the zed ros2 wrapper:
+
+### using top:
+ros2 node list
+top -c -p $(pgrep -d',' -f name_of_command)
+
+
+### using rqt:
+install rqt with
+
+`sudo apt install ros-humble-rqt*`
+sudo apt install ros-${ROS_DISTRO}-rqt-top
+to launch, just type `rqt` into the terminal
+
+
+### visualizing the image topics
+source: https://ros2jsguy.medium.com/4-data-visualization-using-ros-2-rviz2-pub-sub-communications-and-javascript-typescript-6e43cde75029
+
+
+
+
+### looking at the rviz2 display
+`ros2 launch zed_display_rviz2 display_zed_cam.launch.py camera_model:=zed2`
diff --git a/ros2_ws/src/python_workspace/launch/README.md b/ros2_ws/src/python_package/launch/README.md
similarity index 100%
rename from ros2_ws/src/python_workspace/launch/README.md
rename to ros2_ws/src/python_package/launch/README.md
diff --git a/ros2_ws/src/python_workspace/launch/debugging.py b/ros2_ws/src/python_package/launch/debugging.py
similarity index 93%
rename from ros2_ws/src/python_workspace/launch/debugging.py
rename to ros2_ws/src/python_package/launch/debugging.py
index 569a27b..3304eb0 100644
--- a/ros2_ws/src/python_workspace/launch/debugging.py
+++ b/ros2_ws/src/python_package/launch/debugging.py
@@ -1,54 +1,54 @@
-from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument
-from launch_ros.actions import Node
-from launch.substitutions import LaunchConfiguration
-
-def generate_launch_description():
- return LaunchDescription([
- DeclareLaunchArgument('static_image_path', default_value='/home/user/ROS/assets/maize/IMG_1822_14.JPG',description='The path to the static image for CameraNode'),
- DeclareLaunchArgument('loop', default_value='-1', description='0 = do not loop, >0 = # of loops, -1 = loop forever'),
- DeclareLaunchArgument('frame_rate', default_value='30', description='Frame rate for CameraNode publishing'),
- DeclareLaunchArgument('precision', default_value='fp32', description='Precision for the inference data input'),
- DeclareLaunchArgument('weights_path', default_value='/home/user/ROS/models/maize/Maize.onnx', description='Path to the model weights file (must be absolute!)'),
- DeclareLaunchArgument('camera_side', default_value='left', description='Side of the Zed Camera for inference'),
- DeclareLaunchArgument('use_display_node', default_value='False', description='Toggle for using the display'),
-
- # Picture Node
- Node(
- package='python_workspace',
- executable='picture_node',
- name='picture_node',
- parameters=[
- {'static_image_path': LaunchConfiguration('static_image_path')},
- {'loop': LaunchConfiguration('loop')},
- {'frame_rate': LaunchConfiguration('frame_rate')},
- ],
- output='screen'
- ),
-
- # Inference Node
- Node(
- package='python_workspace',
- executable='inference_node',
- name='inference_node',
- parameters=[
- {'weights_path': LaunchConfiguration('weights_path')},
- {'precision': LaunchConfiguration('precision')},
- {'camera_side': LaunchConfiguration('camera_side')}
- ],
- output='screen'
- ),
-
- # Extermination Node
- Node(
- package='python_workspace',
- executable='extermination_node',
- name='extermination_node',
- parameters=[
- {'use_display_node': LaunchConfiguration('use_display_node')},
- {'camera_side': LaunchConfiguration('camera_side')}
- ],
- output='screen'
- ),
-
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch_ros.actions import Node
+from launch.substitutions import LaunchConfiguration
+
+def generate_launch_description():
+ return LaunchDescription([
+ DeclareLaunchArgument('static_image_path', default_value='/home/user/ROS/assets/maize/IMG_1822_14.JPG',description='The path to the static image for CameraNode'),
+ DeclareLaunchArgument('loop', default_value='-1', description='0 = do not loop, >0 = # of loops, -1 = loop forever'),
+ DeclareLaunchArgument('frame_rate', default_value='30', description='Frame rate for CameraNode publishing'),
+ DeclareLaunchArgument('precision', default_value='fp32', description='Precision for the inference data input'),
+ DeclareLaunchArgument('weights_path', default_value='/home/user/ROS/models/maize/Maize.onnx', description='Path to the model weights file (must be absolute!)'),
+ DeclareLaunchArgument('camera_side', default_value='left', description='Side of the Zed Camera for inference'),
+ DeclareLaunchArgument('use_display_node', default_value='False', description='Toggle for using the display'),
+
+ # Picture Node
+ Node(
+ package='python_package',
+ executable='picture_node',
+ name='picture_node',
+ parameters=[
+ {'static_image_path': LaunchConfiguration('static_image_path')},
+ {'loop': LaunchConfiguration('loop')},
+ {'frame_rate': LaunchConfiguration('frame_rate')},
+ ],
+ output='screen'
+ ),
+
+ # Inference Node
+ Node(
+ package='python_package',
+ executable='inference_node',
+ name='inference_node',
+ parameters=[
+ {'weights_path': LaunchConfiguration('weights_path')},
+ {'precision': LaunchConfiguration('precision')},
+ {'camera_side': LaunchConfiguration('camera_side')}
+ ],
+ output='screen'
+ ),
+
+ # Extermination Node
+ Node(
+ package='python_package',
+ executable='extermination_node',
+ name='extermination_node',
+ parameters=[
+ {'use_display_node': LaunchConfiguration('use_display_node')},
+ {'camera_side': LaunchConfiguration('camera_side')}
+ ],
+ output='screen'
+ ),
+
])
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/launch/launch.py b/ros2_ws/src/python_package/launch/launch.py
similarity index 89%
rename from ros2_ws/src/python_workspace/launch/launch.py
rename to ros2_ws/src/python_package/launch/launch.py
index 831cfb7..3552f27 100644
--- a/ros2_ws/src/python_workspace/launch/launch.py
+++ b/ros2_ws/src/python_package/launch/launch.py
@@ -1,73 +1,73 @@
-from launch import LaunchDescription
-from launch.actions import DeclareLaunchArgument
-from launch.substitutions import LaunchConfiguration
-from launch_ros.actions import Node
-
-def generate_launch_description():
- # Declare launch arguments to allow for parameter substitution
- return LaunchDescription([
- # Camera Node arguments
- DeclareLaunchArgument('camera_side', default_value='left', description='Choose left or right camera'),
- DeclareLaunchArgument('shift_constant', default_value='0', description='Shift value for ROI'),
- DeclareLaunchArgument('roi_dimensions', default_value='[0, 0, 100, 100]', description='ROI dimensions as [x1, y1, x2, y2]'),
-
- # Proxy Node argument
- DeclareLaunchArgument('usb_port', default_value='/dev/ttyACM0', description='USB port for the serial connection'),
-
- # Inference Node arguments
- DeclareLaunchArgument('weights_path', default_value='./src/python_workspace/python_workspace/scripts/best.onnx', description='Path to the weights file'),
- DeclareLaunchArgument('precision', default_value='fp32', description='Precision for the inference model (e.g., fp32, fp16)'),
-
- # Extermination Node arguments
- DeclareLaunchArgument('use_display_node', default_value='True', description='Enable display node for visualization'),
- DeclareLaunchArgument('camera_side_exterm', default_value='left', description='Camera side for extermination node (left or right)'),
-
- # Camera Node
- Node(
- package='python_workspace',
- executable='camera_node',
- name='camera_node',
- parameters=[
- {'camera_side': LaunchConfiguration('camera_side')},
- {'shift_constant': LaunchConfiguration('shift_constant')},
- {'roi_dimensions': LaunchConfiguration('roi_dimensions')}
- ],
- output='screen'
- ),
-
- # Proxy Node
- Node(
- package='python_workspace',
- executable='proxy_node',
- name='proxy_node',
- parameters=[
- {'usb_port': LaunchConfiguration('usb_port')}
- ],
- output='screen'
- ),
-
- # Inference Node
- Node(
- package='python_workspace',
- executable='inference_node',
- name='inference_node',
- parameters=[
- {'weights_path': LaunchConfiguration('weights_path')},
- {'precision': LaunchConfiguration('precision')},
- {'camera_side': LaunchConfiguration('camera_side')}
- ],
- output='screen'
- ),
-
- # Extermination Node
- Node(
- package='python_workspace',
- executable='extermination_node',
- name='extermination_node',
- parameters=[
- {'use_display_node': LaunchConfiguration('use_display_node')},
- {'camera_side': LaunchConfiguration('camera_side_exterm')}
- ],
- output='screen'
- )
+from launch import LaunchDescription
+from launch.actions import DeclareLaunchArgument
+from launch.substitutions import LaunchConfiguration
+from launch_ros.actions import Node
+
+def generate_launch_description():
+ # Declare launch arguments to allow for parameter substitution
+ return LaunchDescription([
+ # Camera Node arguments
+ DeclareLaunchArgument('camera_side', default_value='left', description='Choose left or right camera'),
+ DeclareLaunchArgument('shift_constant', default_value='0', description='Shift value for ROI'),
+ DeclareLaunchArgument('roi_dimensions', default_value='[0, 0, 100, 100]', description='ROI dimensions as [x1, y1, x2, y2]'),
+
+ # Proxy Node argument
+ DeclareLaunchArgument('usb_port', default_value='/dev/ttyACM0', description='USB port for the serial connection'),
+
+ # Inference Node arguments
+ DeclareLaunchArgument('weights_path', default_value='./src/python_package/python_package/scripts/best.onnx', description='Path to the weights file'),
+ DeclareLaunchArgument('precision', default_value='fp32', description='Precision for the inference model (e.g., fp32, fp16)'),
+
+ # Extermination Node arguments
+ DeclareLaunchArgument('use_display_node', default_value='True', description='Enable display node for visualization'),
+ DeclareLaunchArgument('camera_side_exterm', default_value='left', description='Camera side for extermination node (left or right)'),
+
+ # Camera Node
+ Node(
+ package='python_package',
+ executable='camera_node',
+ name='camera_node',
+ parameters=[
+ {'camera_side': LaunchConfiguration('camera_side')},
+ {'shift_constant': LaunchConfiguration('shift_constant')},
+ {'roi_dimensions': LaunchConfiguration('roi_dimensions')}
+ ],
+ output='screen'
+ ),
+
+ # Proxy Node
+ Node(
+ package='python_package',
+ executable='proxy_node',
+ name='proxy_node',
+ parameters=[
+ {'usb_port': LaunchConfiguration('usb_port')}
+ ],
+ output='screen'
+ ),
+
+ # Inference Node
+ Node(
+ package='python_package',
+ executable='inference_node',
+ name='inference_node',
+ parameters=[
+ {'weights_path': LaunchConfiguration('weights_path')},
+ {'precision': LaunchConfiguration('precision')},
+ {'camera_side': LaunchConfiguration('camera_side')}
+ ],
+ output='screen'
+ ),
+
+ # Extermination Node
+ Node(
+ package='python_package',
+ executable='extermination_node',
+ name='extermination_node',
+ parameters=[
+ {'use_display_node': LaunchConfiguration('use_display_node')},
+ {'camera_side': LaunchConfiguration('camera_side_exterm')}
+ ],
+ output='screen'
+ )
])
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/package.xml b/ros2_ws/src/python_package/package.xml
similarity index 94%
rename from ros2_ws/src/python_workspace/package.xml
rename to ros2_ws/src/python_package/package.xml
index 44fc016..47a6c67 100644
--- a/ros2_ws/src/python_workspace/package.xml
+++ b/ros2_ws/src/python_package/package.xml
@@ -1,30 +1,30 @@
-
-
-
- python_workspace
- 0.0.1
- ROS2 Pure Python MVP
- Ishaan Datta
- Apache License 2.0
-
- ament_python
-
- builtin_interfaces
- std_msgs
-
- rclpy
- std_msgs
- sensor_msgs
- builtin_interfaces
-
- ament_copyright
- ament_flake8
- ament_pep257
- python3-pytest
-
- custom_interfaces
-
-
- ament_python
-
+
+
+
+ python_package
+ 0.0.1
+ ROS2 Pure Python MVP
+ Ishaan Datta
+ Apache License 2.0
+
+ ament_python
+
+ builtin_interfaces
+ std_msgs
+
+ rclpy
+ std_msgs
+ sensor_msgs
+ builtin_interfaces
+
+ ament_copyright
+ ament_flake8
+ ament_pep257
+ python3-pytest
+
+ custom_interfaces
+
+
+ ament_python
+
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/__init__.py b/ros2_ws/src/python_package/python_package/__init__.py
similarity index 100%
rename from ros2_ws/src/python_workspace/python_workspace/__init__.py
rename to ros2_ws/src/python_package/python_package/__init__.py
diff --git a/ros2_ws/src/python_workspace/python_workspace/camera_node.py b/ros2_ws/src/python_package/python_package/camera_node.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/camera_node.py
rename to ros2_ws/src/python_package/python_package/camera_node.py
index 24a80fb..a6fa842 100644
--- a/ros2_ws/src/python_workspace/python_workspace/camera_node.py
+++ b/ros2_ws/src/python_package/python_package/camera_node.py
@@ -1,129 +1,129 @@
-import time
-import cv2
-import pyzed.sl as sl
-
-import rclpy
-from rclpy.time import Time
-from rclpy.node import Node
-from rclpy.executors import MultiThreadedExecutor
-from cv_bridge import CvBridge
-
-from std_msgs.msg import Header
-from sensor_msgs.msg import Image
-from custom_interfaces.msg import ImageInput
-
-from .scripts.utils import ModelInference
-from .scripts.image_source import ImageSource
-
-class CameraNode(Node):
- def __init__(self):
- super().__init__('camera_node')
-
- self.get_logger().info("Initializing Camera Node")
-
- self.declare_parameter('camera_side', 'left')
- self.declare_parameter('roi_dimensions', (0, 0, 100, 100))
- self.declare_parameter('source_type', 'zed')
- self.camera_side = self.get_parameter('camera_side').get_parameter_value().string_value
- self.roi_dimensions = self.get_parameter('roi_dimensions').get_parameter_value().integer_array_value
- self.source_type = self.get_parameter('source_type').get_parameter_value().string_value
-
- # path = ""
-
- # # initialize image_source type
- # image = ImageSource(self.source_type, path)
-
- self.shift_constant = 0
- self.frame_rate = 1
- self.model_dimensions = (640, 640)
- self.velocity = 0.0
- self.bridge = CvBridge()
- self.left_camera_serial_number = 26853647
- self.right_camera_serial_number = 0
-
- self.input_image_publisher = self.create_publisher(ImageInput, f'{self.camera_side}_image_input', 10)
-
- self.publish_zed_frames()
-
- def publish_zed_frames(self):
- zed = sl.Camera()
- init = sl.InitParameters()
- init.camera_resolution = sl.RESOLUTION.HD1080
- init.camera_fps = self.frame_rate
-
- if self.camera_side == 'left':
- init.set_from_serial_number(self.left_camera_serial_number)
- elif self.camera_side == 'right':
- init.set_from_serial_number(self.right_camera_serial_number)
- self.shift_constant *= -1
-
- if not zed.is_opened():
- self.get_logger().info("Initializing Zed Camera")
- status = zed.open(init)
- if status != sl.ERROR_CODE.SUCCESS:
- self.get_logger().error(f"Failed to open ZED camera: {str(status)}")
- return
-
- runtime = sl.RuntimeParameters()
- image_zed = sl.Mat()
-
- while rclpy.ok():
- err = zed.grab(runtime)
- if err == sl.ERROR_CODE.SUCCESS:
- zed.retrieve_image(image_zed, sl.VIEW.LEFT_UNRECTIFIED)
- if self.camera_side == 'left':
- zed.retrieve_image(image_zed, sl.VIEW.LEFT_UNRECTIFIED)
- else:
- zed.retrieve_image(image_zed, sl.VIEW.RIGHT_UNRECTIFIED)
-
- cv_image = image_zed.get_data()
- self.preprocess_image(cv_image)
- time.sleep(1.0 / self.frame_rate)
-
- else:
- self.get_logger().error("Failed to grab ZED camera frame: {str(err)}")
-
- zed.close()
- self.get_logger().info("Closing Zed Camera")
-
- def preprocess_image(self, image):
- tic = time.perf_counter_ns()
- roi_x1, roi_y1, roi_x2, roi_y2 = self.roi_dimensions
- shifted_x1 = roi_x1 + self.shift_constant
- shifted_x2 = roi_x2 + self.shift_constant
-
- preprocessed_img = image[int(roi_y1):int(roi_y2), int(shifted_x1):int(shifted_x2), :3] # replace w/ util
- preprocessed_img = cv2.resize(preprocessed_img, self.model_dimensions) # check if necessary?
- preprocessed_img_msg = self.bridge.cv2_to_imgmsg(preprocessed_img, encoding='rgb8')
-
- raw_image = image[:, :, :3]
- raw_img_msg = self.bridge.cv2_to_imgmsg(raw_image, encoding='rgb8')
-
- image_input = ImageInput()
- image_input.header = Header()
- image_input.header.frame_id = 'static_image' # fix
- image_input.raw_image = raw_img_msg
- image_input.preprocessed_image = preprocessed_img_msg
- image_input.velocity = self.velocity
- self.input_image_publisher.publish(image_input)
- toc = time.perf_counter_ns()
- self.get_logger().info(f"Preprocessing: {(toc-tic)/1e6} ms")
- time.sleep(1/self.frame_rate)
-
-def main(args=None):
- rclpy.init(args=args)
- camera_node = CameraNode()
- executor = MultiThreadedExecutor(num_threads=1)
- executor.add_node(camera_node)
-
- try:
- executor.spin()
- except KeyboardInterrupt:
- print("Shutting down camera node")
- finally:
- executor.shutdown()
- camera_node.destroy_node()
- rclpy.shutdown()
-
-if __name__ == '__main__':
- main()
+import time
+import cv2
+import pyzed.sl as sl
+
+import rclpy
+from rclpy.time import Time
+from rclpy.node import Node
+from rclpy.executors import MultiThreadedExecutor
+from cv_bridge import CvBridge
+
+from std_msgs.msg import Header
+from sensor_msgs.msg import Image
+from custom_interfaces.msg import ImageInput
+
+from .scripts.utils import ModelInference
+from .scripts.image_source import ImageSource
+
+class CameraNode(Node):
+ def __init__(self):
+ super().__init__('camera_node')
+
+ self.get_logger().info("Initializing Camera Node")
+
+ self.declare_parameter('camera_side', 'left')
+ self.declare_parameter('roi_dimensions', (0, 0, 100, 100))
+ self.declare_parameter('source_type', 'zed')
+ self.camera_side = self.get_parameter('camera_side').get_parameter_value().string_value
+ self.roi_dimensions = self.get_parameter('roi_dimensions').get_parameter_value().integer_array_value
+ self.source_type = self.get_parameter('source_type').get_parameter_value().string_value
+
+ # path = ""
+
+ # # initialize image_source type
+ # image = ImageSource(self.source_type, path)
+
+ self.shift_constant = 0
+ self.frame_rate = 1
+ self.model_dimensions = (640, 640)
+ self.velocity = 0.0
+ self.bridge = CvBridge()
+ self.left_camera_serial_number = 26853647
+ self.right_camera_serial_number = 0
+
+ self.input_image_publisher = self.create_publisher(ImageInput, f'{self.camera_side}_image_input', 10)
+
+ self.publish_zed_frames()
+
+ def publish_zed_frames(self):
+ zed = sl.Camera()
+ init = sl.InitParameters()
+ init.camera_resolution = sl.RESOLUTION.HD1080
+ init.camera_fps = self.frame_rate
+
+ if self.camera_side == 'left':
+ init.set_from_serial_number(self.left_camera_serial_number)
+ elif self.camera_side == 'right':
+ init.set_from_serial_number(self.right_camera_serial_number)
+ self.shift_constant *= -1
+
+ if not zed.is_opened():
+ self.get_logger().info("Initializing Zed Camera")
+ status = zed.open(init)
+ if status != sl.ERROR_CODE.SUCCESS:
+ self.get_logger().error(f"Failed to open ZED camera: {str(status)}")
+ return
+
+ runtime = sl.RuntimeParameters()
+ image_zed = sl.Mat()
+
+ while rclpy.ok():
+ err = zed.grab(runtime)
+ if err == sl.ERROR_CODE.SUCCESS:
+ zed.retrieve_image(image_zed, sl.VIEW.LEFT_UNRECTIFIED)
+ if self.camera_side == 'left':
+ zed.retrieve_image(image_zed, sl.VIEW.LEFT_UNRECTIFIED)
+ else:
+ zed.retrieve_image(image_zed, sl.VIEW.RIGHT_UNRECTIFIED)
+
+ cv_image = image_zed.get_data()
+ self.preprocess_image(cv_image)
+ time.sleep(1.0 / self.frame_rate)
+
+ else:
+ self.get_logger().error("Failed to grab ZED camera frame: {str(err)}")
+
+ zed.close()
+ self.get_logger().info("Closing Zed Camera")
+
+ def preprocess_image(self, image):
+ tic = time.perf_counter_ns()
+ roi_x1, roi_y1, roi_x2, roi_y2 = self.roi_dimensions
+ shifted_x1 = roi_x1 + self.shift_constant
+ shifted_x2 = roi_x2 + self.shift_constant
+
+ preprocessed_img = image[int(roi_y1):int(roi_y2), int(shifted_x1):int(shifted_x2), :3] # replace w/ util
+ preprocessed_img = cv2.resize(preprocessed_img, self.model_dimensions) # check if necessary?
+ preprocessed_img_msg = self.bridge.cv2_to_imgmsg(preprocessed_img, encoding='rgb8')
+
+ raw_image = image[:, :, :3]
+ raw_img_msg = self.bridge.cv2_to_imgmsg(raw_image, encoding='rgb8')
+
+ image_input = ImageInput()
+ image_input.header = Header()
+ image_input.header.frame_id = 'static_image' # fix
+ image_input.raw_image = raw_img_msg
+ image_input.preprocessed_image = preprocessed_img_msg
+ image_input.velocity = self.velocity
+ self.input_image_publisher.publish(image_input)
+ toc = time.perf_counter_ns()
+ self.get_logger().info(f"Preprocessing: {(toc-tic)/1e6} ms")
+ time.sleep(1/self.frame_rate)
+
+def main(args=None):
+ rclpy.init(args=args)
+ camera_node = CameraNode()
+ executor = MultiThreadedExecutor(num_threads=1)
+ executor.add_node(camera_node)
+
+ try:
+ executor.spin()
+ except KeyboardInterrupt:
+ print("Shutting down camera node")
+ finally:
+ executor.shutdown()
+ camera_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
+ main()
diff --git a/ros2_ws/src/python_workspace/python_workspace/extermination_node.py b/ros2_ws/src/python_package/python_package/extermination_node.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/extermination_node.py
rename to ros2_ws/src/python_package/python_package/extermination_node.py
index 3c1227b..6fb982c 100644
--- a/ros2_ws/src/python_workspace/python_workspace/extermination_node.py
+++ b/ros2_ws/src/python_package/python_package/extermination_node.py
@@ -1,83 +1,83 @@
-import cv2
-
-import rclpy
-from rclpy.time import Time
-from rclpy.node import Node
-from rclpy.executors import MultiThreadedExecutor
-from cv_bridge import CvBridge
-
-from sensor_msgs.msg import Image
-from std_msgs.msg import Header, Int8, String
-from custom_interfaces.msg import InferenceOutput
-
-from .scripts.utils import ModelInference
-
-class ExterminationNode(Node):
- def __init__(self):
- super().__init__('extermination_node')
-
- self.get_logger().info("Initializing Extermination Node")
-
- self.declare_parameter('use_display_node', True)
- self.declare_parameter('camera_side', 'left')
- self.use_display_node = self.get_parameter('use_display_node').get_parameter_value().bool_value
- self.camera_side = self.get_parameter('camera_side').get_parameter_value().string_value
-
- self.window = "Left Camera" if self.camera_side == "left" else "Right Camera" if self.use_display_node else None
-
- self.publishing_rate = 1.0
- self.lower_range = [78, 158, 124]
- self.upper_range = [60, 255, 255]
- self.minimum_area = 100
- self.minimum_confidence = 0.5
- self.boxes_present = 0
- self.model = ModelInference()
- self.bridge = CvBridge()
- self.boxes_msg = Int8()
- self.boxes_msg.data = 0
-
- self.inference_subscription = self.create_subscription(InferenceOutput, f'{self.camera_side}_inference_output', self.inference_callback, 10)
- self.box_publisher = self.create_publisher(Int8, f'{self.camera_side}_extermination_output', 10)
- self.timer = self.create_timer(self.publishing_rate, self.timer_callback)
-
- def inference_callback(self, msg):
- self.get_logger().info("Received Bounding Boxes")
-
- preprocessed_image = self.bridge.imgmsg_to_cv2(msg.preprocessed_image, desired_encoding='passthrough') # what is this needed for?
- raw_image = self.bridge.imgmsg_to_cv2(msg.raw_image, desired_encoding='passthrough')
- bounding_boxes = self.model.postprocess(msg.confidences.data,msg.bounding_boxes.data, raw_image,msg.velocity)
- final_image = self.model.draw_boxes(raw_image,bounding_boxes,velocity=msg.velocity)
-
- if self.use_display_node:
- cv2.imshow(self.window, final_image)
- cv2.waitKey(10)
-
- if len(bounding_boxes) > 0:
- self.boxes_present = 1
- else:
- self.boxes_present = 0
-
- self.boxes_msg = Int8()
- self.boxes_msg.data = self.boxes_present
-
- def timer_callback(self):
- self.box_publisher.publish(self.boxes_msg)
- self.get_logger().info("Published results to Proxy Node")
-
-
-def main(args=None):
- rclpy.init(args=args)
- extermination_node = ExterminationNode()
- executor = MultiThreadedExecutor(num_threads=1)
- executor.add_node(extermination_node)
- try:
- executor.spin()
- except KeyboardInterrupt:
- print("Shutting down extermination node")
- finally:
- executor.shutdown()
- extermination_node.destroy_node()
- rclpy.shutdown()
-
-if __name__ == '__main__':
+import cv2
+
+import rclpy
+from rclpy.time import Time
+from rclpy.node import Node
+from rclpy.executors import MultiThreadedExecutor
+from cv_bridge import CvBridge
+
+from sensor_msgs.msg import Image
+from std_msgs.msg import Header, Int8, String
+from custom_interfaces.msg import InferenceOutput
+
+from .scripts.utils import ModelInference
+
+class ExterminationNode(Node):
+ def __init__(self):
+ super().__init__('extermination_node')
+
+ self.get_logger().info("Initializing Extermination Node")
+
+ self.declare_parameter('use_display_node', True)
+ self.declare_parameter('camera_side', 'left')
+ self.use_display_node = self.get_parameter('use_display_node').get_parameter_value().bool_value
+ self.camera_side = self.get_parameter('camera_side').get_parameter_value().string_value
+
+ self.window = "Left Camera" if self.camera_side == "left" else "Right Camera" if self.use_display_node else None
+
+ self.publishing_rate = 1.0
+ self.lower_range = [78, 158, 124]
+ self.upper_range = [60, 255, 255]
+ self.minimum_area = 100
+ self.minimum_confidence = 0.5
+ self.boxes_present = 0
+ self.model = ModelInference()
+ self.bridge = CvBridge()
+ self.boxes_msg = Int8()
+ self.boxes_msg.data = 0
+
+ self.inference_subscription = self.create_subscription(InferenceOutput, f'{self.camera_side}_inference_output', self.inference_callback, 10)
+ self.box_publisher = self.create_publisher(Int8, f'{self.camera_side}_extermination_output', 10)
+ self.timer = self.create_timer(self.publishing_rate, self.timer_callback)
+
+ def inference_callback(self, msg):
+ self.get_logger().info("Received Bounding Boxes")
+
+ preprocessed_image = self.bridge.imgmsg_to_cv2(msg.preprocessed_image, desired_encoding='passthrough') # what is this needed for?
+ raw_image = self.bridge.imgmsg_to_cv2(msg.raw_image, desired_encoding='passthrough')
+ bounding_boxes = self.model.postprocess(msg.confidences.data,msg.bounding_boxes.data, raw_image,msg.velocity)
+ final_image = self.model.draw_boxes(raw_image,bounding_boxes,velocity=msg.velocity)
+
+ if self.use_display_node:
+ cv2.imshow(self.window, final_image)
+ cv2.waitKey(10)
+
+ if len(bounding_boxes) > 0:
+ self.boxes_present = 1
+ else:
+ self.boxes_present = 0
+
+ self.boxes_msg = Int8()
+ self.boxes_msg.data = self.boxes_present
+
+ def timer_callback(self):
+ self.box_publisher.publish(self.boxes_msg)
+ self.get_logger().info("Published results to Proxy Node")
+
+
+def main(args=None):
+ rclpy.init(args=args)
+ extermination_node = ExterminationNode()
+ executor = MultiThreadedExecutor(num_threads=1)
+ executor.add_node(extermination_node)
+ try:
+ executor.spin()
+ except KeyboardInterrupt:
+ print("Shutting down extermination node")
+ finally:
+ executor.shutdown()
+ extermination_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
main()
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/inference_node.py b/ros2_ws/src/python_package/python_package/inference_node.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/inference_node.py
rename to ros2_ws/src/python_package/python_package/inference_node.py
index 16ec616..e6aa495 100644
--- a/ros2_ws/src/python_workspace/python_workspace/inference_node.py
+++ b/ros2_ws/src/python_package/python_package/inference_node.py
@@ -1,76 +1,76 @@
-import time
-import os
-
-import rclpy
-from rclpy.time import Time
-from rclpy.node import Node
-from rclpy.executors import MultiThreadedExecutor
-from cv_bridge import CvBridge
-
-from sensor_msgs.msg import Image
-from std_msgs.msg import Int32MultiArray, Float32MultiArray
-from custom_interfaces.msg import ImageInput, InferenceOutput
-
-from .scripts.utils import ModelInference
-
-class InferenceNode(Node):
- def __init__(self):
- super().__init__('inference_node')
-
- self.get_logger().info("Initializing Inference Node")
-
- self.declare_parameter('weights_path', '/home/user/ROS/models/maize/Maize.onnx')
- self.declare_parameter('precision', 'fp32') # fp32, fp16 # todo: do something with strip_weights and precision
- self.declare_parameter('camera_side', 'left')
- self.camera_side = self.get_parameter('camera_side').get_parameter_value().string_value
- self.weights_path = self.get_parameter('weights_path').get_parameter_value().string_value
- self.precision = self.get_parameter('precision').get_parameter_value().string_value
-
- self.model = ModelInference(self.weights_path, self.precision)
- self.bridge = CvBridge()
-
- self.image_subscription = self.create_subscription(ImageInput, f'{self.camera_side}_image_input', self.image_callback, 10)
- self.box_publisher = self.create_publisher(InferenceOutput,f'{self.camera_side}_inference_output', 10)
-
- def image_callback(self, msg):
- self.get_logger().info("Received Image")
- opencv_img = self.bridge.imgmsg_to_cv2(msg.preprocessed_image, desired_encoding='passthrough')
- tic = time.perf_counter_ns()
- output_img, confidences, boxes = self.model.inference(opencv_img)
- toc = time.perf_counter_ns()
-
- output_msg = InferenceOutput()
- output_msg.num_boxes = len(boxes)
- output_msg.raw_image = msg.raw_image
- output_msg.velocity = msg.velocity
- output_msg.preprocessed_image = msg.preprocessed_image
- bounding_boxes = Float32MultiArray()
- confidences_msg = Float32MultiArray()
- bounding_boxes.data = []
- confidences_msg.data = []
- if len(boxes) != 0:
- bounding_boxes.data = boxes
- confidences_msg.data = confidences
- output_msg.bounding_boxes = bounding_boxes
- output_msg.confidences = confidences_msg
- self.box_publisher.publish(output_msg)
-
- self.get_logger().info(f"Inference: {(toc-tic)/1e6} ms")
-
-def main(args=None):
- rclpy.init(args=args)
- inference_node = InferenceNode()
- executor = MultiThreadedExecutor(num_threads=1)
- executor.add_node(inference_node)
-
- try:
- executor.spin()
- except KeyboardInterrupt:
- print("Shutting down inference node")
- finally:
- executor.shutdown()
- inference_node.destroy_node()
- rclpy.shutdown()
-
-if __name__ == '__main__':
+import time
+import os
+
+import rclpy
+from rclpy.time import Time
+from rclpy.node import Node
+from rclpy.executors import MultiThreadedExecutor
+from cv_bridge import CvBridge
+
+from sensor_msgs.msg import Image
+from std_msgs.msg import Int32MultiArray, Float32MultiArray
+from custom_interfaces.msg import ImageInput, InferenceOutput
+
+from .scripts.utils import ModelInference
+
+class InferenceNode(Node):
+ def __init__(self):
+ super().__init__('inference_node')
+
+ self.get_logger().info("Initializing Inference Node")
+
+ self.declare_parameter('weights_path', '/home/user/ROS/models/maize/Maize.onnx')
+ self.declare_parameter('precision', 'fp32') # fp32, fp16 # todo: do something with strip_weights and precision
+ self.declare_parameter('camera_side', 'left')
+ self.camera_side = self.get_parameter('camera_side').get_parameter_value().string_value
+ self.weights_path = self.get_parameter('weights_path').get_parameter_value().string_value
+ self.precision = self.get_parameter('precision').get_parameter_value().string_value
+
+ self.model = ModelInference(self.weights_path, self.precision)
+ self.bridge = CvBridge()
+
+ self.image_subscription = self.create_subscription(ImageInput, f'{self.camera_side}_image_input', self.image_callback, 10)
+ self.box_publisher = self.create_publisher(InferenceOutput,f'{self.camera_side}_inference_output', 10)
+
+ def image_callback(self, msg):
+ self.get_logger().info("Received Image")
+ opencv_img = self.bridge.imgmsg_to_cv2(msg.preprocessed_image, desired_encoding='passthrough')
+ tic = time.perf_counter_ns()
+ output_img, confidences, boxes = self.model.inference(opencv_img)
+ toc = time.perf_counter_ns()
+
+ output_msg = InferenceOutput()
+ output_msg.num_boxes = len(boxes)
+ output_msg.raw_image = msg.raw_image
+ output_msg.velocity = msg.velocity
+ output_msg.preprocessed_image = msg.preprocessed_image
+ bounding_boxes = Float32MultiArray()
+ confidences_msg = Float32MultiArray()
+ bounding_boxes.data = []
+ confidences_msg.data = []
+ if len(boxes) != 0:
+ bounding_boxes.data = boxes
+ confidences_msg.data = confidences
+ output_msg.bounding_boxes = bounding_boxes
+ output_msg.confidences = confidences_msg
+ self.box_publisher.publish(output_msg)
+
+ self.get_logger().info(f"Inference: {(toc-tic)/1e6} ms")
+
+def main(args=None):
+ rclpy.init(args=args)
+ inference_node = InferenceNode()
+ executor = MultiThreadedExecutor(num_threads=1)
+ executor.add_node(inference_node)
+
+ try:
+ executor.spin()
+ except KeyboardInterrupt:
+ print("Shutting down inference node")
+ finally:
+ executor.shutdown()
+ inference_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
main()
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/picture_node.py b/ros2_ws/src/python_package/python_package/picture_node.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/picture_node.py
rename to ros2_ws/src/python_package/python_package/picture_node.py
index 23fec8d..695a1d1 100644
--- a/ros2_ws/src/python_workspace/python_workspace/picture_node.py
+++ b/ros2_ws/src/python_package/python_package/picture_node.py
@@ -1,151 +1,151 @@
-import time, os
-import cv2
-import random
-import rclpy
-from rclpy.time import Time
-from rclpy.node import Node
-from rclpy.executors import MultiThreadedExecutor
-from sensor_msgs.msg import Image
-from std_msgs.msg import Header, String, Int8
-from cv_bridge import CvBridge
-import queue
-import numpy as np
-
-from .scripts.utils import ModelInference
-
-from custom_interfaces.msg import ImageInput # CHANGE
-# node to publish static image data to the /image topic.
-# used for testing the inference pipelines.
-class PictureNode(Node):
- def __init__(self):
- super().__init__('picture_node')
-
- self.declare_parameter('static_image_path', '/home/user/ROS/assets/maize/IMG_1822_14.JPG')
- self.declare_parameter('loop', -1) # 0 = don't loop, >0 = # of loops, -1 = loop forever
- self.declare_parameter('frame_rate', 1) # Desired frame rate for publishing
-
- self.static_image_path = self.get_parameter('static_image_path').get_parameter_value().string_value
- # Resolve the path programmatically
- if not os.path.isabs(self.static_image_path):
- self.static_image_path = os.path.join(os.getcwd(), self.static_image_path)
-
- self.loop = self.get_parameter('loop').get_parameter_value().integer_value
- self.frame_rate = self.get_parameter('frame_rate').get_parameter_value().integer_value
-
- self.bridge = CvBridge()
- self.model = ModelInference()
-
- self.image_list = self.get_images()
- self.loop_count = 0
- self.image_counter = 0
-
- self.input_image_publisher = self.create_publisher(ImageInput, 'input_image', 10)
- timer_period = 1/self.frame_rate # publish every 0.5 seconds
- self.timer = self.create_timer(timer_period * 2, self.publish_static_image)
-
-
- def get_images(self)-> list[np.ndarray]:
- """
- Returns a list of images in the form of cv images
- from the path specified by the static_image_path parameter.
- This is to simulate having a stream of camera images
- """
- # You can technically read the img file as binary data rather than using cv2 to read it and then convert it into a ROS2 image message... and then converting it back to a numpy array in the inference node. But since the picture node is only for testing the MVP and we're not using the GPU yet the overhead here should not matter.
- if not os.path.exists(self.static_image_path):
- self.get_logger().error(f"Static image not found at {self.static_image_path}")
- raise FileNotFoundError(f"Static image not found at {self.static_image_path}")
-
- filename = self.static_image_path
-
- image_paths = []
- if os.path.isfile(self.static_image_path) \
- and (filename.endswith('.JPG') or filename.endswith('.png')):
- filename = os.path.join(self.static_image_path, filename)
- image_paths.append(filename)
- elif os.path.isdir(self.static_image_path):
- for filename in os.listdir(self.static_image_path):
- if filename.endswith('.JPG') or filename.endswith('.png'):
- filename = os.path.join(self.static_image_path, filename)
- image_paths.append(filename)
-
- if len(image_paths) == 0:
- self.get_logger().error(f"No images found at {self.static_image_path}")
- return
-
- images = []
- for filename in image_paths:
- # read images
- image = cv2.imread(filename, cv2.IMREAD_COLOR)
- if image is None:
- self.get_logger().error(f"Failed to read image: {filename}")
- raise FileNotFoundError(f"Failed to read image: {filename}")
-
- # add to the list of images
- images.append(image)
-
- return images
-
-
- def publish_static_image(self):
- """
- Publishes static images to the /image topic
- """
- array_size = len(self.image_list)
-
- # publish the image on the current pos to the image topic
- if self.loop == -1 or self.loop_count < self.loop:
- #get image from list of uploaded
- position = self.image_counter % array_size
- self.get_logger().info(f"Publishing image {position + 1}/{array_size}")
-
- raw_image = self.image_list[position]
-
- #todo: create the message to publish
- postprocessed_img = self.model.preprocess(raw_image)
- postprocessed_img_msg = self.bridge.cv2_to_imgmsg(postprocessed_img, encoding='rgb8')
- raw_img_msg = self.bridge.cv2_to_imgmsg(raw_image, encoding='rgb8')
-
- image_input = ImageInput()
- image_input.header = Header()
- image_input.header.frame_id = 'static_image'
- image_input.raw_image = raw_img_msg
- image_input.preprocessed_image = postprocessed_img_msg
- image_input.velocity = random.uniform(0,1)
-
- # publish image and increment whatever is needed
- self.input_image_publisher.publish(image_input)
- self.get_logger().info(f"Published image {self.image_counter}")
- self.image_counter += 1
- self.loop_count = self.image_counter // array_size
-
- else:
- # stop the timer/quit the node
- self.timer.cancel()
- self.get_logger().info("Finished publishing images")
- self.destroy_node()
-
-
- def crop_with_velocity(self, image: np.ndarray, velocity: float):
- """
- Takes in an image and crops it with the velocity shift
- """
- return image
-
-def main(args=None):
- rclpy.init(args=args)
- picture_node = PictureNode()
- executor = MultiThreadedExecutor(num_threads=1)
- executor.add_node(picture_node)
-
- try:
- executor.spin()
- except KeyboardInterrupt:
- print("Shutting down picture node")
- return
- finally:
- executor.shutdown()
- picture_node.destroy_node()
- rclpy.shutdown()
-
-if __name__ == '__main__':
+import time, os
+import cv2
+import random
+import rclpy
+from rclpy.time import Time
+from rclpy.node import Node
+from rclpy.executors import MultiThreadedExecutor
+from sensor_msgs.msg import Image
+from std_msgs.msg import Header, String, Int8
+from cv_bridge import CvBridge
+import queue
+import numpy as np
+
+from .scripts.utils import ModelInference
+
+from custom_interfaces.msg import ImageInput # CHANGE
+# node to publish static image data to the /image topic.
+# used for testing the inference pipelines.
+class PictureNode(Node):
+ def __init__(self):
+ super().__init__('picture_node')
+
+ self.declare_parameter('static_image_path', '/home/user/ROS/assets/maize/IMG_1822_14.JPG')
+ self.declare_parameter('loop', -1) # 0 = don't loop, >0 = # of loops, -1 = loop forever
+ self.declare_parameter('frame_rate', 1) # Desired frame rate for publishing
+
+ self.static_image_path = self.get_parameter('static_image_path').get_parameter_value().string_value
+ # Resolve the path programmatically
+ if not os.path.isabs(self.static_image_path):
+ self.static_image_path = os.path.join(os.getcwd(), self.static_image_path)
+
+ self.loop = self.get_parameter('loop').get_parameter_value().integer_value
+ self.frame_rate = self.get_parameter('frame_rate').get_parameter_value().integer_value
+
+ self.bridge = CvBridge()
+ self.model = ModelInference()
+
+ self.image_list = self.get_images()
+ self.loop_count = 0
+ self.image_counter = 0
+
+ self.input_image_publisher = self.create_publisher(ImageInput, 'input_image', 10)
+ timer_period = 1/self.frame_rate # publish every 0.5 seconds
+ self.timer = self.create_timer(timer_period * 2, self.publish_static_image)
+
+
+ def get_images(self)-> list[np.ndarray]:
+ """
+ Returns a list of images in the form of cv images
+ from the path specified by the static_image_path parameter.
+ This is to simulate having a stream of camera images
+ """
+ # You can technically read the img file as binary data rather than using cv2 to read it and then convert it into a ROS2 image message... and then converting it back to a numpy array in the inference node. But since the picture node is only for testing the MVP and we're not using the GPU yet the overhead here should not matter.
+ if not os.path.exists(self.static_image_path):
+ self.get_logger().error(f"Static image not found at {self.static_image_path}")
+ raise FileNotFoundError(f"Static image not found at {self.static_image_path}")
+
+ filename = self.static_image_path
+
+ image_paths = []
+ if os.path.isfile(self.static_image_path) \
+ and (filename.endswith('.JPG') or filename.endswith('.png')):
+ filename = os.path.join(self.static_image_path, filename)
+ image_paths.append(filename)
+ elif os.path.isdir(self.static_image_path):
+ for filename in os.listdir(self.static_image_path):
+ if filename.endswith('.JPG') or filename.endswith('.png'):
+ filename = os.path.join(self.static_image_path, filename)
+ image_paths.append(filename)
+
+ if len(image_paths) == 0:
+ self.get_logger().error(f"No images found at {self.static_image_path}")
+ return
+
+ images = []
+ for filename in image_paths:
+ # read images
+ image = cv2.imread(filename, cv2.IMREAD_COLOR)
+ if image is None:
+ self.get_logger().error(f"Failed to read image: {filename}")
+ raise FileNotFoundError(f"Failed to read image: {filename}")
+
+ # add to the list of images
+ images.append(image)
+
+ return images
+
+
+ def publish_static_image(self):
+ """
+ Publishes static images to the /image topic
+ """
+ array_size = len(self.image_list)
+
+ # publish the image on the current pos to the image topic
+ if self.loop == -1 or self.loop_count < self.loop:
+ #get image from list of uploaded
+ position = self.image_counter % array_size
+ self.get_logger().info(f"Publishing image {position + 1}/{array_size}")
+
+ raw_image = self.image_list[position]
+
+ #todo: create the message to publish
+ postprocessed_img = self.model.preprocess(raw_image)
+ postprocessed_img_msg = self.bridge.cv2_to_imgmsg(postprocessed_img, encoding='rgb8')
+ raw_img_msg = self.bridge.cv2_to_imgmsg(raw_image, encoding='rgb8')
+
+ image_input = ImageInput()
+ image_input.header = Header()
+ image_input.header.frame_id = 'static_image'
+ image_input.raw_image = raw_img_msg
+ image_input.preprocessed_image = postprocessed_img_msg
+ image_input.velocity = random.uniform(0,1)
+
+ # publish image and increment whatever is needed
+ self.input_image_publisher.publish(image_input)
+ self.get_logger().info(f"Published image {self.image_counter}")
+ self.image_counter += 1
+ self.loop_count = self.image_counter // array_size
+
+ else:
+ # stop the timer/quit the node
+ self.timer.cancel()
+ self.get_logger().info("Finished publishing images")
+ self.destroy_node()
+
+
+ def crop_with_velocity(self, image: np.ndarray, velocity: float):
+ """
+ Takes in an image and crops it with the velocity shift
+ """
+ return image
+
+def main(args=None):
+ rclpy.init(args=args)
+ picture_node = PictureNode()
+ executor = MultiThreadedExecutor(num_threads=1)
+ executor.add_node(picture_node)
+
+ try:
+ executor.spin()
+ except KeyboardInterrupt:
+ print("Shutting down picture node")
+ return
+ finally:
+ executor.shutdown()
+ picture_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
main()
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/proxy_node.py b/ros2_ws/src/python_package/python_package/proxy_node.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/proxy_node.py
rename to ros2_ws/src/python_package/python_package/proxy_node.py
index c5c60e0..b8b2e8b 100644
--- a/ros2_ws/src/python_workspace/python_workspace/proxy_node.py
+++ b/ros2_ws/src/python_package/python_package/proxy_node.py
@@ -1,62 +1,62 @@
-import serial
-
-import rclpy
-from rclpy.time import Time
-from rclpy.node import Node
-from std_msgs.msg import Bool
-from rclpy.executors import MultiThreadedExecutor
-from std_msgs.msg import String,Header, Int8
-
-class ProxyNode(Node):
- def __init__(self):
- super().__init__('proxy_node')
-
- self.declare_parameter('usb_port', '/dev/ttyACM0')
- self.port = self.get_parameter('usb_port').get_parameter_value().string_value
-
- try:
- self.ser = serial.Serial(self.port, 115200, timeout=1)
- except Exception as e:
- self.get_logger().error(f"Failed to connect to serial port: {e}")
-
- self.left_subscription = self.create_subscription(Int8, 'left_extermination_output', self.left_callback, 10)
- self.right_subscription = self.create_subscription(Int8, 'right_extermination_output', self.right_callback, 10)
-
- def left_callback(self, msg):
- result = str(msg.data)
- if result == "0":
- serialized_msg = str(0) + '\n'
- self.ser.wrie(serialized_msg.encode())
- self.get_logger().info(f'Sent to Arduino: 0 (left off)')
- else:
- serialized_msg = str(1) + '\n'
- self.ser.wrie(serialized_msg.encode())
- self.get_logger().info(f'Sent to Arduino: 1 (left on)')
-
- def right_callback(self, msg):
- result = str(msg.data)
- if result == "0":
- serialized_msg = str(2) + '\n'
- self.ser.wrie(serialized_msg.encode())
- self.get_logger().info(f'Sent to Arduino: 2 (right off)')
- else:
- serialized_msg = str(3) + '\n'
- self.ser.wrie(serialized_msg.encode())
- self.get_logger().info(f'Sent to Arduino: 3 (right on)')
-
-def main(args=None):
- rclpy.init(args=args)
- proxy_node = ProxyNode()
- executor = MultiThreadedExecutor(num_threads=2)
- executor.add_node(proxy_node)
- try:
- executor.spin()
- except KeyboardInterrupt:
- print("Shutting down proxy node")
- finally:
- executor.shutdown()
- proxy_node.destroy_node()
- rclpy.shutdown()
-
-if __name__ == '__main__':
+import serial
+
+import rclpy
+from rclpy.time import Time
+from rclpy.node import Node
+from std_msgs.msg import Bool
+from rclpy.executors import MultiThreadedExecutor
+from std_msgs.msg import String,Header, Int8
+
+class ProxyNode(Node):
+ def __init__(self):
+ super().__init__('proxy_node')
+
+ self.declare_parameter('usb_port', '/dev/ttyACM0')
+ self.port = self.get_parameter('usb_port').get_parameter_value().string_value
+
+ try:
+ self.ser = serial.Serial(self.port, 115200, timeout=1)
+ except Exception as e:
+ self.get_logger().error(f"Failed to connect to serial port: {e}")
+
+ self.left_subscription = self.create_subscription(Int8, 'left_extermination_output', self.left_callback, 10)
+ self.right_subscription = self.create_subscription(Int8, 'right_extermination_output', self.right_callback, 10)
+
+ def left_callback(self, msg):
+ result = str(msg.data)
+ if result == "0":
+ serialized_msg = str(0) + '\n'
+ self.ser.wrie(serialized_msg.encode())
+ self.get_logger().info(f'Sent to Arduino: 0 (left off)')
+ else:
+ serialized_msg = str(1) + '\n'
+ self.ser.wrie(serialized_msg.encode())
+ self.get_logger().info(f'Sent to Arduino: 1 (left on)')
+
+ def right_callback(self, msg):
+ result = str(msg.data)
+ if result == "0":
+ serialized_msg = str(2) + '\n'
+ self.ser.wrie(serialized_msg.encode())
+ self.get_logger().info(f'Sent to Arduino: 2 (right off)')
+ else:
+ serialized_msg = str(3) + '\n'
+ self.ser.wrie(serialized_msg.encode())
+ self.get_logger().info(f'Sent to Arduino: 3 (right on)')
+
+def main(args=None):
+ rclpy.init(args=args)
+ proxy_node = ProxyNode()
+ executor = MultiThreadedExecutor(num_threads=2)
+ executor.add_node(proxy_node)
+ try:
+ executor.spin()
+ except KeyboardInterrupt:
+ print("Shutting down proxy node")
+ finally:
+ executor.shutdown()
+ proxy_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
main()
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/scripts/__init__.py b/ros2_ws/src/python_package/python_package/scripts/__init__.py
similarity index 100%
rename from ros2_ws/src/python_workspace/python_workspace/scripts/__init__.py
rename to ros2_ws/src/python_package/python_package/scripts/__init__.py
diff --git a/ros2_ws/src/python_workspace/python_workspace/scripts/image_source.py b/ros2_ws/src/python_package/python_package/scripts/image_source.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/scripts/image_source.py
rename to ros2_ws/src/python_package/python_package/scripts/image_source.py
index ebc2e19..c517a03 100644
--- a/ros2_ws/src/python_workspace/python_workspace/scripts/image_source.py
+++ b/ros2_ws/src/python_package/python_package/scripts/image_source.py
@@ -1,112 +1,112 @@
-import cv2
-import os
-import numpy as np
-import logging
-
-
-class ImageSource():
- default_folder_path = ""
- default_image_path = ""
- default_video_path = ""
- logging.basicConfig(format='%(message)s', level=logging.INFO)
-
- def __init__(self, source_type="picture", source_path="/usr/ROS/assets/maize", camera_side="left", serial_number=26853647):
- if self.source_type == "zed":
- import pyzed as sl
- self.camera_side = 0
- self.serial_number = 0
- self.initialize_camera()
-
- elif self.source_type == "static_image":
- if not os.path.exists(self.source_path):
- raise ValueError(f"Image file not found at {self.source_path}")
-
- if not (self.source_path.endswith(".jpg") or self.source_path.endswith(".JPG") or self.source_path.endswith(".png") or self.source_path.endswith(".PNG")):
- raise ValueError(f"Image file at {self.source_path} is not a valid image file. Supported formats are .JPG and .PNG")
-
- elif self.source_type == "image_folder":
- if not os.path.exists(self.source_path):
- raise ValueError(f"Folder not found at {self.source_path}")
-
- if not os.path.isdir(self.source_path):
- raise ValueError(f"Path at {self.source_path} is not a folder")
-
- if len(os.listdir(self.source_path)) == 0:
- raise ValueError(f"Folder at {self.source_path} is empty")
-
- valid_images = [".jpg", ".JPG", ".png", ".PNG"]
- image_files = [f for f in os.listdir(self.source_path) if any(f.endswith(ext) for ext in valid_images)]
-
- if len(image_files) == 0:
- raise ValueError(f"No image files found in {self.source_path}. Supported formats are .JPG and .PNG")
-
- elif self.source_type == "video":
- if not os.path.exists(self.source_path):
- raise ValueError(f"Video file not found at {self.source_path}")
-
- if not self.source_path.endswith(".mp4"):
- raise ValueError(f"Video file at {self.source_path} is not a valid video file. Supported format is .mp4")
-
- else:
- logging.info(f"{self.source_type} is not a valid image source, options are: zed, static_image, image_folder, video")
- raise ValueError(f"{self.source_type} is not a valid image source, options are: zed, static_image, image_folder, video")
-
- def validate_paths(self):
- if not os.path.exists(self.image_path):
- raise ValueError(f"Images folder not found at {self.image_path}")
-
- if len(os.listdir(self.image_path)) == 0:
- raise ValueError(f"Images folder is empty")
-
- files = []
- os.chdir(self.image_path)
- for filename in os.listdir(self.image_path):
- if filename.endswith(".jpg") or filename.endswith(".JPG") or filename.endswith(".png"):
- files.append(self.image_path + '/' + filename)
-
- if len(files) == 0:
- raise ValueError(f"No images files found in {self.image_path}")
-
- logging.info(f"{len(files)} from {self.image_path} loaded successfully")
- return files
-
- def initialize_camera(self):
- import pyzed as sl
- self.zed = sl.Camera()
- init = sl.InitParameters()
- init.camera_resolution = sl.RESOLUTION.HD1080
- init.camera_fps = 30
-
- init.set_from_serial_number(self.serial_number)
-
- if not self.zed.is_opened():
- logging.info("Initializing Zed Camera")
- status = self.zed.open(init)
- if status != sl.ERROR_CODE.SUCCESS:
- logging.error(f"Failed to open ZED camera: {str(status)}")
- return
-
- # should parse directories
- def retrieve_images():
- pass
-
- def capture_frames(self):
- import pyzed as sl
- runtime = sl.RuntimeParameters()
- image_zed = sl.Mat()
-
- err = self.zed.grab(runtime)
- if err == sl.ERROR_CODE.SUCCESS:
- if self.camera_side == 'left':
- self.zed.retrieve_image(image_zed, sl.VIEW.LEFT_UNRECTIFIED)
- else:
- self.zed.retrieve_image(image_zed, sl.VIEW.RIGHT_UNRECTIFIED)
- cv_image = image_zed.get_data()
- yield(cv_image)
- else:
- logging.error("Failed to grab ZED camera frame: {str(err)}")
-
- # should be a generator that gets images
- def parse_images(self):
- for image in self.images:
+import cv2
+import os
+import numpy as np
+import logging
+
+
+class ImageSource():
+ default_folder_path = ""
+ default_image_path = ""
+ default_video_path = ""
+ logging.basicConfig(format='%(message)s', level=logging.INFO)
+
+ def __init__(self, source_type="picture", source_path="/usr/ROS/assets/maize", camera_side="left", serial_number=26853647):
+ if self.source_type == "zed":
+ import pyzed as sl
+ self.camera_side = 0
+ self.serial_number = 0
+ self.initialize_camera()
+
+ elif self.source_type == "static_image":
+ if not os.path.exists(self.source_path):
+ raise ValueError(f"Image file not found at {self.source_path}")
+
+ if not (self.source_path.endswith(".jpg") or self.source_path.endswith(".JPG") or self.source_path.endswith(".png") or self.source_path.endswith(".PNG")):
+ raise ValueError(f"Image file at {self.source_path} is not a valid image file. Supported formats are .JPG and .PNG")
+
+ elif self.source_type == "image_folder":
+ if not os.path.exists(self.source_path):
+ raise ValueError(f"Folder not found at {self.source_path}")
+
+ if not os.path.isdir(self.source_path):
+ raise ValueError(f"Path at {self.source_path} is not a folder")
+
+ if len(os.listdir(self.source_path)) == 0:
+ raise ValueError(f"Folder at {self.source_path} is empty")
+
+ valid_images = [".jpg", ".JPG", ".png", ".PNG"]
+ image_files = [f for f in os.listdir(self.source_path) if any(f.endswith(ext) for ext in valid_images)]
+
+ if len(image_files) == 0:
+ raise ValueError(f"No image files found in {self.source_path}. Supported formats are .JPG and .PNG")
+
+ elif self.source_type == "video":
+ if not os.path.exists(self.source_path):
+ raise ValueError(f"Video file not found at {self.source_path}")
+
+ if not self.source_path.endswith(".mp4"):
+ raise ValueError(f"Video file at {self.source_path} is not a valid video file. Supported format is .mp4")
+
+ else:
+ logging.info(f"{self.source_type} is not a valid image source, options are: zed, static_image, image_folder, video")
+ raise ValueError(f"{self.source_type} is not a valid image source, options are: zed, static_image, image_folder, video")
+
+ def validate_paths(self):
+ if not os.path.exists(self.image_path):
+ raise ValueError(f"Images folder not found at {self.image_path}")
+
+ if len(os.listdir(self.image_path)) == 0:
+ raise ValueError(f"Images folder is empty")
+
+ files = []
+ os.chdir(self.image_path)
+ for filename in os.listdir(self.image_path):
+ if filename.endswith(".jpg") or filename.endswith(".JPG") or filename.endswith(".png"):
+ files.append(self.image_path + '/' + filename)
+
+ if len(files) == 0:
+ raise ValueError(f"No images files found in {self.image_path}")
+
+ logging.info(f"{len(files)} from {self.image_path} loaded successfully")
+ return files
+
+ def initialize_camera(self):
+ import pyzed as sl
+ self.zed = sl.Camera()
+ init = sl.InitParameters()
+ init.camera_resolution = sl.RESOLUTION.HD1080
+ init.camera_fps = 30
+
+ init.set_from_serial_number(self.serial_number)
+
+ if not self.zed.is_opened():
+ logging.info("Initializing Zed Camera")
+ status = self.zed.open(init)
+ if status != sl.ERROR_CODE.SUCCESS:
+ logging.error(f"Failed to open ZED camera: {str(status)}")
+ return
+
+ # should parse directories
+ def retrieve_images():
+ pass
+
+ def capture_frames(self):
+ import pyzed as sl
+ runtime = sl.RuntimeParameters()
+ image_zed = sl.Mat()
+
+ err = self.zed.grab(runtime)
+ if err == sl.ERROR_CODE.SUCCESS:
+ if self.camera_side == 'left':
+ self.zed.retrieve_image(image_zed, sl.VIEW.LEFT_UNRECTIFIED)
+ else:
+ self.zed.retrieve_image(image_zed, sl.VIEW.RIGHT_UNRECTIFIED)
+ cv_image = image_zed.get_data()
+ yield(cv_image)
+ else:
+ logging.error("Failed to grab ZED camera frame: {str(err)}")
+
+ # should be a generator that gets images
+ def parse_images(self):
+ for image in self.images:
pass
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/scripts/utils.py b/ros2_ws/src/python_package/python_package/scripts/utils.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/scripts/utils.py
rename to ros2_ws/src/python_package/python_package/scripts/utils.py
index c0a7ecd..a97dd45 100644
--- a/ros2_ws/src/python_workspace/python_workspace/scripts/utils.py
+++ b/ros2_ws/src/python_package/python_package/scripts/utils.py
@@ -1,312 +1,312 @@
-import cv2
-import os
-import numpy as np
-import torch
-from ultralytics import YOLO
-from ultralytics.engine.results import Results
-
-#utility class for the python workspace
-class ModelInference:
- ROI_ROWS = 300
- ROI_COLS = 300
- CONTOUR_AREA_THRESHOLD = 500
- POSTPROCESS_OUTPUT_SHAPE = (640, 640)
-
- """
- class that performs inference using a model
- """
- def __init__(self, weights_path=None, precision=None):
- if weights_path is None or precision is None:
- self.yolo = None
- return
- # Initialize instance variables
- else:
- self.precision = precision
- if not os.path.exists(weights_path):
- print(f"weights file not found at {weights_path}")
- raise FileNotFoundError(f"weights file not found at {weights_path}")
-
- self.yolo = YOLO(weights_path)
-
- def preprocess(self, image: np.ndarray):
- """
- Takes in a numpy array that has been preprocessed
- No preprocessing is nededed for YOLO
- """
- resized_up = cv2.resize(image, self.POSTPROCESS_OUTPUT_SHAPE)
-
- return resized_up
-
- def initialise_model(self, weights_path):
- """
- Initialize the YOLO model with the given weights.
- Args:
- weights_path (str): The file path to the YOLO weights.
- Raises:
- FileNotFoundError: If the weights file does not exist at the specified path.
- Returns:
- None
- """
-
- if not os.path.exists(weights_path):
- print(f"weights file not found at {weights_path}")
- raise FileNotFoundError(f"weights file not found at {weights_path}")
-
- self.yolo = YOLO(weights_path)
-
- def inference(self, image_array: np.ndarray) -> Results:
- # https://docs.ultralytics.com/modes/predict/#inference-arguments
- """
- Perform inference on the provided image array using the YOLO model.
-
- Args:
- image_array (np.ndarray): The input image array on which inference is to be performed.
-
- Returns:
- tuple: A tuple containing:
- - out_img (np.ndarray): The output image with bounding boxes drawn.
- - confidences (list): A list of confidence scores for each detected object.
- - boxes (list): A list of bounding boxes in the format [x1, y1, x2, y2] normalized from 0 to 1.
- """
- if self.yolo is None:
- raise Exception("Model was not initialised, inference cannot be done")
- height, width, _ = image_array.shape
- size = max(height, width)
- results =self.yolo.predict(image_array,imgsz=size) #we are only predicting one image
-
-
- if results[0] is not None: # todo: change this so the message only sends bb
- result = results[0]
- out_img = cv2.cvtColor(result.plot(), cv2.COLOR_BGR2RGB) # Convert BGR to RGB
-
- # print(f"Bounding boxes (xyxyn format): {result.boxes.xyxyn}")
- boxes = np.array(result.boxes.xyxyn.tolist()).reshape(-1)
- confidences = np.array(result.boxes.conf.tolist()).reshape(-1)
- return out_img, list(confidences), list(boxes)
-
- return image_array, [], []
-
- def postprocess(self, confidence, bbox_array,raw_image: np.ndarray, velocity=0):
- """
- Postprocesses the bounding boxes to convert them from normalized coordinates (xyxyn) to pixel coordinates (xyxy).
- Applies color segmentation to refine the bounding boxes and adjusts them to fit within a shifted region of interest (ROI).
-
- Args:
- confidence (list): List of confidence scores for each bounding box.
- bbox_array (list): List of bounding boxes in normalized coordinates (xyxyn).
- raw_image (np.ndarray): The original input image.
- velocity (int, optional): The velocity to shift the ROI. Default is 0.
-
- Returns:
- list: A list of refined bounding boxes in pixel coordinates (xyxy).
- """
- velocity = int(velocity)
- detections = []
- if len(bbox_array) > 0 and type(bbox_array[0]) != tuple: #CONVERT TO TUPLE
- for offset in range(int(len(bbox_array) / 4)):
- x1 = int(bbox_array[offset * 4] * raw_image.shape[1])
- y1 = int(bbox_array[offset * 4 + 1] * raw_image.shape[0])
- x2 = int(bbox_array[offset * 4 + 2] * raw_image.shape[1])
- y2 = int(bbox_array[offset * 4 + 3] * raw_image.shape[0])
- detections.append((x1, y1, x2, y2))
- else:
- for bbox in bbox_array: #resize so it first the original image
- x1,y1,x2,y2 = bbox
-
- x1 = int(x1 * raw_image.shape[1])
- y1 = int(y1 * raw_image.shape[0])
- x2 = int(x2 * raw_image.shape[1])
- y2 = int(y2 * raw_image.shape[0])
-
- detections.append((x1, y1, x2, y2))
-
-
- detections = self.object_filter(raw_image, detections) #color segmentation
- detections = self.verify_object(raw_image, detections,velocity)
-
- return detections
-
-
- # creates a color segmentation mask and filters small areas within bounding box frame
- def object_filter(self, image, bboxes):
- """
- Uses Color segmentation to create better boxes
- Filters objects in an image based on bounding boxes and color thresholds.
- Args:
- image (numpy.ndarray): The input image in which objects are to be filtered.
- bboxes (list of tuples): A list of bounding boxes, where each bounding box is represented
- as a tuple of four integers (x1, y1, x2, y2).
- Returns:
- list of tuples: A list of filtered bounding boxes, where each bounding box is represented
- as a tuple of four integers (x1, y1, x2, y2) corresponding to the coordinates
- of the top-left and bottom-right corners.
- """
-
- detections = []
- for bbox in bboxes:
- x1, y1, x2, y2 = map(int, bbox)
-
- roi = image[y1:y2, x1:x2] # Extract Region of interest (bounding box)
- bbox_offset_y = y1
- bbox_offset_x = x1
-
- roi = image[y1:y2,x1:x2,:] # Extract Region of interest (bounding box)
-
- hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) #convrt opencv bgr to hsv
-
- lower_mask = hsv[:,:,0] > 35 #refer to hue channel (in the colorbar)
- upper_mask = hsv[:,:,0] < 80 #refer to transparency channel (in the colorbar)
- saturation_mask = hsv[:,:,1] >50
-
- mask = upper_mask*lower_mask*saturation_mask
- blue = roi[:,:,0]*mask
- green = roi[:,:,1]*mask
- red = roi[:,:,2]*mask
-
- masked_brg = np.dstack((blue,green,red))
- gray_image = cv2.cvtColor(masked_brg, cv2.COLOR_BGR2GRAY)
-
-
- ret, thresh = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY)
-
- contours, hiearchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
-
-
- for cnt in contours:
- area = cv2.contourArea(cnt)
- if area > self.CONTOUR_AREA_THRESHOLD:
- print(area)
- x, y, w, h = cv2.boundingRect(cnt)
- detections.append((x + bbox_offset_x, y + bbox_offset_y, x + w + bbox_offset_x, y + h + bbox_offset_y))
-
- return detections
-
-
- def verify_object(self, raw_image, bboxes, velocity=0):
- """
- Adjusts bounding boxes based on the region of interest (ROI) and velocity.
- This function takes an image, a list of bounding boxes, and an optional velocity parameter.
- It adjusts the bounding boxes to ensure they are within the shifted ROI
- Parameters:
- raw_image (numpy.ndarray): The input image.
- bboxes (list of list of int): A list of bounding boxes, where each bounding box is represented
- as [x1, y1, x2, y2].
- velocity (int, optional): The velocity to shift the ROI. Default is 0.
- Returns:
- list of list of int: A list of adjusted bounding boxes that are within the shifted ROI.
- """
-
- velocity = int(velocity)
- roi_x1,roi_y1,roi_x2,roi_y2 = self.get_roi_coordinates(image=raw_image)
- shifted_roi_x1 = roi_x1 - velocity
- shifted_roi_y1 = roi_y1
- shifted_roi_x2 = roi_x2 - velocity
- shifted_roi_y2 = roi_y2
-
- width = raw_image.shape[1]
- height = raw_image.shape[0]
-
- adjusted_bboxes = []
- for bbox in bboxes:
- x1, y1, x2, y2 = bbox
- # # # Ensure the bounding box doesn't exceed the original image dimensions
- # x1 = max(0, min(x1, width))
- # x2 = max(0, min(x2, width))
- # y1 = max(0, min(y1, height))
- # y2 = max(0, min(y2, height))
-
- if ((x1 < shifted_roi_x1 and x2 < shifted_roi_x1) #remove boxes that are out of point
- or (x1 > shifted_roi_x2 and x2 > shifted_roi_x2)
- or (y1 < shifted_roi_y1 and y2 < shifted_roi_y1)
- or (y1 > shifted_roi_y2 and y2 > shifted_roi_y2)
- ):
- pass
- else:
- x1 = max(shifted_roi_x1, min(x1, shifted_roi_x2)) #set the coordinates to be inside roi
- x2 = max(shifted_roi_x1, min(x2, shifted_roi_x2))
- y1 = max(shifted_roi_y1, min(y1, shifted_roi_y2))
- y2 = max(shifted_roi_y1, min(y2, shifted_roi_y2))
-
- adjusted_bboxes.append([x1, y1, x2, y2])
-
- return adjusted_bboxes
-
-
- def draw_boxes(self, image: np.ndarray, bboxes: list, with_roi =True, with_roi_shift = True, velocity = 0) -> np.ndarray:
- """
- Given array of bounding box tuples and an image, draw the bouding boxes into the image.
- If with_roi and with_roi shift is set to true, the ROI areas will also be drawn in.
- velocity must be set if roi_shift is True
-
- Parameters:
- image (np.ndarray): The image on which to draw the bounding boxes.
- bboxes (list): A list of bounding boxes, where each bounding box is represented as a list of four values [x1, y1, x2, y2].
-
- Returns:
- np.ndarray: The image with bounding boxes drawn.
- """
- velocity = int(velocity)
-
- if with_roi:
- x1,y1,x2,y2 = self.get_roi_coordinates(image)
- overlay = image.copy()
- cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 128, 255), -1)
- alpha = 0.3 # Transparency factor.
- image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
- cv2.putText(image, 'Original ROI', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2)
-
- if with_roi_shift and velocity != 0:
- x1_shifted = x1 - velocity if x1 > velocity else 0
- x2_shifted = x2 - velocity if x2 > velocity else 0
- image = cv2.rectangle(image, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), 2)
- overlay = image.copy()
- cv2.rectangle(overlay, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), -1)
- alpha = 0.5 # Transparency factor.
- image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
- cv2.putText(image, 'Shifted ROI', (x1_shifted, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2)
-
-
-
- color = tuple(np.random.randint(0, 256, 3).tolist()) # Generate a random color
- for bbox in bboxes:
- x1, y1, x2, y2 = map(int, bbox)
-
- print(f"Bounding box: ({x1}, {y1}), ({x2}, {y2})")
-
- image = cv2.rectangle(image, (x1, y1), (x2, y2),(255, 0, 0), 2)
-
-
-
- return image
-
- def get_roi_coordinates(self, image:np.ndarray):
- """
- Calculate the coordinates of the Region of Interest (ROI) in the given image.
- Args:
- image (np.ndarray): The input image as a NumPy array.
- Returns:
- list: A list containing the coordinates of the top-left and bottom-right corners of the ROI
- in the format [top_left_x, top_left_y, bottom_right_x, bottom_right_y].
- """
- rows = image.shape[0]
- cols = image.shape[1]
-
- top_left_y_coord = (int(rows/2) - int(self.ROI_ROWS/2))
- top_left_x_coord = (int(cols/2) - int(self.ROI_COLS/2))
-
- bottom_right_y = top_left_y_coord + self.ROI_COLS
- bottom_right_x = top_left_x_coord + self.ROI_ROWS
-
- if cols < self.ROI_COLS:
- bottom_right_x = cols
- top_left_x_coord = 0
-
- if rows < self.ROI_ROWS:
- bottom_right_y = rows
- top_left_y_coord = 0
-
-
- return [top_left_x_coord,top_left_y_coord,bottom_right_x,bottom_right_y]
-
- def print_info(self):
+import cv2
+import os
+import numpy as np
+import torch
+from ultralytics import YOLO
+from ultralytics.engine.results import Results
+
+#utility class for the python workspace
+class ModelInference:
+ ROI_ROWS = 300
+ ROI_COLS = 300
+ CONTOUR_AREA_THRESHOLD = 500
+ POSTPROCESS_OUTPUT_SHAPE = (640, 640)
+
+ """
+ class that performs inference using a model
+ """
+ def __init__(self, weights_path=None, precision=None):
+ if weights_path is None or precision is None:
+ self.yolo = None
+ return
+ # Initialize instance variables
+ else:
+ self.precision = precision
+ if not os.path.exists(weights_path):
+ print(f"weights file not found at {weights_path}")
+ raise FileNotFoundError(f"weights file not found at {weights_path}")
+
+ self.yolo = YOLO(weights_path)
+
+ def preprocess(self, image: np.ndarray):
+ """
+ Takes in a numpy array that has been preprocessed
+ No preprocessing is nededed for YOLO
+ """
+ resized_up = cv2.resize(image, self.POSTPROCESS_OUTPUT_SHAPE)
+
+ return resized_up
+
+ def initialise_model(self, weights_path):
+ """
+ Initialize the YOLO model with the given weights.
+ Args:
+ weights_path (str): The file path to the YOLO weights.
+ Raises:
+ FileNotFoundError: If the weights file does not exist at the specified path.
+ Returns:
+ None
+ """
+
+ if not os.path.exists(weights_path):
+ print(f"weights file not found at {weights_path}")
+ raise FileNotFoundError(f"weights file not found at {weights_path}")
+
+ self.yolo = YOLO(weights_path)
+
+ def inference(self, image_array: np.ndarray) -> Results:
+ # https://docs.ultralytics.com/modes/predict/#inference-arguments
+ """
+ Perform inference on the provided image array using the YOLO model.
+
+ Args:
+ image_array (np.ndarray): The input image array on which inference is to be performed.
+
+ Returns:
+ tuple: A tuple containing:
+ - out_img (np.ndarray): The output image with bounding boxes drawn.
+ - confidences (list): A list of confidence scores for each detected object.
+ - boxes (list): A list of bounding boxes in the format [x1, y1, x2, y2] normalized from 0 to 1.
+ """
+ if self.yolo is None:
+ raise Exception("Model was not initialised, inference cannot be done")
+ height, width, _ = image_array.shape
+ size = max(height, width)
+ results =self.yolo.predict(image_array,imgsz=size) #we are only predicting one image
+
+
+ if results[0] is not None: # todo: change this so the message only sends bb
+ result = results[0]
+ out_img = cv2.cvtColor(result.plot(), cv2.COLOR_BGR2RGB) # Convert BGR to RGB
+
+ # print(f"Bounding boxes (xyxyn format): {result.boxes.xyxyn}")
+ boxes = np.array(result.boxes.xyxyn.tolist()).reshape(-1)
+ confidences = np.array(result.boxes.conf.tolist()).reshape(-1)
+ return out_img, list(confidences), list(boxes)
+
+ return image_array, [], []
+
+ def postprocess(self, confidence, bbox_array,raw_image: np.ndarray, velocity=0):
+ """
+ Postprocesses the bounding boxes to convert them from normalized coordinates (xyxyn) to pixel coordinates (xyxy).
+ Applies color segmentation to refine the bounding boxes and adjusts them to fit within a shifted region of interest (ROI).
+
+ Args:
+ confidence (list): List of confidence scores for each bounding box.
+ bbox_array (list): List of bounding boxes in normalized coordinates (xyxyn).
+ raw_image (np.ndarray): The original input image.
+ velocity (int, optional): The velocity to shift the ROI. Default is 0.
+
+ Returns:
+ list: A list of refined bounding boxes in pixel coordinates (xyxy).
+ """
+ velocity = int(velocity)
+ detections = []
+ if len(bbox_array) > 0 and type(bbox_array[0]) != tuple: #CONVERT TO TUPLE
+ for offset in range(int(len(bbox_array) / 4)):
+ x1 = int(bbox_array[offset * 4] * raw_image.shape[1])
+ y1 = int(bbox_array[offset * 4 + 1] * raw_image.shape[0])
+ x2 = int(bbox_array[offset * 4 + 2] * raw_image.shape[1])
+ y2 = int(bbox_array[offset * 4 + 3] * raw_image.shape[0])
+ detections.append((x1, y1, x2, y2))
+ else:
+ for bbox in bbox_array: #resize so it first the original image
+ x1,y1,x2,y2 = bbox
+
+ x1 = int(x1 * raw_image.shape[1])
+ y1 = int(y1 * raw_image.shape[0])
+ x2 = int(x2 * raw_image.shape[1])
+ y2 = int(y2 * raw_image.shape[0])
+
+ detections.append((x1, y1, x2, y2))
+
+
+ detections = self.object_filter(raw_image, detections) #color segmentation
+ detections = self.verify_object(raw_image, detections,velocity)
+
+ return detections
+
+
+ # creates a color segmentation mask and filters small areas within bounding box frame
+ def object_filter(self, image, bboxes):
+ """
+ Uses Color segmentation to create better boxes
+ Filters objects in an image based on bounding boxes and color thresholds.
+ Args:
+ image (numpy.ndarray): The input image in which objects are to be filtered.
+ bboxes (list of tuples): A list of bounding boxes, where each bounding box is represented
+ as a tuple of four integers (x1, y1, x2, y2).
+ Returns:
+ list of tuples: A list of filtered bounding boxes, where each bounding box is represented
+ as a tuple of four integers (x1, y1, x2, y2) corresponding to the coordinates
+ of the top-left and bottom-right corners.
+ """
+
+ detections = []
+ for bbox in bboxes:
+ x1, y1, x2, y2 = map(int, bbox)
+
+ roi = image[y1:y2, x1:x2] # Extract Region of interest (bounding box)
+ bbox_offset_y = y1
+ bbox_offset_x = x1
+
+ roi = image[y1:y2,x1:x2,:] # Extract Region of interest (bounding box)
+
+ hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) #convrt opencv bgr to hsv
+
+ lower_mask = hsv[:,:,0] > 35 #refer to hue channel (in the colorbar)
+ upper_mask = hsv[:,:,0] < 80 #refer to transparency channel (in the colorbar)
+ saturation_mask = hsv[:,:,1] >50
+
+ mask = upper_mask*lower_mask*saturation_mask
+ blue = roi[:,:,0]*mask
+ green = roi[:,:,1]*mask
+ red = roi[:,:,2]*mask
+
+ masked_brg = np.dstack((blue,green,red))
+ gray_image = cv2.cvtColor(masked_brg, cv2.COLOR_BGR2GRAY)
+
+
+ ret, thresh = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY)
+
+ contours, hiearchy = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
+
+
+ for cnt in contours:
+ area = cv2.contourArea(cnt)
+ if area > self.CONTOUR_AREA_THRESHOLD:
+ print(area)
+ x, y, w, h = cv2.boundingRect(cnt)
+ detections.append((x + bbox_offset_x, y + bbox_offset_y, x + w + bbox_offset_x, y + h + bbox_offset_y))
+
+ return detections
+
+
+ def verify_object(self, raw_image, bboxes, velocity=0):
+ """
+ Adjusts bounding boxes based on the region of interest (ROI) and velocity.
+ This function takes an image, a list of bounding boxes, and an optional velocity parameter.
+ It adjusts the bounding boxes to ensure they are within the shifted ROI
+ Parameters:
+ raw_image (numpy.ndarray): The input image.
+ bboxes (list of list of int): A list of bounding boxes, where each bounding box is represented
+ as [x1, y1, x2, y2].
+ velocity (int, optional): The velocity to shift the ROI. Default is 0.
+ Returns:
+ list of list of int: A list of adjusted bounding boxes that are within the shifted ROI.
+ """
+
+ velocity = int(velocity)
+ roi_x1,roi_y1,roi_x2,roi_y2 = self.get_roi_coordinates(image=raw_image)
+ shifted_roi_x1 = roi_x1 - velocity
+ shifted_roi_y1 = roi_y1
+ shifted_roi_x2 = roi_x2 - velocity
+ shifted_roi_y2 = roi_y2
+
+ width = raw_image.shape[1]
+ height = raw_image.shape[0]
+
+ adjusted_bboxes = []
+ for bbox in bboxes:
+ x1, y1, x2, y2 = bbox
+ # # # Ensure the bounding box doesn't exceed the original image dimensions
+ # x1 = max(0, min(x1, width))
+ # x2 = max(0, min(x2, width))
+ # y1 = max(0, min(y1, height))
+ # y2 = max(0, min(y2, height))
+
+ if ((x1 < shifted_roi_x1 and x2 < shifted_roi_x1) #remove boxes that are out of point
+ or (x1 > shifted_roi_x2 and x2 > shifted_roi_x2)
+ or (y1 < shifted_roi_y1 and y2 < shifted_roi_y1)
+ or (y1 > shifted_roi_y2 and y2 > shifted_roi_y2)
+ ):
+ pass
+ else:
+ x1 = max(shifted_roi_x1, min(x1, shifted_roi_x2)) #set the coordinates to be inside roi
+ x2 = max(shifted_roi_x1, min(x2, shifted_roi_x2))
+ y1 = max(shifted_roi_y1, min(y1, shifted_roi_y2))
+ y2 = max(shifted_roi_y1, min(y2, shifted_roi_y2))
+
+ adjusted_bboxes.append([x1, y1, x2, y2])
+
+ return adjusted_bboxes
+
+
+ def draw_boxes(self, image: np.ndarray, bboxes: list, with_roi =True, with_roi_shift = True, velocity = 0) -> np.ndarray:
+ """
+ Given array of bounding box tuples and an image, draw the bouding boxes into the image.
+ If with_roi and with_roi shift is set to true, the ROI areas will also be drawn in.
+ velocity must be set if roi_shift is True
+
+ Parameters:
+ image (np.ndarray): The image on which to draw the bounding boxes.
+ bboxes (list): A list of bounding boxes, where each bounding box is represented as a list of four values [x1, y1, x2, y2].
+
+ Returns:
+ np.ndarray: The image with bounding boxes drawn.
+ """
+ velocity = int(velocity)
+
+ if with_roi:
+ x1,y1,x2,y2 = self.get_roi_coordinates(image)
+ overlay = image.copy()
+ cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 128, 255), -1)
+ alpha = 0.3 # Transparency factor.
+ image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
+ cv2.putText(image, 'Original ROI', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2)
+
+ if with_roi_shift and velocity != 0:
+ x1_shifted = x1 - velocity if x1 > velocity else 0
+ x2_shifted = x2 - velocity if x2 > velocity else 0
+ image = cv2.rectangle(image, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), 2)
+ overlay = image.copy()
+ cv2.rectangle(overlay, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), -1)
+ alpha = 0.5 # Transparency factor.
+ image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0)
+ cv2.putText(image, 'Shifted ROI', (x1_shifted, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2)
+
+
+
+ color = tuple(np.random.randint(0, 256, 3).tolist()) # Generate a random color
+ for bbox in bboxes:
+ x1, y1, x2, y2 = map(int, bbox)
+
+ print(f"Bounding box: ({x1}, {y1}), ({x2}, {y2})")
+
+ image = cv2.rectangle(image, (x1, y1), (x2, y2),(255, 0, 0), 2)
+
+
+
+ return image
+
+ def get_roi_coordinates(self, image:np.ndarray):
+ """
+ Calculate the coordinates of the Region of Interest (ROI) in the given image.
+ Args:
+ image (np.ndarray): The input image as a NumPy array.
+ Returns:
+ list: A list containing the coordinates of the top-left and bottom-right corners of the ROI
+ in the format [top_left_x, top_left_y, bottom_right_x, bottom_right_y].
+ """
+ rows = image.shape[0]
+ cols = image.shape[1]
+
+ top_left_y_coord = (int(rows/2) - int(self.ROI_ROWS/2))
+ top_left_x_coord = (int(cols/2) - int(self.ROI_COLS/2))
+
+ bottom_right_y = top_left_y_coord + self.ROI_COLS
+ bottom_right_x = top_left_x_coord + self.ROI_ROWS
+
+ if cols < self.ROI_COLS:
+ bottom_right_x = cols
+ top_left_x_coord = 0
+
+ if rows < self.ROI_ROWS:
+ bottom_right_y = rows
+ top_left_y_coord = 0
+
+
+ return [top_left_x_coord,top_left_y_coord,bottom_right_x,bottom_right_y]
+
+ def print_info(self):
print(self.yolo.info())
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/python_workspace/video_node.py b/ros2_ws/src/python_package/python_package/video_node.py
similarity index 97%
rename from ros2_ws/src/python_workspace/python_workspace/video_node.py
rename to ros2_ws/src/python_package/python_package/video_node.py
index ebbd809..b34ac8f 100644
--- a/ros2_ws/src/python_workspace/python_workspace/video_node.py
+++ b/ros2_ws/src/python_package/python_package/video_node.py
@@ -1,148 +1,148 @@
-import time, os
-import cv2
-import pycuda.driver as cuda
-import cupy as cp
-import queue
-
-import rclpy
-from rclpy.time import Time
-from rclpy.node import Node
-from rclpy.executors import MultiThreadedExecutor
-from sensor_msgs.msg import Image
-from std_msgs.msg import Header, String
-from cv_bridge import CvBridge
-
-class VideoNode(Node):
- def __init__(self):
- super().__init__('video_node')
-
- cuda.init()
- device = cuda.Device(0)
- self.cuda_driver_context = device.make_context()
- self.stream = cuda.Stream()
-
- self.declare_parameter('video_path', '/home/usr/Desktop/ROS/assets/video.mp4')
- self.declare_parameter('loop', 0) # 0 = don't loop, >0 = # of loops, -1 = loop forever
- self.declare_parameter('frame_rate', 30) # Desired frame rate for publishing
- self.declare_parameter('model_dimensions', (448, 1024))
- self.declare_parameter('shift_constant', 1)
- self.declare_parameter('roi_dimensions', [0, 0, 100, 100])
- self.declare_paramter('precision', 'fp32')
-
- self.video_path = self.get_parameter('video_path').get_parameter_value().string_value
- self.loop = self.get_parameter('loop').get_parameter_value().integer_value
- self.frame_rate = self.get_parameter('frame_rate').get_parameter_value().integer_value
- self.dimensions = tuple(self.get_parameter('model_dimensions').get_parameter_value().integer_array_value)
- self.shift_constant = self.get_parameter('shift_constant').get_parameter_value().integer_value
- self.roi_dimensions = self.get_parameter('roi_dimensions').get_parameter_value().integer_array_value
- self.precision = self.get_parameter('precision').get_parameter_value().string_value
-
- self.pointer_publisher = self.create_publisher(String, 'preprocessing_done', 10)
- self.image_service = self.create_service(Image, 'image_data', self.image_callback)
-
- self.velocity = [0, 0, 0]
- self.image_queue = queue.Queue()
- self.bridge = CvBridge()
- self.publish_video_frames()
- # propagate fp16 option (.fp16 datatype for cupy)
- if self.precision == 'fp32':
- pass
- elif self.precision == 'fp16':
- pass
- else:
- self.get_logger().error(f"Invalid precision: {self.precision}")
-
- def image_callback(self, response):
- self.get_logger().info("Received image request")
- if not self.image_queue.empty():
- image_data = self.image_queue.get() # Get the image from the queue
- cv_image, velocity = image_data # unpack tuple (image, velocity)
-
- # Convert OpenCV image to ROS2 Image message using cv_bridge
- ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='rgb8')
- # Create a new header and include velocity in the stamp fields
- header = Header()
- current_time = self.get_clock().now().to_msg()
- header.stamp = current_time # Set timestamp to current time
- header.frame_id = str(velocity) # Set frame ID to velocity
-
- ros_image.header = header # Attach the header to the ROS image message
- response.image = ros_image # Set the response's image
- response.image = ros_image # Set the response's image
- return response
-
- else:
- self.get_logger().error("Image queue is empty")
- return response
-
- def publish_video_frames(self):
- if not os.path.exists(self.video_path):
- self.get_logger().error(f"Video file not found at {self.video_path}")
- return
-
- cap = cv2.VideoCapture(self.video_path)
- if not cap.isOpened():
- self.get_logger().error(f"Failed to open video: {self.video_path}")
- return
-
- loops = 0
- while rclpy.ok() and (self.loop == -1 or loops < self.loop):
- while cap.isOpened() and rclpy.ok():
- ret, frame = cap.read()
- if not ret:
- break
- self.image_queue.put((frame, self.velocity[0]))
- self.preprocess_image(frame)
- time.sleep(1.0 / self.frame_rate)
-
- if self.loop > 0:
- loops += 1
-
- if self.loop != -1:
- cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # restart video
- cap.release()
-
- def preprocess_image(self, image):
- tic = time.perf_counter_ns()
-
- roi_x, roi_y, roi_w, roi_h = self.roi_dimensions
- shifted_x = roi_x + abs(self.velocity[0]) * self.shift_constant
-
- gpu_image = cv2.cuda_GpuMat()
- gpu_image.upload(image)
-
- gpu_image = cv2.cuda.cvtColor(gpu_image, cv2.COLOR_RGBA2RGB) # remove alpha channel
- gpu_image = gpu_image[roi_y:(roi_y+roi_h), shifted_x:(shifted_x+roi_w)] # crop the image to ROI
- gpu_image = cv2.cuda.resize(gpu_image, self.dimensions) # resize to model dimensions
-
- input_data = cp.asarray(gpu_image) # Now the image is on GPU memory as CuPy array
- input_data = input_data.astype(cp.float32) / 255.0 # normalize to [0, 1]
- input_data = cp.transpose(input_data, (2, 0, 1)) # Transpose from HWC (height, width, channels) to CHW (channels, height, width)
- input_data = cp.ravel(input_data) # flatten the array
-
- d_input_ptr = input_data.data.ptr # Get device pointer of the CuPy array
- ipc_handle = cuda.mem_get_ipc_handle(d_input_ptr) # Create the IPC handle
-
- toc = time.perf_counter_ns()
- self.get_logger().info(f"Preprocessing: {(toc-tic)/1e6} ms")
-
- # Publish the IPC handle as a string (sending the handle reference as a string)
- ipc_handle_msg = String()
- ipc_handle_msg.data = str(ipc_handle.handle)
- self.pointer_publisher.publish(ipc_handle_msg)
-
-def main(args=None):
- rclpy.init(args=args)
- video_node = VideoNode()
- executor = MultiThreadedExecutor(num_threads=1)
- executor.add_node(video_node)
-
- try:
- executor.spin()
- finally:
- executor.shutdown()
- video_node.destroy_node()
- rclpy.shutdown()
-
-if __name__ == '__main__':
+import time, os
+import cv2
+import pycuda.driver as cuda
+import cupy as cp
+import queue
+
+import rclpy
+from rclpy.time import Time
+from rclpy.node import Node
+from rclpy.executors import MultiThreadedExecutor
+from sensor_msgs.msg import Image
+from std_msgs.msg import Header, String
+from cv_bridge import CvBridge
+
+class VideoNode(Node):
+ def __init__(self):
+ super().__init__('video_node')
+
+ cuda.init()
+ device = cuda.Device(0)
+ self.cuda_driver_context = device.make_context()
+ self.stream = cuda.Stream()
+
+ self.declare_parameter('video_path', '/home/usr/Desktop/ROS/assets/video.mp4')
+ self.declare_parameter('loop', 0) # 0 = don't loop, >0 = # of loops, -1 = loop forever
+ self.declare_parameter('frame_rate', 30) # Desired frame rate for publishing
+ self.declare_parameter('model_dimensions', (448, 1024))
+ self.declare_parameter('shift_constant', 1)
+ self.declare_parameter('roi_dimensions', [0, 0, 100, 100])
+ self.declare_paramter('precision', 'fp32')
+
+ self.video_path = self.get_parameter('video_path').get_parameter_value().string_value
+ self.loop = self.get_parameter('loop').get_parameter_value().integer_value
+ self.frame_rate = self.get_parameter('frame_rate').get_parameter_value().integer_value
+ self.dimensions = tuple(self.get_parameter('model_dimensions').get_parameter_value().integer_array_value)
+ self.shift_constant = self.get_parameter('shift_constant').get_parameter_value().integer_value
+ self.roi_dimensions = self.get_parameter('roi_dimensions').get_parameter_value().integer_array_value
+ self.precision = self.get_parameter('precision').get_parameter_value().string_value
+
+ self.pointer_publisher = self.create_publisher(String, 'preprocessing_done', 10)
+ self.image_service = self.create_service(Image, 'image_data', self.image_callback)
+
+ self.velocity = [0, 0, 0]
+ self.image_queue = queue.Queue()
+ self.bridge = CvBridge()
+ self.publish_video_frames()
+ # propagate fp16 option (.fp16 datatype for cupy)
+ if self.precision == 'fp32':
+ pass
+ elif self.precision == 'fp16':
+ pass
+ else:
+ self.get_logger().error(f"Invalid precision: {self.precision}")
+
+ def image_callback(self, response):
+ self.get_logger().info("Received image request")
+ if not self.image_queue.empty():
+ image_data = self.image_queue.get() # Get the image from the queue
+ cv_image, velocity = image_data # unpack tuple (image, velocity)
+
+ # Convert OpenCV image to ROS2 Image message using cv_bridge
+ ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='rgb8')
+ # Create a new header and include velocity in the stamp fields
+ header = Header()
+ current_time = self.get_clock().now().to_msg()
+ header.stamp = current_time # Set timestamp to current time
+ header.frame_id = str(velocity) # Set frame ID to velocity
+
+ ros_image.header = header # Attach the header to the ROS image message
+ response.image = ros_image # Set the response's image
+ response.image = ros_image # Set the response's image
+ return response
+
+ else:
+ self.get_logger().error("Image queue is empty")
+ return response
+
+ def publish_video_frames(self):
+ if not os.path.exists(self.video_path):
+ self.get_logger().error(f"Video file not found at {self.video_path}")
+ return
+
+ cap = cv2.VideoCapture(self.video_path)
+ if not cap.isOpened():
+ self.get_logger().error(f"Failed to open video: {self.video_path}")
+ return
+
+ loops = 0
+ while rclpy.ok() and (self.loop == -1 or loops < self.loop):
+ while cap.isOpened() and rclpy.ok():
+ ret, frame = cap.read()
+ if not ret:
+ break
+ self.image_queue.put((frame, self.velocity[0]))
+ self.preprocess_image(frame)
+ time.sleep(1.0 / self.frame_rate)
+
+ if self.loop > 0:
+ loops += 1
+
+ if self.loop != -1:
+ cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # restart video
+ cap.release()
+
+ def preprocess_image(self, image):
+ tic = time.perf_counter_ns()
+
+ roi_x, roi_y, roi_w, roi_h = self.roi_dimensions
+ shifted_x = roi_x + abs(self.velocity[0]) * self.shift_constant
+
+ gpu_image = cv2.cuda_GpuMat()
+ gpu_image.upload(image)
+
+ gpu_image = cv2.cuda.cvtColor(gpu_image, cv2.COLOR_RGBA2RGB) # remove alpha channel
+ gpu_image = gpu_image[roi_y:(roi_y+roi_h), shifted_x:(shifted_x+roi_w)] # crop the image to ROI
+ gpu_image = cv2.cuda.resize(gpu_image, self.dimensions) # resize to model dimensions
+
+ input_data = cp.asarray(gpu_image) # Now the image is on GPU memory as CuPy array
+ input_data = input_data.astype(cp.float32) / 255.0 # normalize to [0, 1]
+ input_data = cp.transpose(input_data, (2, 0, 1)) # Transpose from HWC (height, width, channels) to CHW (channels, height, width)
+ input_data = cp.ravel(input_data) # flatten the array
+
+ d_input_ptr = input_data.data.ptr # Get device pointer of the CuPy array
+ ipc_handle = cuda.mem_get_ipc_handle(d_input_ptr) # Create the IPC handle
+
+ toc = time.perf_counter_ns()
+ self.get_logger().info(f"Preprocessing: {(toc-tic)/1e6} ms")
+
+ # Publish the IPC handle as a string (sending the handle reference as a string)
+ ipc_handle_msg = String()
+ ipc_handle_msg.data = str(ipc_handle.handle)
+ self.pointer_publisher.publish(ipc_handle_msg)
+
+def main(args=None):
+ rclpy.init(args=args)
+ video_node = VideoNode()
+ executor = MultiThreadedExecutor(num_threads=1)
+ executor.add_node(video_node)
+
+ try:
+ executor.spin()
+ finally:
+ executor.shutdown()
+ video_node.destroy_node()
+ rclpy.shutdown()
+
+if __name__ == '__main__':
main()
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/resource/python_workspace b/ros2_ws/src/python_package/resource/python_package
similarity index 100%
rename from ros2_ws/src/python_workspace/resource/python_workspace
rename to ros2_ws/src/python_package/resource/python_package
diff --git a/ros2_ws/src/python_package/setup.cfg b/ros2_ws/src/python_package/setup.cfg
new file mode 100644
index 0000000..e91952c
--- /dev/null
+++ b/ros2_ws/src/python_package/setup.cfg
@@ -0,0 +1,4 @@
+[develop]
+script_dir=$base/lib/python_package
+[install]
+install_scripts=$base/lib/python_package
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/setup.py b/ros2_ws/src/python_package/setup.py
similarity index 63%
rename from ros2_ws/src/python_workspace/setup.py
rename to ros2_ws/src/python_package/setup.py
index ab8ba93..2919697 100644
--- a/ros2_ws/src/python_workspace/setup.py
+++ b/ros2_ws/src/python_package/setup.py
@@ -1,33 +1,33 @@
-import os
-from glob import glob
-from setuptools import setup
-
-package_name = 'python_workspace'
-subfolder = f'{package_name}/scripts'
-
-setup(
- name=package_name,
- version='0.0.0',
- packages=[package_name,subfolder],
- data_files=[
- ('share/ament_index/resource_index/packages',
- ['resource/' + package_name]),
- (os.path.join('share', package_name), ['package.xml']),
- (os.path.join('share', package_name, 'launch'), glob(os.path.join('launch', '*.launch.py'))),
- ],
- zip_safe=True,
- maintainer='ishaan_datta',
- maintainer_email='ishaandatta737@gmail.com',
- description='Experimental ROS 2 Python Package',
- license='Apache License 2.0',
- entry_points={
- 'console_scripts': [
- 'camera_node = python_workspace.camera_node:main',
- 'inference_node = python_workspace.inference_node:main',
- 'extermination_node = python_workspace.extermination_node:main',
- 'proxy_node = python_workspace.proxy_node:main',
- 'picture_node = python_workspace.picture_node:main' # remove later
- ],
- },
-
+import os
+from glob import glob
+from setuptools import setup
+
+package_name = 'python_package'
+subfolder = f'{package_name}/scripts'
+
+setup(
+ name=package_name,
+ version='0.0.0',
+ packages=[package_name,subfolder],
+ data_files=[
+ ('share/ament_index/resource_index/packages',
+ ['resource/' + package_name]),
+ (os.path.join('share', package_name), ['package.xml']),
+ (os.path.join('share', package_name, 'launch'), glob(os.path.join('launch', '*.launch.py'))),
+ ],
+ zip_safe=True,
+ maintainer='ishaan_datta',
+ maintainer_email='ishaandatta737@gmail.com',
+ description='Experimental ROS 2 Python Package',
+ license='Apache License 2.0',
+ entry_points={
+ 'console_scripts': [
+ 'camera_node = python_package.camera_node:main',
+ 'inference_node = python_package.inference_node:main',
+ 'extermination_node = python_package.extermination_node:main',
+ 'proxy_node = python_package.proxy_node:main',
+ 'picture_node = python_package.picture_node:main' # remove later
+ ],
+ },
+
)
\ No newline at end of file
diff --git a/ros2_ws/src/python_workspace/test/test_copyright.py b/ros2_ws/src/python_package/test/test_copyright.py
similarity index 97%
rename from ros2_ws/src/python_workspace/test/test_copyright.py
rename to ros2_ws/src/python_package/test/test_copyright.py
index 282981a..97a3919 100644
--- a/ros2_ws/src/python_workspace/test/test_copyright.py
+++ b/ros2_ws/src/python_package/test/test_copyright.py
@@ -1,25 +1,25 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_copyright.main import main
-import pytest
-
-
-# Remove the `skip` decorator once the source file(s) have a copyright header
-@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
-@pytest.mark.copyright
-@pytest.mark.linter
-def test_copyright():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found errors'
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_copyright.main import main
+import pytest
+
+
+# Remove the `skip` decorator once the source file(s) have a copyright header
+@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
+@pytest.mark.copyright
+@pytest.mark.linter
+def test_copyright():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found errors'
diff --git a/ros2_ws/src/python_workspace/test/test_flake8.py b/ros2_ws/src/python_package/test/test_flake8.py
similarity index 97%
rename from ros2_ws/src/python_workspace/test/test_flake8.py
rename to ros2_ws/src/python_package/test/test_flake8.py
index 625324e..27ee107 100644
--- a/ros2_ws/src/python_workspace/test/test_flake8.py
+++ b/ros2_ws/src/python_package/test/test_flake8.py
@@ -1,25 +1,25 @@
-# Copyright 2017 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_flake8.main import main_with_errors
-import pytest
-
-
-@pytest.mark.flake8
-@pytest.mark.linter
-def test_flake8():
- rc, errors = main_with_errors(argv=[])
- assert rc == 0, \
- 'Found %d code style errors / warnings:\n' % len(errors) + \
- '\n'.join(errors)
+# Copyright 2017 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_flake8.main import main_with_errors
+import pytest
+
+
+@pytest.mark.flake8
+@pytest.mark.linter
+def test_flake8():
+ rc, errors = main_with_errors(argv=[])
+ assert rc == 0, \
+ 'Found %d code style errors / warnings:\n' % len(errors) + \
+ '\n'.join(errors)
diff --git a/ros2_ws/src/python_workspace/test/test_pep257.py b/ros2_ws/src/python_package/test/test_pep257.py
similarity index 97%
rename from ros2_ws/src/python_workspace/test/test_pep257.py
rename to ros2_ws/src/python_package/test/test_pep257.py
index 4569717..b234a38 100644
--- a/ros2_ws/src/python_workspace/test/test_pep257.py
+++ b/ros2_ws/src/python_package/test/test_pep257.py
@@ -1,23 +1,23 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_pep257.main import main
-import pytest
-
-
-@pytest.mark.linter
-@pytest.mark.pep257
-def test_pep257():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found code style errors / warnings'
+# Copyright 2015 Open Source Robotics Foundation, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ament_pep257.main import main
+import pytest
+
+
+@pytest.mark.linter
+@pytest.mark.pep257
+def test_pep257():
+ rc = main(argv=['.', 'test'])
+ assert rc == 0, 'Found code style errors / warnings'
diff --git a/ros2_ws/src/python_workspace/setup.cfg b/ros2_ws/src/python_workspace/setup.cfg
deleted file mode 100644
index 45d0d2f..0000000
--- a/ros2_ws/src/python_workspace/setup.cfg
+++ /dev/null
@@ -1,4 +0,0 @@
-[develop]
-script_dir=$base/lib/python_workspace
-[install]
-install_scripts=$base/lib/python_workspace
\ No newline at end of file