Skip to content

Commit

Permalink
Refactor kinematics test and functions
Browse files Browse the repository at this point in the history
  • Loading branch information
lochhh committed Jan 24, 2024
1 parent a9f7ebb commit aca1a42
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 47 deletions.
107 changes: 77 additions & 30 deletions movement/analysis/kinematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,70 @@
import xarray as xr


def compute_velocity(
data: xr.DataArray, method: str = "euclidean"
) -> np.ndarray:
"""Compute the instantaneous velocity of a single keypoint from
def displacement(data: xr.DataArray) -> np.ndarray:
"""Compute the displacement between consecutive locations
of a single keypoint from a single individual.
Parameters
----------
data : xarray.DataArray
The input data, assumed to be of shape (..., 2), where the last
dimension contains the x and y coordinates.
Returns
-------
numpy.ndarray
A numpy array containing the computed magnitude and
direction of the displacement.
"""
displacement_vector = np.diff(data, axis=0, prepend=data[0:1])
magnitude = np.linalg.norm(displacement_vector, axis=1)
direction = np.arctan2(
displacement_vector[..., 1], displacement_vector[..., 0]
)
return np.stack((magnitude, direction), axis=1)


def distance(data: xr.DataArray) -> np.ndarray:
"""Compute the distances between consecutive locations of
a single keypoint from a single individual.
Parameters
----------
data : xarray.DataArray
The input data, assumed to be of shape (..., 2), where the last
dimension contains the x and y coordinates.
Returns
-------
numpy.ndarray
A numpy array containing the computed distance.
"""
return displacement(data)[:, 0]


def velocity(data: xr.DataArray) -> np.ndarray:
"""Compute the velocity of a single keypoint from
a single individual.
Parameters
----------
data : xarray.DataArray
The input data, assumed to be of shape (..., 2), where the last
dimension contains the x and y coordinates.
method : str
The method to use for computing velocity. Can be "euclidean" or
"numerical".
Returns
-------
numpy.ndarray
A numpy array containing the computed velocity.
"""
if method == "euclidean":
return compute_euclidean_velocity(data)
return approximate_derivative(data, order=1)


def compute_euclidean_velocity(data: xr.DataArray) -> np.ndarray:
def speed(data: xr.DataArray) -> np.ndarray:
"""Compute velocity based on the Euclidean norm (magnitude) of the
differences between consecutive points, i.e. the straight-line
distance travelled.
distance travelled, assuming equidistant time spacing.
Parameters
----------
Expand All @@ -43,46 +78,58 @@ def compute_euclidean_velocity(data: xr.DataArray) -> np.ndarray:
numpy.ndarray
A numpy array containing the computed velocity.
"""
time_diff = data["time"].diff(dim="time")
space_diff = np.linalg.norm(np.diff(data.values, axis=0), axis=1)
velocity = space_diff / time_diff
# Pad with zero to match the original shape of the data
velocity = np.concatenate([np.zeros((1,) + velocity.shape[1:]), velocity])
return velocity
return velocity(data)[:, 0]


def acceleration(data: xr.DataArray) -> np.ndarray:
"""Compute the acceleration of a single keypoint from
a single individual.
Parameters
----------
data : xarray.DataArray
The input data, assumed to be of shape (..., 2), where the last
dimension contains the x and y coordinates.
Returns
-------
numpy.ndarray
A numpy array containing the computed acceleration.
"""
return approximate_derivative(data, order=2)


def approximate_derivative(data: xr.DataArray, order: int = 0) -> np.ndarray:
"""Compute displacement, velocity, or acceleration using numerical
differentiation, assuming equidistant time spacing.
def approximate_derivative(data: xr.DataArray, order: int = 1) -> np.ndarray:
"""Compute velocity or acceleration using numerical differentiation,
assuming equidistant time spacing.
Parameters
----------
data : xarray.DataArray
The input data, assumed to be of shape (..., 2), where the last
dimension contains the x and y coordinates.
order : int
The order of the derivative. 0 for displacement, 1 for velocity, 2 for
acceleration.
The order of the derivative. 1 for velocity, 2 for
acceleration. Default is 1.
Returns
-------
numpy.ndarray
A numpy array containing the computed kinematic variable.
A numpy array containing the computed magnitudes and directions of
the kinematic variable.
"""
if order == 0: # Compute displacement
result = np.diff(data, axis=0)
# Pad with zeros to match the original shape of the data
result = np.concatenate([np.zeros((1,) + result.shape[1:]), result])
if order <= 0:
raise ValueError("order must be a positive integer.")
else:
result = data
dt = data["time"].diff(dim="time").values[0]
for _ in range(order):
result = np.gradient(result, dt, axis=0)
# Pad with zeros to match the output of compute_euclidean_velocity
# Prepend with zeros to match match output to the input shape
result = np.pad(result[1:], ((1, 0), (0, 0)), "constant")
magnitude = np.linalg.norm(result, axis=-1)
# direction = np.arctan2(result[..., 1], result[..., 0])
return magnitude
direction = np.arctan2(result[..., 1], result[..., 0])
return np.stack((magnitude, direction), axis=1)


