From e3b80926c69e78ccad73597742a5997b9314d7b1 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 2 Aug 2024 11:06:31 -0600 Subject: [PATCH 1/3] Auto rechunk to enable blockwise reduction Done when 1. `method` is None 2. Grouping and reducing by a 1D array We gate this on fractional change in number of chunks and change in size of largest chunk. Closes #359 --- flox/core.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 1c9599a1..bab117f1 100644 --- a/flox/core.py +++ b/flox/core.py @@ -113,6 +113,12 @@ # _simple_combine. DUMMY_AXIS = -2 +# Thresholds below which we will automatically rechunk to blockwise if it makes sense +# 1. Fractional change in number of chunks after rechunking +BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 +# 2. Fractional change in max chunk size after rechunking +BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 0.15 + logger = logging.getLogger("flox") @@ -230,6 +236,8 @@ def _get_optimal_chunks_for_groups(chunks, labels): Δl = abs(c - l) if c == 0 or newchunkidx[-1] > l: continue + f = f.item() # noqa + l = l.item() # noqa if Δf < Δl and f > newchunkidx[-1]: newchunkidx.append(f) else: @@ -651,12 +659,20 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray Rechunked array """ - labels = factorize_((labels,), axes=())[0] chunks = array.chunks[axis] + if len(chunks) == 1: + return array + + labels = factorize_((labels,), axes=())[0] newchunks = _get_optimal_chunks_for_groups(chunks, labels) if newchunks == chunks: return array - else: + + Δn = abs(len(newchunks) - len(chunks)) + if (Δn / len(chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) and ( + abs(max(newchunks) - max(chunks)) / max(chunks) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ): + # Less than 25% change in number of chunks, let's do it return array.rechunk({axis: newchunks}) @@ -2468,6 +2484,11 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) + if method is None and not any_by_dask and by_.ndim == 1 and _issorted(by_): + # Let's try rechunking for sorted 1D by. + (single_axis,) = axis_ + array = rechunk_for_blockwise(array, single_axis, by_) + if _is_first_last_reduction(func): if has_dask and nax != 1: raise ValueError( From b6d767ded67f2c6cebaa3c661f56dbf19fb2b107 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 2 Aug 2024 11:23:49 -0600 Subject: [PATCH 2/3] Fix --- flox/core.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index bab117f1..eb6ed13b 100644 --- a/flox/core.py +++ b/flox/core.py @@ -157,8 +157,11 @@ def identity(x: T) -> T: return x -def _issorted(arr: np.ndarray) -> bool: - return bool((arr[:-1] <= arr[1:]).all()) +def _issorted(arr: np.ndarray, ascending=True) -> bool: + if ascending: + return bool((arr[:-1] <= arr[1:]).all()) + else: + return bool((arr[:-1] >= arr[1:]).all()) def _is_arg_reduction(func: T_Agg) -> bool: @@ -2484,7 +2487,13 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) - if method is None and not any_by_dask and by_.ndim == 1 and _issorted(by_): + if ( + method is None + and is_duck_dask_array(array) + and not any_by_dask + and by_.ndim == 1 + and _issorted(by_, ascending=True) + ): # Let's try rechunking for sorted 1D by. (single_axis,) = axis_ array = rechunk_for_blockwise(array, single_axis, by_) From 0c0b0e5f130dbfd80335d9d6a958c94440d90ec1 Mon Sep 17 00:00:00 2001 From: Deepak Cherian Date: Fri, 2 Aug 2024 13:20:34 -0600 Subject: [PATCH 3/3] fix tests --- flox/core.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/flox/core.py b/flox/core.py index eb6ed13b..a8d543fd 100644 --- a/flox/core.py +++ b/flox/core.py @@ -639,7 +639,9 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray: +def rechunk_for_blockwise( + array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True +) -> DaskArray: """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarrassingly parallel group reductions. @@ -672,11 +674,17 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> return array Δn = abs(len(newchunks) - len(chunks)) - if (Δn / len(chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) and ( - abs(max(newchunks) - max(chunks)) / max(chunks) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + if force or ( + (Δn / len(chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) + and ( + abs(max(newchunks) - max(chunks)) / max(chunks) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ) ): + logger.debug("Rechunking to enable blockwise.") # Less than 25% change in number of chunks, let's do it return array.rechunk({axis: newchunks}) + else: + return array def reindex_( @@ -2496,7 +2504,7 @@ def groupby_reduce( ): # Let's try rechunking for sorted 1D by. (single_axis,) = axis_ - array = rechunk_for_blockwise(array, single_axis, by_) + array = rechunk_for_blockwise(array, single_axis, by_, force=False) if _is_first_last_reduction(func): if has_dask and nax != 1: