Replies: 2 comments
-
The default multiprocess executor spinning up new processes for each step is the likely cause of latency with empty ops. There is an option to switch from spawning clean processes to forking a template process using the The "in process" executor avoids this overhead, but is currently not concurrent so likely not suitable. It is theoretically possible to write your own executor if you wanted to try to make a threaded one for example that meets your specific needs. https://docs.dagster.io/_apidocs/internals#executors-experimental The tracking issue for us adding that is here #4041 |
Beta Was this translation helpful? Give feedback.
-
Thanks a lot for your answer!
1. I've tried forkserver using "config" in the documentation section
provided. However got EOF error (see below). I'm using Ubuntu under WSL2,
Win 10.
2. Is there any expectations about when to await an async executor?
"""
EOFError: unexpected EOF
File
"/home/ameledin/miniconda3/lib/python3.10/site-packages/dagster/_core/execution/api.py",
line 766, in job_execution_iterator
for event in job_context.executor.execute(job_context, execution_plan):
File
"/home/ameledin/miniconda3/lib/python3.10/site-packages/dagster/_core/executor/multiprocess.py",
line 249, in execute
event_or_none = next(step_iter)
File
"/home/ameledin/miniconda3/lib/python3.10/site-packages/dagster/_core/executor/multiprocess.py",
line 360, in execute_step_out_of_process
for ret in execute_child_process_command(multiproc_ctx, command):
File
"/home/ameledin/miniconda3/lib/python3.10/site-packages/dagster/_core/executor/child_process_executor.py",
line 157, in execute_child_process_command
process.start()
File
"/home/ameledin/miniconda3/lib/python3.10/multiprocessing/process.py", line
121, in start
self._popen = self._Popen(self)
File
"/home/ameledin/miniconda3/lib/python3.10/multiprocessing/context.py", line
300, in _Popen
return Popen(process_obj)
File
"/home/ameledin/miniconda3/lib/python3.10/multiprocessing/popen_forkserver.py",
line 35, in __init__
super().__init__(process_obj)
File
"/home/ameledin/miniconda3/lib/python3.10/multiprocessing/popen_fork.py",
line 19, in __init__
self._launch(process_obj)
File
"/home/ameledin/miniconda3/lib/python3.10/multiprocessing/popen_forkserver.py",
line 59, in _launch
self.pid = forkserver.read_signed(self.sentinel)
File
"/home/ameledin/miniconda3/lib/python3.10/multiprocessing/forkserver.py",
line 328, in read_signed
raise EOFError('unexpected EOF')
"""
…On Fri, 10 Nov 2023 at 20:49, Alex Langenfeld ***@***.***> wrote:
The default multiprocess executor spinning up new processes for each step
is the likely cause of latency with empty ops.
There is an option to switch from spawning clean processes to forking a
template process using the forkserver multiprocessing start method
https://docs.dagster.io/concepts/ops-jobs-graphs/job-execution#controlling-job-execution.
I would expect this to drastically improve the measured performance of the
benchmark you described.
The "in process" executor avoids this overhead, but is currently not
concurrent so likely not suitable.
It is theoretically possible to write your own executor if you wanted to
try to make a threaded one for example that meets your specific needs.
https://docs.dagster.io/_apidocs/internals#executors-experimental
The tracking issue for us adding that is here #4041
<#4041>
—
Reply to this email directly, view it on GitHub
<#17899 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AXNAWSTT4KDZ7KGE3QK77NLYDZSL3AVCNFSM6AAAAAA7GKREW2VHI2DSMVQWIX3LMV43SRDJONRXK43TNFXW4Q3PNVWWK3TUHM3TKMZWGEZDO>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
--
С уважением, Алексей Меледин
|
Beta Was this translation helpful? Give feedback.
-
Hello,
We're in search for suitable open-source substitution of a legacy "home-made" workload automation solution. We've tested Airflow and Dagster for that, both with empty op/assets consisting of 130 assets, 35 of them running in 150 instances static partitions. Total number of instances to run is approximately 6000.
The problem is that such execution of empty DAG/Job (op code is empty) takes 1 hour (I was using 8 parallel processes) on a simple workstation (Core i5 with 12 logical CPU, 32Gb mem). Airflow speed on the same Dag with static partitions is much slower, but the same with dynamic partitions. We stopped with Airflow because of this and due to dynamic mapping code complexity when generating a DAG. Dagster is much simpler with that.
As we estimated, execution is slow because of two reasons: (a) traversing "big" graph is slow (at least in Airflow); (b) for every step in every partition, python interpreter starts and loads libraries.
In our case we don't plan to use python code inside @op/asset. All that we need is to invoke local/remote process in nohup and fetch events logs from it. So, op instance is lightweight. In Airflow we planned to use Triggerer to free worker slots while log is being read.
So, we think in the direction of removing @op code execution using python. For local/remote process execution @op/asset name and partition key seems to be enough for invocation....
What is the the best way to do that with Dagster?
Can you provide some guidance on that? Thank you?
Beta Was this translation helpful? Give feedback.
All reactions