Skip to content

Commit

Permalink
Don't measure peak mem with threads executor
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Nov 22, 2024
1 parent caf1351 commit d472e6a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
20 changes: 16 additions & 4 deletions cubed/runtime/executors/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from cubed.runtime.types import Callback, CubedPipeline, DagExecutor, TaskEndEvent
from cubed.runtime.utils import (
execution_stats,
execution_timing,
handle_callbacks,
handle_operation_start_callbacks,
profile_memray,
Expand Down Expand Up @@ -61,9 +62,14 @@ def execute_dag(
[callback.on_task_end(event) for callback in callbacks]


@execution_timing
def run_func_threads(input, func=None, config=None, name=None, compute_id=None):
return func(input, config=config)


@profile_memray
@execution_stats
def run_func(input, func=None, config=None, name=None, compute_id=None):
def run_func_processes(input, func=None, config=None, name=None, compute_id=None):
return func(input, config=config)


Expand Down Expand Up @@ -142,7 +148,11 @@ def create_futures_func_multiprocessing(input, **kwargs):


def pipeline_to_stream(
concurrent_executor: Executor, name: str, pipeline: CubedPipeline, **kwargs
concurrent_executor: Executor,
run_func: Callable,
name: str,
pipeline: CubedPipeline,
**kwargs,
) -> Stream:
return stream.iterate(
map_unordered(
Expand Down Expand Up @@ -200,15 +210,17 @@ async def async_execute_dag(
mp_context=context,
max_tasks_per_child=max_tasks_per_child,
)
run_func = run_func_processes
else:
concurrent_executor = ThreadPoolExecutor(max_workers=max_workers)
run_func = run_func_threads
try:
if not compute_arrays_in_parallel:
# run one pipeline at a time
for name, node in visit_nodes(dag, resume=resume):
handle_operation_start_callbacks(callbacks, name)
st = pipeline_to_stream(
concurrent_executor, name, node["pipeline"], **kwargs
concurrent_executor, run_func, name, node["pipeline"], **kwargs
)
async with st.stream() as streamer:
async for _, stats in streamer:
Expand All @@ -218,7 +230,7 @@ async def async_execute_dag(
# run pipelines in the same topological generation in parallel by merging their streams
streams = [
pipeline_to_stream(
concurrent_executor, name, node["pipeline"], **kwargs
concurrent_executor, run_func, name, node["pipeline"], **kwargs
)
for name, node in gen
]
Expand Down
21 changes: 21 additions & 0 deletions cubed/runtime/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,33 @@ def execute_with_stats(function, *args, **kwargs):
)


def execute_with_timing(function, *args, **kwargs):
"""Invoke function and measure timing information.
Returns the result of the function call and a stats dictionary.
"""

function_start_tstamp = time.time()
result = function(*args, **kwargs)
function_end_tstamp = time.time()
return result, dict(
function_start_tstamp=function_start_tstamp,
function_end_tstamp=function_end_tstamp,
)


def execution_stats(func):
"""Decorator to measure timing information and peak memory usage of a function call."""

return partial(execute_with_stats, func)


def execution_timing(func):
"""Decorator to measure timing information of a function call."""

return partial(execute_with_timing, func)


def execute_with_memray(function, input, **kwargs):
# only run memray if installed, and only for first input (for operations that run on block locations)
if (
Expand Down

0 comments on commit d472e6a

Please sign in to comment.