Skip to content

Commit

Permalink
Add draft p2p logic
Browse files Browse the repository at this point in the history
Signed-off-by: aapozd <[email protected]>
  • Loading branch information
aaletov committed May 18, 2024
1 parent 639201f commit 8672cee
Showing 1 changed file with 162 additions and 38 deletions.
200 changes: 162 additions & 38 deletions open_pcc_metric/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def __init__(
):
self.origin_cloud = origin_cloud
self.reconst_cloud = reconst_cloud
if not self.origin_cloud.has_normals():
self.origin_cloud.estimate_normals()
if not self.reconst_cloud.has_normals():
self.reconst_cloud.estimate_normals()
self._origin_tree = o3d.geometry.KDTreeFlann(self.origin_cloud)
self._reconst_tree = o3d.geometry.KDTreeFlann(self.reconst_cloud)
self._origin_neigh_cloud, self._origin_neigh_dists = CloudPair.get_neighbour_cloud(
Expand Down Expand Up @@ -112,6 +116,12 @@ class AbstractMetric(abc.ABC):
def __str__(self) -> str:
return "{label}: {value}".format(label=self.label, value=str(self.value))

def get_metrics_by_label(
metrics: typing.List[AbstractMetric],
label: str,
) -> typing.List[AbstractMetric]:
return list(filter(lambda m: m.label == label, metrics))

class PrimaryMetric(AbstractMetric):
@abc.abstractmethod
def calculate(self, cloud_pair: CloudPair):
Expand All @@ -120,9 +130,6 @@ def calculate(self, cloud_pair: CloudPair):
class DirectionalMetric(PrimaryMetric):
is_left: bool

def __init__(self, is_left: bool):
self.is_left = is_left

def __str__(self) -> str:
order = "left" if self.is_left else "right"
return "{label}({order}): {value}".format(
Expand All @@ -135,19 +142,91 @@ class SecondaryMetric(AbstractMetric):
def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
raise NotImplementedError("calculate is not implmented")

class ErrorVector(DirectionalMetric):
label = "ErrorVector"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, cloud_pair: CloudPair):
if self.is_left:
self.value = np.substract(
cloud_pair.origin_cloud,
cloud_pair._origin_neigh_cloud,
)
else:
self.value = np.subtract(
cloud_pair.reconst_cloud,
cloud_pair._reconst_neigh_cloud,
)

class PointNormals(DirectionalMetric):
label = "PointNormals"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, cloud_pair: CloudPair):
if self.is_left:
self.value = np.asarray(cloud_pair.origin_cloud.normals)
else:
self.value = np.asarray(cloud_pair.reconst_cloud.normals)

class EuclideanDistance(DirectionalMetric):
label = "EuclideanDistance"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, cloud_pair: CloudPair):
if self.is_left:
self.value = cloud_pair._origin_neigh_dists
else:
self.value = cloud_pair._reconst_neigh_dists

class EuclideanDistanceToPlane(SecondaryMetric, DirectionalMetric):
label = "EuclideanDistanceToPlane"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
res = get_metrics_by_label(metrics, "ErrorVector")
errs = next(filter(lambda m: m.is_left == self.is_left, res), None)
if errs is None:
raise RuntimeError("No corresponding ErrorVector found")
if self.is_left:
self.value = 0
res = get_metrics_by_label(metrics, "PointNormals")
if errs is None:
raise RuntimeError("No corresponding PointNormals found")
normals = next(filter(lambda m: m.is_left != self.is_left, res), None)

dists = np.zeros(shape=(errs.shape[0],))
for i in range(errs.shape[0]):
dists[i] = np.dot(errs[i], normals[i])
self.value = dists
return True

class DistanceDependantMetric(SecondaryMetric, DirectionalMetric):
distance_label: str

def __str__(self) -> str:
order = "left" if self.is_left else "right"
return "{label}({distance})({order}): {value}".format(
label=self.label,
distance=self.distance_label,
order=order,
value=self.value,
)

class BoundarySqrtDistances(PrimaryMetric):
label = "BoundarySqrtDistances"

def calculate(self, cloud_pair: CloudPair):
inner_dists = cloud_pair.origin_cloud.compute_nearest_neighbor_distance()
self.value = (np.min(inner_dists), np.max(inner_dists))

def get_metrics_by_label(
metrics: typing.List[AbstractMetric],
label: str,
) -> typing.List[AbstractMetric]:
return list(filter(lambda m: m.label == label, metrics))

class MinSqrtDistance(SecondaryMetric):
label = "MinSqrtDistance"

Expand All @@ -174,21 +253,31 @@ def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
self.value = boundary_metric.value[1]
return True

class GeoMSE(DirectionalMetric):
class GeoMSE(DistanceDependantMetric):
label = "GeoMSE"

def calculate(self, cloud_pair: CloudPair):
sse = 0
if self.is_left:
sse = np.sum(cloud_pair._origin_neigh_dists, axis=0)
else:
sse = np.sum(cloud_pair._reconst_neigh_dists, axis=0)
n = cloud_pair._origin_neigh_dists.shape[0]
def __init__(self, is_left: bool, distance_label: str):
self.is_left = is_left
self.distance_label = distance_label

def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
res = get_metrics_by_label(metrics, self.distance_label)
if len(res) == 0:
return False
if len(res) > 2:
raise RuntimeError("Must be two or less: %s" % self.distance_label)
dists = next(filter(lambda m: m.is_left == self.is_left, res), None).value
n = dists.shape[0]
sse = np.sum(dists, axis=0)
self.value = sse / n
return True

