File tree 4 files changed +8
-16
lines changed
backend/danswer/background
4 files changed +8
-16
lines changed Original file line number Diff line number Diff line change 21
21
@beat_init .connect
22
22
def on_beat_init (sender : Any , ** kwargs : Any ) -> None :
23
23
logger .info ("beat_init signal received." )
24
+
25
+ # celery beat shouldn't touch the db at all. But just setting a low minimum here.
24
26
SqlEngine .set_app_name (POSTGRES_CELERY_BEAT_APP_NAME )
25
27
SqlEngine .init_engine (pool_size = 2 , max_overflow = 0 )
26
28
app_base .wait_for_redis (sender , ** kwargs )
Original file line number Diff line number Diff line change @@ -58,7 +58,7 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
58
58
logger .info (f"Multiprocessing start method: { multiprocessing .get_start_method ()} " )
59
59
60
60
SqlEngine .set_app_name (POSTGRES_CELERY_WORKER_HEAVY_APP_NAME )
61
- SqlEngine .init_engine (pool_size = 8 , max_overflow = 0 )
61
+ SqlEngine .init_engine (pool_size = 4 , max_overflow = 12 )
62
62
63
63
app_base .wait_for_redis (sender , ** kwargs )
64
64
app_base .on_secondary_worker_init (sender , ** kwargs )
Original file line number Diff line number Diff line change @@ -166,19 +166,6 @@ def on_worker_init(sender: Any, **kwargs: Any) -> None:
166
166
r .delete (key )
167
167
168
168
169
- # @worker_process_init.connect
170
- # def on_worker_process_init(sender: Any, **kwargs: Any) -> None:
171
- # """This only runs inside child processes when the worker is in pool=prefork mode.
172
- # This may be technically unnecessary since we're finding prefork pools to be
173
- # unstable and currently aren't planning on using them."""
174
- # logger.info("worker_process_init signal received.")
175
- # SqlEngine.set_app_name(POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME)
176
- # SqlEngine.init_engine(pool_size=5, max_overflow=0)
177
-
178
- # # https://stackoverflow.com/questions/43944787/sqlalchemy-celery-with-scoped-session-error
179
- # SqlEngine.get_engine().dispose(close=False)
180
-
181
-
182
169
@worker_ready .connect
183
170
def on_worker_ready (sender : Any , ** kwargs : Any ) -> None :
184
171
app_base .on_worker_ready (sender , ** kwargs )
Original file line number Diff line number Diff line change 11
11
from typing import Literal
12
12
from typing import Optional
13
13
14
- from danswer .db .engine import get_sqlalchemy_engine
14
+ from danswer .configs .constants import POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME
15
+ from danswer .db .engine import SqlEngine
15
16
from danswer .utils .logger import setup_logger
16
17
17
18
logger = setup_logger ()
@@ -37,7 +38,9 @@ def _initializer(
37
38
if kwargs is None :
38
39
kwargs = {}
39
40
40
- get_sqlalchemy_engine ().dispose (close = False )
41
+ logger .info ("Initializing spawned worker child process." )
42
+ SqlEngine .set_app_name (POSTGRES_CELERY_WORKER_INDEXING_CHILD_APP_NAME )
43
+ SqlEngine .init_engine (pool_size = 4 , max_overflow = 12 , pool_recycle = 60 )
41
44
return func (* args , ** kwargs )
42
45
43
46
You can’t perform that action at this time.
0 commit comments