@@ -143,16 +143,8 @@ def load_initial_checkpoint_values(
143
143
logger .info ("Found filtered observations" )
144
144
filtered_observations = Observations .from_parquet (filtered_observations_path )
145
145
146
- # Unfortunately we have to reinitialize the times to set the attribute
147
- # correctly.
148
- filtered_observations = qv .defragment (filtered_observations )
149
- filtered_observations = filtered_observations .sort_by (
150
- [
151
- "coordinates.time.days" ,
152
- "coordinates.time.nanos" ,
153
- "coordinates.origin.code" ,
154
- ]
155
- )
146
+ if filtered_observations .fragmented ():
147
+ filtered_observations = qv .defragment (filtered_observations )
156
148
157
149
# If the pipeline was started but we have recovered_orbits already, we
158
150
# are done and should exit early.
@@ -169,15 +161,10 @@ def load_initial_checkpoint_values(
169
161
recovered_orbit_members_path
170
162
)
171
163
172
- # Unfortunately we have to reinitialize the times to set the attribute
173
- # correctly.
174
- recovered_orbits = qv .defragment (recovered_orbits )
175
- recovered_orbits = recovered_orbits .sort_by (
176
- [
177
- "coordinates.time.days" ,
178
- "coordinates.time.nanos" ,
179
- ]
180
- )
164
+ if recovered_orbits .fragmented ():
165
+ recovered_orbits = qv .defragment (recovered_orbits )
166
+ if recovered_orbit_members .fragmented ():
167
+ recovered_orbit_members = qv .defragment (recovered_orbit_members )
181
168
182
169
return create_checkpoint_data (
183
170
"complete" ,
@@ -196,15 +183,10 @@ def load_initial_checkpoint_values(
196
183
od_orbits = FittedOrbits .from_parquet (od_orbits_path )
197
184
od_orbit_members = FittedOrbitMembers .from_parquet (od_orbit_members_path )
198
185
199
- # Unfortunately we have to reinitialize the times to set the attribute
200
- # correctly.
201
- od_orbits = qv .defragment (od_orbits )
202
- od_orbits = od_orbits .sort_by (
203
- [
204
- "coordinates.time.days" ,
205
- "coordinates.time.nanos" ,
206
- ]
207
- )
186
+ if od_orbits .fragmented ():
187
+ od_orbits = qv .defragment (od_orbits )
188
+ if od_orbit_members .fragmented ():
189
+ od_orbit_members = qv .defragment (od_orbit_members )
208
190
209
191
return create_checkpoint_data (
210
192
"recover_orbits" ,
@@ -222,15 +204,10 @@ def load_initial_checkpoint_values(
222
204
iod_orbits = FittedOrbits .from_parquet (iod_orbits_path )
223
205
iod_orbit_members = FittedOrbitMembers .from_parquet (iod_orbit_members_path )
224
206
225
- # Unfortunately we have to reinitialize the times to set the attribute
226
- # correctly.
227
- iod_orbits = qv .defragment (iod_orbits )
228
- iod_orbits = iod_orbits .sort_by (
229
- [
230
- "coordinates.time.days" ,
231
- "coordinates.time.nanos" ,
232
- ]
233
- )
207
+ if iod_orbits .fragmented ():
208
+ iod_orbits = qv .defragment (iod_orbits )
209
+ if iod_orbit_members .fragmented ():
210
+ iod_orbit_members = qv .defragment (iod_orbit_members )
234
211
235
212
return create_checkpoint_data (
236
213
"differential_correction" ,
@@ -246,6 +223,11 @@ def load_initial_checkpoint_values(
246
223
clusters = Clusters .from_parquet (clusters_path )
247
224
cluster_members = ClusterMembers .from_parquet (cluster_members_path )
248
225
226
+ if clusters .fragmented ():
227
+ clusters = qv .defragment (clusters )
228
+ if cluster_members .fragmented ():
229
+ cluster_members = qv .defragment (cluster_members )
230
+
249
231
return create_checkpoint_data (
250
232
"initial_orbit_determination" ,
251
233
filtered_observations = filtered_observations ,
0 commit comments