diff --git a/cubed/runtime/executors/local.py b/cubed/runtime/executors/local.py index a62c07a4..21ef55fe 100644 --- a/cubed/runtime/executors/local.py +++ b/cubed/runtime/executors/local.py @@ -18,6 +18,7 @@ from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent from cubed.runtime.utils import ( execution_stats, + execution_timing, handle_callbacks, handle_operation_start_callbacks, profile_memray, @@ -61,9 +62,14 @@ def execute_dag( [callback.on_task_end(event) for callback in callbacks] +@execution_timing +def run_func_threads(input, func=None, config=None, name=None, compute_id=None): + return func(input, config=config) + + @profile_memray @execution_stats -def run_func(input, func=None, config=None, name=None, compute_id=None): +def run_func_processes(input, func=None, config=None, name=None, compute_id=None): return func(input, config=config) @@ -142,7 +148,11 @@ def create_futures_func_multiprocessing(input, **kwargs): def pipeline_to_stream( - concurrent_executor: Executor, name: str, pipeline: CubedPipeline, **kwargs + concurrent_executor: Executor, + run_func: Callable, + name: str, + pipeline: CubedPipeline, + **kwargs, ) -> Stream: return stream.iterate( map_unordered( @@ -200,15 +210,17 @@ async def async_execute_dag( mp_context=context, max_tasks_per_child=max_tasks_per_child, ) + run_func = run_func_processes else: concurrent_executor = ThreadPoolExecutor(max_workers=max_workers) + run_func = run_func_threads try: if not compute_arrays_in_parallel: # run one pipeline at a time for name, node in visit_nodes(dag, resume=resume): handle_operation_start_callbacks(callbacks, name) st = pipeline_to_stream( - concurrent_executor, name, node["pipeline"], **kwargs + concurrent_executor, run_func, name, node["pipeline"], **kwargs ) async with st.stream() as streamer: async for _, stats in streamer: @@ -218,7 +230,7 @@ async def async_execute_dag( # run pipelines in the same topological generation in parallel by merging their streams streams = [ pipeline_to_stream( - concurrent_executor, name, node["pipeline"], **kwargs + concurrent_executor, run_func, name, node["pipeline"], **kwargs ) for name, node in gen ] diff --git a/cubed/runtime/utils.py b/cubed/runtime/utils.py index 4591842a..8c611c8d 100644 --- a/cubed/runtime/utils.py +++ b/cubed/runtime/utils.py @@ -40,12 +40,33 @@ def execute_with_stats(function, *args, **kwargs): ) +def execute_with_timing(function, *args, **kwargs): + """Invoke function and measure timing information. + + Returns the result of the function call and a stats dictionary. + """ + + function_start_tstamp = time.time() + result = function(*args, **kwargs) + function_end_tstamp = time.time() + return result, dict( + function_start_tstamp=function_start_tstamp, + function_end_tstamp=function_end_tstamp, + ) + + def execution_stats(func): """Decorator to measure timing information and peak memory usage of a function call.""" return partial(execute_with_stats, func) +def execution_timing(func): + """Decorator to measure timing information of a function call.""" + + return partial(execute_with_timing, func) + + def execute_with_memray(function, input, **kwargs): # only run memray if installed, and only for first input (for operations that run on block locations) if (