Skip to content

Commit 593b18f

Browse files
authored
Merge pull request #3071 from danswer-ai/hotfix/v0.12-stale-tasks
Hotfix/v0.12 stale tasks
2 parents b2c55eb + 93ec2a6 commit 593b18f

File tree

4 files changed

+53
-39
lines changed

4 files changed

+53
-39
lines changed

backend/danswer/background/celery/tasks/indexing/tasks.py

+37-28
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ def check_for_indexing(self: Task, *, tenant_id: str | None) -> int | None:
173173
)
174174
if attempt_id:
175175
task_logger.info(
176-
f"Indexing queued: cc_pair={cc_pair.id} index_attempt={attempt_id}"
176+
f"Indexing queued: index_attempt={attempt_id} "
177+
f"cc_pair={cc_pair.id} "
178+
f"search_settings={search_settings_instance.id} "
177179
)
178180
tasks_created += 1
179181
except SoftTimeLimitExceeded:
@@ -489,7 +491,7 @@ def connector_indexing_task(
489491
f"search_settings={search_settings_id}"
490492
)
491493

492-
attempt = None
494+
attempt_found = False
493495
n_final_progress: int | None = None
494496

495497
redis_connector = RedisConnector(tenant_id, cc_pair_id)
@@ -529,6 +531,13 @@ def connector_indexing_task(
529531
sleep(1)
530532
continue
531533

534+
if payload.index_attempt_id != index_attempt_id:
535+
raise ValueError(
536+
f"connector_indexing_task - id mismatch. Task may be left over from previous run.: "
537+
f"task_index_attempt={index_attempt_id} "
538+
f"payload_index_attempt={payload.index_attempt_id}"
539+
)
540+
532541
logger.info(
533542
f"connector_indexing_task - Fence found, continuing...: fence={redis_connector_index.fence_key}"
534543
)
@@ -557,6 +566,7 @@ def connector_indexing_task(
557566
raise ValueError(
558567
f"Index attempt not found: index_attempt={index_attempt_id}"
559568
)
569+
attempt_found = True
560570

561571
cc_pair = get_connector_credential_pair_from_id(
562572
cc_pair_id=cc_pair_id,
@@ -576,44 +586,43 @@ def connector_indexing_task(
576586
f"Credential not found: cc_pair={cc_pair_id} credential={cc_pair.credential_id}"
577587
)
578588

579-
# define a callback class
580-
callback = RunIndexingCallback(
581-
redis_connector.stop.fence_key,
582-
redis_connector_index.generator_progress_key,
583-
lock,
584-
r,
585-
)
589+
# define a callback class
590+
callback = RunIndexingCallback(
591+
redis_connector.stop.fence_key,
592+
redis_connector_index.generator_progress_key,
593+
lock,
594+
r,
595+
)
586596

587-
logger.info(
588-
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
589-
f"tenant={tenant_id} "
590-
f"cc_pair={cc_pair_id} "
591-
f"search_settings={search_settings_id}"
592-
)
597+
logger.info(
598+
f"Indexing spawned task running entrypoint: attempt={index_attempt_id} "
599+
f"tenant={tenant_id} "
600+
f"cc_pair={cc_pair_id} "
601+
f"search_settings={search_settings_id}"
602+
)
593603

594-
run_indexing_entrypoint(
595-
index_attempt_id,
596-
tenant_id,
597-
cc_pair_id,
598-
is_ee,
599-
callback=callback,
600-
)
604+
run_indexing_entrypoint(
605+
index_attempt_id,
606+
tenant_id,
607+
cc_pair_id,
608+
is_ee,
609+
callback=callback,
610+
)
601611

602-
# get back the total number of indexed docs and return it
603-
n_final_progress = redis_connector_index.get_progress()
604-
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
612+
# get back the total number of indexed docs and return it
613+
n_final_progress = redis_connector_index.get_progress()
614+
redis_connector_index.set_generator_complete(HTTPStatus.OK.value)
605615
except Exception as e:
606616
logger.exception(
607617
f"Indexing spawned task failed: attempt={index_attempt_id} "
608618
f"tenant={tenant_id} "
609619
f"cc_pair={cc_pair_id} "
610620
f"search_settings={search_settings_id}"
611621
)
612-
if attempt:
622+
if attempt_found:
613623
with get_session_with_tenant(tenant_id) as db_session:
614-
mark_attempt_failed(attempt, db_session, failure_reason=str(e))
624+
mark_attempt_failed(index_attempt_id, db_session, failure_reason=str(e))
615625

616-
redis_connector_index.reset()
617626
raise e
618627
finally:
619628
if lock.owned():

backend/danswer/background/celery/tasks/vespa/tasks.py

+12-7
Original file line numberDiff line numberDiff line change
@@ -610,7 +610,7 @@ def monitor_ccpair_indexing_taskset(
610610
index_attempt = get_index_attempt(db_session, payload.index_attempt_id)
611611
if index_attempt:
612612
mark_attempt_failed(
613-
index_attempt=index_attempt,
613+
index_attempt_id=payload.index_attempt_id,
614614
db_session=db_session,
615615
failure_reason="Connector indexing aborted or exceptioned.",
616616
)
@@ -690,13 +690,18 @@ def monitor_vespa_sync(self: Task, tenant_id: str | None) -> bool:
690690

691691
for a in attempts:
692692
# if attempts exist in the db but we don't detect them in redis, mark them as failed
693-
failure_reason = f"Unknown index attempt {a.id}. Might be left over from a process restart."
694-
if not r.exists(
695-
RedisConnectorIndex.fence_key_with_ids(
696-
a.connector_credential_pair_id, a.search_settings_id
693+
fence_key = RedisConnectorIndex.fence_key_with_ids(
694+
a.connector_credential_pair_id, a.search_settings_id
695+
)
696+
if not r.exists(fence_key):
697+
failure_reason = (
698+
f"Unknown index attempt. Might be left over from a process restart: "
699+
f"index_attempt={a.id} "
700+
f"cc_pair={a.connector_credential_pair_id} "
701+
f"search_settings={a.search_settings_id}"
697702
)
698-
):
699-
mark_attempt_failed(a, db_session, failure_reason=failure_reason)
703+
task_logger.warning(failure_reason)
704+
mark_attempt_failed(a.id, db_session, failure_reason=failure_reason)
700705

701706
lock_beat.reacquire()
702707
if r.exists(RedisConnectorCredentialPair.get_fence_key()):

backend/danswer/background/indexing/run_indexing.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -337,7 +337,7 @@ def _run_indexing(
337337
or index_attempt.status != IndexingStatus.IN_PROGRESS
338338
):
339339
mark_attempt_failed(
340-
index_attempt,
340+
index_attempt.id,
341341
db_session,
342342
failure_reason=str(e),
343343
full_exception_trace=traceback.format_exc(),
@@ -372,7 +372,7 @@ def _run_indexing(
372372
and index_attempt_md.num_exceptions >= batch_num
373373
):
374374
mark_attempt_failed(
375-
index_attempt,
375+
index_attempt.id,
376376
db_session,
377377
failure_reason="All batches exceptioned.",
378378
)

backend/danswer/db/index_attempt.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -219,15 +219,15 @@ def mark_attempt_partially_succeeded(
219219

220220

221221
def mark_attempt_failed(
222-
index_attempt: IndexAttempt,
222+
index_attempt_id: int,
223223
db_session: Session,
224224
failure_reason: str = "Unknown",
225225
full_exception_trace: str | None = None,
226226
) -> None:
227227
try:
228228
attempt = db_session.execute(
229229
select(IndexAttempt)
230-
.where(IndexAttempt.id == index_attempt.id)
230+
.where(IndexAttempt.id == index_attempt_id)
231231
.with_for_update()
232232
).scalar_one()
233233

0 commit comments

Comments
 (0)