diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 31018729dc..f9ee600ed7 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -25,7 +25,7 @@ from ... import opcodes as OperandDef from ...config import options from ...core.custom_log import redirect_custom_log -from ...core import ENTITY_TYPE, OutputType +from ...core import ENTITY_TYPE, OutputType, recursive_tile from ...core.context import get_context from ...core.operand import OperandStage from ...serialization.serializables import ( @@ -64,6 +64,8 @@ _support_get_group_without_as_index = pd_release_version[:2] > (1, 0) +_FUNCS_PREFER_SHUFFLE = {"nunique"} + class SizeRecorder: def __init__(self): @@ -163,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin): method = StringField("method") use_inf_as_na = BoolField("use_inf_as_na") + map_on_shuffle = AnyField("map_on_shuffle") + # for chunk combine_size = Int32Field("combine_size") chunk_store_limit = Int64Field("chunk_store_limit") @@ -421,10 +425,29 @@ def _tile_with_shuffle( in_df: TileableType, out_df: TileableType, func_infos: ReductionSteps, + agg_chunks: List[ChunkType] = None, ): - # First, perform groupby and aggregation on each chunk. - agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos) - return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) + if op.map_on_shuffle is None: + op.map_on_shuffle = all( + agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs + ) + + if not op.map_on_shuffle: + groupby_params = op.groupby_params.copy() + selection = groupby_params.pop("selection", None) + groupby = in_df.groupby(**groupby_params) + if selection: + groupby = groupby[selection] + result = groupby.transform( + op.raw_func, _call_agg=True, index=out_df.index_value + ) + return (yield from recursive_tile(result)) + else: + # First, perform groupby and aggregation on each chunk. + agg_chunks = agg_chunks or cls._gen_map_chunks( + op, in_df.chunks, out_df, func_infos + ) + return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) @classmethod def _perform_shuffle( @@ -624,8 +647,10 @@ def _tile_auto( else: # otherwise, use shuffle logger.debug("Choose shuffle method for groupby operand %s", op) - return cls._perform_shuffle( - op, chunks + left_chunks, in_df, out_df, func_infos + return ( + yield from cls._tile_with_shuffle( + op, in_df, out_df, func_infos, chunks + left_chunks + ) ) @classmethod @@ -638,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"): func_infos = cls._compile_funcs(op, in_df) if op.method == "auto": - if len(in_df.chunks) <= op.combine_size: + if set(op.func) & _FUNCS_PREFER_SHUFFLE: + return ( + yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos) + ) + elif len(in_df.chunks) <= op.combine_size: return cls._tile_with_tree(op, in_df, out_df, func_infos) else: return (yield from cls._tile_auto(op, in_df, out_df, func_infos)) if op.method == "shuffle": - return cls._tile_with_shuffle(op, in_df, out_df, func_infos) + return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)) elif op.method == "tree": return cls._tile_with_tree(op, in_df, out_df, func_infos) else: # pragma: no cover @@ -1075,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"): pd.reset_option("mode.use_inf_as_na") -def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): +def agg( + groupby, + func=None, + method="auto", + combine_size=None, + map_on_shuffle=None, + *args, + **kwargs, +): """ Aggregate using one or more operations on grouped data. @@ -1091,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): in distributed mode and use 'tree' in local mode. combine_size : int The number of chunks to combine when method is 'tree' - + map_on_shuffle : bool + When not specified, will decide whether to perform aggregation on the + map stage of shuffle (currently no aggregation when there is custom + reduction in functions). Otherwise, whether to call map on map stage + of shuffle is determined by the value. Returns ------- @@ -1138,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): combine_size=combine_size or options.combine_size, chunk_store_limit=options.chunk_store_limit, use_inf_as_na=use_inf_as_na, + map_on_shuffle=map_on_shuffle, ) return agg_op(groupby) diff --git a/mars/dataframe/groupby/tests/test_groupby.py b/mars/dataframe/groupby/tests/test_groupby.py index 83960da53f..50a5041da5 100644 --- a/mars/dataframe/groupby/tests/test_groupby.py +++ b/mars/dataframe/groupby/tests/test_groupby.py @@ -476,24 +476,3 @@ def test_groupby_fill(): assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - - -def test_groupby_nunique(): - df1 = pd.DataFrame( - [ - [1, 1, 10], - [1, 1, np.nan], - [1, 1, np.nan], - [1, 2, np.nan], - [1, 2, 20], - [1, 2, np.nan], - [1, 3, np.nan], - [1, 3, np.nan], - ], - columns=["one", "two", "three"], - ) - mdf = md.DataFrame(df1, chunk_size=3) - - r = tile(mdf.groupby(["one", "two"]).nunique()) - assert len(r.chunks) == 1 - assert isinstance(r.chunks[0].op, DataFrameGroupByAgg) diff --git a/mars/dataframe/merge/concat.py b/mars/dataframe/merge/concat.py index 7bb3cab721..e82d986b61 100644 --- a/mars/dataframe/merge/concat.py +++ b/mars/dataframe/merge/concat.py @@ -324,7 +324,10 @@ def _auto_concat_dataframe_chunks(chunk, inputs): ) if chunk.op.axis is not None: - return xdf.concat(inputs, axis=op.axis) + try: + return xdf.concat(inputs, axis=op.axis) + except: + raise # auto generated concat when executing a DataFrame if len(inputs) == 1: diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 77dfd871cc..3ff1854e56 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -25,7 +25,7 @@ from ...config import options from ...serialization.serializables import BoolField from ...utils import lazy_import -from ..arrays import ArrowListArray, ArrowListDtype +from ..arrays import ArrowListArray from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction cp = lazy_import("cupy", globals=globals(), rename="cp") @@ -52,18 +52,24 @@ def _get_modules(self): def _drop_duplicates(self, value, explode=False, agg=False): xp, xdf = self._get_modules() + use_arrow_dtype = self._use_arrow_dtype and xp is not cp if self._use_arrow_dtype and xp is not cp and hasattr(value, "to_numpy"): value = value.to_numpy() else: value = value.values if explode: + if len(value) == 0: + if not use_arrow_dtype: + return [xp.array([], dtype=object)] + else: + return [ArrowListArray([])] value = xp.concatenate(value) value = xdf.unique(value) if not agg: - if not self._use_arrow_dtype or xp is cp: + if not use_arrow_dtype: return [value] else: try: @@ -78,15 +84,16 @@ def _drop_duplicates(self, value, explode=False, agg=False): def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xp, xdf = self._get_modules() + out_dtype = object if not self._use_arrow_dtype or xp is cp else None if isinstance(in_data, xdf.Series): unique_values = self._drop_duplicates(in_data) - return xdf.Series(unique_values, name=in_data.name, dtype=object) + return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): data[d] = self._drop_duplicates(v) - df = xdf.DataFrame(data, copy=False, dtype=object) + df = xdf.DataFrame(data, copy=False, dtype=out_dtype) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): @@ -95,15 +102,16 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xp, xdf = self._get_modules() + out_dtype = object if not self._use_arrow_dtype or xp is cp else None if isinstance(in_data, xdf.Series): unique_values = self._drop_duplicates(in_data, explode=True) - return xdf.Series(unique_values, name=in_data.name, dtype=object) + return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): data[d] = self._drop_duplicates(v, explode=True) - df = xdf.DataFrame(data, copy=False, dtype=object) + df = xdf.DataFrame(data, copy=False, dtype=out_dtype) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows():