diff --git a/flox/core.py b/flox/core.py index 1c9599a1..a8d543fd 100644 --- a/flox/core.py +++ b/flox/core.py @@ -113,6 +113,12 @@ # _simple_combine. DUMMY_AXIS = -2 +# Thresholds below which we will automatically rechunk to blockwise if it makes sense +# 1. Fractional change in number of chunks after rechunking +BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD = 0.25 +# 2. Fractional change in max chunk size after rechunking +BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD = 0.15 + logger = logging.getLogger("flox") @@ -151,8 +157,11 @@ def identity(x: T) -> T: return x -def _issorted(arr: np.ndarray) -> bool: - return bool((arr[:-1] <= arr[1:]).all()) +def _issorted(arr: np.ndarray, ascending=True) -> bool: + if ascending: + return bool((arr[:-1] <= arr[1:]).all()) + else: + return bool((arr[:-1] >= arr[1:]).all()) def _is_arg_reduction(func: T_Agg) -> bool: @@ -230,6 +239,8 @@ def _get_optimal_chunks_for_groups(chunks, labels): Δl = abs(c - l) if c == 0 or newchunkidx[-1] > l: continue + f = f.item() # noqa + l = l.item() # noqa if Δf < Δl and f > newchunkidx[-1]: newchunkidx.append(f) else: @@ -628,7 +639,9 @@ def rechunk_for_cohorts( return array.rechunk({axis: newchunks}) -def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray: +def rechunk_for_blockwise( + array: DaskArray, axis: T_Axis, labels: np.ndarray, *, force: bool = True +) -> DaskArray: """ Rechunks array so that group boundaries line up with chunk boundaries, allowing embarrassingly parallel group reductions. @@ -651,13 +664,27 @@ def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray Rechunked array """ - labels = factorize_((labels,), axes=())[0] chunks = array.chunks[axis] + if len(chunks) == 1: + return array + + labels = factorize_((labels,), axes=())[0] newchunks = _get_optimal_chunks_for_groups(chunks, labels) if newchunks == chunks: return array - else: + + Δn = abs(len(newchunks) - len(chunks)) + if force or ( + (Δn / len(chunks) < BLOCKWISE_RECHUNK_NUM_CHUNKS_THRESHOLD) + and ( + abs(max(newchunks) - max(chunks)) / max(chunks) < BLOCKWISE_RECHUNK_CHUNK_SIZE_THRESHOLD + ) + ): + logger.debug("Rechunking to enable blockwise.") + # Less than 25% change in number of chunks, let's do it return array.rechunk({axis: newchunks}) + else: + return array def reindex_( @@ -2468,6 +2495,17 @@ def groupby_reduce( has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_) has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_) + if ( + method is None + and is_duck_dask_array(array) + and not any_by_dask + and by_.ndim == 1 + and _issorted(by_, ascending=True) + ): + # Let's try rechunking for sorted 1D by. + (single_axis,) = axis_ + array = rechunk_for_blockwise(array, single_axis, by_, force=False) + if _is_first_last_reduction(func): if has_dask and nax != 1: raise ValueError(