class GeoPSNR(SecondaryMetric, DirectionalMetric):
label = "GeoPSNR"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
res = get_metrics_by_label(metrics, "MaxSqrtDistance")
if len(res) == 0:
Expand All @@ -203,9 +292,26 @@ def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
self.value = 10 * np.log10(max_neigh_dist**2 / geo_mse.value)
return True

class ColorMSE(DirectionalMetric):
label = "ColorMSE"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, cloud_pair: CloudPair):
diff = 0
if self.is_left:
diff = np.subtract(cloud_pair.origin_cloud.colors, cloud_pair._origin_neigh_cloud.colors)
else:
diff = np.subtract(cloud_pair.reconst_cloud.colors, cloud_pair._reconst_neigh_cloud.colors)
self.value = np.mean(diff**2, axis=0)

class ColorPSNR(SecondaryMetric, DirectionalMetric):
label = "ColorPSNR"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
peak = 255
res = get_metrics_by_label(metrics, "ColorMSE")
Expand All @@ -215,29 +321,29 @@ def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
self.value = 10 * np.log10(peak**2 / geo_mse.value)
return True

class ColorMSE(DirectionalMetric):
label = "ColorMSE"

def calculate(self, cloud_pair: CloudPair):
diff = 0
if self.is_left:
diff = np.subtract(cloud_pair.origin_cloud.colors, cloud_pair._origin_neigh_cloud.colors)
else:
diff = np.subtract(cloud_pair.reconst_cloud.colors, cloud_pair._reconst_neigh_cloud.colors)
self.value = np.mean(diff**2, axis=0)

class GeoHausdorffDistance(DirectionalMetric):
class GeoHausdorffDistance(DistanceDependantMetric, DirectionalMetric):
label = "GeoHausdorffDistance"

def calculate(self, cloud_pair: CloudPair):
if self.is_left:
self.value = np.max(cloud_pair._origin_neigh_dists, axis=0)
else:
self.value = np.max(cloud_pair._reconst_neigh_dists, axis=0)
def __init__(self, is_left: bool, distance_label: str):
self.is_left = is_left
self.distance_label = distance_label

def calculate(self, metrics: typing.List[AbstractMetric]):
res = get_metrics_by_label(metrics, self.distance_label)
if len(res) == 0:
return False
if len(res) > 2:
raise RuntimeError("Must be two or less: %s" % self.distance_label)
dists = next(filter(lambda m: m.is_left == self.is_left, res), None).value
self.value = np.max(dists, axis=0)
return True

class GeoHausdorffDistancePSNR(SecondaryMetric, DirectionalMetric):
label = "GeoHausdorffDistancePSNR"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
res = get_metrics_by_label(metrics, "MaxSqrtDistance")
if len(res) == 0:
Expand All @@ -255,6 +361,9 @@ def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
class ColorHausdorffDistance(DirectionalMetric):
label = "ColorHausdorffDistance"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, cloud_pair: CloudPair):
diff = None
if self.is_left:
Expand All @@ -267,6 +376,9 @@ def calculate(self, cloud_pair: CloudPair):
class ColorHausdorffDistancePSNR(SecondaryMetric, DirectionalMetric):
label = "ColorHausdorffDistancePSNR"

def __init__(self, is_left: bool):
self.is_left = is_left

def calculate(self, metrics: typing.List[AbstractMetric]) -> bool:
peak = 255
res = get_metrics_by_label(metrics, "ColorHausdorffDistance")
Expand Down Expand Up @@ -351,18 +463,30 @@ def calculate_from_files(
cloud_pair = CloudPair(ocloud, pcloud)
primary_metrics = [
BoundarySqrtDistances(),
GeoMSE(is_left=True),
GeoMSE(is_left=False),
ErrorVector(is_left=True),
ErrorVector(is_left=False),
PointNormals(is_left=True),
PointNormals(is_left=False),
ColorMSE(is_left=True),
ColorMSE(is_left=False),
GeoHausdorffDistance(is_left=True),
GeoHausdorffDistance(is_left=False),
ColorHausdorffDistance(is_left=True),
ColorHausdorffDistance(is_left=False),
]
secondary_metrics = [
MinSqrtDistance(),
MaxSqrtDistance(),
EuclideanDistance(is_left=True),
EuclideanDistance(is_left=False),
EuclideanDistanceToPlane(is_left=True),
EuclideanDistanceToPlane(is_left=False),
GeoMSE(is_left=True, distance_label="EuclideanDistance"),
GeoMSE(is_left=False, distance_label="EuclideanDistance"),
GeoHausdorffDistance(is_left=True, distance_label="EuclideanDistance"),
GeoHausdorffDistance(is_left=False, distance_label="EuclideanDistance"),
GeoMSE(is_left=True, distance_label="EuclideanDistance"),
GeoMSE(is_left=False, distance_label="EuclideanDistance"),
GeoHausdorffDistance(is_left=True, distance_label="EuclideanDistance"),
GeoHausdorffDistance(is_left=False, distance_label="EuclideanDistance"),
GeoPSNR(is_left=True),
GeoPSNR(is_left=False),
ColorPSNR(is_left=True),
Expand Down

0 comments on commit 8672cee

Please sign in to comment.