# Locomotion Features
Expand Down
55 changes: 38 additions & 17 deletions tests/test_unit/test_kinematics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,52 @@
class TestKinematics:
"""Test suite for the kinematics module."""

def test_compute_displacement(self, valid_pose_dataset):
"""Test the `approximate_derivative` function for
calculating displacement."""
def test_distance(self, valid_pose_dataset):
"""Test distance calculation."""
# Select a single keypoint from a single individual
data = valid_pose_dataset.pose_tracks.isel(keypoints=0, individuals=0)
# Compute displacement
displacement = kinematics.approximate_derivative(data)
result = kinematics.distance(data)
expected = np.pad([5.0] * 9, (1, 0), "constant")
assert np.allclose(displacement, expected)
assert np.allclose(result, expected)

@pytest.mark.parametrize("method", ["euclidean", "numerical"])
def test_compute_velocity(self, valid_pose_dataset, method):
"""Test the `compute_velocity` function."""
def test_displacement(self, valid_pose_dataset):
"""Test displacement calculation."""
# Select a single keypoint from a single individual
data = valid_pose_dataset.pose_tracks.isel(keypoints=0, individuals=0)
result = kinematics.displacement(data)
expected_magnitude = np.pad([5.0] * 9, (1, 0), "constant")
expected_direction = np.concatenate(([0], np.full(9, 0.92729522)))
expected = np.stack((expected_magnitude, expected_direction), axis=1)
assert np.allclose(result, expected)

def test_velocity(self, valid_pose_dataset):
"""Test velocity calculation."""
# Select a single keypoint from a single individual
data = valid_pose_dataset.pose_tracks.isel(keypoints=0, individuals=0)
# Compute velocity
velocity = kinematics.compute_velocity(data, method=method)
result = kinematics.velocity(data)
expected_magnitude = np.pad([5.0] * 9, (1, 0), "constant")
expected_direction = np.concatenate(([0], np.full(9, 0.92729522)))
expected = np.stack((expected_magnitude, expected_direction), axis=1)
assert np.allclose(result, expected)

def test_speed(self, valid_pose_dataset):
"""Test velocity calculation."""
# Select a single keypoint from a single individual
data = valid_pose_dataset.pose_tracks.isel(keypoints=0, individuals=0)
result = kinematics.speed(data)
expected = np.pad([5.0] * 9, (1, 0), "constant")
assert np.allclose(velocity, expected)
assert np.allclose(result, expected)

def test_compute_acceleration(self, valid_pose_dataset):
"""Test the `approximate_derivative` function for
calculating acceleration."""
def test_acceleration(self, valid_pose_dataset):
"""Test acceleration calculation."""
# Select a single keypoint from a single individual
data = valid_pose_dataset.pose_tracks.isel(keypoints=0, individuals=0)
# Compute acceleration
acceleration = kinematics.approximate_derivative(data, order=2)
assert np.allclose(acceleration, np.zeros(10))
result = kinematics.acceleration(data)
assert np.allclose(result, np.zeros((10, 2)))

def test_approximate_derivative_with_nonpositive_order(self):
"""Test that an error is raised when the order is non-positive."""
data = np.arange(10)
with pytest.raises(ValueError):
kinematics.approximate_derivative(data, order=0)

0 comments on commit aca1a42

Please sign in to comment.