Skip to content

Commit

Permalink
add implementation of Generating Action Sequences for Robot Task Plan…
Browse files Browse the repository at this point in the history
…s using LLM

Signed-off-by: Zeeland <[email protected]>

update code

Signed-off-by: Zeeland <[email protected]>

add implementation of Generating Action Sequences for Robot Task Plans using LLM

Signed-off-by: Zeeland <[email protected]>

add dockerfile

Signed-off-by: Zeeland <[email protected]>

remove some useless

Signed-off-by: Zeeland <[email protected]>

update code

Signed-off-by: Zeeland <[email protected]>

update code

Signed-off-by: Zeeland <[email protected]>

add proposal

Signed-off-by: Zeeland <[email protected]>

update code

Signed-off-by: Zeeland <[email protected]>
  • Loading branch information
Undertone0809 authored and Zeeland committed Oct 30, 2023
1 parent 124dcad commit 75595b6
Show file tree
Hide file tree
Showing 32 changed files with 7,702 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
.idea/*
.vscode
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
build/**
install/**
log/**
src/navigation
src/cartographer_code/cartographer
src/cartographer_code/cartographer_ros
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[submodule "src/navigation"]
path = src/navigation
url = https://github.com/fishros/navigation2.git
branch = main
[submodule "src/cartographer_code/cartographer"]
path = src/cartographer_code/cartographer
url = https://github.com/ros2/cartographer.git
[submodule "src/cartographer_code/cartographer_ros"]
path = src/cartographer_code/cartographer_ros
url = https://github.com/ros2/cartographer_ros.git
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM ros:humble

WORKDIR /ros2_ws

COPY . /ros2_ws/src

RUN apt-get update && apt-get install -y \
python3-pip \
python3-rosdep \
python3-vcstool \
&& rm -rf /var/lib/apt/lists/*


RUN rosdep init && rosdep update
RUN rosdep install --from-paths src -y

RUN colcon build

ENV ROS_DOMAIN_ID=0

CMD ["bash"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Generating Action Sequences for Robot Task Plans using LLM

## Environment

- Ubuntu 22.04
- ROS2 humble
- gpt-3.5-turbo
- gazebo

## Setup

**Clone the community/sig-robotics respository**

```shell
git clone https://github.com/kubeedge/community
```

**Install relational packages and build the project**

```shell
cd ./sig-robotics/examples/Generating-Action-Sequences-for-Robot-Task-Plans-using-LLM/
rosdep install --from-paths src -y

cd /src/bot_description/
pip3 install -r requirements.txt

cd ./sig-robotics/examples/Generating-Action-Sequences-for-Robot-Task-Plans-using-LLM/
colcon build
```

**Simulation in gazebo**

```shell
source install/setup.bash
ros2 launch bot_description gazebo.launch.py
ros2 run bot_description user_client
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
from typing import List
from collections import deque

from promptulate.llms import ChatOpenAI, BaseLLM
from promptulate.agents import BaseAgent
from promptulate.output_formatter import OutputFormatter
from promptulate.schema import MessageSet, SystemMessage, UserMessage, BaseMessage
from promptulate.utils.color_print import print_text

from bot_description.prompt import (
SYSTEM_PROMPT_TEMPLATE,
GENERATE_PLAN_SYSTEM_PROMPT,
ENVIRONMENT_SUMMARY_SYSTEM_PROMPT,
SECURITY_CHECK_SYSTEM_PROMPT,
)
from bot_description.schema import (
Task,
GeneratePlanResponse,
CommandResponse,
Operator,
Sensor,
)


class RobotController:
def __init__(self, operators: List[Operator]) -> None:
self.operators: List[Operator] = operators

if self.get_operator("stop") is None:
raise ValueError("Please provide stop operator")

def execute_action(self, action_name: str, *args, **kwargs) -> None:
"""Execute action by action_name."""
operator = self.get_operator(action_name)
print_text(f"[command] run {action_name} {str(args)} {str(kwargs)}", "green")
operator.execute_action(*args, **kwargs)

def get_operator(self, action_name: str) -> Operator:
return next(
(operator for operator in self.operators if operator.name == action_name),
None,
)


class RobotObserver:
def __init__(self, sensors: List[Sensor]) -> None:
self.sensors: List[Sensor] = sensors

def get_data(self, sensor_name: str) -> None:
sensor = self.get_sensor(sensor_name)
return sensor.get_data()

def get_sensor(self, sensor_name: str) -> Sensor:
return next(
(sensor for sensor in self.sensors if sensor.name == sensor_name),
None,
)

def show_all_data(self):
return "\n".join([sensor.get_data() for sensor in self.sensors])


class RobotAgent(BaseAgent):
def __init__(
self, controller: RobotController, observer: RobotObserver, *args, **kwargs
):
super().__init__(*args, **kwargs)
self.llm = ChatOpenAI(temperature=0.0)
self.robot_controller: RobotController = controller
self.robot_observer: RobotObserver = observer
self.user_demand: str = ""
self.pending_tasks: deque = deque()
self.completed_tasks: deque = deque()

def get_llm(self) -> BaseLLM:
return self.llm

def _run(self, prompt: str, *args, **kwargs) -> str:
"""Main loop for RobotAgent"""
self.user_demand = prompt

# generate plan
self.generate_tasks()

# build system prompt
environment = self.llm(
ENVIRONMENT_SUMMARY_SYSTEM_PROMPT.format(
environment=self.robot_observer.show_all_data()
)
)
system_prompt: str = SYSTEM_PROMPT_TEMPLATE.format(
skills=str(self.robot_controller.operators),
environment=environment,
user_demand=self.user_demand,
pending_tasks=f"{str(self.pending_tasks)}",
)
formatter = OutputFormatter(CommandResponse)
instruction = formatter.get_formatted_instructions()
system_prompt = f"{system_prompt}\n{instruction}"

messages: MessageSet = MessageSet(
messages=[
SystemMessage(content=system_prompt),
]
)

counter = 0
while True:
self.robot_controller.execute_action("stop")

llm_output: BaseMessage = self.llm.predict(messages)
messages.add_message(llm_output)
messages.add_user_message("next")

try:
# get current task
# TODO add retry if failed
task: Task = formatter.formatting_result(llm_output.content).task
print_text(f"[task] current task: {task.dict()}", "blue")

if task.name == "stop" or not self.pending_tasks:
self.robot_controller.execute_action("stop")
return

# run task
task = self.pending_tasks.popleft()
self.completed_tasks.append(task)
self.robot_controller.execute_action(
action_name=task.name, **task.parameters
)

except Exception as e:
print_text(f"[error] {e}", "red")
self.robot_controller.execute_action("stop")
return

counter += 1

def generate_tasks(self):
"""Generate tasks based on user demand."""
# build conversation
formatter = OutputFormatter(GeneratePlanResponse)
instruction: str = formatter.get_formatted_instructions()
messages: MessageSet = MessageSet(
messages=[
SystemMessage(content=f"{GENERATE_PLAN_SYSTEM_PROMPT}\n{instruction}"),
UserMessage(content=f"##User input:\n{self.user_demand}"),
]
)

llm_output: str = self.llm.predict(messages).content

response: GeneratePlanResponse = formatter.formatting_result(llm_output)
self.pending_tasks = deque(response.tasks)
print_text(f"Generate plans: {self.pending_tasks}", "green")


def example():
operators = []
sensors = []
controller = RobotController(operators)
observer = RobotObserver(sensors)
RobotAgent(controller=controller, observer=observer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from nav_msgs.msg import Odometry
from math import sqrt


class OdomUserClientNode(Node):
def __init__(self):
super().__init__("go_front")
self.publisher_ = self.create_publisher(Twist, "cmd_vel", 10)
self.subscription_ = self.create_subscription(
Odometry, "odom", self.odometry_callback, 10
)
self.subscription_ # prevent unused variable warning
self.distance_to_travel = 0
self.distance_traveled = 0

def run(self):
while True:
target_distance: float = float(str(input("Please input dir: ")))
self.go_front(target_distance)

def odometry_callback(self, msg):
self.distance_traveled = sqrt(
msg.pose.pose.position.x**2 + msg.pose.pose.position.y**2
)

def go_front(self, distance):
self.distance_to_travel = distance
self.distance_traveled = 0

twist = Twist()
twist.linear.x = 0.5 # Adjust the linear velocity as needed
twist.angular.z = 0.0

while self.distance_traveled < self.distance_to_travel:
self.publisher_.publish(twist)
self.get_logger().info(f"Distance traveled: {self.distance_traveled}")
rclpy.spin_once(self)

twist.linear.x = 0.0
self.publisher_.publish(twist)
self.get_logger().info("Finished moving forward")


def main(args=None):
rclpy.init(args=args)
node = OdomUserClientNode()
node.run()
rclpy.shutdown()
Loading

0 comments on commit 75595b6

Please sign in to comment.