From 75d88670aac1f4b3debc6265540652b5df0a941b Mon Sep 17 00:00:00 2001 From: Vitor Date: Fri, 18 Apr 2025 17:39:15 -0300 Subject: [PATCH 1/2] Stationary test for GNSS system --- README.md | 40 +++++++++++++ scripts/plot_gps_from_csv.py | 107 +++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 scripts/plot_gps_from_csv.py diff --git a/README.md b/README.md index f996929..4e4a0d2 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,42 @@ # DataAnalysisForRobotics Data Analysis for Robotics Projects with ROS + +## Data analysis for GNSS systems +Tools to plot and verify the data from a GNSS system, checking RTK status and random walk. + +### Stationary test +Check the deviation of a GNSS system. + +#### What it does +* It will draw one circle within the specified range, defined by the user as a kind of random walk limit + +* It will draw an ellipse containing 95% of the points to check it's distribution while stationary. + +#### How to run it +* Collect a `/fix` rosbag. +* Rename it to `run_.bag` (e.g `run_42.bag`) +* Convert the bag into a csv file with `bag_to_csv.py` + From the repository folder, run: + ```bash + python scripts/bag_to_csv.py --folder /path/to/bag/folder --num_bags 1 + ``` + Arguments: + ---------- + --folder : Path to the folder containing the .bag files. + + --num_bags : Number of bag files to process. Practicaly, it should always be one, as there is no reasson to merge more than one bag, unless it was a interrupted stationary test or something similar + +* Run the script to generate the stationary plot, with the csv generated as input + ```bash + python --csv <.csv file path> --max-distance + ``` + Arguments: + ---------- + --csv Path of the csv file(usually inside the `run_` folder) + + --max-distance Distance of the max distance threshold(Circle) + + e.g. + ``` + python3 scripts/plot_gps_from_csv.py --csv run_0/ublox-fix.csv --max-distance 0.2 + ``` \ No newline at end of file diff --git a/scripts/plot_gps_from_csv.py b/scripts/plot_gps_from_csv.py new file mode 100644 index 0000000..03e7b7c --- /dev/null +++ b/scripts/plot_gps_from_csv.py @@ -0,0 +1,107 @@ +# plot_gps_from_csv.py +# +# Static‑position scatter for a single /fix CSV, saving into the same run folder. +# ------------------------------------------------------------- + +import argparse +from pathlib import Path + +import numpy as np +import pandas as pd +import matplotlib +matplotlib.use("Agg") # head‑less backend +import matplotlib.pyplot as plt +from matplotlib.patches import Ellipse, Circle + + +def load_fix_csv(csv_path: Path) -> pd.DataFrame: + df = pd.read_csv(csv_path) + needed = {"latitude", "longitude", "status.status"} + if not needed.issubset(df.columns): + missing = needed - set(df.columns) + raise ValueError(f"{csv_path.name} is missing columns: {missing}") + return df + + +def plot_static_scatter(df: pd.DataFrame, out_png: Path, max_distance: float): + lat = df["latitude"].values + lon = df["longitude"].values + status = df["status.status"].values + + fig, ax = plt.subplots(figsize=(10, 8)) + + # non‑RTK vs RTK + rtk = status == 2 + ax.scatter(lon[~rtk], lat[~rtk], c="blue", alpha=0.7, s=50, label="non‑RTK") + ax.scatter(lon[rtk], lat[rtk], c="red", alpha=0.9, s=70, marker="^", label="RTK fix") + + # mean position + μlat, μlon = lat.mean(), lon.mean() + + # 95% error ellipse + dx = (lon - μlon) * 111_320 + dy = (lat - μlat) * 110_540 + if len(dx) > 1: + cov = np.cov(np.vstack([dx, dy])) + vals, vecs = np.linalg.eig(cov) + χ = np.sqrt(5.991) + w_m, h_m = χ * np.sqrt(vals) + angle = np.degrees(np.arctan2(vecs[1, 0], vecs[0, 0])) + ax.add_patch(Ellipse( + (μlon, μlat), + width=w_m/111_320, + height=h_m/110_540, + angle=angle, + fc="none", ec="black", lw=2, + label="95 % error ellipse" + )) + + # max‑distance circle + ax.add_patch(Circle( + (μlon, μlat), + radius=max_distance/111_320, + fc="none", ec="black", ls="--", lw=2, + label=f"{max_distance} m radius" + )) + + # axis limits + lat_span = np.ptp(lat) + lon_span = np.ptp(lon) + ax.set_xlim(lon.min() - lon_span*0.2, lon.max() + lon_span*0.2) + ax.set_ylim(lat.min() - lat_span*0.2, lat.max() + lat_span*0.2) + + ax.set_xlabel("Longitude") + ax.set_ylabel("Latitude") + ax.set_title("Static accuracy – position scatter") + ax.set_aspect("equal", adjustable="datalim") + ax.grid(True) + ax.legend(fontsize=8, loc="best") + + fig.savefig(out_png, dpi=150) + plt.close(fig) + print(f"Saved scatter plot to {out_png.resolve()}") + + +def main(): + p = argparse.ArgumentParser( + description="Static GPS scatter plot from a /fix CSV, saved into the same run folder." + ) + p.add_argument( + "-i", "--csv", type=Path, required=True, + help="Path to the ublox-fix CSV (e.g. ../run_0/ublox-fix.csv)" + ) + p.add_argument( + "--max-distance", type=float, default=1.0, + help="Radius of dashed circle in metres (default: 1.0)" + ) + args = p.parse_args() + + df = load_fix_csv(args.csv) + run_dir = args.csv.parent + out_png = run_dir / "static_scatter.png" + + plot_static_scatter(df, out_png, args.max_distance) + + +if __name__ == "__main__": + main() From 800e830890e575344d0ffa52902a4e563e98599a Mon Sep 17 00:00:00 2001 From: Vitor Date: Sun, 20 Apr 2025 17:12:37 -0300 Subject: [PATCH 2/2] Added moving test for laps --- README.md | 41 ++++++- scripts/plot_gps_laps_from_csv.py | 171 ++++++++++++++++++++++++++++++ 2 files changed, 211 insertions(+), 1 deletion(-) create mode 100644 scripts/plot_gps_laps_from_csv.py diff --git a/README.md b/README.md index 4e4a0d2..1b331fb 100644 --- a/README.md +++ b/README.md @@ -39,4 +39,43 @@ Check the deviation of a GNSS system. e.g. ``` python3 scripts/plot_gps_from_csv.py --csv run_0/ublox-fix.csv --max-distance 0.2 - ``` \ No newline at end of file + ``` + +### Moving (Lap) Test +Segment a trajectory into laps and plot continuous, colored paths for each run. + +#### What it does +* Scans every `run_*` folder under a user‑provided root directory. +* For each `run_X`, reads all `*-fix.csv` files (e.g. `ublox_F9P-fix.csv`, `blabla-fix.csv`). +* Segments each file’s latitude/longitude data into laps based on: + - **distance threshold** (meters to close a lap) + - **minimum duration** (seconds before considering a lap closed) +* Generates: + 1. **Per‑sensor plots**: `run_X__continuous_laps.png` (one PNG per sensor per run). + 2. **Combined plot**: `run_X_combined_continuous_laps.png`, overlaying all sensors’ laps. + +All plots are saved in the `plots/` folder under the specified root. + +#### How to run it +From the `GNSS/scripts/` directory: +```bash +python plot_gps_laps_from_csv.py \ + --data_dir /path/to/GNSS \ + [--run_distance_threshold 5.0] \ + [--min_run_duration 30.0] +``` + +**Arguments** +- `--data_dir` : Root folder containing `run_*` subdirectories (required). +- `--run_distance_threshold`: Meters to close a lap (default: `5.0`). +- `--min_run_duration` : Min lap duration in seconds (default: `30.0`). + +**Example** +```bash +python plot_gps_laps_from_csv.py \ + --data_dir ~/Documents/VAL/GNSS/git/DataAnalysisForRobotics \ + --run_distance_threshold 3.0 \ + --min_run_duration 20.0 +``` + +After running, check the `plots/` folder at the project root for all generated PNGs. \ No newline at end of file diff --git a/scripts/plot_gps_laps_from_csv.py b/scripts/plot_gps_laps_from_csv.py new file mode 100644 index 0000000..33b562d --- /dev/null +++ b/scripts/plot_gps_laps_from_csv.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +plot_gps_laps_from_csv.py + +Scans all `run_*` folders in a specified root directory, reads `*-fix.csv` files in each, +segments GPS fixes into laps, and generates: + - One PNG per sensor per run (named `run_X_sensor_continuous_laps.png`). + - One combined PNG per run (named `run_X_combined_continuous_laps.png`). + +All outputs go into a `plots/` folder inside the specified root (created if missing). + +Usage: + cd GNSS/scripts + python plot_gps_laps_from_csv.py \ + --data_dir /path/to/GNSS \ + [--run_distance_threshold 5.0] [--min_run_duration 30.0] +""" +import os +import glob +import argparse +from math import sqrt + +import pandas as pd +import numpy as np +import matplotlib +matplotlib.use('Agg') +import matplotlib.pyplot as plt + + +def segment_runs(times: np.ndarray, + lat: np.ndarray, + lon: np.ndarray, + run_distance_threshold: float, + min_run_duration: float): + segments = [] + if len(times) == 0: + return segments + + current = [0] + ref_lat, ref_lon = lat[0], lon[0] + start_time = times[0] + + for i in range(1, len(times)): + dx = (lon[i] - ref_lon) * 111320.0 + dy = (lat[i] - ref_lat) * 110540.0 + dist = sqrt(dx*dx + dy*dy) + current.append(i) + elapsed = times[i] - start_time + if dist < run_distance_threshold and elapsed > min_run_duration: + segments.append(current.copy()) + current = [i] + start_time = times[i] + ref_lat, ref_lon = lat[i], lon[i] + + if current: + segments.append(current) + return segments + + +def plot_runs(sensor_id: str, df: pd.DataFrame, segments: list, plots_dir: str, run_name: str): + lat0, lon0 = df['latitude'].iloc[0], df['longitude'].iloc[0] + xs = (df['longitude'].to_numpy() - lon0) * 111320.0 + ys = (df['latitude'].to_numpy() - lat0) * 110540.0 + + plt.figure(figsize=(10, 8)) + cmap = plt.get_cmap('tab10') + for idx, seg in enumerate(segments): + seg_x = xs[seg] + seg_y = ys[seg] + color = cmap(idx % 10) + plt.plot(seg_x, seg_y, marker='o', linestyle='-', label=f'Lap {idx+1}', color=color) + plt.plot(seg_x[0], seg_y[0], marker='s', markersize=8, markeredgecolor='k', color=color) + + plt.xlabel('Relative X (m)') + plt.ylabel('Relative Y (m)') + plt.title(f'{run_name} - {sensor_id}') + plt.legend() + plt.grid(True) + plt.gca().set_aspect('equal', adjustable='datalim') + + out = os.path.join(plots_dir, f'{run_name}_{sensor_id}_continuous_laps.png') + plt.savefig(out) + plt.close() + print(f'Saved: {out}') + + +def plot_combined(run_name: str, sensor_data: dict, plots_dir: str): + plt.figure(figsize=(12, 10)) + cmap = plt.get_cmap('tab10') + markers = ['o','s','^','x','D','v','*','p','H','+'] + + for s_idx, (sensor_id, (df, segments)) in enumerate(sensor_data.items()): + lat0, lon0 = df['latitude'].iloc[0], df['longitude'].iloc[0] + xs = (df['longitude'].to_numpy() - lon0) * 111320.0 + ys = (df['latitude'].to_numpy() - lat0) * 110540.0 + base_color = cmap(s_idx % 10) + for l_idx, seg in enumerate(segments): + seg_x = xs[seg] + seg_y = ys[seg] + marker = markers[l_idx % len(markers)] + plt.plot(seg_x, seg_y, marker=marker, linestyle='-', label=f'{sensor_id} Lap {l_idx+1}', color=base_color) + + plt.xlabel('Relative X (m)') + plt.ylabel('Relative Y (m)') + plt.title(f'{run_name} - Combined') + plt.legend(fontsize='small', loc='best', ncol=2) + plt.grid(True) + plt.gca().set_aspect('equal', adjustable='datalim') + + out = os.path.join(plots_dir, f'{run_name}_combined_continuous_laps.png') + plt.savefig(out) + plt.close() + print(f'Saved: {out}') + + +def main(): + parser = argparse.ArgumentParser( + description='Analyze GPS fix CSV runs and plot laps.') + parser.add_argument( + '--data_dir', type=str, required=True, + help='Root folder containing run_* subdirectories.') + parser.add_argument( + '--run_distance_threshold', type=float, default=5.0, + help='Meters to close a lap (default: 5.0)') + parser.add_argument( + '--min_run_duration', type=float, default=30.0, + help='Min lap duration in seconds (default: 30.0)') + args = parser.parse_args() + + data_root = os.path.abspath(args.data_dir) + plots_dir = os.path.join(data_root, 'plots') + os.makedirs(plots_dir, exist_ok=True) + + run_dirs = sorted(glob.glob(os.path.join(data_root, 'run_*'))) + if not run_dirs: + print(f'No run_* folders under {data_root}.') + return + + for run_path in run_dirs: + run_name = os.path.basename(run_path) + print(f'=== Processing {run_name} ===') + fix_files = glob.glob(os.path.join(run_path, '*-fix.csv')) + if not fix_files: + print(f' No fix CSVs in {run_name}, skipping.') + continue + + sensor_data = {} + for fp in sorted(fix_files): + sensor_id = os.path.basename(fp).split('-fix.csv')[0] + print(f' Sensor: {sensor_id}') + df = pd.read_csv(fp) + if not {'Time','latitude','longitude'}.issubset(df.columns): + print(f' Missing cols, skipping.') + continue + times = df['Time'].to_numpy() + lat = df['latitude'].to_numpy() + lon = df['longitude'].to_numpy() + + segments = segment_runs( + times, lat, lon, + args.run_distance_threshold, + args.min_run_duration) + print(f' Detected {len(segments)} lap(s)') + plot_runs(sensor_id, df, segments, plots_dir, run_name) + sensor_data[sensor_id] = (df, segments) + + if sensor_data: + plot_combined(run_name, sensor_data, plots_dir) + +if __name__ == '__main__': + main()