Skip to content

Commit 8f063e1

Browse files
authored
Merge pull request #2990 from danswer-ai/hotfix/v0.11-worker-process-init
Merge hotfix/v0.11-worker-process-init into release/v0.11
2 parents 6ff559a + 67be4bb commit 8f063e1

File tree

4 files changed

+8
-16
lines changed

4 files changed

+8
-16
lines changed

backend/danswer/background/celery/apps/beat.py

+2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
@beat_init.connect
2222
def on_beat_init(sender: Any, **kwargs: Any) -> None:
2323
logger.info("beat_init signal received.")
24+
25+
# celery beat shouldn't touch the db at all. But just setting a low minimum here.
2426
SqlEngine.set_app_name(POSTGRES_CELERY_BEAT_APP_NAME)
2527
SqlEngine.init_engine(pool_size=2, max_overflow=0)
2628
app_base.wait_for_redis(sender, **kwargs)

backend/danswer/background/celery/apps/heavy.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
5858
logger.info(f"Multiprocessing start method: {multiprocessing.get_start_method()}")
5959

6060
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_HEAVY_APP_NAME)
61-
SqlEngine.init_engine(pool_size=8, max_overflow=0)
61+
SqlEngine.init_engine(pool_size=4, max_overflow=12)
6262

6363
app_base.wait_for_redis(sender, **kwargs)
6464
app_base.on_secondary_worker_init(sender, **kwargs)

backend/danswer/background/celery/apps/primary.py

-13
Original file line numberDiff line numberDiff line change
@@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
166166
r.delete(key)
167167

168168

169-
# @worker_process_init.connect
170-
# def on_worker_process_init(sender: Any, **kwargs: Any) -> None:
171-
# """This only runs inside child processes when the worker is in pool=prefork mode.
172-
# This may be technically unnecessary since we're finding prefork pools to be
173-
# unstable and currently aren't planning on using them."""
174-
# logger.info("worker_process_init signal received.")
175-
# SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
176-
# SqlEngine.init_engine(pool_size=5, max_overflow=0)
177-
178-
# # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error
179-
# SqlEngine.get_engine().dispose(close=False)
180-
181-
182169
@worker_ready.connect
183170
def on_worker_ready(sender: Any, **kwargs: Any) -> None:
184171
app_base.on_worker_ready(sender, **kwargs)

backend/danswer/background/indexing/job_client.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
from typing import Literal
1212
from typing import Optional
1313

14-
from danswer.db.engine import get_sqlalchemy_engine
14+
from danswer.configs.constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
15+
from danswer.db.engine import SqlEngine
1516
from danswer.utils.logger import setup_logger
1617

1718
logger = setup_logger()
@@ -37,7 +38,9 @@ def _initializer(
3738
if kwargs is None:
3839
kwargs = {}
3940

40-
get_sqlalchemy_engine().dispose(close=False)
41+
logger.info("Initializing spawned worker child process.")
42+
SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
43+
SqlEngine.init_engine(pool_size=4, max_overflow=12, pool_recycle=60)
4144
return func(*args, **kwargs)
4245

4346

0 commit comments

Comments
 (0)