From d2f10743a6baa9441b49a29657700cc607cc1813 Mon Sep 17 00:00:00 2001 From: Joachim Moeyens Date: Wed, 29 Nov 2023 19:07:07 -0800 Subject: [PATCH] Loop over orbits first then collect results before running next chunk of observations in attribute_observations --- thor/orbits/attribution.py | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/thor/orbits/attribution.py b/thor/orbits/attribution.py index 83d1fc4c..522baf9b 100644 --- a/thor/orbits/attribution.py +++ b/thor/orbits/attribution.py @@ -24,7 +24,7 @@ __all__ = ["Attributions", "attribute_observations", "merge_and_extend_orbits"] -LATLOT_INDEX = np.array([2, 1]) +LATLON_INDEX = np.array([2, 1]) class Attributions(qv.Table): @@ -182,8 +182,8 @@ def attribution_worker( coords_predicted = ephemeris_i.coordinates # Haversine metric requires latitude first then longitude... - coords_latlon = np.radians(coords.values[:, LATLOT_INDEX]) - coords_predicted_latlon = np.radians(coords_predicted.values[:, LATLOT_INDEX]) + coords_latlon = np.radians(coords.values[:, LATLON_INDEX]) + coords_predicted_latlon = np.radians(coords_predicted.values[:, LATLON_INDEX]) num_obs = len(coords_predicted) k = np.minimum(3, num_obs) @@ -296,11 +296,15 @@ def attribute_observations( refs_to_free.append(observations_ref) logger.info("Placed observations in the object store.") - futures = [] - for orbit_id_chunk in _iterate_chunks(orbit_ids, orbits_chunk_size): - for observations_indices_chunk in _iterate_chunks( - observation_indices, observations_chunk_size - ): + # For each chunk of observations run attribution with all orbits. + # We wait for each chunk of orbits to finish before starting the next + # chunk of observations to reduce the memory pressure. If not, the number + # of expected futures will be large (num_orbits / orbit_chunk_size * num_observation_chunks) + for observations_indices_chunk in _iterate_chunks( + observation_indices, observations_chunk_size + ): + futures = [] + for orbit_id_chunk in _iterate_chunks(orbit_ids, orbits_chunk_size): futures.append( attribution_worker_remote.remote( orbit_id_chunk, @@ -313,9 +317,9 @@ def attribute_observations( ) ) - while futures: - finished, futures = ray.wait(futures, num_returns=1) - attributions_list.append(ray.get(finished[0])) + while futures: + finished, futures = ray.wait(futures, num_returns=1) + attributions_list.append(ray.get(finished[0])) if len(refs_to_free) > 0: ray.internal.free(refs_to_free)