@@ -388,7 +388,12 @@ def connector_indexing_proxy_task(
388
388
tenant_id : str | None ,
389
389
) -> None :
390
390
"""celery tasks are forked, but forking is unstable. This proxies work to a spawned task."""
391
-
391
+ task_logger .info (
392
+ f"Indexing proxy - starting: attempt={ index_attempt_id } "
393
+ f"tenant={ tenant_id } "
394
+ f"cc_pair={ cc_pair_id } "
395
+ f"search_settings={ search_settings_id } "
396
+ )
392
397
client = SimpleJobClient ()
393
398
394
399
job = client .submit (
@@ -402,29 +407,56 @@ def connector_indexing_proxy_task(
402
407
)
403
408
404
409
if not job :
410
+ task_logger .info (
411
+ f"Indexing proxy - spawn failed: attempt={ index_attempt_id } "
412
+ f"tenant={ tenant_id } "
413
+ f"cc_pair={ cc_pair_id } "
414
+ f"search_settings={ search_settings_id } "
415
+ )
405
416
return
406
417
418
+ task_logger .info (
419
+ f"Indexing proxy - spawn succeeded: attempt={ index_attempt_id } "
420
+ f"tenant={ tenant_id } "
421
+ f"cc_pair={ cc_pair_id } "
422
+ f"search_settings={ search_settings_id } "
423
+ )
424
+
407
425
while True :
408
426
sleep (10 )
409
- with get_session_with_tenant (tenant_id ) as db_session :
410
- index_attempt = get_index_attempt (
411
- db_session = db_session , index_attempt_id = index_attempt_id
412
- )
413
427
414
- # do nothing for ongoing jobs that haven't been stopped
415
- if not job .done ():
428
+ # do nothing for ongoing jobs that haven't been stopped
429
+ if not job .done ():
430
+ with get_session_with_tenant (tenant_id ) as db_session :
431
+ index_attempt = get_index_attempt (
432
+ db_session = db_session , index_attempt_id = index_attempt_id
433
+ )
434
+
416
435
if not index_attempt :
417
436
continue
418
437
419
438
if not index_attempt .is_finished ():
420
439
continue
421
440
422
- if job .status == "error" :
423
- logger .error (job .exception ())
441
+ if job .status == "error" :
442
+ task_logger .error (
443
+ f"Indexing proxy - spawned task exceptioned: "
444
+ f"attempt={ index_attempt_id } "
445
+ f"tenant={ tenant_id } "
446
+ f"cc_pair={ cc_pair_id } "
447
+ f"search_settings={ search_settings_id } "
448
+ f"error={ job .exception ()} "
449
+ )
424
450
425
- job .release ()
426
- break
451
+ job .release ()
452
+ break
427
453
454
+ task_logger .info (
455
+ f"Indexing proxy - finished: attempt={ index_attempt_id } "
456
+ f"tenant={ tenant_id } "
457
+ f"cc_pair={ cc_pair_id } "
458
+ f"search_settings={ search_settings_id } "
459
+ )
428
460
return
429
461
430
462
@@ -446,7 +478,17 @@ def connector_indexing_task(
446
478
447
479
Returns None if the task did not run (possibly due to a conflict).
448
480
Otherwise, returns an int >= 0 representing the number of indexed docs.
481
+
482
+ NOTE: if an exception is raised out of this task, the primary worker will detect
483
+ that the task transitioned to a "READY" state but the generator_complete_key doesn't exist.
484
+ This will cause the primary worker to abort the indexing attempt and clean up.
449
485
"""
486
+ logger .info (
487
+ f"Indexing spawned task starting: attempt={ index_attempt_id } "
488
+ f"tenant={ tenant_id } "
489
+ f"cc_pair={ cc_pair_id } "
490
+ f"search_settings={ search_settings_id } "
491
+ )
450
492
451
493
attempt = None
452
494
n_final_progress = 0
@@ -485,19 +527,19 @@ def connector_indexing_task(
485
527
cast (str , fence_json )
486
528
)
487
529
except ValueError :
488
- task_logger .exception (
530
+ logger .exception (
489
531
f"connector_indexing_task: fence_data not decodeable: fence={ rci .fence_key } "
490
532
)
491
533
raise
492
534
493
535
if fence_data .index_attempt_id is None or fence_data .celery_task_id is None :
494
- task_logger .info (
536
+ logger .info (
495
537
f"connector_indexing_task - Waiting for fence: fence={ rci .fence_key } "
496
538
)
497
539
sleep (1 )
498
540
continue
499
541
500
- task_logger .info (
542
+ logger .info (
501
543
f"connector_indexing_task - Fence found, continuing...: fence={ rci .fence_key } "
502
544
)
503
545
break
@@ -509,7 +551,7 @@ def connector_indexing_task(
509
551
510
552
acquired = lock .acquire (blocking = False )
511
553
if not acquired :
512
- task_logger .warning (
554
+ logger .warning (
513
555
f"Indexing task already running, exiting...: "
514
556
f"cc_pair={ cc_pair_id } search_settings={ search_settings_id } "
515
557
)
@@ -552,6 +594,13 @@ def connector_indexing_task(
552
594
rcs .fence_key , rci .generator_progress_key , lock , r
553
595
)
554
596
597
+ logger .info (
598
+ f"Indexing spawned task running entrypoint: attempt={ index_attempt_id } "
599
+ f"tenant={ tenant_id } "
600
+ f"cc_pair={ cc_pair_id } "
601
+ f"search_settings={ search_settings_id } "
602
+ )
603
+
555
604
run_indexing_entrypoint (
556
605
index_attempt_id ,
557
606
tenant_id ,
@@ -570,7 +619,12 @@ def connector_indexing_task(
570
619
571
620
r .set (rci .generator_complete_key , HTTPStatus .OK .value )
572
621
except Exception as e :
573
- task_logger .exception (f"Indexing failed: cc_pair={ cc_pair_id } " )
622
+ logger .exception (
623
+ f"Indexing spawned task failed: attempt={ index_attempt_id } "
624
+ f"tenant={ tenant_id } "
625
+ f"cc_pair={ cc_pair_id } "
626
+ f"search_settings={ search_settings_id } "
627
+ )
574
628
if attempt :
575
629
with get_session_with_tenant (tenant_id ) as db_session :
576
630
mark_attempt_failed (attempt , db_session , failure_reason = str (e ))
@@ -584,4 +638,10 @@ def connector_indexing_task(
584
638
if lock .owned ():
585
639
lock .release ()
586
640
641
+ logger .info (
642
+ f"Indexing spawned task finished: attempt={ index_attempt_id } "
643
+ f"tenant={ tenant_id } "
644
+ f"cc_pair={ cc_pair_id } "
645
+ f"search_settings={ search_settings_id } "
646
+ )
587
647
return n_final_progress
0 commit comments