Skip to content

Commit 9456fef

Browse files
authored
Merge pull request #3161 from danswer-ai/hotfix/v0.13-indexing-redux
enhanced logging for indexing and increased indexing timeouts
2 parents 574ef47 + cc3c080 commit 9456fef

14 files changed

+210
-107
lines changed

backend/danswer/background/celery/apps/primary.py

+21
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,14 @@
1414
import danswer.background.celery.apps.app_base as app_base
1515
from danswer.background.celery.apps.app_base import task_logger
1616
from danswer.background.celery.celery_utils import celery_is_worker_primary
17+
from danswer.background.celery.tasks.vespa.tasks import get_unfenced_index_attempt_ids
1718
from danswer.configs.constants import CELERY_PRIMARY_WORKER_LOCK_TIMEOUT
1819
from danswer.configs.constants import DanswerRedisLocks
1920
from danswer.configs.constants import POSTGRES_CELERY_WORKER_PRIMARY_APP_NAME
21+
from danswer.db.engine import get_session_with_default_tenant
2022
from danswer.db.engine import SqlEngine
23+
from danswer.db.index_attempt import get_index_attempt
24+
from danswer.db.index_attempt import mark_attempt_failed
2125
from danswer.redis.redis_connector_credential_pair import RedisConnectorCredentialPair
2226
from danswer.redis.redis_connector_delete import RedisConnectorDelete
2327
from danswer.redis.redis_connector_index import RedisConnectorIndex
@@ -134,6 +138,23 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
134138

135139
RedisConnectorStop.reset_all(r)
136140

141+
# mark orphaned index attempts as failed
142+
with get_session_with_default_tenant() as db_session:
143+
unfenced_attempt_ids = get_unfenced_index_attempt_ids(db_session, r)
144+
for attempt_id in unfenced_attempt_ids:
145+
attempt = get_index_attempt(db_session, attempt_id)
146+
if not attempt:
147+
continue
148+
149+
failure_reason = (
150+
f"Orphaned index attempt found on startup: "
151+
f"index_attempt={attempt.id} "
152+
f"cc_pair={attempt.connector_credential_pair_id} "
153+
f"search_settings={attempt.search_settings_id}"
154+
)
155+
logger.warning(failure_reason)
156+
mark_attempt_failed(attempt.id, db_session, failure_reason)
157+
137158

138159
@worker_ready.connect
139160
def on_worker_ready(sender: Any, **kwargs: Any) -> None:

backend/danswer/background/celery/tasks/connector_deletion/tasks.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from datetime import datetime
22
from datetime import timezone
33

4-
import redis
54
from celery import Celery
65
from celery import shared_task
76
from celery import Task
87
from celery.exceptions import SoftTimeLimitExceeded
98
from redis import Redis
9+
from redis.lock import Lock as RedisLock
1010
from sqlalchemy.orm import Session
1111

1212
from danswer.background.celery.apps.app_base import task_logger
@@ -87,7 +87,7 @@ def try_generate_document_cc_pair_cleanup_tasks(
8787
cc_pair_id: int,
8888
db_session: Session,
8989
r: Redis,
90-
lock_beat: redis.lock.Lock,
90+
lock_beat: RedisLock,
9191
tenant_id: str | None,
9292
) -> int | None:
9393
"""Returns an int if syncing is needed. The int represents the number of sync tasks generated.

backend/danswer/background/celery/tasks/indexing/tasks.py

+24-7
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@
33
from http import HTTPStatus
44
from time import sleep
55

6-
import redis
76
import sentry_sdk
87
from celery import Celery
98
from celery import shared_task
109
from celery import Task
1110
from celery.exceptions import SoftTimeLimitExceeded
1211
from redis import Redis
12+
from redis.exceptions import LockError
13+
from redis.lock import Lock as RedisLock
1314
from sqlalchemy.orm import Session
1415

1516
from danswer.background.celery.apps.app_base import task_logger
@@ -44,7 +45,7 @@
4445
from danswer.natural_language_processing.search_nlp_models import EmbeddingModel
4546
from danswer.natural_language_processing.search_nlp_models import warm_up_bi_encoder
4647
from danswer.redis.redis_connector import RedisConnector
47-
from danswer.redis.redis_connector_index import RedisConnectorIndexingFenceData
48+
from danswer.redis.redis_connector_index import RedisConnectorIndexPayload
4849
from danswer.redis.redis_pool import get_redis_client
4950
from danswer.utils.logger import setup_logger
5051
from danswer.utils.variable_functionality import global_version
@@ -61,22 +62,38 @@ def __init__(
6162
self,
6263
stop_key: str,
6364
generator_progress_key: str,
64-
redis_lock: redis.lock.Lock,
65+
redis_lock: RedisLock,
6566
redis_client: Redis,
6667
):
6768
super().__init__()
68-
self.redis_lock: redis.lock.Lock = redis_lock
69+
self.redis_lock: RedisLock = redis_lock
6970
self.stop_key: str = stop_key
7071
self.generator_progress_key: str = generator_progress_key
7172
self.redis_client = redis_client
73+
self.started: datetime = datetime.now(timezone.utc)
74+
self.redis_lock.reacquire()
75+
76+
self.last_lock_reacquire: datetime = datetime.now(timezone.utc)
7277

7378
def should_stop(self) -> bool:
7479
if self.redis_client.exists(self.stop_key):
7580
return True
7681
return False
7782

7883
def progress(self, amount: int) -> None:
79-
self.redis_lock.reacquire()
84+
try:
85+
self.redis_lock.reacquire()
86+
self.last_lock_reacquire = datetime.now(timezone.utc)
87+
except LockError:
88+
logger.exception(
89+
f"RunIndexingCallback - lock.reacquire exceptioned. "
90+
f"lock_timeout={self.redis_lock.timeout} "
91+
f"start={self.started} "
92+
f"last_reacquired={self.last_lock_reacquire} "
93+
f"now={datetime.now(timezone.utc)}"
94+
)
95+
raise
96+
8097
self.redis_client.incrby(self.generator_progress_key, amount)
8198

8299

@@ -325,7 +342,7 @@ def try_creating_indexing_task(
325342
redis_connector_index.generator_clear()
326343

327344
# set a basic fence to start
328-
payload = RedisConnectorIndexingFenceData(
345+
payload = RedisConnectorIndexPayload(
329346
index_attempt_id=None,
330347
started=None,
331348
submitted=datetime.now(timezone.utc),
@@ -368,7 +385,7 @@ def try_creating_indexing_task(
368385
redis_connector_index.set_fence(payload)
369386

370387
except Exception:
371-
redis_connector_index.set_fence(payload)
388+
redis_connector_index.set_fence(None)
372389
task_logger.exception(
373390
f"Unexpected exception: "
374391
f"tenant={tenant_id} "

0 commit comments

Comments
 (0)