From e550ae408128b24db9ac72d915c6df4928d64a30 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Sat, 7 May 2022 18:21:36 +0800 Subject: [PATCH] [BACKPORT] Fix incorrect result for df.sort_values when specifying multiple ascending (#2984) (#3006) Co-authored-by: He Kaisheng --- mars/core/operand/base.py | 4 +- mars/core/operand/tests/test_core.py | 2 + mars/dataframe/sort/core.py | 7 +- mars/dataframe/sort/psrs.py | 224 +++++------------- mars/dataframe/sort/sort_values.py | 7 + .../sort/tests/test_sort_execution.py | 37 ++- mars/oscar/backends/mars/pool.py | 9 +- mars/services/task/analyzer/analyzer.py | 3 +- 8 files changed, 124 insertions(+), 169 deletions(-) diff --git a/mars/core/operand/base.py b/mars/core/operand/base.py index 3d2cc8425c..b745e2b245 100644 --- a/mars/core/operand/base.py +++ b/mars/core/operand/base.py @@ -147,8 +147,8 @@ def __init__(self: OperandType, *args, **kwargs): extra_names = ( set(kwargs) - set(self._FIELDS) - set(SchedulingHint.all_hint_names) ) - extras = AttributeDict((k, kwargs.pop(k)) for k in extra_names) - kwargs["extra_params"] = kwargs.pop("extra_params", extras) + extras = dict((k, kwargs.pop(k)) for k in extra_names) + kwargs["extra_params"] = AttributeDict(kwargs.pop("extra_params", extras)) self._extract_scheduling_hint(kwargs) super().__init__(*args, **kwargs) diff --git a/mars/core/operand/tests/test_core.py b/mars/core/operand/tests/test_core.py index 0500c92887..363befcd8d 100644 --- a/mars/core/operand/tests/test_core.py +++ b/mars/core/operand/tests/test_core.py @@ -73,6 +73,8 @@ class MyOperand5(MyOperand4): def test_execute(): + op = MyOperand(extra_params={"my_extra_params": 1}) + assert op.extra_params["my_extra_params"] == 1 MyOperand.register_executor(lambda *_: 2) assert execute(dict(), MyOperand(_key="1")) == 2 assert execute(dict(), MyOperand2(_key="1")) == 2 diff --git a/mars/dataframe/sort/core.py b/mars/dataframe/sort/core.py index 99857b453d..2d2152bf3f 100644 --- a/mars/dataframe/sort/core.py +++ b/mars/dataframe/sort/core.py @@ -19,11 +19,12 @@ from ...core.operand import OperandStage from ...serialization.serializables import ( FieldTypes, + AnyField, + BoolField, Int32Field, Int64Field, - StringField, ListField, - BoolField, + StringField, ) from ...utils import ceildiv from ..operands import DataFrameOperand @@ -32,7 +33,7 @@ class DataFrameSortOperand(DataFrameOperand): _axis = Int32Field("axis") - _ascending = BoolField("ascending") + _ascending = AnyField("ascending") _inplace = BoolField("inplace") _kind = StringField("kind") _na_position = StringField("na_position") diff --git a/mars/dataframe/sort/psrs.py b/mars/dataframe/sort/psrs.py index 5defef7f24..c4426f210f 100644 --- a/mars/dataframe/sort/psrs.py +++ b/mars/dataframe/sort/psrs.py @@ -20,7 +20,13 @@ from ... import opcodes as OperandDef from ...core.operand import OperandStage, MapReduceOperand from ...utils import lazy_import -from ...serialization.serializables import Int32Field, ListField, StringField, BoolField +from ...serialization.serializables import ( + AnyField, + Int32Field, + ListField, + StringField, + BoolField, +) from ...tensor.base.psrs import PSRSOperandMixin from ..core import IndexValue, OutputType from ..utils import standardize_range_index, parse_index, is_cudf @@ -48,6 +54,23 @@ def __gt__(self, other): _largest = _Largest() +class _ReversedValue: + def __init__(self, value): + self._value = value + + def __lt__(self, other): + if type(other) is _ReversedValue: + # may happen when call searchsorted + return self._value >= other._value + return self._value >= other + + def __gt__(self, other): + return self._value <= other + + def __repr__(self): + return repr(self._value) + + class DataFramePSRSOperandMixin(DataFrameOperandMixin, PSRSOperandMixin): @classmethod def _collect_op_properties(cls, op): @@ -377,90 +400,23 @@ def execute_sort_index(data, op, inplace=None): class DataFramePSRSChunkOperand(DataFrameOperand): # sort type could be 'sort_values' or 'sort_index' - _sort_type = StringField("sort_type") + sort_type = StringField("sort_type") - _axis = Int32Field("axis") - _by = ListField("by") - _ascending = BoolField("ascending") - _inplace = BoolField("inplace") - _kind = StringField("kind") - _na_position = StringField("na_position") + axis = Int32Field("axis") + by = ListField("by", default=None) + ascending = AnyField("ascending") + inplace = BoolField("inplace") + kind = StringField("kind") + na_position = StringField("na_position") # for sort_index - _level = ListField("level") - _sort_remaining = BoolField("sort_remaining") - - _n_partition = Int32Field("n_partition") - - def __init__( - self, - sort_type=None, - by=None, - axis=None, - ascending=None, - inplace=None, - kind=None, - na_position=None, - level=None, - sort_remaining=None, - n_partition=None, - output_types=None, - **kw - ): - super().__init__( - _sort_type=sort_type, - _by=by, - _axis=axis, - _ascending=ascending, - _inplace=inplace, - _kind=kind, - _na_position=na_position, - _level=level, - _sort_remaining=sort_remaining, - _n_partition=n_partition, - _output_types=output_types, - **kw - ) + level = ListField("level") + sort_remaining = BoolField("sort_remaining") - @property - def sort_type(self): - return self._sort_type + n_partition = Int32Field("n_partition") - @property - def axis(self): - return self._axis - - @property - def by(self): - return self._by - - @property - def ascending(self): - return self._ascending - - @property - def inplace(self): - return self._inplace - - @property - def kind(self): - return self._kind - - @property - def na_position(self): - return self._na_position - - @property - def level(self): - return self._level - - @property - def sort_remaining(self): - return self._sort_remaining - - @property - def n_partition(self): - return self._n_partition + def __init__(self, output_types=None, **kw): + super().__init__(_output_types=output_types, **kw) class DataFramePSRSSortRegularSample(DataFramePSRSChunkOperand, DataFrameOperandMixin): @@ -564,92 +520,25 @@ def execute(cls, ctx, op): class DataFramePSRSShuffle(MapReduceOperand, DataFrameOperandMixin): _op_type_ = OperandDef.PSRS_SHUFFLE - _sort_type = StringField("sort_type") + sort_type = StringField("sort_type") # for shuffle map - _axis = Int32Field("axis") - _by = ListField("by") - _ascending = BoolField("ascending") - _inplace = BoolField("inplace") - _na_position = StringField("na_position") - _n_partition = Int32Field("n_partition") + axis = Int32Field("axis") + by = ListField("by") + ascending = AnyField("ascending") + inplace = BoolField("inplace") + na_position = StringField("na_position") + n_partition = Int32Field("n_partition") # for sort_index - _level = ListField("level") - _sort_remaining = BoolField("sort_remaining") + level = ListField("level") + sort_remaining = BoolField("sort_remaining") # for shuffle reduce - _kind = StringField("kind") - - def __init__( - self, - sort_type=None, - by=None, - axis=None, - ascending=None, - n_partition=None, - na_position=None, - inplace=None, - kind=None, - level=None, - sort_remaining=None, - output_types=None, - **kw - ): - super().__init__( - _sort_type=sort_type, - _by=by, - _axis=axis, - _ascending=ascending, - _n_partition=n_partition, - _na_position=na_position, - _inplace=inplace, - _kind=kind, - _level=level, - _sort_remaining=sort_remaining, - _output_types=output_types, - **kw - ) - - @property - def sort_type(self): - return self._sort_type - - @property - def by(self): - return self._by - - @property - def axis(self): - return self._axis - - @property - def ascending(self): - return self._ascending + kind = StringField("kind") - @property - def inplace(self): - return self._inplace - - @property - def na_position(self): - return self._na_position - - @property - def level(self): - return self._level - - @property - def sort_remaining(self): - return self._sort_remaining - - @property - def n_partition(self): - return self._n_partition - - @property - def kind(self): - return self._kind + def __init__(self, output_types=None, **kw): + super().__init__(_output_types=output_types, **kw) @property def output_limit(self): @@ -657,6 +546,23 @@ def output_limit(self): @staticmethod def _calc_poses(src_cols, pivots, ascending=True): + if isinstance(ascending, list): + for asc, col in zip(ascending, pivots.columns): + # Make pivots available to use ascending order when mixed order specified + if not asc: + if pd.api.types.is_numeric_dtype(pivots.dtypes[col]): + # for numeric dtypes, convert to negative is more efficient + pivots[col] = -pivots[col] + src_cols[col] = -src_cols[col] + else: + # for other types, convert to ReversedValue + pivots[col] = pivots[col].map( + lambda x: x + if type(x) is _ReversedValue + else _ReversedValue(x) + ) + ascending = True + records = src_cols.to_records(index=False) p_records = pivots.to_records(index=False) if ascending: diff --git a/mars/dataframe/sort/sort_values.py b/mars/dataframe/sort/sort_values.py index 3bf94ed893..6aca000e3c 100644 --- a/mars/dataframe/sort/sort_values.py +++ b/mars/dataframe/sort/sort_values.py @@ -252,6 +252,13 @@ def dataframe_sort_values( raise NotImplementedError("Only support sort on axis 0") psrs_kinds = _validate_sort_psrs_kinds(psrs_kinds) by = by if isinstance(by, (list, tuple)) else [by] + if isinstance(ascending, list): # pragma: no cover + if all(ascending): + # all are True, convert to True + ascending = True + elif not any(ascending): + # all are False, convert to False + ascending = False op = DataFrameSortValues( by=by, axis=axis, diff --git a/mars/dataframe/sort/tests/test_sort_execution.py b/mars/dataframe/sort/tests/test_sort_execution.py index 70e5c2b19b..d7b6fbe9d5 100644 --- a/mars/dataframe/sort/tests/test_sort_execution.py +++ b/mars/dataframe/sort/tests/test_sort_execution.py @@ -27,10 +27,9 @@ "distinct_opt", ["0"] if sys.platform.lower().startswith("win") else ["0", "1"] ) def test_sort_values_execution(setup, distinct_opt): + ns = np.random.RandomState(0) os.environ["PSRS_DISTINCT_COL"] = distinct_opt - df = pd.DataFrame( - np.random.rand(100, 10), columns=["a" + str(i) for i in range(10)] - ) + df = pd.DataFrame(ns.rand(100, 10), columns=["a" + str(i) for i in range(10)]) # test one chunk mdf = DataFrame(df) @@ -67,6 +66,38 @@ def test_sort_values_execution(setup, distinct_opt): pd.testing.assert_frame_equal(result, expected) + # test ascending is a list + result = ( + mdf.sort_values(["a3", "a4", "a5", "a6"], ascending=[False, True, True, False]) + .execute() + .fetch() + ) + expected = df.sort_values( + ["a3", "a4", "a5", "a6"], ascending=[False, True, True, False] + ) + pd.testing.assert_frame_equal(result, expected) + + in_df = pd.DataFrame( + { + "col1": ns.choice([f"a{i}" for i in range(5)], size=(100,)), + "col2": ns.choice([f"b{i}" for i in range(5)], size=(100,)), + "col3": ns.choice([f"c{i}" for i in range(5)], size=(100,)), + "col4": ns.randint(10, 20, size=(100,)), + } + ) + mdf = DataFrame(in_df, chunk_size=10) + result = ( + mdf.sort_values( + ["col1", "col4", "col3", "col2"], ascending=[False, False, True, False] + ) + .execute() + .fetch() + ) + expected = in_df.sort_values( + ["col1", "col4", "col3", "col2"], ascending=[False, False, True, False] + ) + pd.testing.assert_frame_equal(result, expected) + # test multiindex df2 = df.copy(deep=True) df2.columns = pd.MultiIndex.from_product([list("AB"), list("CDEFG")]) diff --git a/mars/oscar/backends/mars/pool.py b/mars/oscar/backends/mars/pool.py index 872b885303..f6558dbabe 100644 --- a/mars/oscar/backends/mars/pool.py +++ b/mars/oscar/backends/mars/pool.py @@ -238,7 +238,14 @@ async def kill_sub_pool( await asyncio.to_thread(process.join, 5) async def is_sub_pool_alive(self, process: multiprocessing.Process): - return await asyncio.to_thread(process.is_alive) + try: + return await asyncio.to_thread(process.is_alive) + except RuntimeError as ex: # pragma: no cover + if "shutdown" not in str(ex): + # when atexit is triggered, the default pool might be shutdown + # and to_thread will fail + raise + return process.is_alive() async def recover_sub_pool(self, address: str): process_index = self._config.get_process_index(address) diff --git a/mars/services/task/analyzer/analyzer.py b/mars/services/task/analyzer/analyzer.py index 4d0c0abc6b..dbf542dfcd 100644 --- a/mars/services/task/analyzer/analyzer.py +++ b/mars/services/task/analyzer/analyzer.py @@ -305,8 +305,9 @@ def gen_subtask_graph(self) -> SubtaskGraph: # assign expect workers for those specified with `expect_worker` # skip `start_ops`, which have been assigned before + start_ops_set = set(start_ops) for chunk in self._chunk_graph: - if chunk not in start_ops and chunk.op.expect_worker is not None: + if chunk not in start_ops_set and chunk.op.expect_worker is not None: chunk_to_bands[chunk] = self._to_band(chunk.op.expect_worker) # fuse node