-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunitree_h1_interface.py
154 lines (139 loc) · 5.97 KB
/
unitree_h1_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import mujoco
import numpy as np
from unitree_sdk2py.core.channel import ChannelFactoryInitialize, ChannelSubscriber, ChannelPublisher
from unitree_sdk2py.idl.unitree_go.msg.dds_ import SportModeState_
from unitree_sdk2py.idl.default import unitree_go_msg_dds__SportModeState_
from unitree_sdk2py.utils.thread import RecurrentThread
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowCmd_
from unitree_sdk2py.idl.unitree_go.msg.dds_ import LowState_
from unitree_sdk2py.idl.default import unitree_go_msg_dds__LowState_ as LowState_default
TOPIC_LOWCMD = 'rt/lowcmd'
TOPIC_LOWSTATE = 'rt/lowstate'
TOPIC_HIGHSTATE = 'rt/sportmodestate'
MOTOR_SENSOR_NUM = 3
class SimInterface:
def __init__(self, model, data):
# record mujoco model & data
self.model = model
self.data = data
# initialize state parameters
self.num_motor = self.model.nu
self.dim_motor_sensor = MOTOR_SENSOR_NUM * self.num_motor
self.dt = self.model.opt.timestep
# check sensor
self.have_imu = False
self.have_frame_sensor = False
for i in range(self.dim_motor_sensor, self.model.nsensor):
name = mujoco.mj_id2name(
self.model, mujoco._enums.mjtObj.mjOBJ_SENSOR, i
)
if name == 'imu_quat':
self.have_imu = True
if name == 'frame_pos':
self.have_frame_sensor = True
# initialize channel
ChannelFactoryInitialize(id=0, networkInterface='lo')
# publish low state
self.low_state = LowState_default()
self.low_state_puber = ChannelPublisher(TOPIC_LOWSTATE, LowState_)
self.low_state_puber.Init()
self.lowStateThread = RecurrentThread(
interval=self.dt, target=self.PublishLowState, name='sim_lowstate'
)
self.lowStateThread.Start()
# publish high state
self.high_state = unitree_go_msg_dds__SportModeState_()
self.high_state_puber = ChannelPublisher(TOPIC_HIGHSTATE, SportModeState_)
self.high_state_puber.Init()
self.HighStateThread = RecurrentThread(
interval=self.dt, target=self.PublishHighState, name='sim_highstate'
)
self.HighStateThread.Start()
# subscribe to low command
self.low_cmd_suber = ChannelSubscriber(TOPIC_LOWCMD, LowCmd_)
self.low_cmd_suber.Init(self.LowCmdHandler, 10)
def PublishLowState(self):
if self.data is not None:
# wrote motor state
for i in range(self.num_motor):
self.low_state.motor_state[i].q = self.data.sensordata[i]
self.low_state.motor_state[i].dq = self.data.sensordata[
i + self.num_motor
]
self.low_state.motor_state[i].tau_est = self.data.sensordata[
i + 2 * self.num_motor
]
if self.have_frame_sensor:
# write IMU data
self.low_state.imu_state.quaternion[0] = self.data.sensordata[
self.dim_motor_sensor + 0
]
self.low_state.imu_state.quaternion[1] = self.data.sensordata[
self.dim_motor_sensor + 1
]
self.low_state.imu_state.quaternion[2] = self.data.sensordata[
self.dim_motor_sensor + 2
]
self.low_state.imu_state.quaternion[3] = self.data.sensordata[
self.dim_motor_sensor + 3
]
# write gyroscope data
self.low_state.imu_state.gyroscope[0] = self.data.sensordata[
self.dim_motor_sensor + 4
]
self.low_state.imu_state.gyroscope[1] = self.data.sensordata[
self.dim_motor_sensor + 5
]
self.low_state.imu_state.gyroscope[2] = self.data.sensordata[
self.dim_motor_sensor + 6
]
# write accelerometer data
self.low_state.imu_state.accelerometer[0] = self.data.sensordata[
self.dim_motor_sensor + 7
]
self.low_state.imu_state.accelerometer[1] = self.data.sensordata[
self.dim_motor_sensor + 8
]
self.low_state.imu_state.accelerometer[2] = self.data.sensordata[
self.dim_motor_sensor + 9
]
# write to low state
self.low_state_puber.Write(self.low_state)
def PublishHighState(self):
if self.data is not None:
# write global posiiton
self.high_state.position[0] = self.data.sensordata[
self.dim_motor_sensor + 10
]
self.high_state.position[1] = self.data.sensordata[
self.dim_motor_sensor + 11
]
self.high_state.position[2] = self.data.sensordata[
self.dim_motor_sensor + 12
]
# write global velocity
self.high_state.velocity[0] = self.data.sensordata[
self.dim_motor_sensor + 13
]
self.high_state.velocity[1] = self.data.sensordata[
self.dim_motor_sensor + 14
]
self.high_state.velocity[2] = self.data.sensordata[
self.dim_motor_sensor + 15
]
# write to high state
self.high_state_puber.Write(self.high_state)
def LowCmdHandler(self, msg: LowCmd_):
if self.data is not None:
# apply control to each motor
for i in range(self.num_motor):
self.data.ctrl[i] = (
msg.motor_cmd[i].tau
+ msg.motor_cmd[i].kp
* (msg.motor_cmd[i].q - self.data.sensordata[i])
+ msg.motor_cmd[i].kd
* (
msg.motor_cmd[i].dq
- self.data.sensordata[i + self.num_motor]
)
)