Skip to content

Commit

Permalink
Now accepts different orbit epochs and origins
Browse files Browse the repository at this point in the history
  • Loading branch information
akoumjian committed Aug 7, 2024
1 parent d7519eb commit 741c1e3
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 23 deletions.
95 changes: 94 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,105 @@

**Table of Contents**


- [Installation](#installation)
- [License](#license)
- [Usage](#usage)
- [Propagating Orbits](#propagating-orbits)
- [Generating Ephemerides](#generating-ephemerides)


## Overview
`adam-assist` is a pluggable propagator class for the `adam-core` package that uses [ASSIST](https://github.com/matthewholman/assist) for propagating orbits.


## Installation

```console
pip install adam-assist
```

You will need to download the JPL ephemeris files. It is recommended to use the built in utility for this.

```python
from adam_core.propagator.adam_assist import download_jpl_ephemeris_files

download_jpl_ephemeris_files()
```

## Usage

### Propagating Orbits

Here we initialize a set of `adam_core.orbit.Orbit` objects from the JPL Small Bodies Database and propagate them using the `AdamAssistPropagator` class. You can manually initialize the orbits as well.

```python
from adam_core.orbits.query.sbdb import query_sbdb
from adam_core.time import Timestamp
from adam_core.propagator.adam_assist import ASSISTPropagator

# Query the JPL Small Bodies Database for a set of orbits
sbdb_orbits = query_sbdb(["2020 AV2", "A919 FB", "1993 SB"])
times = Timestamp.from_mjd([60000, 60365, 60730], scale="tdb")


propagator = ASSISTPropagator()

propagated = propagator.propagate_orbits(sbdb_orbits, times)
```

Of course you can define your own orbits as well.

```python
import pyarrow as pa
from adam_core.orbits import Orbit
from adam_core.coordinates import CartesianCoordinates, Origin
from adam_core.time import Timestamp
from adam_core.propagator.adam_assist import ASSISTPropagator

# Define an orbit
orbits = Orbit.from_kwargs(
orbit_id=["1", "2", "3"],
coordinates=CartesianCoordinates.from_kwargs(
# use realistic cartesian coords in AU and AU/day
x=[-1.0, 0.0, 1.0],
y=[-1.0, 0.0, 1.0],
z=[-1.0, 0.0, 1.0],
vx=[-0.1, 0.0, 0.1],
vy=[-0.1, 0.0, 0.1],
vz=[-0.1, 0.0, 0.1],
time=Timestamp.from_mjd([60000, 60365, 60730], scale="tdb"),
origin=Origin.from_kwargs(code=pa.repeat("SUN", 3)),
frame="eliptic"
),
)

propagator = ASSISTPropagator()

propagated = propagator.propagate_orbits(orbits)
```

### Generating Ephemerides

The `ASSISTPropagator` class uses the `adam-core` default ephemeris generator to generate ephemerides from the `ASSIST` propagated orbits. The default ephemeris generator accounts for light travel time and aberration. See `adam_core.propagator.propagator.EphemerisMixin` for implementation details.


```python
from adam_core.orbits.query.sbdb import query_sbdb
from adam_core.time import Timestamp
from adam_core.observers import Observers
from adam_core.propagator.adam_assist import ASSISTPropagator

# Query the JPL Small Bodies Database for a set of orbits
sbdb_orbits = query_sbdb(["2020 AV2", "A919 FB", "1993 SB"])
times = Timestamp.from_mjd([60000, 60365, 60730], scale="utc")
observers = Observers.from_code("399", times)
propagator = ASSISTPropagator()

ephemerides = propagator.generate_ephemeris(sbdb_orbits, observers)
```






1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ dev = [
"mypy",
"pytest",
"pytest-cov",
"pytest-mock",
"pytest-benchmark",
"black",
"isort",
Expand Down
76 changes: 54 additions & 22 deletions src/adam_core/propagator/adam_assist.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,29 +90,72 @@ def hash_orbit_ids_to_uint32(

class ASSISTPropagator(Propagator, ImpactMixin): # type: ignore


def _propagate_orbits(self, orbits: OrbitType, times: TimestampType) -> OrbitType:
# Assert that the time for each orbit definition is the same for the simulator to work
assert len(pc.unique(orbits.coordinates.time.mjd())) == 1
"""
Propagate the orbits to the specified times.
"""

# Record the original origin and frame to use for the final results
original_frame = orbits.coordinates.frame

# The coordinate frame is the equatorial International Celestial Reference Frame (ICRF).
# This is also the native coordinate system for the JPL binary files.
# For units we use solar masses, astronomical units, and days.
# The time coordinate is Barycentric Dynamical Time (TDB) in Julian days.

# Record the original origin and frame to use for the final results
original_origin = orbits.coordinates.origin
original_frame = orbits.coordinates.frame

# Convert coordinates to ICRF using TDB time
coords = transform_coordinates(
transformed_coords = transform_coordinates(
orbits.coordinates,
origin_out=OriginCodes.SOLAR_SYSTEM_BARYCENTER,
frame_out="equatorial",
)
input_orbit_times = coords.time.rescale("tdb")
coords = coords.set_column("time", input_orbit_times)
orbits = orbits.set_column("coordinates", coords)
transformed_input_orbit_times = transformed_coords.time.rescale("tdb")
transformed_coords = transformed_coords.set_column("time", transformed_input_orbit_times)
transformed_orbits = orbits.set_column("coordinates", transformed_coords)

# Group orbits by unique time, then propagate them
results = None
unique_times = transformed_orbits.coordinates.time.unique()
for epoch in unique_times:
mask = transformed_orbits.coordinates.time.equals(epoch)
epoch_orbits = transformed_orbits.apply_mask(mask)
propagated_orbits = self._propagate_orbits_inner(epoch_orbits, times)
if results is None:
results = propagated_orbits
else:
results = concatenate([results, propagated_orbits])

# Sanity check that the results are of the correct type
assert isinstance(results, OrbitType)

# Return the results with the original origin and frame
# Preserve the original output origin for the input orbits
# by orbit id
final_results = None
unique_origins = pc.unique(orbits.coordinates.origin.code)
for origin_code in unique_origins:
origin_orbits = orbits.select("coordinates.origin.code", origin_code)
result_origin_orbits = results.where(pc.field("orbit_id").isin(origin_orbits.orbit_id))
partial_results = result_origin_orbits.set_column(
"coordinates",
transform_coordinates(
result_origin_orbits.coordinates,
origin_out=OriginCodes[origin_code.as_py()],
frame_out=original_frame,
),
)
if final_results is None:
final_results = partial_results
else:
final_results = concatenate([final_results, partial_results])

return final_results


def _propagate_orbits_inner(self, orbits: OrbitType, times: TimestampType) -> OrbitType:
"""
Propagates one or more orbits with the same epoch to the specified times.
"""
root_dir = pathlib.Path(DATA_DIR).expanduser()
ephem = assist.Ephem(
planets_path=str(root_dir.joinpath("linux_p1550p2650.440")),
Expand Down Expand Up @@ -244,17 +287,6 @@ def _propagate_orbits(self, orbits: OrbitType, times: TimestampType) -> OrbitTyp
else:
results = concatenate([results, time_step_results])

assert isinstance(results, OrbitType)

results = results.set_column(
"coordinates",
transform_coordinates(
results.coordinates,
origin_out=original_origin.as_OriginCodes(),
frame_out=original_frame,
),
)

return results

def _detect_impacts(
Expand Down
66 changes: 66 additions & 0 deletions tests/test_propagate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
import pyarrow as pa
import pyarrow.compute as pc
import pytest
from adam_core.coordinates import CartesianCoordinates, Origin
from adam_core.coordinates.residuals import Residuals
from adam_core.orbits import Orbits
from adam_core.orbits.query.horizons import query_horizons
from adam_core.orbits.query.sbdb import query_sbdb
from adam_core.time import Timestamp
Expand Down Expand Up @@ -181,3 +183,67 @@ def test_propagate():

np.testing.assert_array_less(absolute_position, pos_tol, f"Failed position for {object_id}")
np.testing.assert_array_less(absolute_velocity, vel_tol, f"Failed velocity for {object_id}")


def test_propagate_different_origins():
"""
Test that we are returning propagated orbits with their original origins
"""
download_jpl_ephemeris_files()
prop = ASSISTPropagator()
orbits = Orbits.from_kwargs(
orbit_id=["1", "2"],
object_id=["1", "2"],
coordinates=CartesianCoordinates.from_kwargs(
x=[1, 1],
y=[1, 1],
z=[1, 1],
vx=[1, 1],
vy=[1, 1],
vz=[1, 1],
time=Timestamp.from_mjd([60000, 60000], scale="tdb"),
frame="ecliptic",
origin=Origin.from_kwargs(code=["SOLAR_SYSTEM_BARYCENTER", "EARTH_MOON_BARYCENTER"]),
),
)

propagated_orbits = prop.propagate_orbits(orbits, Timestamp.from_mjd([60001, 60002, 60003], scale="tdb"))
orbit_one_results = propagated_orbits.select("orbit_id", "1")
orbit_two_results = propagated_orbits.select("orbit_id", "2")
# Assert that the origin codes for each set of results is unique
# and that it matches the original input
assert len(orbit_one_results.coordinates.origin.code.unique()) == 1
assert orbit_one_results.coordinates.origin.code.unique()[0].as_py() == "SOLAR_SYSTEM_BARYCENTER"
assert len(orbit_two_results.coordinates.origin.code.unique()) == 1
assert orbit_two_results.coordinates.origin.code.unique()[0].as_py() == "EARTH_MOON_BARYCENTER"


def test_propagate_different_input_times(mocker):
"""
Ensure that we can pass in vectors with different epochs
"""
download_jpl_ephemeris_files()
prop = ASSISTPropagator()
watched_propagate_orbits_inner = mocker.spy(prop, "_propagate_orbits_inner")
orbits = Orbits.from_kwargs(
orbit_id=["1", "2", "3", "4"],
object_id=["1", "2", "3", "4"],
coordinates=CartesianCoordinates.from_kwargs(
x=[1, 1, 1, 1],
y=[1, 1, 1, 1],
z=[1, 1, 1, 1],
vx=[1, 1, 1, 1],
vy=[1, 1, 1, 1],
vz=[1, 1, 1, 1],
time=Timestamp.from_mjd([60000, 60000, 60000, 60001], scale="tdb"),
frame="ecliptic",
origin=Origin.from_kwargs(code=pa.repeat("SOLAR_SYSTEM_BARYCENTER", 4)),
),
)

propagated_orbits = prop.propagate_orbits(orbits, Timestamp.from_mjd([60005, 60006], scale="tdb"))

assert watched_propagate_orbits_inner.call_count == 2, "Inner function should be called once for each unique input epoch"

assert len(propagated_orbits.coordinates.time.unique()) == 2
assert len(propagated_orbits) == 8, "Should have input orbits x epochs number of results"

0 comments on commit 741c1e3

Please sign in to comment.