-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmavlink_test.py
90 lines (72 loc) · 3.02 KB
/
mavlink_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python3
import time
from pymavlink import mavutil
from trunk import *
#from colored import fg, bg, attr
from colored import fg, bg, attr
#if get_Setting('mainLoopStatus', 'status.json', 0) == "closed":
# print("Warning: Manual override enabled")
# set_Setting('mainLoopStatus', 'manual', 'status.json', 1)
print("Hello, Launching MavLink viewer...")
timer1 = time.time()
# Start a connection listening to a UDP port
#the_connection = mavutil.mavlink_connection('udpin:localhost:14540')
time.sleep(1)
print("--->Looking for ports, please wait... Can take up to a minute...")
print("")
port1 = '/dev/ttyACM1'
port2 = '/dev/ttyACM2'
current = port1
while 1:
try:
the_connection = mavutil.mavlink_connection(current)
time.sleep(1)
the_connection.wait_heartbeat()
break
except:
pass
if current == port1:
current = port2
elif current == port2:
current = port1
time.sleep(1)
print("Connected to port: " + current)
print("Heartbeat from System %u, Component %u" % (the_connection.target_system, the_connection.target_system))
time.sleep(4)
#https://mavlink.io/en/messages/common.html#MAV_DATA_STREAM_EXTENDED_STATUS
for i in range(0, 3):
the_connection.mav.request_data_stream_send(the_connection.target_system, the_connection.target_component, mavutil.mavlink.MAV_DATA_STREAM_ALL, 4, 1)
pevent = mavutil.periodic_event(5)
while(1):
the_connection.recv_msg()
if pevent.trigger():
try:
IMU = the_connection.messages['RAW_IMU']
IMU2= the_connection.messages['SCALED_IMU2']
try:
IMU3= the_connection.messages['SCALED_IMU3']
except:
pass
PR1= the_connection.messages['SCALED_PRESSURE']
GPS_RAW= the_connection.messages['GPS_RAW_INT']
#print(IMU)
print("\tAx\tAy\tAz\t|Gx\tGy\tGz\t|Mx\tMy\tMz")
print(f"0|\t{IMU.xacc:.0f}\t{IMU.yacc:.0f}\t{IMU.zacc:.0f}\t|{IMU.xgyro:.0f}\t{IMU.ygyro:.0f}\t{IMU.zgyro:.0f}\t|{IMU.xmag:.0f}\t{IMU.ymag:.0f}\t{IMU.zmag:.0f}")
print(f"1|\t{IMU2.xacc:.0f}\t{IMU2.yacc:.0f}\t{IMU2.zacc:.0f}\t|{IMU2.xgyro:.0f}\t{IMU2.ygyro:.0f}\t{IMU2.zgyro:.0f}\t|{IMU2.xmag:.0f}\t{IMU2.ymag:.0f}\t{IMU2.zmag:.0f}")
print(f"2|\t{IMU3.xacc:.0f}\t{IMU3.yacc:.0f}\t{IMU3.zacc:.0f}\t|{IMU3.xgyro:.0f}\t{IMU3.ygyro:.0f}\t{IMU3.zgyro:.0f}\t|{IMU3.xmag:.0f}\t{IMU3.ymag:.0f}\t{IMU3.zmag:.0f}")
print(f"Pressure1 Abs: {PR1.press_abs:.2f} \tDif: {PR1.press_diff:.1f} \tTemp: {PR1.temperature:.0f}" + "\tSats in View: " + str(GPS_RAW.satellites_visible))
#print("Sats in View: " + str(GPS_RAW.satellites_visible))
except:
print("Error")
try:
PR2= the_connection.messages['SCALED_PRESSURE2']
print(f"Pressure2 Abs: {PR2.press_abs:.2f} \tDif: {PR2.press_diff:.1f} \tTemp: {PR2.temperature:.0f}")
#print(" ")
except:
print(f"Pressure2 Abs: INOP \tDif: INOP \tTemp: INOP")
#print("...Press ctrl+c to exit...")
print('%s ...Press ctrl+c to exit... %s' % (fg(3), attr(0)))
print(" ")
time.sleep(.005)
#ALL = the_connection.recv_match(blocking=True)
#print(ALL)