Skip to content

Commit

Permalink
chore: adding lm
Browse files Browse the repository at this point in the history
  • Loading branch information
AtticusZeller committed Jun 1, 2024
1 parent e1b9dfe commit 9084fa2
Show file tree
Hide file tree
Showing 11 changed files with 381 additions and 143 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,6 @@
[submodule "thirdparty/AB_GICP_c"]
path = thirdparty/AB_GICP_c
url = [email protected]:Atticuszz/AB_GICP_c.git
[submodule "thirdparty/torchimize"]
path = thirdparty/torchimize
url = [email protected]:SupaVision/torchimize.git
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ open3d = "^0.18.0"
tqdm = "^4.66.4"
scipy = "^1.13.0"
wandb = "^0.17.0"
torchimize = "^0.0.16"
#small-gicp = "^0.0.6"
torch = "^2.3.0"



Expand Down
15 changes: 13 additions & 2 deletions src/component/eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def calculate_rotation_error(
# 计算旋转角度
trace_value = np.trace(delta_R)
cos_theta = (trace_value - 1) / 2
# cos_theta = np.clip(cos_theta, -1, 1) # 确保值在合法范围内
cos_theta = np.clip(cos_theta, -1, 1) # 确保值在合法范围内
theta = np.arccos(cos_theta)

# 返回以度为单位的旋转误差
Expand Down Expand Up @@ -180,6 +180,17 @@ class RegistrationConfig(NamedTuple):
voxel_downsampling_resolutions: float | None = None
grid_downsample_resolution: int | None = None

def as_dict(self):
return {key: val for key, val in self._asdict().items() if val is not None}


class WandbConfig(NamedTuple):
algorithm: str = "GICP"
implementation: str = "small_gicp"
dataset: str = "Replica"
sub_set: str = "office0"
description: str = "GICP on Replica dataset"


class Experiment:
def __init__(
Expand All @@ -197,7 +208,7 @@ def __init__(
)
if extra_config is None:
extra_config = {}
wandb_config = registration_config._asdict()
wandb_config = registration_config.as_dict()
wandb_config.update({"name": name, **extra_config})
self.grid_downsample = registration_config.grid_downsample_resolution
self.logger = WandbLogger(run_name=name, config=wandb_config)
Expand Down
36 changes: 34 additions & 2 deletions src/component/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,41 @@
import small_gicp
from numpy.typing import NDArray

from src.gicp.optimizer import lm_optimize
from src.gicp.pcd import PointClouds


# TODO: add registration base class and registration result class


class GICP:
def __init__(self):
self.previous_pcd: PointClouds | None = None
# every frame pose
self.T_world_camera = np.identity(4) # World to camera transformation

def align_pcd_gt_pose(
self,
raw_points: NDArray[np.float64],
init_gt_pose: NDArray[np.float64] | None = None,
T_last_current: NDArray[np.float64] = np.identity(4),
):
raw_points = PointClouds(raw_points, np.zeros_like(raw_points))
raw_points.preprocess(20)
# first frame
if self.previous_pcd is None:
self.previous_pcd = raw_points
self.T_world_camera = (
init_gt_pose if init_gt_pose is not None else np.identity(4)
)
return init_gt_pose

result = lm_optimize(T_last_current, self.previous_pcd, raw_points)
# Update the world transformation matrix
self.T_world_camera = self.T_world_camera @ result
return result


class Scan2ScanICP:
"""
Scan-to-Scan ICP class for depth image-derived point clouds using small_gicp library.
Expand Down Expand Up @@ -173,5 +205,5 @@ def align_pcd_gt_pose(
self.previous_pcd = downsampled
self.previous_tree = tree

# return self.T_world_camera
return result
return self.T_world_camera
# return result
209 changes: 152 additions & 57 deletions src/gicp/factor.py
Original file line number Diff line number Diff line change
@@ -1,93 +1,188 @@
import numpy as np
import torch
from numpy._typing import NDArray
from numpy.typing import NDArray
from torch import Tensor

from .pcd import PointClouds
from ..utils import to_tensor


def gicp_error_optimized(
point_clouds_target: PointClouds, point_clouds_source: PointClouds, T: torch.Tensor
):
"""
Compute the GICP error using nearest neighbor search with a k-d tree.
def compute_geometric_residuals(
target: NDArray[np.float64] | Tensor,
trans_src: NDArray[np.float64] | Tensor,
T: NDArray[np.float64] | Tensor,
cov_target: NDArray[np.float64] | Tensor,
cov_src: NDArray[np.float64] | Tensor,
) -> float | Tensor:

residual = target[:3] - trans_src[:3]
# inf error init
error = float("inf")
if isinstance(target, np.ndarray):
# Rotation part of the transformation matrix
combined_covariance = cov_target + np.dot(np.dot(T, cov_src), T.T)
inv_combined_covariance = np.linalg.inv(combined_covariance[:3, :3])

# Calculate Mahalanobis distance
error = 0.5 * np.dot(residual.T, np.dot(inv_combined_covariance, residual))
return error
elif torch.is_tensor(target):
combined_covariance = cov_target + torch.matmul(
torch.matmul(T, cov_src), T.transpose(0, 1)
)
inv_combined_covariance = torch.inverse(combined_covariance[:3, :3])

# Calculate Mahalanobis distance
error = 0.5 * torch.matmul(
torch.matmul(residual.unsqueeze(0), inv_combined_covariance),
residual.unsqueeze(1),
)
return error


:param point_clouds_target: Target point clouds object
:param point_clouds_source: Source point clouds object
:param T: Transformation matrix [4, 4]
:return: Total error as a single float
# NOTE: pure math version
def gicp_error_total(T: torch.Tensor, pcd_target: PointClouds, pcd_src: PointClouds):
"""
# Transform the source points
pcd_source = to_tensor(point_clouds_source.points)
transformed_source = torch.matmul(T, pcd_source.T).T[:, :3]
pcd_target = to_tensor(point_clouds_target.points)
# Use k-d tree to find nearest neighbors in the target
_, indices = point_clouds_target.kdtree.query(transformed_source.numpy(), k=1)
nearest_target_points = pcd_target[indices.flatten()]

# Compute residuals
residuals = nearest_target_points - transformed_source

# Calculate the Mahalanobis distance for each matched pair
total_error = 0.0
for i, idx in enumerate(indices.flatten()):
cov_source = point_clouds_source.cov(i)
cov_target = point_clouds_target.cov(idx)
RCR = cov_target + torch.matmul(
torch.matmul(T[:3, :3], cov_source[:3, :3]), T[:3, :3].T
Compute the total GICP error for all points in a source point cloud against a target point cloud
using tensors for all computations.
Parameters:
----------
pcd_target : Target point clouds object
pcd_src : Source point clouds object
T : Transformation matrix [4, 4] as a PyTorch tensor
Returns:
----------
total_error : float, sum of computed Mahalanobis distances for all points
"""
# Transform the entire source point cloud using the transformation matrix T
transformed_sources = torch.matmul(
T, to_tensor(pcd_src.points).T
).T # Apply transformation

# Initialize total error
total_error = []

# Iterate over each transformed source point
for idx in range(transformed_sources.shape[0]):
transformed_source = transformed_sources[idx]

# Assuming a method to find the nearest neighbor and its index
found, nearest_idx, _ = pcd_target.kdtree.nearest_neighbor_search(
transformed_source.detach().cpu().numpy()[:3]
)
mahalanobis_weight = torch.inverse(RCR)
error = torch.matmul(
torch.matmul(residuals[i].unsqueeze(0), mahalanobis_weight),
residuals[i].unsqueeze(1),
if not found:
continue # If no nearest neighbor is found, skip this point

nearest_target_point = to_tensor(pcd_target.point(nearest_idx))
cov_target = to_tensor(pcd_target.cov(nearest_idx))
cov_source = to_tensor(pcd_src.cov(idx))

# Calculate the Mahalanobis distance for the matched pair using provided function
error = compute_geometric_residuals(
target=nearest_target_point,
trans_src=transformed_source,
T=T,
cov_target=cov_target,
cov_src=cov_source,
)
total_error += 0.5 * error

return total_error.squeeze()
total_error.append(error)

return to_tensor(total_error)


def gicp_single_point_error(
point_clouds_target: PointClouds,
point_source: NDArray[np.float64],
# NOTE: small_gicp version
def gicp_error_np(
pcd_target: PointClouds,
pcd_src: PointClouds,
src_idx: int,
T: NDArray[np.float64],
):
"""
Compute the GICP error for a single source point against a target point cloud using k-d tree.
Assumes source and target points include homogeneous coordinates [x, y, z, 1].
:param point_clouds_target: Target point clouds object
:param point_source: Source point as a tensor [4] (homogeneous coordinates)
:param T: Transformation matrix [4, 4]
:return: Total error as a single float, Index of the nearest target point
Parameters
----------
pcd_target: Target point clouds object
pcd_src: Source point clouds object
src_idx: Index of the source point
T: Transformation matrix [4, 4]
Returns
----------
error: float, computed Mahalanobis distance or -1 if no neighbor found
index: int, index of the nearest neighbor or -1 if no neighbor found
"""
# Transform the source point using the full 4x4 transformation matrix
transformed_source = T @ point_source
transformed_source = T @ pcd_src.point(src_idx)

# Use k-d tree to find the nearest neighbor in the target
found, index, sq_dist = point_clouds_target.kdtree.nearest_neighbor_search(
found, index, sq_dist = pcd_target.kdtree.nearest_neighbor_search(
transformed_source
)
if not found:
return False

nearest_target_point = point_clouds_target.point(index)
nearest_target_point = pcd_target.point(index)

# Compute residual for the first 3 dimensions (x, y, z)
residual = nearest_target_point[:3] - transformed_source[:3]
assert np.allclose(residual, sq_dist)
assert np.allclose(residual, sq_dist), "Residuals do not match!"

error = compute_geometric_residuals(
nearest_target_point,
transformed_source,
T,
pcd_target.cov(index),
pcd_src.cov(src_idx),
)

return error, index


def gicp_error_tensor(
pcd_target: PointClouds,
pcd_src: PointClouds,
src_idx: int,
T: Tensor,
):
"""
Compute the GICP error for a single source point against a target point cloud using k-d tree.
Assumes source and target points include homogeneous coordinates [x, y, z, 1].
# Retrieve the covariance of the nearest target point
cov_target = point_clouds_target.cov(index)
Parameters:
----------
pcd_target : Target point clouds object
pcd_src : Source point clouds object
src_idx : Index of the source point
T : Transformation matrix [4, 4]
Returns:
----------
error : float, computed Mahalanobis distance or -1 if no neighbor found
index : int, index of the nearest neighbor or -1 if no neighbor found
"""
src_point = to_tensor(pcd_src.point(src_idx))
transformed_source = torch.matmul(T, src_point)

# TODO: finish loss func
# Calculate the combined covariance matrix for the Mahalanobis distance
RCR = cov_target + torch.matmul(torch.matmul(T[:3, :3], cov_source), T[:3, :3].T)
mahalanobis_weight = torch.inverse(RCR)
# Assuming the nearest neighbor search is done on the CPU and results are obtained
found, index, sq_dist = pcd_target.kdtree.nearest_neighbor_search(
transformed_source.cpu().numpy()
)
if not found:
return -1, -1

# Calculate the Mahalanobis distance error
error = (
0.5
* (residual.unsqueeze(0) @ mahalanobis_weight @ residual.unsqueeze(1)).item()
nearest_target_point = to_tensor(pcd_target.point(index))
residual = nearest_target_point[:3] - transformed_source[:3]
assert torch.allclose(residual, to_tensor(sq_dist)), "Residuals do not match!"

error = compute_geometric_residuals(
nearest_target_point,
transformed_source,
T,
to_tensor(pcd_target.cov(index)),
to_tensor(pcd_src.cov(src_idx)),
)

return error, nearest_target_index
return error, index
Loading

0 comments on commit 9084fa2

Please sign in to comment.