forked from NevPalmer/ob_inst_survey
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlog_nmea_to_file.py
176 lines (152 loc) · 5.73 KB
/
log_nmea_to_file.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
"""Log NMEA stream to a text file."""
import sys
from argparse import ArgumentParser
from datetime import datetime, timedelta, timezone
from pathlib import Path
from queue import Queue
from time import sleep
import ob_inst_survey as obsurv
DFLT_PREFIX = "NMEA"
DFLT_PATH = Path.home() / "logs/nmea/"
def main():
"""Initialise NMEA data stream and log to text file."""
# Default CLI arguments.
ip_param = obsurv.IpParam()
# Retrieve CLI arguments.
helpdesc: str = (
"Receives an NMEA stream via either UDP or TCP over an IP connection "
"and logs the received stream to a text file located in the directory "
"specified. "
"If an input file is specified (containg NMEA text sentences) then "
"all IP parameters will be ignored and instead the specified file will "
"be replayed as if it is a live stream."
)
parser = ArgumentParser(
parents=[
obsurv.out_filepath_parser(DFLT_PATH),
obsurv.out_fileprefix_parser(DFLT_PREFIX),
obsurv.ip_arg_parser(ip_param),
obsurv.file_split_parser(),
obsurv.replayfile_parser(None),
],
description=helpdesc,
)
parser.add_argument(
"--ignorechksum",
action="store_true",
help=("Ignore the NMEA checksum and use all sentences."),
)
args = parser.parse_args()
outfilepath: Path = args.outfilepath
outfileprefix: str = args.outfileprefix
ip_param = obsurv.IpParam(
port=args.ipport,
addr=args.ipaddr,
prot=args.ipprot,
buffer=args.ipbuffer,
)
file_split_hours: int = args.filesplit
last_file_split = 0
replay_file: Path = args.replayfile
replay_start: datetime = args.replaystart
replay_speed: float = args.replayspeed
ignorechksum: bool = args.ignorechksum
# Create directory for logging.
outfilepath.mkdir(parents=True, exist_ok=True)
print(f"Logging NMEA to directory {outfilepath}")
nmea_q: Queue[str] = Queue()
if replay_file:
obsurv.nmea_replay_textfile(
filename=replay_file,
nmea_q=nmea_q,
spd_fctr=replay_speed,
timestamp_start=replay_start,
)
else:
obsurv.nmea_ip_stream(
ip_conn=ip_param,
nmea_q=nmea_q,
)
try:
count_no_time = 0
while True:
sleep(0.001) # Prevents idle loop from 100% CPU thread usage.
sentence = get_next_sentence(nmea_q)
if not sentence:
continue
if not ignorechksum and not obsurv.nmea_checksum(sentence):
log_invalid_nmea_str(
outfilepath,
sentence,
"Checksum for NMEA line is invalid!",
)
continue
nmea_time = time_from_nmea(sentence)
if not nmea_time:
if last_file_split == 0:
count_no_time += 1
if count_no_time > 5:
# Use system time to name file if NMEA has no time
# stamp after 6 sentences.
nmea_time = datetime.now(timezone.utc)
else:
continue
if nmea_time:
if nmea_time == "invalid_time":
log_invalid_nmea_str(
outfilepath, sentence, "NMEA Timestamp is invalid!"
)
continue
curr_file_split = int(nmea_time.timestamp() / (file_split_hours * 3600))
if curr_file_split > last_file_split:
file_timestamp = nmea_time.strftime("%Y-%m-%d_%H-%M")
outfilename = outfilepath / f"{outfileprefix}_{file_timestamp}.txt"
last_file_split = curr_file_split
with open(outfilename, "a+", newline="", encoding="utf-8") as nmea_file:
nmea_file.write(f"{sentence}\n")
print(sentence)
except KeyboardInterrupt:
sys.exit("*** End NMEA Logging ***")
def log_invalid_nmea_str(outfilepath, nmea_sentence, message):
"""Write an invalid NMEA sentence to a log file."""
logfilename = outfilepath / "invalid_nmea_log.txt"
sys_time = datetime.now(timezone.utc)
time_str = sys_time.strftime("%Y-%m-%d_%H-%M-%S")
with open(logfilename, "a+", newline="", encoding="utf-8") as nmea_log_file:
nmea_log_file.write(f"{time_str}:- {message}\n")
nmea_log_file.write(f"{nmea_sentence}\n")
print(f"{message} Sentence has been ignored:\n{nmea_sentence}")
def get_next_sentence(nmea_q: Queue) -> str:
"""Return next sentence from NMEA queue."""
if nmea_q.empty():
return None
nmea_str = nmea_q.get(block=False)
if nmea_str in ["TimeoutError", "EOF"]:
sys.exit(f"*** NMEA: {nmea_str} ***")
return nmea_str
def time_from_nmea(sentence: str) -> datetime:
"""Return the time from an NMEA sentence."""
try:
nmea_hr = int(sentence[7:9])
nmea_min = int(sentence[9:11])
nmea_sec = int(sentence[11:13])
except ValueError:
# This NMEA sentence does not contain a time field.
return 0
sys_time = datetime.now(timezone.utc)
sys_yr = sys_time.year
sys_mth = sys_time.month
sys_day = sys_time.day
sys_hr = sys_time.hour
try:
nmea_time = datetime(sys_yr, sys_mth, sys_day, nmea_hr, nmea_min, nmea_sec)
except ValueError:
return "invalid_time"
nmea_time = nmea_time.replace(tzinfo=timezone.utc)
if nmea_hr == 0 and sys_hr == 23:
nmea_time += timedelta(days=1)
if nmea_hr == 23 and sys_hr == 0:
nmea_time -= timedelta(days=1)
return nmea_time
if __name__ == "__main__":
main()