From bd2555a5b3e24525d706118f74b4b6efc3c9e22a Mon Sep 17 00:00:00 2001 From: Alec Koumjian Date: Wed, 22 Nov 2023 16:39:02 -0500 Subject: [PATCH 1/6] Initialize ray from adam_core --- .gitignore | 1 + setup.cfg | 3 ++- thor/clusters.py | 17 +++-------------- thor/config.py | 3 ++- thor/main.py | 19 ++++--------------- thor/observations/filters.py | 10 +++------- thor/orbit.py | 15 ++++++++++----- thor/orbits/attribution.py | 19 ++++--------------- thor/orbits/iod.py | 17 +++-------------- thor/orbits/od.py | 21 +++------------------ thor/tests/test_main.py | 16 +++++++++++++--- 11 files changed, 48 insertions(+), 93 deletions(-) diff --git a/.gitignore b/.gitignore index 8b5f4f0b..221494b4 100644 --- a/.gitignore +++ b/.gitignore @@ -132,6 +132,7 @@ venv/ ENV/ env.bak/ venv.bak/ +.python-version # Spyder project settings .spyderproject diff --git a/setup.cfg b/setup.cfg index 2b77279c..b8d28855 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,7 +31,7 @@ setup_requires = wheel setuptools_scm >= 6.0 install_requires = - adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@main + adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@75a9cb077870c557a628101ca05e8ed57659ee97#egg=adam_core astropy >= 5.3.1 astroquery difi @@ -40,6 +40,7 @@ install_requires = numpy numba pandas + psutil pyarrow >= 14.0.0 pydantic < 2.0.0 pyyaml >= 5.1 diff --git a/thor/clusters.py b/thor/clusters.py index 8fbae0c7..b4392e95 100644 --- a/thor/clusters.py +++ b/thor/clusters.py @@ -11,6 +11,7 @@ import quivr as qv import ray from adam_core.propagator import _iterate_chunks +from adam_core.ray_cluster import initialize_use_ray from .range_and_transform import TransformedDetections @@ -507,7 +508,6 @@ def cluster_velocity( if len(clusters) == 0: return Clusters.empty(), ClusterMembers.empty() else: - cluster_ids = [] cluster_num_obs = [] cluster_members_cluster_ids = [] @@ -633,9 +633,6 @@ def cluster_and_link( Algorithm to use. Can be "dbscan" or "hotspot_2d". num_jobs : int, optional Number of jobs to launch. - parallel_backend : str, optional - Which parallelization backend to use {'ray', 'mp', 'cf'}. - Defaults to using Python's concurrent futures module ('cf'). Returns ------- @@ -691,14 +688,8 @@ def cluster_and_link( mjd0 = mjd[first][0] dt = mjd - mjd0 - if max_processes is None or max_processes > 1: - - if not ray.is_initialized(): - logger.info( - f"Ray is not initialized. Initializing with {max_processes}..." - ) - ray.init(address="auto", num_cpus=max_processes) - + use_ray = initialize_use_ray(num_cpus=max_processes) + if use_ray: # Put all arrays (which can be large) in ray's # local object store ahead of time obs_ids_ref = ray.put(obs_ids) @@ -742,7 +733,6 @@ def cluster_and_link( ) else: - for vxi_chunk, vyi_chunk in zip( _iterate_chunks(vxx, chunk_size), _iterate_chunks(vyy, chunk_size) ): @@ -776,7 +766,6 @@ def cluster_and_link( ) else: - clusters = Clusters.empty() cluster_members = ClusterMembers.empty() diff --git a/thor/config.py b/thor/config.py index 7584ded2..02afdc78 100644 --- a/thor/config.py +++ b/thor/config.py @@ -9,7 +9,8 @@ class Config(BaseModel): max_processes: Optional[int] = None - propagator: Literal["PYOORB"] = "PYOORB" + ray_memory_bytes: int = 0 + propagator: str = "PYOORB" cell_radius: float = 10 vx_min: float = -0.1 vx_max: float = 0.1 diff --git a/thor/main.py b/thor/main.py index c2feb06b..ba7fdf8e 100644 --- a/thor/main.py +++ b/thor/main.py @@ -8,6 +8,7 @@ import quivr as qv import ray from adam_core.propagator import PYOORB +from adam_core.ray_cluster import initialize_use_ray from .checkpointing import create_checkpoint_data, load_initial_checkpoint_values from .clusters import cluster_and_link @@ -25,20 +26,6 @@ logger = logging.getLogger("thor") -def initialize_use_ray(config: Config) -> bool: - use_ray = False - if config.max_processes is None or config.max_processes > 1: - # Initialize ray - if not ray.is_initialized(): - logger.info( - f"Ray is not initialized. Initializing with {config.max_processes} cpus..." - ) - ray.init(num_cpus=config.max_processes) - - use_ray = True - return use_ray - - def initialize_test_orbit( test_orbit: TestOrbits, working_dir: Optional[str] = None, @@ -132,7 +119,9 @@ def link_test_orbit( else: raise ValueError(f"Unknown propagator: {config.propagator}") - use_ray = initialize_use_ray(config) + use_ray = initialize_use_ray( + num_cpus=config.max_processes, object_store_bytes=config.ray_memory_bytes + ) refs_to_free = [] if ( diff --git a/thor/observations/filters.py b/thor/observations/filters.py index 6740ab4d..2d155542 100644 --- a/thor/observations/filters.py +++ b/thor/observations/filters.py @@ -7,6 +7,7 @@ import quivr as qv import ray from adam_core.coordinates import SphericalCoordinates +from adam_core.ray_cluster import initialize_use_ray from thor.config import Config from thor.observations.observations import Observations @@ -160,13 +161,8 @@ def apply( ephemeris = test_orbit.generate_ephemeris_from_observations(observations) filtered_observations_list = [] - if max_processes is None or max_processes > 1: - if not ray.is_initialized(): - logger.info( - f"Ray is not initialized. Initializing with {max_processes}..." - ) - ray.init(num_cpus=max_processes) - + use_ray = initialize_use_ray(num_cpus=max_processes) + if use_ray: refs_to_free = [] if observations_ref is None: observations_ref = ray.put(observations) diff --git a/thor/orbit.py b/thor/orbit.py index 126811c9..4002efbe 100644 --- a/thor/orbit.py +++ b/thor/orbit.py @@ -38,7 +38,6 @@ class RangedPointSourceDetections(qv.Table): - id = qv.StringColumn() exposure_id = qv.StringColumn() coordinates = SphericalCoordinates.as_column() @@ -46,7 +45,6 @@ class RangedPointSourceDetections(qv.Table): class TestOrbitEphemeris(qv.Table): - id = qv.Int64Column() ephemeris = Ephemeris.as_column() observer = Observers.as_column() @@ -97,7 +95,6 @@ def range_observations_worker( class TestOrbits(qv.Table): - orbit_id = qv.StringColumn(default=lambda: uuid.uuid4().hex) object_id = qv.StringColumn(nullable=True) bundle_id = qv.Int64Column(nullable=True) @@ -199,7 +196,11 @@ def propagate( The test orbit propagated to the given times. """ return propagator.propagate_orbits( - self.to_orbits(), times, max_processes=max_processes, chunk_size=1 + self.to_orbits(), + times, + max_processes=max_processes, + chunk_size=1, + parallel_backend="ray", ) def generate_ephemeris( @@ -226,7 +227,11 @@ def generate_ephemeris( The ephemeris of the test orbit at the given observers. """ return propagator.generate_ephemeris( - self.to_orbits(), observers, max_processes=max_processes, chunk_size=1 + self.to_orbits(), + observers, + max_processes=max_processes, + chunk_size=1, + parallel_backend="ray", ) def generate_ephemeris_from_observations( diff --git a/thor/orbits/attribution.py b/thor/orbits/attribution.py index b6083b03..15289d78 100644 --- a/thor/orbits/attribution.py +++ b/thor/orbits/attribution.py @@ -12,6 +12,7 @@ from adam_core.orbits import Orbits from adam_core.propagator import PYOORB from adam_core.propagator.utils import _iterate_chunks +from adam_core.ray_cluster import initialize_use_ray from sklearn.neighbors import BallTree from ..observations.observations import Observations @@ -165,7 +166,6 @@ def attribution_worker( radius_rad = np.radians(radius) residuals = [] for _, ephemeris_i, observations_i in linkage.iterate(): - # Extract the observation IDs and times obs_ids = observations_i.id.to_numpy(zero_copy_only=False) obs_times = observations_i.coordinates.time.mjd().to_numpy(zero_copy_only=False) @@ -279,12 +279,8 @@ def attribute_observations( observation_indices = np.arange(0, len(observations)) attributions_list = [] - if max_processes is None or max_processes > 1: - - if not ray.is_initialized(): - logger.info(f"Ray is not initialized. Initializing with {max_processes}...") - ray.init(address="auto", max_processes=max_processes) - + use_ray = initialize_use_ray(num_cpus=max_processes) + if use_ray: refs_to_free = [] if orbits_ref is None: orbits_ref = ray.put(orbits) @@ -421,14 +417,8 @@ def merge_and_extend_orbits( odp_orbits_list = [] odp_orbit_members_list = [] if len(orbits_iter) > 0 and len(observations_iter) > 0: - + use_ray = initialize_use_ray(num_cpus=max_processes) if use_ray: - if not ray.is_initialized(): - logger.info( - f"Ray is not initialized. Initializing with {max_processes}..." - ) - ray.init(address="auto", max_processes=max_processes) - refs_to_free = [] if observations_ref is None: observations_ref = ray.put(observations) @@ -437,7 +427,6 @@ def merge_and_extend_orbits( converged = False while not converged: - if use_ray: # Orbits will change with differential correction so we need to add them # to the object store at the start of each iteration (we cannot simply diff --git a/thor/orbits/iod.py b/thor/orbits/iod.py index d7ac70d0..b18b568d 100644 --- a/thor/orbits/iod.py +++ b/thor/orbits/iod.py @@ -12,6 +12,7 @@ from adam_core.coordinates.residuals import Residuals from adam_core.propagator import PYOORB, Propagator from adam_core.propagator.utils import _iterate_chunks +from adam_core.ray_cluster import initialize_use_ray from ..clusters import ClusterMembers from ..observations.observations import Observations @@ -126,13 +127,11 @@ def iod_worker( propagator: Type[Propagator] = PYOORB, propagator_kwargs: dict = {}, ) -> Tuple[FittedOrbits, FittedOrbitMembers]: - prop = propagator(**propagator_kwargs) iod_orbits_list = [] iod_orbit_members_list = [] for linkage_id in linkage_ids: - time_start = time.time() logger.debug(f"Finding initial orbit for linkage {linkage_id}...") @@ -379,7 +378,6 @@ def iod( # belonging to one object yield a good initial orbit but the presence of outlier # observations is skewing the sum total of the residuals and chi2 elif num_outliers > 0: - logger.debug("Attempting to identify possible outliers.") for o in range(num_outliers): # Select i highest observations that contribute to @@ -424,11 +422,9 @@ def iod( j += 1 if not converged or not processable: - return FittedOrbits.empty(), FittedOrbitMembers.empty() else: - orbit = FittedOrbits.from_kwargs( orbit_id=orbit_sol.orbit_id, object_id=orbit_sol.object_id, @@ -574,18 +570,11 @@ def initial_orbit_determination( iod_orbits_list = [] iod_orbit_members_list = [] if len(observations) > 0 and len(linkage_members) > 0: - # Extract linkage IDs linkage_ids = linkage_members.column(linkage_id_col).unique() - if max_processes is None or max_processes > 1: - - if not ray.is_initialized(): - logger.info( - f"Ray is not initialized. Initializing with {max_processes}..." - ) - ray.init(address="auto", num_cpus=max_processes) - + use_ray = initialize_use_ray(num_cpus=max_processes) + if use_ray: refs_to_free = [] if linkage_members_ref is None: linkage_members_ref = ray.put(linkage_members) diff --git a/thor/orbits/od.py b/thor/orbits/od.py index 25ab50c8..d30fbaf7 100644 --- a/thor/orbits/od.py +++ b/thor/orbits/od.py @@ -11,6 +11,7 @@ from adam_core.coordinates.residuals import Residuals from adam_core.orbits import Orbits from adam_core.propagator import PYOORB, _iterate_chunks +from adam_core.ray_cluster import initialize_use_ray from scipy.linalg import solve from ..observations.observations import Observations @@ -37,12 +38,10 @@ def od_worker( propagator: Literal["PYOORB"] = "PYOORB", propagator_kwargs: dict = {}, ) -> Tuple[FittedOrbits, FittedOrbitMembers]: - od_orbits_list = [] od_orbit_members_list = [] for orbit_id in orbit_ids: - time_start = time.time() logger.debug(f"Differentially correcting orbit {orbit_id}...") @@ -96,7 +95,6 @@ def od( propagator: Literal["PYOORB"] = "PYOORB", propagator_kwargs: dict = {}, ) -> Tuple[FittedOrbits, FittedOrbitMembers]: - if propagator == "PYOORB": prop = PYOORB(**propagator_kwargs) else: @@ -229,7 +227,6 @@ def od( # Modify each component of the state by a small delta d = np.zeros((1, 7)) for i in range(num_params): - # zero the delta vector d *= 0.0 @@ -272,7 +269,6 @@ def od( delta_denom = d[0, i] if method == "central": - # Modify component i of the orbit by a small delta cartesian_elements_n = orbit_prev.coordinates.values - d[0, :6] orbit_iter_n = Orbits.from_kwargs( @@ -426,7 +422,6 @@ def od( if ( (rchi2_iter < rchi2_prev) or first_solution ) and arc_length >= min_arc_length: - if first_solution: logger.debug( "Storing first successful differential correction iteration for these observations." @@ -454,7 +449,6 @@ def od( and iterations > max_iter_i and not solution_found ): - logger.debug("Attempting to identify possible outliers.") # Previous fits have failed, lets reset the current best fit orbit back to its original # state and re-run fitting, this time removing outliers @@ -503,12 +497,10 @@ def od( logger.debug("First solution: {}".format(first_solution)) if not solution_found or not processable or first_solution: - od_orbit = FittedOrbits.empty() od_orbit_members = FittedOrbitMembers.empty() else: - obs_times = observations.coordinates.time.mjd().to_numpy()[ids_mask] arc_length_ = obs_times.max() - obs_times.min() assert arc_length == arc_length_ @@ -615,19 +607,13 @@ def differential_correction( observations_ref = None if len(orbits) > 0 and len(orbit_members) > 0: - orbit_ids = orbits.orbit_id.to_numpy(zero_copy_only=False) od_orbits_list = [] od_orbit_members_list = [] - if max_processes is None or max_processes > 1: - - if not ray.is_initialized(): - logger.info( - f"Ray is not initialized. Initializing with {max_processes}..." - ) - ray.init(address="auto", num_cpus=max_processes) + use_ray = initialize_use_ray(num_cpus=max_processes) + if use_ray: refs_to_free = [] if orbits_ref is None: orbits_ref = ray.put(orbits) @@ -678,7 +664,6 @@ def differential_correction( ) else: - for orbit_ids_chunk in _iterate_chunks(orbit_ids, chunk_size): od_orbits_chunk, od_orbit_members_chunk = od_worker( orbit_ids_chunk, diff --git a/thor/tests/test_main.py b/thor/tests/test_main.py index 9f6ec586..0e1d4439 100644 --- a/thor/tests/test_main.py +++ b/thor/tests/test_main.py @@ -84,7 +84,7 @@ def integration_config(): def ray_cluster(): import ray - ray_initialized = initialize_use_ray(Config()) + ray_initialized = initialize_use_ray() yield if ray_initialized: ray.shutdown() @@ -232,7 +232,12 @@ def test_link_test_orbit( else: integration_config.max_processes = 1 - (test_orbit, observations, obs_ids_expected, integration_config,) = setup_test_data( + ( + test_orbit, + observations, + obs_ids_expected, + integration_config, + ) = setup_test_data( object_id, orbits, observations, integration_config, max_arc_length=14 ) @@ -259,7 +264,12 @@ def test_benchmark_link_test_orbit( else: integration_config.max_processes = 1 - (test_orbit, observations, obs_ids_expected, integration_config,) = setup_test_data( + ( + test_orbit, + observations, + obs_ids_expected, + integration_config, + ) = setup_test_data( object_id, orbits, observations, integration_config, max_arc_length=14 ) From 03d75ea06bc3986c2d31859067dec2f0a0890c5f Mon Sep 17 00:00:00 2001 From: Alec Koumjian Date: Thu, 23 Nov 2023 19:28:08 -0500 Subject: [PATCH 2/6] update adam_core --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index b8d28855..0917d27b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,7 +31,7 @@ setup_requires = wheel setuptools_scm >= 6.0 install_requires = - adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@75a9cb077870c557a628101ca05e8ed57659ee97#egg=adam_core + adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@8e214881a8ef8383fcc53acc2a763836bcc7b13a#egg=adam_core astropy >= 5.3.1 astroquery difi From 2e0cef797e68ba9fa5ccbf67b6e1617b094da54b Mon Sep 17 00:00:00 2001 From: Alec Koumjian Date: Fri, 24 Nov 2023 09:50:29 -0500 Subject: [PATCH 3/6] change chunk size, having memory usage issues --- thor/orbit_selection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thor/orbit_selection.py b/thor/orbit_selection.py index da1dc133..c14b0099 100644 --- a/thor/orbit_selection.py +++ b/thor/orbit_selection.py @@ -232,7 +232,7 @@ def generate_test_orbits( start_time, max_processes=max_processes, parallel_backend="ray", - chunk_size=1000, + chunk_size=500, ) propagation_end_time = time.perf_counter() logger.info( From 7dffcfa6e71402d7b5f1582a20090f5780713067 Mon Sep 17 00:00:00 2001 From: Alec Koumjian Date: Mon, 27 Nov 2023 09:59:06 -0500 Subject: [PATCH 4/6] point back to main --- setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 0917d27b..802372bb 100644 --- a/setup.cfg +++ b/setup.cfg @@ -31,7 +31,7 @@ setup_requires = wheel setuptools_scm >= 6.0 install_requires = - adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@8e214881a8ef8383fcc53acc2a763836bcc7b13a#egg=adam_core + adam-core @ git+https://github.com/B612-Asteroid-Institute/adam_core@main#egg=adam_core astropy >= 5.3.1 astroquery difi From 8127c3de1383761380ca0f642ae608107ef40d2d Mon Sep 17 00:00:00 2001 From: Alec Koumjian Date: Mon, 27 Nov 2023 13:38:38 -0500 Subject: [PATCH 5/6] linting --- .pre-commit-config.yaml | 36 ++++++++++++++++++------------------ thor/config.py | 2 +- thor/tests/test_main.py | 14 ++------------ 3 files changed, 21 insertions(+), 31 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index a66e368d..5a222b25 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,32 +1,32 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks default_language_version: - python: python3.10 + python: python3.11 repos: -- repo: https://github.com/pre-commit/pre-commit-hooks + - repo: https://github.com/pre-commit/pre-commit-hooks rev: v3.2.0 hooks: - - id: trailing-whitespace - args: [--markdown-linebreak-ext=md] - - id: end-of-file-fixer - - id: check-yaml - - id: check-added-large-files -- repo: https://github.com/PyCQA/isort + - id: trailing-whitespace + args: [--markdown-linebreak-ext=md] + - id: end-of-file-fixer + - id: check-yaml + - id: check-added-large-files + - repo: https://github.com/PyCQA/isort rev: 5.12.0 hooks: - - id: isort - additional_dependencies: - - toml -- repo: https://github.com/psf/black + - id: isort + additional_dependencies: + - toml + - repo: https://github.com/psf/black rev: 22.10.0 hooks: - - id: black -- repo: https://github.com/pre-commit/mirrors-mypy + - id: black + - repo: https://github.com/pre-commit/mirrors-mypy rev: v1.1.1 hooks: - - id: mypy + - id: mypy exclude: bench/ additional_dependencies: - - 'types-pyyaml' - - 'types-requests' - - 'types-python-dateutil' + - "types-pyyaml" + - "types-requests" + - "types-python-dateutil" diff --git a/thor/config.py b/thor/config.py index 02afdc78..f6e53fdc 100644 --- a/thor/config.py +++ b/thor/config.py @@ -10,7 +10,7 @@ class Config(BaseModel): max_processes: Optional[int] = None ray_memory_bytes: int = 0 - propagator: str = "PYOORB" + propagator: Literal["PYOORB"] = "PYOORB" cell_radius: float = 10 vx_min: float = -0.1 vx_max: float = 0.1 diff --git a/thor/tests/test_main.py b/thor/tests/test_main.py index 0e1d4439..fb7e31cc 100644 --- a/thor/tests/test_main.py +++ b/thor/tests/test_main.py @@ -232,12 +232,7 @@ def test_link_test_orbit( else: integration_config.max_processes = 1 - ( - test_orbit, - observations, - obs_ids_expected, - integration_config, - ) = setup_test_data( + (test_orbit, observations, obs_ids_expected, integration_config,) = setup_test_data( object_id, orbits, observations, integration_config, max_arc_length=14 ) @@ -264,12 +259,7 @@ def test_benchmark_link_test_orbit( else: integration_config.max_processes = 1 - ( - test_orbit, - observations, - obs_ids_expected, - integration_config, - ) = setup_test_data( + (test_orbit, observations, obs_ids_expected, integration_config,) = setup_test_data( object_id, orbits, observations, integration_config, max_arc_length=14 ) From 666a75628fb1036a5fb5b35e612c56bbe320325a Mon Sep 17 00:00:00 2001 From: Alec Koumjian Date: Mon, 27 Nov 2023 13:45:52 -0500 Subject: [PATCH 6/6] remove default language version for precommit --- .pre-commit-config.yaml | 2 -- 1 file changed, 2 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 5a222b25..dcaaaf7a 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,7 +1,5 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks -default_language_version: - python: python3.11 repos: - repo: https://github.com/pre-commit/pre-commit-hooks rev: v3.2.0