@@ -34,12 +34,25 @@ def __init__(self):
34
34
35
35
@staticmethod
36
36
def to_es_bbox (bbox_array ):
37
+ # lon = x, lat = y
38
+ # lon, lat, lon, lat
39
+ # x can be 170 to -170
40
+ # 170, 0, -170, 10
41
+ minX , minY , maxX , maxY = bbox_array
42
+
43
+ # Ensure the values are properly sorted
44
+ # if minX > maxX:
45
+ # minX, maxX = maxX, minX
46
+ if minY > maxY :
47
+ minY , maxY = maxY , minY
48
+
37
49
return {
38
50
"type" : "envelope" ,
39
- "coordinates" : [
40
- [bbox_array [0 ], bbox_array [3 ]], # Top-left corner (minLon, maxLat)
41
- [bbox_array [2 ], bbox_array [1 ]] # Bottom-right corner (maxLon, minLat)
42
- ]
51
+ "coordinates" : [[minX , maxY ], [maxX , minY ]],
52
+ # "coordinates": [
53
+ # [bbox_array[0], bbox_array[3]], # Top-left corner (minLon, maxLat)
54
+ # [bbox_array[2], bbox_array[1]] # Bottom-right corner (maxLon, minLat)
55
+ # ]
43
56
}
44
57
45
58
@staticmethod
@@ -152,16 +165,20 @@ def create_new_index(self, tenant, tenant_venue, es_mapping: dict):
152
165
self .__es .swap_index_for_alias (write_perc_alias_name , current_perc_index_name , new_perc_index_name )
153
166
try :
154
167
self .__es .migrate_index_data (current_perc_index_name , new_perc_index_name )
155
- except Exception as e :
168
+ except :
156
169
LOGGER .exception (f'failed to migrate index data: { (current_perc_index_name , new_perc_index_name )} ' )
157
170
return
158
171
159
- def get_latest_index (self , tenant , tenant_venue ):
172
+ def get_latest_index_name (self , tenant , tenant_venue ):
160
173
write_alias_name = f'{ DBConstants .granules_write_alias_prefix } _{ tenant } _{ tenant_venue } ' .lower ().strip ()
161
174
write_alias_name = self .__es .get_alias (write_alias_name )
162
175
if len (write_alias_name ) != 1 :
163
176
raise ValueError (f'missing index for { tenant } _{ tenant_venue } . { write_alias_name } ' )
164
177
latest_index_name = [k for k in write_alias_name .keys ()][0 ]
178
+ return latest_index_name
179
+
180
+ def get_latest_index (self , tenant , tenant_venue ):
181
+ latest_index_name = self .get_latest_index_name (tenant , tenant_venue )
165
182
index_mapping = self .__es .get_index_mapping (latest_index_name )
166
183
if index_mapping is None :
167
184
raise ValueError (f'missing index: { latest_index_name } ' )
@@ -201,53 +218,106 @@ def get_entry(self, tenant: str, tenant_venue: str, doc_id: str, ):
201
218
raise ValueError (f"no such granule: { doc_id } " )
202
219
return result
203
220
204
- def delete_entry (self , tenant : str , tenant_venue : str , doc_id : str , ):
221
+ def __query_by_id_local (self , tenant : str , tenant_venue : str , doc_id : str , ):
205
222
read_alias_name = f'{ DBConstants .granules_read_alias_prefix } _{ tenant } _{ tenant_venue } ' .lower ().strip ()
206
- result = self . __es . query ( {
223
+ dsl = {
207
224
'size' : 9999 ,
208
- 'query' : {'term' : {'_id' : doc_id }}
209
- }, read_alias_name )
210
- if result is None :
211
- raise ValueError (f"no such granule: { doc_id } " )
212
- for each_granule in result ['hits' ]['hits' ]:
225
+ 'sort' : [
226
+ {'properties.datetime' : {'order' : 'desc' }},
227
+ {'id' : {'order' : 'asc' }}
228
+ ],
229
+ 'query' : {
230
+ 'term' : {'_id' : doc_id }
231
+ }
232
+ }
233
+ result = self .__es .query (dsl , read_alias_name )
234
+ if result is None or len (result ['hits' ]['hits' ]) < 1 :
235
+ return []
236
+ return result ['hits' ]['hits' ]
237
+
238
+ def __delete_old_entries (self , dsl_result ):
239
+ for each_granule in dsl_result :
213
240
LOGGER .debug (f"deleting { each_granule ['_id' ]} from { each_granule ['_index' ]} " )
214
241
delete_result = self .__es .delete_by_query ({
215
242
'query' : {'term' : {'id' : each_granule ['_id' ]}}
216
243
}, each_granule ['_index' ])
217
244
LOGGER .debug (f'delete_result: { delete_result } ' )
218
245
if delete_result is None :
219
246
raise ValueError (f"error deleting { each_granule } " )
247
+ return
248
+
249
+ def delete_entry (self , tenant : str , tenant_venue : str , doc_id : str , ):
250
+ result = self .__query_by_id_local (tenant , tenant_venue , doc_id )
251
+ if len (result ) < 1 :
252
+ raise ValueError (f"no such granule: { doc_id } " )
253
+ self .__delete_old_entries (result )
220
254
return result
221
255
222
256
def update_entry (self , tenant : str , tenant_venue : str , json_body : dict , doc_id : str , ):
257
+ # find existing doc_id
258
+ # if not found, throw error. Cannot update
259
+ # if found, check index.
260
+ # if latest index, proceed with update
261
+ # if older index, proceed with get + delete
262
+ # tweak meta locally, and add it.
223
263
write_alias_name = f'{ DBConstants .granules_write_alias_prefix } _{ tenant } _{ tenant_venue } ' .lower ().strip ()
224
264
json_body ['event_time' ] = TimeUtils .get_current_unix_milli ()
225
- self .__es .update_one (json_body , doc_id , index = write_alias_name ) # TODO assuming granule_id is prefixed with collection id
226
- LOGGER .debug (f'custom_metadata indexed' )
265
+ existing_entries = self .__query_by_id_local (tenant , tenant_venue , doc_id )
266
+ if len (existing_entries ) < 1 :
267
+ raise ValueError (f'unable to update { doc_id } as it is not found. ' )
268
+ latest_index_name = self .get_latest_index_name (tenant , tenant_venue )
269
+ existing_entry = existing_entries [0 ]
270
+ if existing_entry ['_index' ] == latest_index_name :
271
+ LOGGER .debug (f'{ doc_id } in latest index: { latest_index_name } . continuing with update' )
272
+ self .__es .update_one (json_body , doc_id , index = write_alias_name ) # TODO assuming granule_id is prefixed with collection id
273
+ self .__delete_old_entries (existing_entries [1 :])
274
+ return
275
+ LOGGER .debug (f'{ doc_id } in older index: { latest_index_name } v. { existing_entry ["_index" ]} ' )
276
+ new_doc = {** existing_entry ['_source' ], ** json_body }
277
+ self .__es .index_one (new_doc , doc_id , index = write_alias_name ) # TODO assuming granule_id is prefixed with collection id
278
+ self .__delete_old_entries (existing_entries )
227
279
return
228
280
229
281
def add_entry (self , tenant : str , tenant_venue : str , json_body : dict , doc_id : str , ):
282
+ # find existing doc_id
283
+ # if not found, add it
284
+ # if found, and it is in latest index, add it.
285
+ # if found, and it is in older index, add current one, and delete the older one.
286
+
230
287
write_alias_name = f'{ DBConstants .granules_write_alias_prefix } _{ tenant } _{ tenant_venue } ' .lower ().strip ()
231
288
json_body ['event_time' ] = TimeUtils .get_current_unix_milli ()
232
- # TODO validate custom metadata vs the latest index to filter extra items
289
+ existing_entries = self .__query_by_id_local (tenant , tenant_venue , doc_id )
290
+ if len (existing_entries ) < 1 :
291
+ self .__es .index_one (json_body , doc_id , index = write_alias_name ) # TODO assuming granule_id is prefixed with collection id
292
+ return
293
+ latest_index_name = self .get_latest_index_name (tenant , tenant_venue )
294
+ existing_entry = existing_entries [0 ]
295
+ if existing_entry ['_index' ] == latest_index_name :
296
+ self .__es .index_one (json_body , doc_id , index = write_alias_name ) # TODO assuming granule_id is prefixed with collection id
297
+ self .__delete_old_entries (existing_entries [1 :])
298
+ return
233
299
self .__es .index_one (json_body , doc_id , index = write_alias_name ) # TODO assuming granule_id is prefixed with collection id
234
- LOGGER .debug (f'custom_metadata indexed' )
300
+ self .__delete_old_entries (existing_entries )
301
+ # TODO validate custom metadata vs the latest index to filter extra items
235
302
return
236
303
237
304
def dsl_search (self , tenant : str , tenant_venue : str , search_dsl : dict ):
238
305
read_alias_name = f'{ DBConstants .granules_read_alias_prefix } _{ tenant } _{ tenant_venue } ' .lower ().strip ()
239
- if 'sort' not in search_dsl :
240
- search_result = self .__es .query (search_dsl ,
241
- querying_index = read_alias_name ) if 'sort' in search_dsl else self .__es .query (
242
- search_dsl , querying_index = read_alias_name )
306
+ if 'sort' not in search_dsl : # We cannot paginate w/o sort. So, max is 10k items:
307
+ # This also assumes "size" should be part of search_dsl
308
+ search_result = self .__es .query (search_dsl , querying_index = read_alias_name )
243
309
LOGGER .debug (f'search_finished: { len (search_result ["hits" ]["hits" ])} ' )
244
310
return search_result
311
+ # we can run paginate search
245
312
original_size = search_dsl ['size' ] if 'size' in search_dsl else 20
313
+ total_size = - 999
246
314
result = []
247
315
duplicates = set ([])
248
316
while len (result ) < original_size :
249
317
search_dsl ['size' ] = (original_size - len (result )) * 2
250
- search_result = self .__es .query_pages (search_dsl , querying_index = read_alias_name ) if 'sort' in search_dsl else self .__es .query (search_dsl , querying_index = read_alias_name )
318
+ search_result = self .__es .query_pages (search_dsl , querying_index = read_alias_name )
319
+ if total_size == - 999 :
320
+ total_size = self .__es .get_result_size (search_result )
251
321
if len (search_result ['hits' ]['hits' ]) < 1 :
252
322
break
253
323
for each in search_result ['hits' ]['hits' ]:
@@ -257,11 +327,16 @@ def dsl_search(self, tenant: str, tenant_venue: str, search_dsl: dict):
257
327
search_dsl ['search_after' ] = search_result ['hits' ]['hits' ][- 1 ]['sort' ]
258
328
259
329
LOGGER .debug (f'search_finished: { len (result )} ' )
330
+ if len (result ) > original_size :
331
+ result = result [:original_size ]
260
332
return {
261
333
'hits' : {
262
334
"total" : {
263
- "value" : len ( result )
335
+ "value" : total_size ,
264
336
},
265
337
'hits' : result
266
338
}
267
339
}
340
+
341
+
342
+
0 commit comments