Skip to content

Commit

Permalink
[BACKPORT] Fix consistency between tensor metadata and real outputs (#…
Browse files Browse the repository at this point in the history
…1085) (#1087)

Co-authored-by: 继盛 <[email protected]>
Co-authored-by: hekaisheng <[email protected]>
  • Loading branch information
3 people authored Mar 21, 2020
1 parent 520d9ef commit ad5292f
Show file tree
Hide file tree
Showing 19 changed files with 149 additions and 70 deletions.
1 change: 1 addition & 0 deletions mars/dataframe/align.py
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ def _gen_dataframe_chunks(splits, out_shape, left_or_right, df):
chunk_kw = {
'index_value': chunk.index_value if splits[0].isdummy() else None,
'columns_value': chunk.columns_value if splits[1].isdummy() else None,
'dtypes': chunk.dtypes if splits[1].isdummy() else None
}
align_op = DataFrameIndexAlign(
stage=OperandStage.map, index_min_max=index_min_max,
Expand Down
42 changes: 31 additions & 11 deletions mars/dataframe/arithmetic/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ def _tile_scalar(cls, op):
out_chunks.append(out_chunk)

new_op = op.copy()
out = op.outputs[0]
if isinstance(df, SERIES_TYPE):
return new_op.new_seriess(op.inputs, df.shape, nsplits=tileable.nsplits, dtype=df.dtype,
return new_op.new_seriess(op.inputs, df.shape, nsplits=tileable.nsplits, dtype=out.dtype,
index_value=df.index_value, name=df.name, chunks=out_chunks)
else:
return new_op.new_dataframes(op.inputs, df.shape, nsplits=tileable.nsplits, dtypes=df.dtypes,
return new_op.new_dataframes(op.inputs, df.shape, nsplits=tileable.nsplits, dtypes=out.dtypes,
index_value=df.index_value, columns_value=df.columns_value,
chunks=out_chunks)

Expand Down Expand Up @@ -234,11 +235,12 @@ def _tile_with_tensor(cls, op):
out_chunks.append(out_chunk)

new_op = op.copy()
out = op.outputs[0]
if isinstance(other, SERIES_TYPE):
return new_op.new_seriess(op.inputs, other.shape, nsplits=other.nsplits, dtype=other.dtype,
index_value=other.index_value, name=other.name, chunks=out_chunks)
return new_op.new_seriess(op.inputs, other.shape, nsplits=other.nsplits, dtype=out.dtype,
index_value=other.index_value, chunks=out_chunks)
else:
return new_op.new_dataframes(op.inputs, other.shape, nsplits=other.nsplits, dtypes=other.dtypes,
return new_op.new_dataframes(op.inputs, other.shape, nsplits=other.nsplits, dtypes=out.dtypes,
index_value=other.index_value, columns_value=other.columns_value,
chunks=out_chunks)

Expand Down Expand Up @@ -295,8 +297,17 @@ def _operator(self):
def _calc_properties(cls, x1, x2=None, axis='columns'):
if isinstance(x1, (DATAFRAME_TYPE, DATAFRAME_CHUNK_TYPE)) \
and (x2 is None or np.isscalar(x2) or isinstance(x2, TENSOR_TYPE)):
# FIXME infer the dtypes of result df properly
return {'shape': x1.shape, 'dtypes': x1.dtypes,
if x2 is None:
dtypes = x1.dtypes
elif np.isscalar(x2):
dtypes = infer_dtypes(x1.dtypes, pd.Series(np.array(x2).dtype), cls._operator)
elif x1.dtypes is not None and isinstance(x2, TENSOR_TYPE):
dtypes = pd.Series(
[infer_dtype(dt, x2.dtype, cls._operator) for dt in x1.dtypes],
index=x1.dtypes.index)
else:
dtypes = x1.dtypes
return {'shape': x1.shape, 'dtypes': dtypes,
'columns_value': x1.columns_value, 'index_value': x1.index_value}

if isinstance(x1, (SERIES_TYPE, SERIES_CHUNK_TYPE)) \
Expand All @@ -311,7 +322,9 @@ def _calc_properties(cls, x1, x2=None, axis='columns'):

if x1.columns_value is not None and x2.columns_value is not None and \
x1.columns_value.key == x2.columns_value.key:
dtypes = x1.dtypes
dtypes = pd.Series([infer_dtype(dt1, dt2, cls._operator) for dt1, dt2
in zip(x1.dtypes, x2.dtypes)],
index=x1.dtypes.index)
columns = copy.copy(x1.columns_value)
columns.value.should_be_monotonic = False
column_shape = len(dtypes)
Expand Down Expand Up @@ -343,11 +356,12 @@ def _calc_properties(cls, x1, x2=None, axis='columns'):
column_shape, dtypes, columns = np.nan, None, None
if x1.columns_value is not None and x1.index_value is not None:
if x1.columns_value.key == x2.index_value.key:
dtypes = x1.dtypes
dtypes = pd.Series([infer_dtype(dt, x2.dtype, cls._operator) for dt in x1.dtypes],
index=x1.dtypes.index)
columns = copy.copy(x1.columns_value)
columns.value.should_be_monotonic = False
column_shape = len(dtypes)
else:
else: # pragma: no cover
dtypes = x1.dtypes # FIXME
columns = infer_index_value(x1.columns_value, x2.index_value)
columns.value.should_be_monotonic = True
Expand All @@ -360,10 +374,16 @@ def _calc_properties(cls, x1, x2=None, axis='columns'):
index_shape, index = np.nan, None
if x1.index_value is not None and x1.index_value is not None:
if x1.index_value.key == x2.index_value.key:
index = copy.copy(x1.columns_value)
dtypes = pd.Series([infer_dtype(dt, x2.dtype, cls._operator) for dt in x1.dtypes],
index=x1.dtypes.index)
index = copy.copy(x1.index_value)
index.value.should_be_monotonic = False
index_shape = x1.shape[0]
else:
if x1.dtypes is not None:
dtypes = pd.Series(
[infer_dtype(dt, x2.dtype, cls._operator) for dt in x1.dtypes],
index=x1.dtypes.index)
index = infer_index_value(x1.index_value, x2.index_value)
index.value.should_be_monotonic = True
index_shape = np.nan
Expand Down
3 changes: 2 additions & 1 deletion mars/dataframe/indexing/getitem.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,10 +276,11 @@ def tile_with_mask(cls, op):
out_chunks = []
for idx, df_chunk in zip(out_chunk_indexes, df_chunks):
mask_chunk = mask_chunks[df_chunk.index[0]]
index_value = parse_index(out_df.index_value.to_pandas(), df_chunk)
out_chunk = op.copy().reset_key().new_chunk([df_chunk, mask_chunk], index=idx,
shape=(np.nan, df_chunk.shape[1]),
dtypes=df_chunk.dtypes,
index_value=df_chunk.index_value,
index_value=index_value,
columns_value=df_chunk.columns_value)
out_chunks.append(out_chunk)

Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/indexing/index_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def set_chunk_index_info(cls, context, index_info, chunk_index,
assert index_info.input_axis == 0, \
'bool indexing on axis columns cannot be tensor'

index_value = parse_index(chunk_input.index_value.to_pandas(),
index_value = parse_index(pd.Index([], chunk_input.index_value.to_pandas().dtype),
chunk_input, index, store_data=False)

info = ChunkIndexAxisInfo(output_axis_index=output_axis_index,
Expand Down
2 changes: 1 addition & 1 deletion mars/dataframe/merge/concat.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ def _call_series(self, objs):
else:
index_value = parse_index(index)
return self.new_series(objs, shape=(row_length,), dtype=objs[0].dtype,
index_value=index_value)
index_value=index_value, name=objs[0].name)
else:
self._object_type = ObjectType.dataframe
col_length = 0
Expand Down
41 changes: 31 additions & 10 deletions mars/dataframe/reduction/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from ...operands import OperandStage
from ...utils import lazy_import
from ...serialize import BoolField, AnyField, DataTypeField, Int32Field
from ..utils import parse_index, build_empty_df
from ..utils import parse_index, build_empty_df, build_empty_series
from ..operands import DataFrameOperandMixin, DataFrameOperand, ObjectType, DATAFRAME_TYPE
from ..merge import DataFrameConcat

Expand Down Expand Up @@ -114,20 +114,26 @@ class DataFrameReductionMixin(DataFrameOperandMixin):
@classmethod
def _tile_one_chunk(cls, op):
df = op.outputs[0]
params = df.params

chk = op.inputs[0].chunks[0]
chunk_params = {k: v for k, v in chk.params.items()
if k in df.params}
chunk_params['shape'] = df.shape
chunk_params['index'] = chk.index
if op.object_type == ObjectType.series:
chunk_params.update(dict(dtype=df.dtype, index_value=df.index_value))
elif op.object_type == ObjectType.dataframe:
chunk_params.update(dict(dtypes=df.dtypes, index_value=df.index_value,
columns_value=df.columns_value))
else:
chunk_params.update(dict(dtype=df.dtype))
new_chunk_op = op.copy().reset_key()
chunk = new_chunk_op.new_chunk(op.inputs[0].chunks, kws=[chunk_params])

new_op = op.copy()
nsplits = tuple((s,) for s in chunk.shape)
params['chunks'] = [chunk]
params['nsplits'] = nsplits
params = df.params.copy()
params.update(dict(chunks=[chunk], nsplits=nsplits))
return new_op.new_tileables(op.inputs, kws=[params])

@classmethod
Expand Down Expand Up @@ -403,6 +409,7 @@ def execute(cls, ctx, op):
def _call_dataframe(self, df):
axis = getattr(self, 'axis', None)
level = getattr(self, 'level', None)
skipna = getattr(self, 'skipna', None)
numeric_only = getattr(self, 'numeric_only', None)
if axis == 'index':
axis = 0
Expand All @@ -416,23 +423,37 @@ def _call_dataframe(self, df):
self._axis = 0

empty_df = build_empty_df(df.dtypes)
reduced_df = getattr(empty_df, getattr(self, '_func_name'))(axis=axis, level=level,
numeric_only=numeric_only)
func_name = getattr(self, '_func_name')
if func_name == 'count':
reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, numeric_only=numeric_only)
else:
reduced_df = getattr(empty_df, func_name)(axis=axis, level=level, skipna=skipna,
numeric_only=numeric_only)
reduced_shape = (df.shape[0],) if axis == 1 else reduced_df.shape
return self.new_series([df], shape=reduced_shape, dtype=reduced_df.dtype,
index_value=parse_index(reduced_df.index))
index_value=parse_index(reduced_df.index, store_data=axis == 0))

def _call_series(self, series):
level = getattr(self, 'level', None)
axis = getattr(self, 'axis', None)
skipna = getattr(self, 'skipna', None)
numeric_only = getattr(self, 'numeric_only', None)
if axis == 'index':
axis = 0
self._axis = axis
# TODO: enable specify level if we support groupby
if level is not None:
raise NotImplementedError('Not support specified level now')

return self.new_scalar([series], dtype=series.dtype)
empty_series = build_empty_series(series.dtype)
func_name = getattr(self, '_func_name')
if func_name == 'count':
reduced_series = empty_series.count(level=level)
else:
reduced_series = getattr(empty_series, func_name)(axis=axis, level=level, skipna=skipna,
numeric_only=numeric_only)

return self.new_scalar([series], dtype=np.array(reduced_series).dtype)

def __call__(self, a):
if isinstance(a, DATAFRAME_TYPE):
Expand All @@ -445,7 +466,7 @@ class DataFrameCumReductionMixin(DataFrameOperandMixin):
@classmethod
def _tile_one_chunk(cls, op):
df = op.outputs[0]
params = df.params
params = df.params.copy()

chk = op.inputs[0].chunks[0]
chunk_params = {k: v for k, v in chk.params.items()
Expand Down Expand Up @@ -532,7 +553,7 @@ def _tile_series(cls, op):
new_op = op.copy().reset_key()
return new_op.new_tileables(op.inputs, shape=in_series.shape, nsplits=in_series.nsplits,
chunks=output_chunks, dtype=series.dtype,
index_value=series.index_value)
index_value=series.index_value, name=series.name)

@classmethod
def tile(cls, op):
Expand Down
22 changes: 14 additions & 8 deletions mars/dataframe/statistics/quantile.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,19 +95,23 @@ def _call_dataframe(self, a, inputs):
if isinstance(self._q, TENSOR_TYPE):
q_val = self._q
pd_index = pd.Index([], dtype=q_val.dtype)
name = None
store_index_value = False
else:
q_val = np.asanyarray(self._q)
pd_index = pd.Index(q_val)
name = self._q if q_val.size == 1 else None
store_index_value = True
tokenize_objects = (a, q_val, self._interpolation, type(self).__name__)

if q_val.ndim == 0 and self._axis == 0:
self._object_type = ObjectType.series
index_value = parse_index(dtypes.index, store_data=True)
index_value = parse_index(dtypes.index, store_data=store_index_value)
shape = (len(dtypes),)
# calc dtype
dtype = self._calc_dtype_on_axis_1(a, dtypes)
return self.new_series(inputs, shape=shape, dtype=dtype,
index_value=index_value, name=dtypes.index.name)
index_value=index_value, name=name or dtypes.index.name)
elif q_val.ndim == 0 and self._axis == 1:
self._object_type = ObjectType.series
index_value = a.index_value
Expand All @@ -117,11 +121,11 @@ def _call_dataframe(self, a, inputs):
self._q, interpolation=self._interpolation,
handle_non_numeric=not self._numeric_only).dtype
return self.new_series(inputs, shape=shape, dtype=dt,
index_value=index_value, name=index_value.name)
index_value=index_value, name=name or index_value.name)
elif q_val.ndim == 1 and self._axis == 0:
self._object_type = ObjectType.dataframe
shape = (len(q_val), len(dtypes))
index_value = parse_index(pd_index, *tokenize_objects, store_data=True)
index_value = parse_index(pd_index, *tokenize_objects, store_data=store_index_value)
dtype_list = []
for name in dtypes.index:
dtype_list.append(
Expand All @@ -136,7 +140,7 @@ def _call_dataframe(self, a, inputs):
assert q_val.ndim == 1 and self._axis == 1
self._object_type = ObjectType.dataframe
shape = (len(q_val), a.shape[0])
index_value = parse_index(pd_index, *tokenize_objects, store_data=True)
index_value = parse_index(pd_index, *tokenize_objects, store_data=store_index_value)
pd_columns = a.index_value.to_pandas()
dtype_list = np.full(len(pd_columns), self._calc_dtype_on_axis_1(a, dtypes))
dtypes = pd.Series(dtype_list, index=pd_columns)
Expand All @@ -150,9 +154,11 @@ def _call_series(self, a, inputs):
if isinstance(self._q, TENSOR_TYPE):
q_val = self._q
index_val = pd.Index([], dtype=q_val.dtype)
store_index_value = False
else:
q_val = np.asanyarray(self._q)
index_val = pd.Index(q_val)
store_index_value = True

# get dtype by tensor
a_t = astensor(a)
Expand All @@ -168,7 +174,7 @@ def _call_series(self, a, inputs):
return self.new_series(
inputs, shape=q_val.shape, dtype=dtype,
index_value=parse_index(index_val, a, q_val, self._interpolation,
type(self).__name__, store_data=True),
type(self).__name__, store_data=store_index_value),
name=a.name)

def __call__(self, a, q_input=None):
Expand All @@ -194,12 +200,12 @@ def _tile_dataframe(cls, op):
handle_non_numeric=not op.numeric_only)
ts.append(t)
try:
dtype = np.result_type([it.dtype for it in ts])
dtype = np.result_type(*[it.dtype for it in ts])
except TypeError:
dtype = np.dtype(object)
stack_op = TensorStack(axis=0, dtype=dtype)
tr = stack_op(ts)
r = series_from_tensor(tr, index=op.input.columns_value.to_pandas(),
r = series_from_tensor(tr, index=df.index_value.to_pandas(),
name=np.asscalar(ts[0].op.q))
else:
assert op.axis == 1
Expand Down
6 changes: 4 additions & 2 deletions mars/dataframe/window/rolling/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ def __call__(self, rolling):
params = rolling.params.copy()
if params['win_type'] == 'freq':
params['win_type'] = None
if self._func != 'count':
empty_df = empty_df._get_numeric_data()
test_df = empty_df.rolling(**params).agg(self._func)
if self._axis == 0:
index_value = inp.index_value
Expand All @@ -134,8 +136,8 @@ def __call__(self, rolling):
store_data=False)
self._object_type = ObjectType.dataframe
return self.new_dataframe(
[inp], shape=inp.shape, dtypes=test_df.dtypes,
index_value=index_value,
[inp], shape=(inp.shape[0], test_df.shape[1]),
dtypes=test_df.dtypes, index_value=index_value,
columns_value=parse_index(test_df.columns, store_data=True))
else:
pd_index = inp.index_value.to_pandas()
Expand Down
12 changes: 6 additions & 6 deletions mars/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def execute(self, retval=True):
class Executor(object):
_op_runners = {}
_op_size_estimators = {}
_graph_execution_cls = GraphExecution

class SyncProviderType(enum.Enum):
THREAD = 0
Expand Down Expand Up @@ -604,12 +605,11 @@ def execute_graph(self, graph, keys, n_parallel=None, print_progress=False,

executed_keys = list(itertools.chain(*[v[1] for v in self.stored_tileables.values()]))
chunk_result = self._chunk_result if chunk_result is None else chunk_result
graph_execution = GraphExecution(chunk_result, optimized_graph,
keys, executed_keys, self._sync_provider,
n_parallel=n_parallel, engine=self._engine,
prefetch=self._prefetch, print_progress=print_progress,
mock=mock, mock_max_memory=self._mock_max_memory,
fetch_keys=fetch_keys, no_intermediate=no_intermediate)
graph_execution = self._graph_execution_cls(
chunk_result, optimized_graph, keys, executed_keys, self._sync_provider,
n_parallel=n_parallel, engine=self._engine, prefetch=self._prefetch,
print_progress=print_progress, mock=mock, mock_max_memory=self._mock_max_memory,
fetch_keys=fetch_keys, no_intermediate=no_intermediate)
res = graph_execution.execute(retval)
self._mock_max_memory = max(self._mock_max_memory, graph_execution._mock_max_memory)
if mock:
Expand Down
Loading

0 comments on commit ad5292f

Please sign in to comment.