Skip to content

Commit

Permalink
Use shuffle when nunique is calculated
Browse files Browse the repository at this point in the history
  • Loading branch information
wjsi committed May 7, 2022
1 parent 22ff5ae commit 7a1b55d
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 32 deletions.
42 changes: 21 additions & 21 deletions benchmarks/tpch/run_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,28 +995,28 @@ def run_queries(data_folder: str):
mars.execute([lineitem, orders, customer, nation, region, supplier, part, partsupp])
print("Reading time (s): ", time.time() - t1)

q01(lineitem)
q02(part, partsupp, supplier, nation, region)
q03(lineitem, orders, customer)
q04(lineitem, orders)
q05(lineitem, orders, customer, nation, region, supplier)
q06(lineitem)
q07(lineitem, supplier, orders, customer, nation)
q08(part, lineitem, supplier, orders, customer, nation, region)
q09(lineitem, orders, part, nation, partsupp, supplier)
q10(lineitem, orders, customer, nation)
q11(partsupp, supplier, nation)
q12(lineitem, orders)
q13(customer, orders)
q14(lineitem, part)
q15(lineitem, supplier)
q16(part, partsupp, supplier)
q17(lineitem, part)
q18(lineitem, orders, customer)
q19(lineitem, part)
q20(lineitem, part, nation, partsupp, supplier)
# q01(lineitem)
# q02(part, partsupp, supplier, nation, region)
# q03(lineitem, orders, customer)
# q04(lineitem, orders)
# q05(lineitem, orders, customer, nation, region, supplier)
# q06(lineitem)
# q07(lineitem, supplier, orders, customer, nation)
# q08(part, lineitem, supplier, orders, customer, nation, region)
# q09(lineitem, orders, part, nation, partsupp, supplier)
# q10(lineitem, orders, customer, nation)
# q11(partsupp, supplier, nation)
# q12(lineitem, orders)
# q13(customer, orders)
# q14(lineitem, part)
# q15(lineitem, supplier)
# q16(part, partsupp, supplier)
# q17(lineitem, part)
# q18(lineitem, orders, customer)
# q19(lineitem, part)
# q20(lineitem, part, nation, partsupp, supplier)
q21(lineitem, orders, supplier, nation)
q22(customer, orders)
# q22(customer, orders)
print("Total Query time (s): ", time.time() - t1)


Expand Down
62 changes: 52 additions & 10 deletions mars/dataframe/groupby/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from ... import opcodes as OperandDef
from ...config import options
from ...core.custom_log import redirect_custom_log
from ...core import ENTITY_TYPE, OutputType
from ...core import ENTITY_TYPE, OutputType, recursive_tile
from ...core.context import get_context
from ...core.operand import OperandStage
from ...serialization.serializables import (
Expand Down Expand Up @@ -64,6 +64,8 @@

_support_get_group_without_as_index = pd_release_version[:2] > (1, 0)

_FUNCS_PREFER_SHUFFLE = {"nunique"}


class SizeRecorder:
def __init__(self):
Expand Down Expand Up @@ -163,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin):
method = StringField("method")
use_inf_as_na = BoolField("use_inf_as_na")

map_on_shuffle = AnyField("map_on_shuffle")

# for chunk
combine_size = Int32Field("combine_size")
chunk_store_limit = Int64Field("chunk_store_limit")
Expand Down Expand Up @@ -421,10 +425,29 @@ def _tile_with_shuffle(
in_df: TileableType,
out_df: TileableType,
func_infos: ReductionSteps,
agg_chunks: List[ChunkType] = None,
):
# First, perform groupby and aggregation on each chunk.
agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)
if op.map_on_shuffle is None:
op.map_on_shuffle = all(
agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs
)

if not op.map_on_shuffle:
groupby_params = op.groupby_params.copy()
selection = groupby_params.pop("selection", None)
groupby = in_df.groupby(**groupby_params)
if selection:
groupby = groupby[selection]
result = groupby.transform(
op.raw_func, _call_agg=True, index=out_df.index_value
)
return (yield from recursive_tile(result))
else:
# First, perform groupby and aggregation on each chunk.
agg_chunks = agg_chunks or cls._gen_map_chunks(
op, in_df.chunks, out_df, func_infos
)
return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos)

@classmethod
def _perform_shuffle(
Expand Down Expand Up @@ -624,8 +647,10 @@ def _tile_auto(
else:
# otherwise, use shuffle
logger.debug("Choose shuffle method for groupby operand %s", op)
return cls._perform_shuffle(
op, chunks + left_chunks, in_df, out_df, func_infos
return (
yield from cls._tile_with_shuffle(
op, in_df, out_df, func_infos, chunks + left_chunks
)
)

@classmethod
Expand All @@ -638,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"):
func_infos = cls._compile_funcs(op, in_df)

if op.method == "auto":
if len(in_df.chunks) <= op.combine_size:
if set(op.func) & _FUNCS_PREFER_SHUFFLE:
return (
yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)
)
elif len(in_df.chunks) <= op.combine_size:
return cls._tile_with_tree(op, in_df, out_df, func_infos)
else:
return (yield from cls._tile_auto(op, in_df, out_df, func_infos))
if op.method == "shuffle":
return cls._tile_with_shuffle(op, in_df, out_df, func_infos)
return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos))
elif op.method == "tree":
return cls._tile_with_tree(op, in_df, out_df, func_infos)
else: # pragma: no cover
Expand Down Expand Up @@ -1075,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"):
pd.reset_option("mode.use_inf_as_na")


def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
def agg(
groupby,
func=None,
method="auto",
combine_size=None,
map_on_shuffle=None,
*args,
**kwargs,
):
"""
Aggregate using one or more operations on grouped data.
Expand All @@ -1091,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
in distributed mode and use 'tree' in local mode.
combine_size : int
The number of chunks to combine when method is 'tree'
map_on_shuffle : bool
When not specified, will decide whether to perform aggregation on the
map stage of shuffle (currently no aggregation when there is custom
reduction in functions). Otherwise, whether to call map on map stage
of shuffle is determined by the value.
Returns
-------
Expand Down Expand Up @@ -1138,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs):
combine_size=combine_size or options.combine_size,
chunk_store_limit=options.chunk_store_limit,
use_inf_as_na=use_inf_as_na,
map_on_shuffle=map_on_shuffle,
)
return agg_op(groupby)
5 changes: 4 additions & 1 deletion mars/dataframe/reduction/nunique.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def _drop_duplicates(self, value, explode=False, agg=False):
value = value.values

if explode:
if len(value) == 0:
return [xp.array([], dtype=object)]
value = xp.concatenate(value)

value = xdf.unique(value)
Expand All @@ -79,7 +81,8 @@ def _drop_duplicates(self, value, explode=False, agg=False):
def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ
xp, xdf = self._get_modules()
if isinstance(in_data, xdf.Series):
unique_values = self._drop_duplicates(in_data)
# unique_values = self._drop_duplicates(in_data)
unique_values = [in_data.values]
return xdf.Series(unique_values, name=in_data.name, dtype=object)
else:
if self._axis == 0:
Expand Down

0 comments on commit 7a1b55d

Please sign in to comment.